mirror of
https://github.com/TheFunny/ArisuAutoSweeper
synced 2026-06-09 20:04:52 +00:00
feat:lesson
This commit is contained in:
@@ -3,6 +3,61 @@ from module.base.button import Button, ButtonWrapper
|
||||
# This file was auto-generated, do not modify it manually. To generate:
|
||||
# ``` python -m dev_tools.button_extract ```
|
||||
|
||||
CONFIRM = ButtonWrapper(
|
||||
name='CONFIRM',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/CONFIRM.png',
|
||||
area=(532, 528, 748, 589),
|
||||
search=(512, 508, 768, 609),
|
||||
color=(110, 207, 241),
|
||||
button=(532, 528, 748, 589),
|
||||
),
|
||||
)
|
||||
FIRST_ITEM = ButtonWrapper(
|
||||
name='FIRST_ITEM',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/FIRST_ITEM.png',
|
||||
area=(727, 137, 1103, 239),
|
||||
search=(707, 117, 1123, 259),
|
||||
color=(200, 209, 220),
|
||||
button=(727, 137, 1103, 239),
|
||||
),
|
||||
)
|
||||
LOCATIONS = ButtonWrapper(
|
||||
name='LOCATIONS',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/LOCATIONS.png',
|
||||
area=(1075, 638, 1256, 693),
|
||||
search=(1055, 618, 1276, 713),
|
||||
color=(107, 202, 237),
|
||||
button=(1075, 638, 1256, 693),
|
||||
),
|
||||
)
|
||||
LOCATIONS_POPUP = ButtonWrapper(
|
||||
name='LOCATIONS_POPUP',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/LOCATIONS_POPUP.png',
|
||||
area=(534, 101, 750, 135),
|
||||
search=(514, 81, 770, 155),
|
||||
color=(194, 202, 210),
|
||||
button=(534, 101, 750, 135),
|
||||
),
|
||||
)
|
||||
OCR_TICKET = ButtonWrapper(
|
||||
name='OCR_TICKET',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/OCR_TICKET.png',
|
||||
area=(220, 79, 266, 121),
|
||||
search=(200, 59, 286, 141),
|
||||
color=(214, 225, 229),
|
||||
button=(220, 79, 266, 121),
|
||||
),
|
||||
)
|
||||
SCROLL = ButtonWrapper(
|
||||
name='SCROLL',
|
||||
jp=Button(
|
||||
@@ -20,3 +75,14 @@ SCROLL = ButtonWrapper(
|
||||
button=(727, 137, 1103, 671),
|
||||
),
|
||||
)
|
||||
START_LESSON = ButtonWrapper(
|
||||
name='START_LESSON',
|
||||
jp=None,
|
||||
en=Button(
|
||||
file='./assets/en/schedule/START_LESSON.png',
|
||||
area=(506, 523, 773, 585),
|
||||
search=(486, 503, 793, 605),
|
||||
color=(110, 205, 239),
|
||||
button=(506, 523, 773, 585),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
from enum import Flag
|
||||
|
||||
from module.base.timer import Timer
|
||||
from module.exception import RequestHumanTakeover
|
||||
from module.logger import logger
|
||||
from tasks.base.assets.assets_base_page import BACK
|
||||
from tasks.base.page import page_schedule
|
||||
from tasks.schedule.ui import ScheduleUI
|
||||
from tasks.base.assets.assets_base_page import SCHEDULE_CHECK
|
||||
|
||||
import re
|
||||
|
||||
class ScheduleStatus(Flag):
|
||||
OCR = 0
|
||||
ENTER = 1
|
||||
SELECT = 2
|
||||
END = 3
|
||||
FINISH = 4
|
||||
|
||||
|
||||
class Schedule(ScheduleUI):
|
||||
@property
|
||||
def schedule_info(self):
|
||||
info = []
|
||||
input_valid = True
|
||||
schedule_config = self.config.cross_get("Schedule")
|
||||
choices = ["Choice1", "Choice2", "Choice3", "Choice4", "Choice5"]
|
||||
|
||||
for choice in choices:
|
||||
location, classrooms = schedule_config[choice]["Location"], schedule_config[choice]["Classrooms"]
|
||||
if location == "None" or not classrooms or (isinstance(classrooms, str) and classrooms.replace(" ", "") == ""):
|
||||
continue
|
||||
elif isinstance(classrooms, int):
|
||||
classrooms_list = [str(classrooms)]
|
||||
else:
|
||||
classrooms = classrooms.strip()
|
||||
classrooms = re.sub(r'[ \t\r\n]', '', classrooms)
|
||||
classrooms = re.sub(r'[>﹥›˃ᐳ❯]', '>', classrooms)
|
||||
classrooms_list = list(set(classrooms.split('>')))
|
||||
|
||||
if self.valid_classroom(classrooms_list):
|
||||
info.append([location, classrooms_list])
|
||||
else:
|
||||
logger.error(f"Failed to read {choice}")
|
||||
input_valid = False
|
||||
|
||||
return info if input_valid else []
|
||||
|
||||
def valid_classroom(self, classrooms_list):
|
||||
if not classrooms_list:
|
||||
return False
|
||||
for classroom in classrooms_list:
|
||||
if not classroom.isdigit():
|
||||
return False
|
||||
if not 1 <= int(classroom) <= 9:
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def valid_task(self) -> list:
|
||||
task = self.schedule_info
|
||||
if not task:
|
||||
logger.warning('Lessons enabled but no task set')
|
||||
self.error_handler()
|
||||
return task
|
||||
|
||||
def error_handler(self):
|
||||
action = self.config.Schedule_OnError
|
||||
if action == 'stop':
|
||||
raise RequestHumanTakeover
|
||||
elif action == 'skip':
|
||||
with self.config.multi_set():
|
||||
self.config.task_delay(server_update=True)
|
||||
self.config.task_stop()
|
||||
|
||||
@property
|
||||
def current_location(self):
|
||||
return self.task[0][0]
|
||||
|
||||
@property
|
||||
def current_classrooms(self):
|
||||
return self.task[0][1]
|
||||
|
||||
def handle_schedule(self, status):
|
||||
match status:
|
||||
case ScheduleStatus.OCR:
|
||||
if self.task:
|
||||
self.ticket = self.get_ticket()
|
||||
if self.ticket not in [0, None]:
|
||||
return ScheduleStatus.ENTER
|
||||
return ScheduleStatus.FINISH
|
||||
case ScheduleStatus.ENTER:
|
||||
if self.enter_location(self.current_location):
|
||||
return ScheduleStatus.SELECT
|
||||
else:
|
||||
self.error_handler()
|
||||
case ScheduleStatus.SELECT:
|
||||
if self.select_classrooms(self.ticket, self.current_classrooms):
|
||||
self.task.pop(0)
|
||||
return ScheduleStatus.END
|
||||
return ScheduleStatus.FINISH
|
||||
case ScheduleStatus.END:
|
||||
if self.appear(SCHEDULE_CHECK):
|
||||
return ScheduleStatus.OCR
|
||||
self.click_with_interval(BACK, interval=2)
|
||||
case ScheduleStatus.FINISH:
|
||||
return status
|
||||
case _:
|
||||
logger.warning(f'Invalid status: {status}')
|
||||
return status
|
||||
|
||||
def run(self):
|
||||
self.ui_ensure(page_schedule)
|
||||
self.task = self.valid_task
|
||||
action_timer = Timer(0.5, 1)
|
||||
status = ScheduleStatus.OCR
|
||||
|
||||
while 1:
|
||||
self.device.screenshot()
|
||||
|
||||
if self.ui_additional():
|
||||
continue
|
||||
|
||||
if action_timer.reached_and_reset():
|
||||
logger.attr('Status', status)
|
||||
status = self.handle_schedule(status)
|
||||
|
||||
if status == ScheduleStatus.FINISH:
|
||||
break
|
||||
|
||||
self.config.task_delay(server_update=True)
|
||||
@@ -0,0 +1,149 @@
|
||||
"""
|
||||
Original Author: sanmusen214(https://github.com/sanmusen214)
|
||||
Adapted from https://github.com/sanmusen214/BAAH/blob/1.2/modules/AllTask/SubTask/ScrollSelect.py
|
||||
"""
|
||||
|
||||
from module.logger import logger
|
||||
from module.base.timer import Timer
|
||||
|
||||
|
||||
class ScrollSelect:
|
||||
"""
|
||||
Scroll and select the corresponding level by clicking on the right-side window.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
targetind : int
|
||||
Index of the target level
|
||||
window_starty:
|
||||
Y-coordinate of the upper edge of the window
|
||||
first_item_endy:
|
||||
Y-coordinate of the lower edge of the first item
|
||||
window_endy:
|
||||
Y-coordinate of the lower edge of the window
|
||||
clickx: int
|
||||
Base X-coordinate for sliding and clicking the button
|
||||
hasexpectimage: function
|
||||
Function to determine the appearance of the expected image after clicking, returns a boolean
|
||||
swipeoffsetx: int
|
||||
X offset of the base X-coordinate during sliding to prevent accidental button clicks
|
||||
finalclick: bool
|
||||
Whether to click on clickx and the last row after the sliding ends
|
||||
"""
|
||||
def __init__(self, window_button, first_item_button, expected_button, clickx, swipeoffsetx=-100, finalclick=True) -> None:
|
||||
# TODO: Actually, only concerned about the height of one element, completely displaying the Y of the first button, completely displaying the Y of the bottom button, the number of complete elements that the window can contain, the height of the last element in the window, and the left offset and response distance.
|
||||
self.window_starty = window_button.area[1]
|
||||
self.window_endy = window_button.area[3]
|
||||
self.first_item_endy = first_item_button.area[3]
|
||||
self.windowheight = window_button.height
|
||||
self.itemheight = first_item_button.height
|
||||
self.clickx = clickx
|
||||
self.expected_button = expected_button
|
||||
self.swipeoffsetx = swipeoffsetx
|
||||
self.responsey = 40
|
||||
self.finalclick = finalclick
|
||||
|
||||
def compute_swipe(self, main, x1, y1, distance, responsey):
|
||||
"""
|
||||
Swipe vertically from bottom to top, actual swipe distance calculated based on the distance between two target points, considering inertia.
|
||||
"""
|
||||
distance = abs(distance)
|
||||
logger.info(f"Swipe distance: {distance}")
|
||||
# 0-50
|
||||
if distance < 50:
|
||||
main.device.swipe((x1, y1), (x1, y1 - (distance + responsey)), duration=2)
|
||||
else:
|
||||
# Effective swipe distance for the Chinese server is 60
|
||||
main.device.swipe((x1, y1), (x1, int(y1 - (distance + responsey - 4 * (1 + distance / 100)))), duration=1 + distance / 100)
|
||||
|
||||
def select_location(self, main, target_index) -> None:
|
||||
click_coords = main.device.click_methods.get(main.config.Emulator_ControlMethod, main.device.click_adb)
|
||||
logger.info("Scroll and select the {}-th level".format(target_index + 1))
|
||||
self.scroll_right_up(main, scrollx=self.clickx + self.swipeoffsetx)
|
||||
# Calculate how many complete elements are on one page
|
||||
itemcount = self.windowheight // self.itemheight
|
||||
# Calculate how much height the last incomplete element on this page occupies
|
||||
lastitemheight = self.windowheight % self.itemheight
|
||||
# Height below the incomplete element
|
||||
hiddenlastitemheight = self.itemheight - lastitemheight
|
||||
# Center point of the height of the first element
|
||||
start_center_y = self.window_starty + self.itemheight // 2
|
||||
# Center point of the last complete element on this page
|
||||
end_center_y = start_center_y + (itemcount - 1) * self.itemheight
|
||||
# If the target element is on the current page
|
||||
if target_index < itemcount:
|
||||
# Center point of the target element
|
||||
target_center_y = start_center_y + self.itemheight * target_index
|
||||
self.run_until(main,
|
||||
lambda: click_coords(self.clickx, target_center_y),
|
||||
lambda: main.appear(self.expected_button),
|
||||
)
|
||||
else:
|
||||
# Start scrolling from the gap in the middle of the levels
|
||||
scroll_start_from_y = self.window_endy - self.itemheight // 2
|
||||
# The target element is on subsequent pages
|
||||
# Calculate how much the page should be scrolled
|
||||
scrolltotal_distance = (target_index - itemcount) * self.itemheight + hiddenlastitemheight
|
||||
logger.info("Height hidden by the last element: %d" % hiddenlastitemheight)
|
||||
# First, slide up the hidden part, add a little distance to let the system recognize it as a swipe event
|
||||
self.compute_swipe(main, self.clickx + self.swipeoffsetx, scroll_start_from_y, hiddenlastitemheight, self.responsey)
|
||||
logger.info(f"Swipe distance: {hiddenlastitemheight}")
|
||||
# Update scrolltotal_distance
|
||||
scrolltotal_distance -= hiddenlastitemheight
|
||||
# Still need to scroll up (target_index - itemcount) * self.itemheight
|
||||
# Important: slide the height of (itemcount - 1) elements each time
|
||||
if itemcount == 1:
|
||||
scroll_distance = itemcount * self.itemheight
|
||||
else:
|
||||
scroll_distance = (itemcount - 1) * self.itemheight
|
||||
while scroll_distance <= scrolltotal_distance:
|
||||
self.compute_swipe(main, self.clickx + self.swipeoffsetx, scroll_start_from_y, scroll_distance, self.responsey)
|
||||
scrolltotal_distance -= scroll_distance
|
||||
if scrolltotal_distance > 5:
|
||||
# Last slide
|
||||
self.compute_swipe(main, self.clickx + self.swipeoffsetx, scroll_start_from_y, scrolltotal_distance, self.responsey)
|
||||
if self.finalclick:
|
||||
# Click on the last row
|
||||
self.run_until(main,
|
||||
lambda: click_coords(self.clickx, self.window_endy - self.itemheight // 2),
|
||||
lambda: main.appear(self.expected_button)
|
||||
)
|
||||
|
||||
def run_until(self, main, func1, func2, times=6, sleeptime=1.5) -> bool:
|
||||
"""
|
||||
Repeat the execution of func1 up to a maximum of times or until func2 evaluates to True.
|
||||
|
||||
func1 should perform a single valid operation or internally call a screenshot function.
|
||||
A screenshot is triggered before evaluating func2.
|
||||
|
||||
After each execution of func1, wait for sleeptime seconds.
|
||||
|
||||
If func2 evaluates to True, exit and return True. Otherwise, return False.
|
||||
|
||||
Note: The comment assumes that func1 produces a meaningful operation or internally calls a screenshot function,
|
||||
and func2 is evaluated after each execution of func1.
|
||||
"""
|
||||
for i in range(times):
|
||||
main.device.screenshot()
|
||||
if func2():
|
||||
return True
|
||||
func1()
|
||||
timer = Timer(sleeptime).start()
|
||||
while not timer.reached_and_reset():
|
||||
pass
|
||||
main.device.screenshot()
|
||||
if func2():
|
||||
return True
|
||||
logger.warning("run_until exceeded max times")
|
||||
return False
|
||||
|
||||
def scroll_right_up(self, main, scrollx=928, times=3):
|
||||
"""
|
||||
scroll to top
|
||||
"""
|
||||
for i in range(times):
|
||||
main.device.swipe((scrollx, 226), (scrollx, 561), duration=0.2)
|
||||
timer = Timer(0.5).start()
|
||||
while not timer.reached_and_reset():
|
||||
pass
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
from module.base.timer import Timer
|
||||
from module.logger import logger
|
||||
from module.ocr.ocr import DigitCounter
|
||||
from tasks.base.ui import UI
|
||||
from tasks.base.assets.assets_base_page import SCHEDULE_CHECK
|
||||
from tasks.schedule.assets.assets_schedule import *
|
||||
from tasks.schedule.scroll_select import ScrollSelect
|
||||
import numpy as np
|
||||
|
||||
|
||||
SCROLL_SELECT = ScrollSelect(window_button=SCROLL, first_item_button=FIRST_ITEM, expected_button=LOCATIONS, clickx=1114)
|
||||
xs = np.linspace(299, 995, 3, dtype=int)
|
||||
ys = np.linspace(268, 573, 3, dtype=int)
|
||||
|
||||
class ScheduleUI(UI):
|
||||
def select_then_check(self, dest_enter: ButtonWrapper, dest_check: ButtonWrapper):
|
||||
timer = Timer(8, 10).start()
|
||||
while 1:
|
||||
self.device.screenshot()
|
||||
self.appear_then_click(dest_enter, interval=1)
|
||||
self.handle_affection_level_up()
|
||||
if self.appear(dest_check):
|
||||
return True
|
||||
|
||||
if timer.reached():
|
||||
return False
|
||||
|
||||
def click_then_check(self, coords, dest_check: ButtonWrapper):
|
||||
click_coords = self.device.click_methods.get(self.config.Emulator_ControlMethod, self.device.click_adb)
|
||||
timer = Timer(3, 5).start()
|
||||
wait = Timer(1).start()
|
||||
while 1:
|
||||
click_coords(*coords)
|
||||
self.device.screenshot()
|
||||
if self.appear_then_click(dest_check):
|
||||
return True
|
||||
while not wait.reached_and_reset():
|
||||
pass
|
||||
if timer.reached():
|
||||
return False
|
||||
|
||||
def enter_location(self, location):
|
||||
SCROLL_SELECT.select_location(self, location)
|
||||
if not self.appear(LOCATIONS):
|
||||
logger.error("Unable to navigate to page for location {}".format(location + 1))
|
||||
return False
|
||||
return self.select_then_check(LOCATIONS, LOCATIONS_POPUP)
|
||||
|
||||
def select_classrooms(self, ticket, classrooms):
|
||||
for classroom in classrooms:
|
||||
if ticket == 0:
|
||||
return False
|
||||
classroom = int(classroom) - 1
|
||||
col = int(classroom % len(xs))
|
||||
row = int((classroom - col) / len(ys))
|
||||
targetloc = (xs[col], ys[row])
|
||||
if not self.click_then_check(targetloc, START_LESSON):
|
||||
logger.info(f"Classroom {classroom + 1} does not exist or has already been clicked")
|
||||
continue
|
||||
if self.select_then_check(START_LESSON, CONFIRM):
|
||||
ticket -= 1
|
||||
if not self.select_then_check(CONFIRM, LOCATIONS_POPUP):
|
||||
break
|
||||
return True
|
||||
|
||||
def get_ticket(self):
|
||||
"""
|
||||
Page:
|
||||
in: page_bounty
|
||||
"""
|
||||
if not self.appear(SCHEDULE_CHECK):
|
||||
logger.warning('OCR failed due to invalid page')
|
||||
return False
|
||||
ticket, _, total = DigitCounter(OCR_TICKET).ocr_single_line(self.device.image)
|
||||
if total == 0:
|
||||
logger.warning('Invalid ticket')
|
||||
return False
|
||||
logger.attr('ScheduleTicket', ticket)
|
||||
#self.config.stored.BountyTicket.set(ticket)
|
||||
return ticket
|
||||
Reference in New Issue
Block a user