1
0
mirror of https://github.com/TheFunny/ArisuAutoSweeper synced 2026-06-09 20:04:52 +00:00

feat: mission/commissions/event

This commit is contained in:
RedDeadDepresso
2023-12-25 11:35:05 +00:00
committed by YoursFunny
parent 69a612dc55
commit bcbe10d291
40 changed files with 2068 additions and 8 deletions
+148
View File
@@ -0,0 +1,148 @@
from module.base.button import Button, ButtonWrapper
# This file was auto-generated, do not modify it manually. To generate:
# ``` python -m dev_tools.button_extract ```
CHECK_BD = ButtonWrapper(
name='CHECK_BD',
jp=None,
en=Button(
file='./assets/en/mission/CHECK_BD.png',
area=(94, 135, 325, 194),
search=(74, 115, 345, 214),
color=(208, 215, 220),
button=(94, 135, 325, 194),
),
)
CHECK_COMMISSIONS = ButtonWrapper(
name='CHECK_COMMISSIONS',
jp=None,
en=Button(
file='./assets/en/mission/CHECK_COMMISSIONS.png',
area=(646, 78, 909, 135),
search=(626, 58, 929, 155),
color=(70, 96, 124),
button=(646, 78, 909, 135),
),
)
CHECK_IR = ButtonWrapper(
name='CHECK_IR',
jp=None,
en=Button(
file='./assets/en/mission/CHECK_IR.png',
area=(97, 137, 340, 191),
search=(77, 117, 360, 211),
color=(213, 220, 223),
button=(97, 137, 340, 191),
),
)
CHECK_MISSION_SWEEP = ButtonWrapper(
name='CHECK_MISSION_SWEEP',
jp=None,
en=Button(
file='./assets/en/mission/CHECK_MISSION_SWEEP.png',
area=(654, 184, 703, 209),
search=(634, 164, 723, 229),
color=(208, 213, 219),
button=(654, 184, 703, 209),
),
)
HARD_OFF = ButtonWrapper(
name='HARD_OFF',
jp=None,
en=Button(
file='./assets/en/mission/HARD_OFF.png',
area=(947, 132, 1193, 182),
search=(927, 112, 1213, 202),
color=(242, 246, 248),
button=(947, 132, 1193, 182),
),
)
HARD_ON = ButtonWrapper(
name='HARD_ON',
jp=None,
en=Button(
file='./assets/en/mission/HARD_ON.png',
area=(940, 133, 1189, 186),
search=(920, 113, 1209, 206),
color=(200, 71, 63),
button=(940, 133, 1189, 186),
),
)
LEFT = ButtonWrapper(
name='LEFT',
jp=None,
en=Button(
file='./assets/en/mission/LEFT.png',
area=(0, 301, 89, 408),
search=(0, 281, 109, 428),
color=(193, 224, 241),
button=(0, 301, 89, 408),
),
)
NORMAL_OFF = ButtonWrapper(
name='NORMAL_OFF',
jp=None,
en=Button(
file='./assets/en/mission/NORMAL_OFF.png',
area=(682, 135, 927, 182),
search=(662, 115, 947, 202),
color=(238, 243, 246),
button=(682, 135, 927, 182),
),
)
NORMAL_ON = ButtonWrapper(
name='NORMAL_ON',
jp=None,
en=Button(
file='./assets/en/mission/NORMAL_ON.png',
area=(682, 137, 924, 185),
search=(662, 117, 944, 205),
color=(62, 81, 89),
button=(682, 137, 924, 185),
),
)
OCR_AREA = ButtonWrapper(
name='OCR_AREA',
jp=None,
en=Button(
file='./assets/en/mission/OCR_AREA.png',
area=(108, 176, 176, 219),
search=(88, 156, 196, 239),
color=(237, 238, 240),
button=(108, 176, 176, 219),
),
)
RIGHT = ButtonWrapper(
name='RIGHT',
jp=None,
en=Button(
file='./assets/en/mission/RIGHT.png',
area=(1202, 311, 1280, 412),
search=(1182, 291, 1280, 432),
color=(193, 223, 241),
button=(1202, 311, 1280, 412),
),
)
SELECT_BD = ButtonWrapper(
name='SELECT_BD',
jp=None,
en=Button(
file='./assets/en/mission/SELECT_BD.png',
area=(1016, 165, 1227, 211),
search=(996, 145, 1247, 231),
color=(205, 212, 220),
button=(1016, 165, 1227, 211),
),
)
SELECT_IR = ButtonWrapper(
name='SELECT_IR',
jp=None,
en=Button(
file='./assets/en/mission/SELECT_IR.png',
area=(1004, 267, 1237, 321),
search=(984, 247, 1257, 341),
color=(214, 220, 227),
button=(1004, 267, 1237, 321),
),
)
+260
View File
@@ -0,0 +1,260 @@
from enum import Enum
from module.base.timer import Timer
from module.exception import RequestHumanTakeover
from module.logger import logger
from tasks.mission.ui import MissionUI, CommissionsUI
from tasks.stage.ap import AP
from tasks.cafe.cafe import Cafe
from tasks.circle.circle import Circle
from tasks.task.task import Task
from tasks.mail.mail import Mail
from tasks.item.data_update import DataUpdate
import json
import math
from filelock import FileLock
from datetime import datetime
class MissionStatus(Enum):
AP = 0 # Calculate AP and decide to terminate Mission module or not
NAVIGATE = 1 # Navigate to the stage page for example the commissions page or mission page
SELECT = 2 # Select the stage mode for example hard or normal in mission
ENTER = 3 # Search and for the stage in the stage list and enter
SWEEP = 4 # Sweep the stage
RECHARGE = 5 # Recharge AP from other taks if they are enabled
FINISH = -1 # Inidicate termination of Mission module
class Mission(MissionUI, CommissionsUI):
_stage_ap = [10, 15, 15, 15]
@property
def stage_ap(self):
return self._stage_ap
@property
def mission_info(self) -> list:
"""
Read the config from MCE/config.json and extract the queue, a list of list.
If queue is empty repopulate from preferred template.
Format of each element in queue: [mode, stage, sweep_num]
e.g. ["N", "1-1", 3]
Mode Acronyms:
"N" : Normal Mission
"H" : Hard Mission
"E" : Event Quest
"IR" : Item Retrieval / Commission where you get credit
"BD" : Base Defense / Commission where you get exp
Returns:
list of list
"""
queue = []
try:
with open("MCE/config.json") as json_file:
config_data = json.load(json_file)
queue = config_data["Queue"]
self.recharge_AP = config_data["RechargeAP"]
self.reset_daily = config_data["ResetDaily"]
self.reset_time = config_data["ResetTime"]
self.last_run = config_data["LastRun"]
self.event = config_data["Event"]
if self.check_reset_daily() or not queue:
preferred_template = config_data["PreferredTemplate"]
queue = config_data["Templates"][preferred_template]
if not self.event:
queue = [x for x in queue if x[0] != "E"]
except:
logger.error("Failed to read configuration file")
finally:
return queue
def check_reset_daily(self):
# Check if it's time to reset the queue
if self.reset_daily:
current_datetime = datetime.now().replace(microsecond=0) # Round to the nearest second
current_date = current_datetime.date()
current_time = current_datetime.time()
last_run_datetime = datetime.strptime(self.last_run, "%Y-%m-%d %H:%M:%S")
reset_time = datetime.strptime(self.reset_time, "%H:%M:%S").time()
if current_date != last_run_datetime.date() and current_time >= reset_time:
self.last_run = str(datetime.now().replace(microsecond=0))
logger.info("Reset Daily activated.")
return True
return False
@property
def valid_task(self) -> list:
task = self.mission_info
if not task:
logger.warning('Mission enabled but no task set')
#self.error_handler()
return task
@property
def current_mode(self):
return self.task[0][0]
@property
def current_stage(self):
return self.task[0][1]
@property
def current_count(self):
if self.current_mode == "H" and self.task[0][2] > 3:
return 3
return self.task[0][2]
@current_count.setter
def current_count(self, value):
self.task[0][2] = value
def select(self) -> bool:
"""
A wrapper method to select the current_mode
by calling the specific method based on its type.
Return
True if selection happens without any problem, False otherwise.
"""
if self.current_mode in ["N", "H"]:
return self.select_mission(self.current_mode, self.current_stage)
elif self.current_mode in ["BD", "IR"]:
return self.select_commission(self.current_mode)
elif self.current_mode == "E":
#return self.select_mode(SWITCH_QUEST)
logger.error("Event not yet implemented")
return False
else:
logger.error("Uknown mode")
return False
def get_realistic_count(self) -> int:
"""
Calculate the possible number of sweeps based on the current AP
"""
ap_cost = 20 if self.current_mode == "H" else 10
required_ap = ap_cost * self.current_count
return math.floor(min(required_ap, self.current_ap) / ap_cost)
def update_task(self, failure=False):
"""
Update self.task and save the current state of the queue in
MCE/config.json.
"""
try:
if failure or self.current_count == self.realistic_count:
self.previous_mode = self.current_mode
self.task.pop(0)
else:
self.previous_mode = None
self.current_count -= self.realistic_count
with open("MCE/config.json", "r") as json_file:
config_data = json.load(json_file)
with open("MCE/config.json", "w") as json_file:
config_data["Queue"] = self.task
config_data["LastRun"] = self.last_run
json.dump(config_data, json_file, indent=2)
except:
logger.error("Failed to save configuration")
self.task = []
def update_ap(self):
ap_cost = 20 if self.current_mode == "H" else 10
ap = self.config.stored.AP
ap_old = ap.value
ap_new = ap_old - ap_cost * self.realistic_count
ap.set(ap_new, ap.total)
logger.info(f'Set AP: {ap_old} -> {ap_new}')
def recharge(self) -> bool:
"""
Check if AP related modules such as cafe, circle, task, mail are enabled and run them if they are.
task_call only works after the current task is finished so is not suitable.
"""
cafe_reward = self.config.cross_get(["Cafe", "Scheduler", "Enable"]) and self.config.cross_get(["Cafe", "Cafe", "Reward"])
circle = self.config.cross_get(["Circle", "Scheduler", "Enable"])
task = self.config.cross_get(["Task", "Scheduler", "Enable"])
mail = self.config.cross_get(["Mail", "Scheduler", "Enable"])
ap_tasks = [(cafe_reward,Cafe), (circle, Circle), (task, Task), (mail, Mail)]
modules = [x[1] for x in ap_tasks if x[0]]
if not modules:
logger.info("Recharge AP was enabled but no AP related modules were enabled")
return False
for module in modules:
module(config=self.config, device=self.device).run()
return True
def handle_mission(self, status):
match status:
case MissionStatus.AP:
if not self.task:
return MissionStatus.FINISH
self.realistic_count = self.get_realistic_count()
if self.realistic_count == 0 and self.recharge_AP:
self.recharge_AP = False
return MissionStatus.RECHARGE
elif self.realistic_count == 0 and not self.recharge_AP:
return MissionStatus.FINISH
else:
return MissionStatus.NAVIGATE
case MissionStatus.NAVIGATE:
self.navigate(self.previous_mode, self.current_mode)
return MissionStatus.SELECT
case MissionStatus.SELECT:
if self.select():
return MissionStatus.ENTER
self.update_task(failure=True)
return MissionStatus.AP
case MissionStatus.ENTER:
if self.enter_stage(self.current_stage):
return MissionStatus.SWEEP
self.update_task(failure=True)
return MissionStatus.AP
case MissionStatus.SWEEP:
if self.do_sweep(self.current_mode, self.realistic_count):
self.update_ap()
self.update_task()
else:
self.update_task(failure=True)
return MissionStatus.AP
case MissionStatus.RECHARGE:
if self.recharge():
DataUpdate(config=self.config, device=self.device).run()
return MissionStatus.AP
return MissionStatus.FINISH
case MissionStatus.FINISH:
return status
case _:
logger.warning(f'Invalid status: {status}')
return status
def run(self):
self.lock = FileLock("MCE/config.json.lock")
with self.lock.acquire():
self.previous_mode = None
self.task = self.valid_task
action_timer = Timer(0.5, 1)
status = MissionStatus.AP
"""Update the dashboard to accurately calculate AP"""
DataUpdate(config=self.config, device=self.device).run()
while 1:
self.device.screenshot()
if self.ui_additional():
continue
if action_timer.reached_and_reset():
logger.attr('Status', status)
status = self.handle_mission(status)
if status == MissionStatus.FINISH:
break
self.config.task_delay(server_update=True)
+147
View File
@@ -0,0 +1,147 @@
from module.base.timer import Timer
from module.logger import logger
from module.ui.switch import Switch
from module.ocr.ocr import Digit
from tasks.base.assets.assets_base_page import BACK, MISSION_CHECK
from tasks.base.page import page_mission, page_commissions
from tasks.base.ui import UI
from tasks.mission.assets.assets_mission import *
from tasks.stage.ap import AP
from tasks.stage.mission_list import StageList
from tasks.stage.sweep import StageSweep
SHARED_LIST = StageList('SharedList')
MISSION_SWEEP = StageSweep('MissionSweep', 60)
MISSION_SWEEP.set_button(button_check=CHECK_MISSION_SWEEP) # Check sweep is different for mission
SHARED_SWEEP = StageSweep('SharedSweep', 60)
SWITCH_NORMAL = Switch("Normal_switch")
SWITCH_NORMAL.add_state("on", NORMAL_ON)
SWITCH_NORMAL.add_state("off", NORMAL_OFF)
SWITCH_HARD = Switch("HARD_switch")
SWITCH_HARD.add_state("on", HARD_ON)
SWITCH_HARD.add_state("off", HARD_OFF)
SWITCH_QUEST = None
"""
A dictionary that maps the mode to a tuple where
the first element is an argument to go_back and second is for ui_ensure
Missing for "E" because there are no event in Global and no page_event
"""
MODE_TO_PAGE = {
"N": (MISSION_CHECK, page_mission),
"H": (MISSION_CHECK, page_mission),
"BD": (CHECK_BD, page_commissions),
"IR": (CHECK_IR, page_commissions),
"E" : ()
}
class MissionUI(UI, AP):
def select_mission(self, mode, stage):
area = int(stage.split("-")[0])
if not self.select_area(area):
logger.warning("Area not found")
return False
to_switch = {
"N": SWITCH_NORMAL,
"H": SWITCH_HARD
}
switch = to_switch[mode]
if not self.select_mode(switch) and not self.select_area(area):
return False
return True
def select_area(self, num):
""""
May require further error handling for these cases.
1. Fails to ocr area number
2. May trigger too many click exception when clicking left or right too many times
3. Area not unlocked. Simplest way if left or right button are still present
but problem is it is expensive to check every time and they always keep moving.
"""
tries = 0
ocr_area = Digit(OCR_AREA)
while 1:
try:
self.device.screenshot()
current_area = int(ocr_area.ocr_single_line(self.device.image))
if current_area == num:
return True
elif current_area > num:
[self.click_with_interval(LEFT, interval=1) for x in range(abs(current_area-num))]
elif current_area < num:
[self.click_with_interval(RIGHT, interval=1) for x in range(abs(current_area-num))]
except:
tries += 1
if tries > 5:
return False
def select_mode(self, switch):
"""
Set switch to on.
Returns:
True if switch is set, False if switch not found
"""
if not switch.appear(main=self):
logger.info(f'{switch.name} not found')
return False
switch.set('on', main=self)
return True
def enter_stage(self, index: int) -> bool:
if not index:
index = SHARED_LIST.insight_max_sweepable_index(self)
if SHARED_LIST.select_index_enter(self, index):
return True
return False
def do_sweep(self, mode, num: int) -> bool:
if mode in ["N", "H"]:
return MISSION_SWEEP.do_sweep(self, num=num)
else:
return SHARED_SWEEP.do_sweep(self, num=num)
def navigate(self, prev, next):
"""
go_back is called when the previous stage and next stage are in
the same game mode.
For example, "N" and "H" are in Mission so we call go_back.
If different, ui_ensure is called for example, "N" and "IR".
"""
if prev==next or (prev in ["N", "H"] and next in ["N", "H"]):
self.go_back(MODE_TO_PAGE[next][0])
elif prev in ["BD", "IR"] and next in ["BD", "IR"]:
self.go_back(CHECK_COMMISSIONS)
else:
self.ui_ensure(MODE_TO_PAGE[next][1])
def go_back(self, check):
while 1:
self.device.screenshot()
if self.match_color(check) and self.appear(check):
return True
self.click_with_interval(BACK, interval=2)
class CommissionsUI(UI, AP):
"""Works the same way as select_bounty"""
def select_commission(self, mode):
to_button = {
"IR": (SELECT_IR, CHECK_IR),
"BD": (SELECT_BD, CHECK_BD)
}
dest_enter, dest_check = to_button[mode]
timer = Timer(5, 10).start()
while 1:
self.device.screenshot()
self.appear_then_click(dest_enter, interval=1)
if self.appear(dest_check):
return True
if timer.reached():
return False