mirror of
https://github.com/TheFunny/ArisuAutoSweeper
synced 2025-12-16 22:05:12 +00:00
315 lines
10 KiB
Python
315 lines
10 KiB
Python
import re
|
||
from datetime import datetime, timedelta
|
||
from enum import Enum
|
||
|
||
import numpy as np
|
||
|
||
from module.base.base import ModuleBase
|
||
from module.base.timer import Timer
|
||
from module.base.utils import area_offset, area_size
|
||
from module.config.utils import get_server_next_update
|
||
from module.logger import logger
|
||
from module.ocr.ocr import Ocr
|
||
from tasks.cafe.assets.assets_cafe import *
|
||
|
||
|
||
class InvitationOcr(Ocr):
|
||
def after_process(self, result):
|
||
result = super().after_process(result)
|
||
result = result.replace('モI', 'モエ')
|
||
return result
|
||
|
||
|
||
class InvitationStatus(Enum):
|
||
MOMOTALK = 0
|
||
OCR = 1
|
||
SELECT = 2
|
||
CONFIRM = 3
|
||
SUBSTITUTE = 4
|
||
IN_SECOND = 5
|
||
INVITED = 6
|
||
FAILED = 7
|
||
FINISHED = -1
|
||
|
||
|
||
class Invitation:
|
||
swipe_vector_range = (0.65, 0.85)
|
||
cafe_update = ["04:00", "16:00"]
|
||
|
||
def __init__(self, name: str):
|
||
self.name = name
|
||
self.ocr = InvitationOcr(OCR_NAME)
|
||
self.list = OCR_NAME
|
||
self.item = MOMOTALK_ITEM
|
||
self.invite = MOMOTALK_INVITE
|
||
|
||
self.target_names = []
|
||
self.waiting_hour = None
|
||
self.substitute = None
|
||
self.choice = None
|
||
|
||
self.invited = []
|
||
self.current_names = []
|
||
|
||
def __str__(self):
|
||
return f'Invitation({self.name})'
|
||
|
||
__repr__ = __str__
|
||
|
||
def __eq__(self, other):
|
||
return str(self) == str(other)
|
||
|
||
def __hash__(self):
|
||
return hash(self.name)
|
||
|
||
def swipe_page(self, main: ModuleBase, vector_range=None, reverse=False):
|
||
"""
|
||
Args:
|
||
main:
|
||
vector_range (tuple[float, float]):
|
||
reverse (bool):
|
||
"""
|
||
if vector_range is None:
|
||
vector_range = self.swipe_vector_range
|
||
vector = np.random.uniform(*vector_range)
|
||
width, height = area_size(self.list.button)
|
||
vector = (0, -vector * height)
|
||
|
||
if reverse:
|
||
vector = (-vector[0], -vector[1])
|
||
name = f'{self.name}_SWIPE'
|
||
main.device.swipe_vector(vector, self.list.button, name=name)
|
||
main.device.click_record_remove(name)
|
||
|
||
def load_names(self, main: ModuleBase):
|
||
names = self.ocr.detect_and_ocr(main.device.image)
|
||
if not names:
|
||
logger.warning(f'No valid names in {self.ocr.name}')
|
||
return
|
||
self.current_names = []
|
||
for name_ in names:
|
||
name: str = name_.ocr_text.replace(' ', '')
|
||
if name.isdigit():
|
||
continue
|
||
if name.startswith('('):
|
||
if not self.current_names:
|
||
continue
|
||
n = self.current_names.pop(len(self.current_names) - 1)
|
||
self.current_names.append((n[0] + name, n[1]))
|
||
continue
|
||
self.current_names.append((name, name_.box))
|
||
|
||
@property
|
||
def is_invitation(self) -> bool:
|
||
return get_server_next_update(self.cafe_update) - datetime.now() > timedelta(hours=self.waiting_hour)
|
||
|
||
@property
|
||
def names(self):
|
||
return list(map(lambda x: x[0], self.current_names))
|
||
|
||
@property
|
||
def target_name(self):
|
||
return self.target_names[0] if self.target_names else None
|
||
|
||
def on_success(self):
|
||
logger.info(f'Invited {self.target_name}')
|
||
self.invited.append(self.target_name)
|
||
self.target_names.pop(0)
|
||
|
||
def on_failed(self):
|
||
logger.warning(f'Failed to invite {self.target_name}')
|
||
self.target_names.pop(0)
|
||
|
||
def insight_name(self, name: str, main: ModuleBase, skip_first_screenshot=True) -> bool:
|
||
"""
|
||
Args:
|
||
name:
|
||
main:
|
||
skip_first_screenshot:
|
||
"""
|
||
logger.info(f'Insight name: {name}')
|
||
last_names: set[str] = set()
|
||
|
||
while 1:
|
||
if skip_first_screenshot:
|
||
skip_first_screenshot = False
|
||
else:
|
||
main.device.screenshot()
|
||
|
||
self.load_names(main)
|
||
|
||
if name in self.names:
|
||
return True
|
||
|
||
names = self.names
|
||
if names and last_names == set(names):
|
||
logger.warning(f'Name not found: {name}')
|
||
return False
|
||
last_names = set(names)
|
||
|
||
self.swipe_page(main)
|
||
|
||
main.wait_until_stable(
|
||
self.list.button,
|
||
timer=Timer(0, 0),
|
||
timeout=Timer(1.5, 5)
|
||
)
|
||
|
||
def select_name_invite(
|
||
self,
|
||
main: ModuleBase,
|
||
name: str = None,
|
||
insight: bool = True,
|
||
skip_first_screenshot: bool = True
|
||
) -> bool:
|
||
"""
|
||
Args:
|
||
main:
|
||
name:
|
||
insight:
|
||
skip_first_screenshot:
|
||
|
||
Returns:
|
||
If success
|
||
"""
|
||
if name is None:
|
||
if not self.target_name:
|
||
return False
|
||
name = self.target_name
|
||
if insight and not self.insight_name(name, main, skip_first_screenshot):
|
||
return False
|
||
|
||
logger.info(f'Select name: {name}')
|
||
click_interval = Timer(1, 2)
|
||
load_names_interval = Timer(1, 5)
|
||
timeout = Timer(10, 10).start()
|
||
while 1:
|
||
if skip_first_screenshot:
|
||
skip_first_screenshot = False
|
||
else:
|
||
main.device.screenshot()
|
||
|
||
if load_names_interval.reached_and_reset() and not insight:
|
||
self.load_names(main)
|
||
|
||
name_box = next(filter(lambda x: x[0] == name, self.current_names), None)
|
||
if name_box is None:
|
||
logger.warning(f'No name {name} in {self.ocr.name}')
|
||
continue
|
||
|
||
search_box = area_offset((0, 0, *area_size(self.item.area)), name_box[1][:2])
|
||
self.invite.load_search(search_box)
|
||
click_button = self.invite.match_multi_template(main.device.image)
|
||
|
||
if not click_button:
|
||
logger.warning(f'No clickable {self.invite.name}')
|
||
continue
|
||
|
||
if click_interval.reached_and_reset():
|
||
main.device.click(click_button[0])
|
||
return True
|
||
|
||
if timeout.reached():
|
||
logger.warning(f'No clickable {self.invite.name}')
|
||
return False
|
||
|
||
|
||
invitation = Invitation('CafeInvitation')
|
||
|
||
|
||
def handle_invitation_status(status: InvitationStatus, main: ModuleBase) -> InvitationStatus:
|
||
match status:
|
||
case InvitationStatus.MOMOTALK:
|
||
if not invitation.is_invitation:
|
||
logger.info('Invitation waiting until next refresh')
|
||
return InvitationStatus.FINISHED
|
||
if main.match_color(CAFE_INVITED):
|
||
logger.info('Invitation in cooldown')
|
||
return InvitationStatus.FINISHED
|
||
if invitation.choice != 'list_top' and invitation.target_name is None:
|
||
logger.warning('No student to be invited or all invitations failed')
|
||
return InvitationStatus.FINISHED
|
||
if main.appear(CHECK_MOMOTALK):
|
||
return InvitationStatus.OCR
|
||
main.appear_then_click(CAFE_INVITE)
|
||
case InvitationStatus.OCR:
|
||
if invitation.choice == 'list_top':
|
||
invitation.load_names(main)
|
||
invitation.target_names = invitation.names
|
||
invitation.choice = 'list'
|
||
if invitation.select_name_invite(main):
|
||
return InvitationStatus.SELECT
|
||
return InvitationStatus.FAILED
|
||
case InvitationStatus.SELECT:
|
||
if main.appear(INVITE_CONFIRM):
|
||
return InvitationStatus.CONFIRM
|
||
if main.appear(INVITE_IN_SECOND):
|
||
return InvitationStatus.IN_SECOND
|
||
if main.appear(INVITE_SUBSTITUTE):
|
||
return InvitationStatus.SUBSTITUTE
|
||
case InvitationStatus.CONFIRM:
|
||
main.appear_then_click(INVITE_CONFIRM)
|
||
if not main.appear(INVITE_CONFIRM):
|
||
invitation.on_success()
|
||
return InvitationStatus.INVITED
|
||
case InvitationStatus.IN_SECOND:
|
||
main.appear_then_click(INVITE_IN_SECOND_CLOSE)
|
||
return InvitationStatus.FAILED
|
||
case InvitationStatus.SUBSTITUTE:
|
||
if not invitation.substitute:
|
||
main.appear_then_click(INVITE_SUBSTITUTE_CLOSE)
|
||
return InvitationStatus.FAILED
|
||
else:
|
||
main.appear_then_click(INVITE_SUBSTITUTE)
|
||
if not main.appear(INVITE_SUBSTITUTE):
|
||
return InvitationStatus.INVITED
|
||
case InvitationStatus.INVITED:
|
||
main.appear_then_click(MOMOTALK_CLOSE)
|
||
if not main.appear(MOMOTALK_CLOSE):
|
||
return InvitationStatus.FINISHED
|
||
case InvitationStatus.FAILED:
|
||
main.appear_then_click(MOMOTALK_CLOSE)
|
||
if not main.appear(CHECK_MOMOTALK):
|
||
invitation.on_failed()
|
||
return InvitationStatus.MOMOTALK
|
||
case InvitationStatus.FINISHED:
|
||
pass
|
||
case _:
|
||
logger.warning(f'Invalid status: {status}')
|
||
return status
|
||
|
||
|
||
def handle_invitation(main: ModuleBase):
|
||
if not main.config.Invitation_Enable:
|
||
logger.info('Invitation disabled')
|
||
return True
|
||
invitation.waiting_hour = main.config.Invitation_WaitingHour
|
||
invitation.substitute = main.config.Invitation_Substitute
|
||
if invitation.choice is None:
|
||
invitation.choice = main.config.Invitation_Choice
|
||
if invitation.choice == 'by_name' and not invitation.target_names:
|
||
name = main.config.Invitation_Name
|
||
if name is None:
|
||
logger.warning('Choose By Name but Inviting Student Name is blank')
|
||
return True
|
||
name = re.sub(r'[ \t\r\n]', '', name)
|
||
name = re.sub(r'[>﹥›˃ᐳ❯]', '>', name)
|
||
name = re.sub(r'(', '(', name)
|
||
name = re.sub(r')', ')', name)
|
||
invitation.target_names = name.split('>')
|
||
status = InvitationStatus.MOMOTALK
|
||
action_timer = Timer(1, 1)
|
||
loading_timer = Timer(1, 1)
|
||
while 1:
|
||
main.device.screenshot()
|
||
|
||
if not loading_timer.reached():
|
||
continue
|
||
|
||
if action_timer.reached_and_reset():
|
||
logger.attr('Status', status)
|
||
status = handle_invitation_status(status, main)
|
||
|
||
if status == InvitationStatus.FINISHED:
|
||
return True
|