1
0
mirror of https://github.com/TheFunny/ArisuAutoSweeper synced 2026-06-09 20:04:52 +00:00

Upload code

This commit is contained in:
2023-11-01 15:33:35 +08:00
commit 6860f2eb72
415 changed files with 50990 additions and 0 deletions
+348
View File
@@ -0,0 +1,348 @@
{
"Alas": {
"Emulator": {
"Serial": {
"type": "input",
"value": "auto",
"valuetype": "str"
},
"PackageName": {
"type": "select",
"value": "auto",
"option": [
"auto",
"JP-Official"
]
},
"GameLanguage": {
"type": "select",
"value": "auto",
"option": [
"auto",
"jp"
]
},
"ScreenshotMethod": {
"type": "select",
"value": "scrcpy",
"option": [
"auto",
"ADB",
"ADB_nc",
"uiautomator2",
"aScreenCap",
"aScreenCap_nc",
"DroidCast",
"DroidCast_raw",
"scrcpy"
],
"display": "hide"
},
"ControlMethod": {
"type": "select",
"value": "MaaTouch",
"option": [
"minitouch",
"MaaTouch"
],
"display": "hide"
},
"AdbRestart": {
"type": "checkbox",
"value": false
}
},
"EmulatorInfo": {
"Emulator": {
"type": "select",
"value": "auto",
"option": [
"auto",
"NoxPlayer",
"NoxPlayer64",
"BlueStacks4",
"BlueStacks5",
"BlueStacks4HyperV",
"BlueStacks5HyperV",
"LDPlayer3",
"LDPlayer4",
"LDPlayer9",
"MuMuPlayer",
"MuMuPlayerX",
"MuMuPlayer12",
"MEmuPlayer"
]
},
"name": {
"type": "textarea",
"value": null
},
"path": {
"type": "textarea",
"value": null
}
},
"Error": {
"Restart": {
"type": "select",
"value": "game",
"option": [
"game",
"game_emulator"
]
},
"SaveError": {
"type": "checkbox",
"value": true
},
"ScreenshotLength": {
"type": "input",
"value": 1
},
"OnePushConfig": {
"type": "textarea",
"value": "provider: null",
"mode": "yaml"
}
},
"Optimization": {
"ScreenshotInterval": {
"type": "input",
"value": 0.2,
"display": "hide"
},
"CombatScreenshotInterval": {
"type": "input",
"value": 1.0,
"display": "hide"
},
"WhenTaskQueueEmpty": {
"type": "select",
"value": "goto_main",
"option": [
"stay_there",
"goto_main",
"close_game"
]
}
}
},
"Restart": {
"Scheduler": {
"Enable": {
"type": "state",
"value": true,
"option": [
true
],
"option_bold": [
true
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "Restart",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "04:00",
"display": "hide"
}
}
},
"Cafe": {
"Scheduler": {
"Enable": {
"type": "checkbox",
"value": true,
"option": [
true,
false
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "Cafe",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "04:00, 16:00",
"display": "hide"
}
},
"Cafe": {
"Reward": {
"type": "checkbox",
"value": true
},
"Touch": {
"type": "checkbox",
"value": true
},
"AutoAdjust": {
"type": "checkbox",
"value": true
}
}
},
"Mail": {
"Scheduler": {
"Enable": {
"type": "checkbox",
"value": true,
"option": [
true,
false
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "Mail",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "04:00",
"display": "hide"
}
}
},
"Circle": {
"Scheduler": {
"Enable": {
"type": "checkbox",
"value": true,
"option": [
true,
false
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "Circle",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "04:00",
"display": "hide"
}
}
},
"TacticalChallenge": {
"Scheduler": {
"Enable": {
"type": "checkbox",
"value": true,
"option": [
true,
false
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "TacticalChallenge",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "15:00",
"display": "hide"
}
},
"TacticalChallenge": {
"PlayerSelect": {
"type": "select",
"value": 0,
"option": [
0,
1,
2,
3
]
}
}
},
"DataUpdate": {
"Scheduler": {
"Enable": {
"type": "state",
"value": true,
"option": [
true
],
"option_bold": [
true
]
},
"NextRun": {
"type": "datetime",
"value": "2020-01-01 00:00:00",
"validate": "datetime"
},
"Command": {
"type": "input",
"value": "DataUpdate",
"display": "hide"
},
"ServerUpdate": {
"type": "input",
"value": "04:00",
"display": "hide"
}
},
"ItemStorage": {
"AP": {
"type": "stored",
"value": {},
"display": "hide",
"stored": "StoredAP",
"order": 1,
"color": "#62ea6e"
},
"Credit": {
"type": "stored",
"value": {},
"display": "hide",
"stored": "StoredInt",
"order": 2,
"color": "#fdec00"
},
"Pyroxene": {
"type": "stored",
"value": {},
"display": "hide",
"stored": "StoredInt",
"order": 3,
"color": "#21befc"
}
}
}
}
+100
View File
@@ -0,0 +1,100 @@
# --------------------
# Define arguments.
# --------------------
# ==================== Alas ====================
Scheduler:
Enable:
type: checkbox
value: false
option: [ true, false ]
NextRun: 2020-01-01 00:00:00
Command: Alas
ServerUpdate:
value: 04:00
display: hide
Emulator:
Serial:
value: auto
valuetype: str
PackageName:
value: auto
option: [ auto, ]
GameLanguage:
value: auto
option: [ auto, jp ]
ScreenshotMethod:
value: auto
option: [ auto, ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, DroidCast_raw, scrcpy ]
ControlMethod:
value: MaaTouch
option: [ minitouch, MaaTouch ]
AdbRestart: false
EmulatorInfo:
Emulator:
value: auto
option: [
auto,
NoxPlayer,
NoxPlayer64,
BlueStacks4,
BlueStacks5,
BlueStacks4HyperV,
BlueStacks5HyperV,
LDPlayer3,
LDPlayer4,
LDPlayer9,
MuMuPlayer,
MuMuPlayerX,
MuMuPlayer12,
MEmuPlayer,
]
name:
value: null
type: textarea
path:
value: null
type: textarea
Error:
Restart:
value: game
option: [ game, game_emulator ]
SaveError: true
ScreenshotLength: 1
OnePushConfig:
type: textarea
mode: yaml
value: 'provider: null'
Optimization:
ScreenshotInterval: 0.3
CombatScreenshotInterval: 1.0
WhenTaskQueueEmpty:
value: goto_main
option: [ stay_there, goto_main, close_game ]
# ==================== Daily ====================
Cafe:
Reward: true
Touch: true
AutoAdjust: true
TacticalChallenge:
PlayerSelect:
value: 0
option: [ 0, 1, 2, 3 ]
ItemStorage:
AP:
stored: StoredAP
order: 1
color: "#62ea6e"
Credit:
stored: StoredInt
order: 2
color: "#fdec00"
Pyroxene:
stored: StoredInt
order: 3
color: "#21befc"
+18
View File
@@ -0,0 +1,18 @@
# --------------------
# Define default values
# --------------------
# ==================== Alas ====================
Cafe:
Scheduler:
Enable: true
Mail:
Scheduler:
Enable: true
Circle:
Scheduler:
Enable: true
TacticalChallenge:
Scheduler:
Enable: true
+103
View File
@@ -0,0 +1,103 @@
# Translations web gui
# This will insert to `config/i18n/{lang}.json`, under key `Gui`
Aside:
Install:
Home:
Develop:
Performance:
Setting:
AddAlas:
Button:
Start:
Stop:
ScrollON:
ScrollOFF:
ClearLog:
Setting:
CheckUpdate:
ClickToUpdate:
RetryUpdate:
CancelUpdate:
Toast:
DisableTranslateMode:
ConfigSaved:
AlasIsRunning:
ClickToUpdate:
Status:
Running:
Inactive:
Warning:
Updating:
MenuAlas:
Overview:
Log:
MenuDevelop:
HomePage:
Translate:
Update:
Remote:
Utils:
Overview:
Scheduler:
Log:
Running:
Pending:
Waiting:
NoTask:
Dashboard:
# From lang.readable_time()
NoData:
TimeError:
JustNow:
MinutesAgo:
HoursAgo:
DaysAgo:
LongTimeAgo:
AddAlas:
PopupTitle:
NewName:
CopyFrom:
Confirm:
FileExist:
InvalidChar:
InvalidPrefixTemplate:
Update:
UpToDate:
HaveUpdate:
UpdateStart:
UpdateWait:
UpdateRun:
UpdateSuccess:
UpdateFailed:
UpdateChecking:
UpdateCancel:
UpdateFinish:
Local:
Upstream:
Author:
Time:
Message:
DisabledWarn:
DetailedHistory:
Remote:
Running:
NotRunning:
NotEnable:
EntryPoint:
ConfigureHint:
SSHNotInstall:
Text:
InvalidFeedBack:
Clear:
+21
View File
@@ -0,0 +1,21 @@
{
"Alas": {
"menu": "list",
"page": "setting",
"tasks": [
"Alas",
"Restart"
]
},
"Daily": {
"menu": "list",
"page": "setting",
"tasks": [
"Cafe",
"Mail",
"Circle",
"TacticalChallenge",
"DataUpdate"
]
}
}
+40
View File
@@ -0,0 +1,40 @@
# --------------------
# Define non-modifiable values
# --------------------
# ==================== Alas ====================
Alas:
Emulator:
ScreenshotMethod: scrcpy
ControlMethod: MaaTouch
Optimization:
ScreenshotInterval: 0.2
CombatScreenshotInterval: 1.0
Restart:
Scheduler:
Enable:
type: state
value: true
option: [ true, ]
option_bold: [ true, ]
ServerUpdate: 04:00
# ==================== Daily ====================
Cafe:
Scheduler:
ServerUpdate: "04:00, 16:00"
TacticalChallenge:
Scheduler:
ServerUpdate: "15:00"
DataUpdate:
Scheduler:
Enable:
type: state
value: true
option: [ true, ]
option_bold: [ true, ]
+39
View File
@@ -0,0 +1,39 @@
{
"AP": {
"name": "AP",
"path": "DataUpdate.ItemStorage.AP",
"i18n": "ItemStorage.AP.name",
"stored": "StoredAP",
"attrs": {
"time": "2020-01-01 00:00:00",
"total": 0,
"value": 0
},
"order": 1,
"color": "#62ea6e"
},
"Credit": {
"name": "Credit",
"path": "DataUpdate.ItemStorage.Credit",
"i18n": "ItemStorage.Credit.name",
"stored": "StoredInt",
"attrs": {
"time": "2020-01-01 00:00:00",
"value": 0
},
"order": 2,
"color": "#fdec00"
},
"Pyroxene": {
"name": "Pyroxene",
"path": "DataUpdate.ItemStorage.Pyroxene",
"i18n": "ItemStorage.Pyroxene.name",
"stored": "StoredInt",
"attrs": {
"time": "2020-01-01 00:00:00",
"value": 0
},
"order": 3,
"color": "#21befc"
}
}
+37
View File
@@ -0,0 +1,37 @@
# --------------------
# Define argument group of tasks.
# --------------------
# ==================== Alas ====================
Alas:
menu: 'list'
page: 'setting'
tasks:
Alas:
- Emulator
- EmulatorInfo
- Error
- Optimization
Restart:
- Scheduler
# ==================== Daily ====================
Daily:
menu: 'list'
page: 'setting'
tasks:
Cafe:
- Scheduler
- Cafe
Mail:
- Scheduler
Circle:
- Scheduler
TacticalChallenge:
- Scheduler
- TacticalChallenge
DataUpdate:
- Scheduler
- ItemStorage
+236
View File
@@ -0,0 +1,236 @@
"""
Copy-pasted from
https://github.com/untitaker/python-atomicwrites
"""
import contextlib
import io
import os
import sys
import tempfile
try:
import fcntl
except ImportError:
fcntl = None
# `fspath` was added in Python 3.6
try:
from os import fspath
except ImportError:
fspath = None
__version__ = '1.4.1'
PY2 = sys.version_info[0] == 2
text_type = unicode if PY2 else str # noqa
def _path_to_unicode(x):
if not isinstance(x, text_type):
return x.decode(sys.getfilesystemencoding())
return x
DEFAULT_MODE = "wb" if PY2 else "w"
_proper_fsync = os.fsync
if sys.platform != 'win32':
if hasattr(fcntl, 'F_FULLFSYNC'):
def _proper_fsync(fd):
# https://lists.apple.com/archives/darwin-dev/2005/Feb/msg00072.html
# https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man2/fsync.2.html
# https://github.com/untitaker/python-atomicwrites/issues/6
fcntl.fcntl(fd, fcntl.F_FULLFSYNC)
def _sync_directory(directory):
# Ensure that filenames are written to disk
fd = os.open(directory, 0)
try:
_proper_fsync(fd)
finally:
os.close(fd)
def _replace_atomic(src, dst):
os.rename(src, dst)
_sync_directory(os.path.normpath(os.path.dirname(dst)))
def _move_atomic(src, dst):
os.link(src, dst)
os.unlink(src)
src_dir = os.path.normpath(os.path.dirname(src))
dst_dir = os.path.normpath(os.path.dirname(dst))
_sync_directory(dst_dir)
if src_dir != dst_dir:
_sync_directory(src_dir)
else:
from ctypes import windll, WinError
_MOVEFILE_REPLACE_EXISTING = 0x1
_MOVEFILE_WRITE_THROUGH = 0x8
_windows_default_flags = _MOVEFILE_WRITE_THROUGH
def _handle_errors(rv):
if not rv:
raise WinError()
def _replace_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags | _MOVEFILE_REPLACE_EXISTING
))
def _move_atomic(src, dst):
_handle_errors(windll.kernel32.MoveFileExW(
_path_to_unicode(src), _path_to_unicode(dst),
_windows_default_flags
))
def replace_atomic(src, dst):
'''
Move ``src`` to ``dst``. If ``dst`` exists, it will be silently
overwritten.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
return _replace_atomic(src, dst)
def move_atomic(src, dst):
'''
Move ``src`` to ``dst``. There might a timewindow where both filesystem
entries exist. If ``dst`` already exists, :py:exc:`FileExistsError` will be
raised.
Both paths must reside on the same filesystem for the operation to be
atomic.
'''
return _move_atomic(src, dst)
class AtomicWriter(object):
'''
A helper class for performing atomic writes. Usage::
with AtomicWriter(path).open() as f:
f.write(...)
:param path: The destination filepath. May or may not exist.
:param mode: The filemode for the temporary file. This defaults to `wb` in
Python 2 and `w` in Python 3.
:param overwrite: If set to false, an error is raised if ``path`` exists.
Errors are only raised after the file has been written to. Either way,
the operation is atomic.
:param open_kwargs: Keyword-arguments to pass to the underlying
:py:func:`open` call. This can be used to set the encoding when opening
files in text-mode.
If you need further control over the exact behavior, you are encouraged to
subclass.
'''
def __init__(self, path, mode=DEFAULT_MODE, overwrite=False,
**open_kwargs):
if 'a' in mode:
raise ValueError(
'Appending to an existing file is not supported, because that '
'would involve an expensive `copy`-operation to a temporary '
'file. Open the file in normal `w`-mode and copy explicitly '
'if that\'s what you\'re after.'
)
if 'x' in mode:
raise ValueError('Use the `overwrite`-parameter instead.')
if 'w' not in mode:
raise ValueError('AtomicWriters can only be written to.')
# Attempt to convert `path` to `str` or `bytes`
if fspath is not None:
path = fspath(path)
self._path = path
self._mode = mode
self._overwrite = overwrite
self._open_kwargs = open_kwargs
def open(self):
'''
Open the temporary file.
'''
return self._open(self.get_fileobject)
@contextlib.contextmanager
def _open(self, get_fileobject):
f = None # make sure f exists even if get_fileobject() fails
try:
success = False
with get_fileobject(**self._open_kwargs) as f:
yield f
self.sync(f)
self.commit(f)
success = True
finally:
if not success:
try:
self.rollback(f)
except Exception:
pass
def get_fileobject(self, suffix="", prefix=tempfile.gettempprefix(),
dir=None, **kwargs):
'''Return the temporary file to use.'''
if dir is None:
dir = os.path.normpath(os.path.dirname(self._path))
descriptor, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
dir=dir)
# io.open() will take either the descriptor or the name, but we need
# the name later for commit()/replace_atomic() and couldn't find a way
# to get the filename from the descriptor.
os.close(descriptor)
kwargs['mode'] = self._mode
kwargs['file'] = name
return io.open(**kwargs)
def sync(self, f):
'''responsible for clearing as many file caches as possible before
commit'''
f.flush()
_proper_fsync(f.fileno())
def commit(self, f):
'''Move the temporary file to the target location.'''
if self._overwrite:
replace_atomic(f.name, self._path)
else:
move_atomic(f.name, self._path)
def rollback(self, f):
'''Clean up all temporary resources.'''
os.unlink(f.name)
def atomic_write(path, writer_cls=AtomicWriter, **cls_kwargs):
'''
Simple atomic writes. This wraps :py:class:`AtomicWriter`::
with atomic_write(path) as f:
f.write(...)
:param path: The target path to write to.
:param writer_cls: The writer class to use. This parameter is useful if you
subclassed :py:class:`AtomicWriter` to change some behavior and want to
use that new subclass.
Additional keyword arguments are passed to the writer class. See
:py:class:`AtomicWriter`.
'''
return writer_cls(path, **cls_kwargs).open()
+583
View File
@@ -0,0 +1,583 @@
import copy
import datetime
import operator
import threading
import pywebio
from module.base.decorator import cached_property, del_cached_property
from module.base.filter import Filter
from module.config.config_generated import GeneratedConfig
from module.config.config_manual import ManualConfig
from module.config.config_updater import ConfigUpdater
from module.config.stored.classes import iter_attribute
from module.config.stored.stored_generated import StoredGenerated
from module.config.utils import *
from module.config.watcher import ConfigWatcher
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
class TaskEnd(Exception):
pass
class Function:
def __init__(self, data):
self.enable = deep_get(data, keys="Scheduler.Enable", default=False)
self.command = deep_get(data, keys="Scheduler.Command", default="Unknown")
self.next_run = deep_get(data, keys="Scheduler.NextRun", default=DEFAULT_TIME)
def __str__(self):
enable = "Enable" if self.enable else "Disable"
return f"{self.command} ({enable}, {str(self.next_run)})"
__repr__ = __str__
def __eq__(self, other):
if not isinstance(other, Function):
return False
if self.command == other.command and self.next_run == other.next_run:
return True
else:
return False
def name_to_function(name):
"""
Args:
name (str):
Returns:
Function:
"""
function = Function({})
function.command = name
function.enable = True
return function
class AzurLaneConfig(ConfigUpdater, ManualConfig, GeneratedConfig, ConfigWatcher):
stop_event: threading.Event = None
bound = {}
# Class property
is_hoarding_task = True
def __setattr__(self, key, value):
if key in self.bound:
path = self.bound[key]
self.modified[path] = value
if self.auto_update:
self.update()
else:
super().__setattr__(key, value)
def __init__(self, config_name, task=None):
logger.attr("Lang", self.LANG)
# This will read ./config/<config_name>.json
self.config_name = config_name
# Raw json data in yaml file.
self.data = {}
# Modified arguments. Key: Argument path in yaml file. Value: Modified value.
# All variable modifications will be record here and saved in method `save()`.
self.modified = {}
# Key: Argument name in GeneratedConfig. Value: Path in `data`.
self.bound = {}
# If write after every variable modification.
self.auto_update = True
# Force override variables
# Key: Argument name in GeneratedConfig. Value: Modified value.
self.overridden = {}
# Scheduler queue, will be updated in `get_next_task()`, list of Function objects
# pending_task: Run time has been reached, but haven't been run due to task scheduling.
# waiting_task: Run time haven't been reached, wait needed.
self.pending_task = []
self.waiting_task = []
# Task to run and bind.
# Task means the name of the function to run in AzurLaneAutoScript class.
self.task: Function
# Template config is used for dev tools
self.is_template_config = config_name.startswith("template")
if self.is_template_config:
# For dev tools
logger.info("Using template config, which is read only")
self.auto_update = False
self.task = name_to_function("template")
else:
self.load()
if task is None:
# Bind `Alas` by default which includes emulator settings.
task = name_to_function("Alas")
else:
# Bind a specific task for debug purpose.
task = name_to_function(task)
self.bind(task)
self.task = task
self.save()
def load(self):
self.data = self.read_file(self.config_name)
self.config_override()
for path, value in self.modified.items():
deep_set(self.data, keys=path, value=value)
def bind(self, func, func_list=None):
"""
Args:
func (str, Function): Function to run
func_list (set): Set of tasks to be bound
"""
if func_list is None:
func_list = ["Alas"]
if isinstance(func, Function):
func = func.command
func_list.append(func)
logger.info(f"Bind task {func_list}")
# Bind arguments
visited = set()
self.bound.clear()
for func in func_list:
func_data = self.data.get(func, {})
for group, group_data in func_data.items():
for arg, value in group_data.items():
path = f"{group}.{arg}"
if path in visited:
continue
arg = path_to_arg(path)
super().__setattr__(arg, value)
self.bound[arg] = f"{func}.{path}"
visited.add(path)
# Override arguments
for arg, value in self.overridden.items():
super().__setattr__(arg, value)
@property
def hoarding(self):
minutes = int(
deep_get(
self.data, keys="Alas.Optimization.TaskHoardingDuration", default=0
)
)
return timedelta(minutes=max(minutes, 0))
@property
def close_game(self):
return deep_get(
self.data, keys="Alas.Optimization.CloseGameDuringWait", default=False
)
@cached_property
def stored(self) -> StoredGenerated:
stored = StoredGenerated()
# Bind config
for _, value in iter_attribute(stored):
value._bind(self)
del_cached_property(value, '_stored')
return stored
def get_next_task(self):
"""
Calculate tasks, set pending_task and waiting_task
"""
pending = []
waiting = []
error = []
now = datetime.now()
if AzurLaneConfig.is_hoarding_task:
now -= self.hoarding
for func in self.data.values():
func = Function(func)
if not func.enable:
continue
if not isinstance(func.next_run, datetime):
error.append(func)
elif func.next_run < now:
pending.append(func)
else:
waiting.append(func)
f = Filter(regex=r"(.*)", attr=["command"])
f.load(self.SCHEDULER_PRIORITY)
if pending:
pending = f.apply(pending)
if waiting:
waiting = f.apply(waiting)
waiting = sorted(waiting, key=operator.attrgetter("next_run"))
if error:
pending = error + pending
self.pending_task = pending
self.waiting_task = waiting
def get_next(self):
"""
Returns:
Function: Command to run
"""
self.get_next_task()
if self.pending_task:
AzurLaneConfig.is_hoarding_task = False
logger.info(f"Pending tasks: {[f.command for f in self.pending_task]}")
task = self.pending_task[0]
logger.attr("Task", task)
return task
else:
AzurLaneConfig.is_hoarding_task = True
if self.waiting_task:
logger.info("No task pending")
task = copy.deepcopy(self.waiting_task[0])
task.next_run = (task.next_run + self.hoarding).replace(microsecond=0)
logger.attr("Task", task)
return task
else:
logger.critical("No task waiting or pending")
logger.critical("Please enable at least one task")
raise RequestHumanTakeover
def save(self, mod_name='alas'):
if not self.modified:
return False
for path, value in self.modified.items():
deep_set(self.data, keys=path, value=value)
logger.info(
f"Save config {filepath_config(self.config_name, mod_name)}, {dict_to_kv(self.modified)}"
)
# Don't use self.modified = {}, that will create a new object.
self.modified.clear()
del_cached_property(self, 'stored')
self.write_file(self.config_name, data=self.data)
def update(self):
self.load()
self.config_override()
self.bind(self.task)
self.save()
def config_override(self):
now = datetime.now().replace(microsecond=0)
limited = set()
def limit_next_run(tasks, limit):
for task in tasks:
if task in limited:
continue
limited.add(task)
next_run = deep_get(
self.data, keys=f"{task}.Scheduler.NextRun", default=None
)
if isinstance(next_run, datetime) and next_run > limit:
deep_set(self.data, keys=f"{task}.Scheduler.NextRun", value=now)
limit_next_run(['BattlePass'], limit=now + timedelta(days=31, seconds=-1))
limit_next_run(self.args.keys(), limit=now + timedelta(hours=24, seconds=-1))
def override(self, **kwargs):
"""
Override anything you want.
Variables stall remain overridden even config is reloaded from yaml file.
Note that this method is irreversible.
"""
for arg, value in kwargs.items():
self.overridden[arg] = value
super().__setattr__(arg, value)
def set_record(self, **kwargs):
"""
Args:
**kwargs: For example, `Emotion1_Value=150`
will set `Emotion1_Value=150` and `Emotion1_Record=now()`
"""
with self.multi_set():
for arg, value in kwargs.items():
record = arg.replace("Value", "Record")
self.__setattr__(arg, value)
self.__setattr__(record, datetime.now().replace(microsecond=0))
def multi_set(self):
"""
Set multiple arguments but save once.
Examples:
with self.config.multi_set():
self.config.foo1 = 1
self.config.foo2 = 2
"""
return MultiSetWrapper(main=self)
def cross_get(self, keys, default=None):
"""
Get configs from other tasks.
Args:
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
default:
Returns:
Any:
"""
return deep_get(self.data, keys=keys, default=default)
def cross_set(self, keys, value):
"""
Set configs to other tasks.
Args:
keys (str, list[str]): Such as `{task}.Scheduler.Enable`
value (Any):
Returns:
Any:
"""
self.modified[keys] = value
if self.auto_update:
self.update()
def task_delay(self, success=None, server_update=None, target=None, minute=None, task=None):
"""
Set Scheduler.NextRun
Should set at least one arguments.
If multiple arguments are set, use the nearest.
Args:
success (bool):
If True, delay Scheduler.SuccessInterval
If False, delay Scheduler.FailureInterval
server_update (bool, list, str):
If True, delay to nearest Scheduler.ServerUpdate
If type is list or str, delay to such server update
target (datetime.datetime, str, list):
Delay to such time.
minute (int, float, tuple):
Delay several minutes.
task (str):
Set across task. None for current task.
"""
def ensure_delta(delay):
return timedelta(seconds=int(ensure_time(delay, precision=3) * 60))
run = []
if success is not None:
interval = (
120
if success
else 30
)
run.append(datetime.now() + ensure_delta(interval))
if server_update is not None:
if server_update is True:
server_update = self.Scheduler_ServerUpdate
run.append(get_server_next_update(server_update))
if target is not None:
target = [target] if not isinstance(target, list) else target
target = nearest_future(target)
run.append(target)
if minute is not None:
run.append(datetime.now() + ensure_delta(minute))
if len(run):
run = min(run).replace(microsecond=0)
kv = dict_to_kv(
{
"success": success,
"server_update": server_update,
"target": target,
"minute": minute,
},
allow_none=False,
)
if task is None:
task = self.task.command
logger.info(f"Delay task `{task}` to {run} ({kv})")
self.modified[f'{task}.Scheduler.NextRun'] = run
self.update()
else:
raise ScriptError(
"Missing argument in delay_next_run, should set at least one"
)
def task_call(self, task, force_call=True):
"""
Call another task to run.
That task will run when current task finished.
But it might not be run because:
- Other tasks should run first according to SCHEDULER_PRIORITY
- Task is disabled by user
Args:
task (str): Task name to call, such as `Restart`
force_call (bool):
Returns:
bool: If called.
"""
if deep_get(self.data, keys=f"{task}.Scheduler.NextRun", default=None) is None:
raise ScriptError(f"Task to call: `{task}` does not exist in user config")
if force_call or self.is_task_enabled(task):
logger.info(f"Task call: {task}")
self.modified[f"{task}.Scheduler.NextRun"] = datetime.now().replace(
microsecond=0
)
self.modified[f"{task}.Scheduler.Enable"] = True
if self.auto_update:
self.update()
return True
else:
logger.info(f"Task call: {task} (skipped because disabled by user)")
return False
@staticmethod
def task_stop(message=""):
"""
Stop current task.
Raises:
TaskEnd:
"""
if message:
raise TaskEnd(message)
else:
raise TaskEnd
def task_switched(self):
"""
Check if needs to switch task.
Raises:
bool: If task switched
"""
# Update event
if self.stop_event is not None:
if self.stop_event.is_set():
return True
prev = self.task
self.load()
new = self.get_next()
if prev == new:
logger.info(f"Continue task `{new}`")
return False
else:
logger.info(f"Switch task `{prev}` to `{new}`")
return True
def check_task_switch(self, message=""):
"""
Stop current task when task switched.
Raises:
TaskEnd:
"""
if self.task_switched():
self.task_stop(message=message)
def is_task_enabled(self, task):
return bool(self.cross_get(keys=[task, 'Scheduler', 'Enable'], default=False))
def update_daily_quests(self):
"""
Raises:
TaskEnd: Call task `DailyQuest` and stop current task
"""
if self.stored.DailyActivity.is_expired():
logger.info('DailyActivity expired, call task to update')
self.task_call('DailyQuest')
self.task_stop()
if self.stored.DailyQuest.is_expired():
logger.info('DailyQuest expired, call task to update')
self.task_call('DailyQuest')
self.task_stop()
def update_battle_pass_quests(self):
"""
Raises:
TaskEnd: Call task `BattlePass` and stop current task
"""
if self.stored.BattlePassTodayQuest.is_expired():
if self.stored.BattlePassLevel.is_full():
logger.info('BattlePassLevel full, no updates')
else:
logger.info('BattlePassTodayQuest expired, call task to update')
self.task_call('BattlePass')
self.task_stop()
@property
def DEVICE_SCREENSHOT_METHOD(self):
return self.Emulator_ScreenshotMethod
@property
def DEVICE_CONTROL_METHOD(self):
return self.Emulator_ControlMethod
def temporary(self, **kwargs):
"""
Cover some settings, and recover later.
Usage:
backup = self.config.cover(ENABLE_DAILY_REWARD=False)
# do_something()
backup.recover()
Args:
**kwargs:
Returns:
ConfigBackup:
"""
backup = ConfigBackup(config=self)
backup.cover(**kwargs)
return backup
class ConfigBackup:
def __init__(self, config):
"""
Args:
config (AzurLaneConfig):
"""
self.config = config
self.backup = {}
self.kwargs = {}
def cover(self, **kwargs):
self.kwargs = kwargs
for key, value in kwargs.items():
self.backup[key] = self.config.__getattribute__(key)
self.config.__setattr__(key, value)
def recover(self):
for key, value in self.backup.items():
self.config.__setattr__(key, value)
class MultiSetWrapper:
def __init__(self, main):
"""
Args:
main (AzurLaneConfig):
"""
self.main = main
self.in_wrapper = False
def __enter__(self):
if self.main.auto_update:
self.main.auto_update = False
else:
self.in_wrapper = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.in_wrapper:
self.main.update()
self.main.auto_update = True
+53
View File
@@ -0,0 +1,53 @@
import datetime
# This file was automatically generated by module/config/config_updater.py.
# Don't modify it manually.
class GeneratedConfig:
"""
Auto generated configuration
"""
# Group `Scheduler`
Scheduler_Enable = False # True, False
Scheduler_NextRun = datetime.datetime(2020, 1, 1, 0, 0)
Scheduler_Command = 'Alas'
Scheduler_ServerUpdate = '04:00'
# Group `Emulator`
Emulator_Serial = 'auto'
Emulator_PackageName = 'auto' # auto, JP-Official
Emulator_GameLanguage = 'auto' # auto, jp
Emulator_ScreenshotMethod = 'auto' # auto, ADB, ADB_nc, uiautomator2, aScreenCap, aScreenCap_nc, DroidCast, DroidCast_raw, scrcpy
Emulator_ControlMethod = 'MaaTouch' # minitouch, MaaTouch
Emulator_AdbRestart = False
# Group `EmulatorInfo`
EmulatorInfo_Emulator = 'auto' # auto, NoxPlayer, NoxPlayer64, BlueStacks4, BlueStacks5, BlueStacks4HyperV, BlueStacks5HyperV, LDPlayer3, LDPlayer4, LDPlayer9, MuMuPlayer, MuMuPlayerX, MuMuPlayer12, MEmuPlayer
EmulatorInfo_name = None
EmulatorInfo_path = None
# Group `Error`
Error_Restart = 'game' # game, game_emulator
Error_SaveError = True
Error_ScreenshotLength = 1
Error_OnePushConfig = 'provider: null'
# Group `Optimization`
Optimization_ScreenshotInterval = 0.3
Optimization_CombatScreenshotInterval = 1.0
Optimization_WhenTaskQueueEmpty = 'goto_main' # stay_there, goto_main, close_game
# Group `Cafe`
Cafe_Reward = True
Cafe_Touch = True
Cafe_AutoAdjust = True
# Group `TacticalChallenge`
TacticalChallenge_PlayerSelect = 0 # 0, 1, 2, 3
# Group `ItemStorage`
ItemStorage_AP = {}
ItemStorage_Credit = {}
ItemStorage_Pyroxene = {}
+54
View File
@@ -0,0 +1,54 @@
import module.config.server as server
class ManualConfig:
@property
def LANG(self):
return server.lang
SCHEDULER_PRIORITY = """
Restart
> Cafe > Circle > Mail > DataUpdate > TacticalChallenge
"""
"""
module.assets
"""
ASSETS_FOLDER = './assets'
ASSETS_MODULE = './tasks'
ASSETS_RESOLUTION = (1280, 720)
"""
module.base
"""
COLOR_SIMILAR_THRESHOLD = 10
BUTTON_OFFSET = (20, 20)
BUTTON_MATCH_SIMILARITY = 0.85
WAIT_BEFORE_SAVING_SCREEN_SHOT = 1
"""
module.device
"""
DEVICE_OVER_HTTP = False
FORWARD_PORT_RANGE = (20000, 21000)
REVERSE_SERVER_PORT = 7903
ASCREENCAP_FILEPATH_LOCAL = './bin/ascreencap'
ASCREENCAP_FILEPATH_REMOTE = '/data/local/tmp/ascreencap'
# 'DroidCast', 'DroidCast_raw'
DROIDCAST_VERSION = 'DroidCast'
DROIDCAST_FILEPATH_LOCAL = './bin/DroidCast/DroidCast-debug-1.1.0.apk'
DROIDCAST_FILEPATH_REMOTE = '/data/local/tmp/DroidCast.apk'
DROIDCAST_RAW_FILEPATH_LOCAL = './bin/DroidCast/DroidCastS-release-1.1.5.apk'
DROIDCAST_RAW_FILEPATH_REMOTE = '/data/local/tmp/DroidCastS.apk'
MINITOUCH_FILEPATH_REMOTE = '/data/local/tmp/minitouch'
HERMIT_FILEPATH_LOCAL = './bin/hermit/hermit.apk'
SCRCPY_FILEPATH_LOCAL = './bin/scrcpy/scrcpy-server-v1.20.jar'
SCRCPY_FILEPATH_REMOTE = '/data/local/tmp/scrcpy-server-v1.20.jar'
MAATOUCH_FILEPATH_LOCAL = './bin/MaaTouch/maatouch'
MAATOUCH_FILEPATH_REMOTE = '/data/local/tmp/maatouch'
+636
View File
@@ -0,0 +1,636 @@
from copy import deepcopy
from cached_property import cached_property
from deploy.Windows.utils import DEPLOY_TEMPLATE, poor_yaml_read, poor_yaml_write
from module.base.timer import timer
from module.config.server import VALID_SERVER
from module.config.utils import *
CONFIG_IMPORT = '''
import datetime
# This file was automatically generated by module/config/config_updater.py.
# Don't modify it manually.
class GeneratedConfig:
"""
Auto generated configuration
"""
'''.strip().split('\n')
DICT_GUI_TO_INGAME = {
'zh-CN': 'cn',
'en-US': 'en',
}
def get_generator():
from module.base.code_generator import CodeGenerator
return CodeGenerator()
class ConfigGenerator:
@cached_property
def argument(self):
"""
Load argument.yaml, and standardise its structure.
<group>:
<argument>:
type: checkbox|select|textarea|input
value:
option (Optional): Options, if argument has any options.
validate (Optional): datetime
"""
data = {}
raw = read_file(filepath_argument('argument'))
def option_add(keys, options):
options = deep_get(raw, keys=keys, default=[]) + options
deep_set(raw, keys=keys, value=options)
# Insert packages
option_add(keys='Emulator.PackageName.option', options=list(VALID_SERVER.keys()))
# Load
for path, value in deep_iter(raw, depth=2):
arg = {
'type': 'input',
'value': '',
# option
}
if not isinstance(value, dict):
value = {'value': value}
arg['type'] = data_to_type(value, arg=path[1])
if arg['type'] == 'stored':
value['value'] = {}
arg['display'] = 'hide' # Hide `stored` by default
if isinstance(value['value'], datetime):
arg['type'] = 'datetime'
arg['validate'] = 'datetime'
# Manual definition has the highest priority
arg.update(value)
deep_set(data, keys=path, value=arg)
return data
@cached_property
def task(self):
"""
<task_group>:
<task>:
<group>:
"""
return read_file(filepath_argument('task'))
@cached_property
def default(self):
"""
<task>:
<group>:
<argument>: value
"""
return read_file(filepath_argument('default'))
@cached_property
def override(self):
"""
<task>:
<group>:
<argument>: value
"""
return read_file(filepath_argument('override'))
@cached_property
def gui(self):
"""
<i18n_group>:
<i18n_key>: value, value is None
"""
return read_file(filepath_argument('gui'))
@cached_property
@timer
def args(self):
"""
Merge definitions into standardised json.
task.yaml ---+
argument.yaml ---+-----> args.json
override.yaml ---+
default.yaml ---+
"""
# Construct args
data = {}
for path, groups in deep_iter(self.task, depth=3):
if 'tasks' not in path:
continue
task = path[2]
# Add storage to all task
# groups.append('Storage')
for group in groups:
if group not in self.argument:
print(f'`{task}.{group}` is not related to any argument group')
continue
deep_set(data, keys=[task, group], value=deepcopy(self.argument[group]))
def check_override(path, value):
# Check existence
old = deep_get(data, keys=path, default=None)
if old is None:
print(f'`{".".join(path)}` is not a existing argument')
return False
# Check type
# But allow `Interval` to be different
old_value = old.get('value', None) if isinstance(old, dict) else old
value = old.get('value', None) if isinstance(value, dict) else value
if type(value) != type(old_value) \
and old_value is not None \
and path[2] not in ['SuccessInterval', 'FailureInterval']:
print(
f'`{value}` ({type(value)}) and `{".".join(path)}` ({type(old_value)}) are in different types')
return False
# Check option
if isinstance(old, dict) and 'option' in old:
if value not in old['option']:
print(f'`{value}` is not an option of argument `{".".join(path)}`')
return False
return True
# Set defaults
for p, v in deep_iter(self.default, depth=3):
if not check_override(p, v):
continue
deep_set(data, keys=p + ['value'], value=v)
# Override non-modifiable arguments
for p, v in deep_iter(self.override, depth=3):
if not check_override(p, v):
continue
if isinstance(v, dict):
typ = v.get('type')
if typ == 'state':
pass
elif typ == 'lock':
deep_default(v, keys='display', value="disabled")
elif deep_get(v, keys='value') is not None:
deep_default(v, keys='display', value='hide')
for arg_k, arg_v in v.items():
deep_set(data, keys=p + [arg_k], value=arg_v)
else:
deep_set(data, keys=p + ['value'], value=v)
deep_set(data, keys=p + ['display'], value='hide')
# Set command
for path, groups in deep_iter(self.task, depth=3):
if 'tasks' not in path:
continue
task = path[2]
if deep_get(data, keys=f'{task}.Scheduler.Command'):
deep_set(data, keys=f'{task}.Scheduler.Command.value', value=task)
deep_set(data, keys=f'{task}.Scheduler.Command.display', value='hide')
return data
@timer
def generate_code(self):
"""
Generate python code.
args.json ---> config_generated.py
"""
visited_group = set()
visited_path = set()
lines = CONFIG_IMPORT
for path, data in deep_iter(self.argument, depth=2):
group, arg = path
if group not in visited_group:
lines.append('')
lines.append(f' # Group `{group}`')
visited_group.add(group)
option = ''
if 'option' in data and data['option']:
option = ' # ' + ', '.join([str(opt) for opt in data['option']])
path = '.'.join(path)
lines.append(f' {path_to_arg(path)} = {repr(parse_value(data["value"], data=data))}{option}')
visited_path.add(path)
with open(filepath_code(), 'w', encoding='utf-8', newline='') as f:
for text in lines:
f.write(text + '\n')
@timer
def generate_stored(self):
import module.config.stored.classes as classes
gen = get_generator()
gen.add('from module.config.stored.classes import (')
with gen.tab():
for cls in sorted([name for name in dir(classes) if name.startswith('Stored')]):
gen.add(cls + ',')
gen.add(')')
gen.Empty()
gen.Empty()
gen.Empty()
gen.CommentAutoGenerage('module/config/config_updater.py')
with gen.Class('StoredGenerated'):
for path, data in deep_iter(self.args, depth=3):
cls = data.get('stored')
if cls:
gen.add(f'{path[-1]} = {cls}("{".".join(path)}")')
gen.write('module/config/stored/stored_generated.py')
@timer
def generate_i18n(self, lang):
"""
Load old translations and generate new translation file.
args.json ---+-----> i18n/<lang>.json
(old) i18n/<lang>.json ---+
"""
new = {}
old = read_file(filepath_i18n(lang))
def deep_load(keys, default=True, words=('name', 'help')):
for word in words:
k = keys + [str(word)]
d = ".".join(k) if default else str(word)
v = deep_get(old, keys=k, default=d)
deep_set(new, keys=k, value=v)
# Menu
for path, data in deep_iter(self.task, depth=3):
if 'tasks' not in path:
continue
task_group, _, task = path
deep_load(['Menu', task_group])
deep_load(['Task', task])
# Arguments
visited_group = set()
for path, data in deep_iter(self.argument, depth=2):
if path[0] not in visited_group:
deep_load([path[0], '_info'])
visited_group.add(path[0])
deep_load(path)
if 'option' in data:
deep_load(path, words=data['option'], default=False)
# Package names
# for package, server in VALID_PACKAGE.items():
# path = ['Emulator', 'PackageName', package]
# if deep_get(new, keys=path) == package:
# deep_set(new, keys=path, value=server.upper())
# for package, server_and_channel in VALID_CHANNEL_PACKAGE.items():
# server, channel = server_and_channel
# name = deep_get(new, keys=['Emulator', 'PackageName', to_package(server)])
# if lang == SERVER_TO_LANG[server]:
# value = f'{name} {channel}渠道服 {package}'
# else:
# value = f'{name} {package}'
# deep_set(new, keys=['Emulator', 'PackageName', package], value=value)
# Game server names
# for server, _list in VALID_SERVER_LIST.items():
# for index in range(len(_list)):
# path = ['Emulator', 'ServerName', f'{server}-{index}']
# prefix = server.split('_')[0].upper()
# prefix = '国服' if prefix == 'CN' else prefix
# deep_set(new, keys=path, value=f'[{prefix}] {_list[index]}')
# GUI i18n
for path, _ in deep_iter(self.gui, depth=2):
group, key = path
deep_load(keys=['Gui', group], words=(key,))
write_file(filepath_i18n(lang), new)
@cached_property
def menu(self):
"""
Generate menu definitions
task.yaml --> menu.json
"""
data = {}
for task_group in self.task.keys():
value = deep_get(self.task, keys=[task_group, 'menu'])
if value not in ['collapse', 'list']:
value = 'collapse'
deep_set(data, keys=[task_group, 'menu'], value=value)
value = deep_get(self.task, keys=[task_group, 'page'])
if value not in ['setting', 'tool']:
value = 'setting'
deep_set(data, keys=[task_group, 'page'], value=value)
tasks = deep_get(self.task, keys=[task_group, 'tasks'], default={})
tasks = list(tasks.keys())
deep_set(data, keys=[task_group, 'tasks'], value=tasks)
return data
@cached_property
def stored(self):
import module.config.stored.classes as classes
data = {}
for path, value in deep_iter(self.args, depth=3):
if value.get('type') != 'stored':
continue
name = path[-1]
stored = value.get('stored')
stored_class = getattr(classes, stored)
row = {
'name': name,
'path': '.'.join(path),
'i18n': f'{path[1]}.{path[2]}.name',
'stored': stored,
'attrs': stored_class('')._attrs,
'order': value.get('order', 0),
'color': value.get('color', '#777777')
}
data[name] = row
# sort by `order` ascending, but `order`==0 at last
data = sorted(data.items(), key=lambda kv: (kv[1]['order'] == 0, kv[1]['order']))
data = {k: v for k, v in data}
return data
@staticmethod
def generate_deploy_template():
template = poor_yaml_read(DEPLOY_TEMPLATE)
cn = {
'Repository': 'cn',
'PypiMirror': 'https://pypi.tuna.tsinghua.edu.cn/simple',
'Language': 'zh-CN',
}
aidlux = {
'GitExecutable': '/usr/bin/git',
'PythonExecutable': '/usr/bin/python',
'RequirementsFile': './deploy/AidLux/0.92/requirements.txt',
'AdbExecutable': '/usr/bin/adb',
}
docker = {
'GitExecutable': '/usr/bin/git',
'PythonExecutable': '/usr/local/bin/python',
'RequirementsFile': './deploy/docker/requirements.txt',
'AdbExecutable': '/usr/bin/adb',
}
def update(suffix, *args):
file = f'./config/deploy.{suffix}.yaml'
new = deepcopy(template)
for dic in args:
new.update(dic)
poor_yaml_write(data=new, file=file)
update('template')
update('template-cn', cn)
# update('template-AidLux', aidlux)
# update('template-AidLux-cn', aidlux, cn)
# update('template-docker', docker)
# update('template-docker-cn', docker, cn)
tpl = {
'Repository': '{{repository}}',
'GitExecutable': '{{gitExecutable}}',
'PythonExecutable': '{{pythonExecutable}}',
'AdbExecutable': '{{adbExecutable}}',
'Language': '{{language}}',
'Theme': '{{theme}}',
}
def update(file, *args):
new = deepcopy(template)
for dic in args:
new.update(dic)
poor_yaml_write(data=new, file=file)
update('./webapp/packages/main/public/deploy.yaml.tpl', tpl)
@timer
def generate(self):
_ = self.args
_ = self.menu
_ = self.stored
# _ = self.event
# self.insert_server()
write_file(filepath_args(), self.args)
write_file(filepath_args('menu'), self.menu)
write_file(filepath_args('stored'), self.stored)
self.generate_code()
self.generate_stored()
for lang in LANGUAGES:
self.generate_i18n(lang)
self.generate_deploy_template()
class ConfigUpdater:
# source, target, (optional)convert_func
# redirection = [
# ('Dungeon.Dungeon.Support', 'Dungeon.DungeonSupport.Use'),
# ('Dungeon.Dungeon.SupportCharacter', 'Dungeon.DungeonSupport.Character'),
# ('Dungeon.Dungeon.Name', 'Dungeon.Dungeon.Name', convert_daily),
# ('Dungeon.Dungeon.NameAtDoubleCalyx', 'Dungeon.Dungeon.NameAtDoubleCalyx', convert_daily),
# ('Dungeon.DungeonDaily.CalyxCrimson', 'Dungeon.DungeonDaily.CalyxCrimson', convert_daily),
# ]
@cached_property
def args(self):
return read_file(filepath_args())
def config_update(self, old, is_template=False):
"""
Args:
old (dict):
is_template (bool):
Returns:
dict:
"""
new = {}
def deep_load(keys):
data = deep_get(self.args, keys=keys, default={})
value = deep_get(old, keys=keys, default=data['value'])
typ = data['type']
display = data.get('display')
if is_template or value is None or value == '' or typ == 'lock' or (display == 'hide' and typ != 'stored'):
value = data['value']
value = parse_value(value, data=data)
deep_set(new, keys=keys, value=value)
for path, _ in deep_iter(self.args, depth=3):
deep_load(path)
# if not is_template:
# new = self.config_redirect(old, new)
# new = self.update_state(new)
return new
# def config_redirect(self, old, new):
# """
# Convert old settings to the new.
#
# Args:
# old (dict):
# new (dict):
#
# Returns:
# dict:
# """
# for row in self.redirection:
# if len(row) == 2:
# source, target = row
# update_func = None
# elif len(row) == 3:
# source, target, update_func = row
# else:
# continue
#
# if isinstance(source, tuple):
# value = []
# error = False
# for attribute in source:
# tmp = deep_get(old, keys=attribute)
# if tmp is None:
# error = True
# continue
# value.append(tmp)
# if error:
# continue
# else:
# value = deep_get(old, keys=source)
# if value is None:
# continue
#
# if update_func is not None:
# value = update_func(value)
#
# if isinstance(target, tuple):
# for k, v in zip(target, value):
# # Allow update same key
# if (deep_get(old, keys=k) is None) or (source == target):
# deep_set(new, keys=k, value=v)
# elif (deep_get(old, keys=target) is None) or (source == target):
# deep_set(new, keys=target, value=value)
#
# return new
# @staticmethod
# def update_state(data):
# def set_daily(quest, value):
# if value is True:
# value = 'achievable'
# if value is False:
# value = 'not_set'
# deep_set(data, keys=['DailyQuest', 'AchievableQuest', quest], value=value)
#
# set_daily('Complete_1_Daily_Mission', 'not_supported')
# # Dungeon
# dungeon = deep_get(data, keys='Dungeon.Scheduler.Enable')
# set_daily('Clear_Calyx_Golden_1_times',
# dungeon and deep_get(data, 'Dungeon.DungeonDaily.CalyxGolden') != 'do_not_achieve')
# set_daily('Complete_Calyx_Crimson_1_time',
# dungeon and deep_get(data, 'Dungeon.DungeonDaily.CalyxCrimson') != 'do_not_achieve')
# set_daily('Clear_Stagnant_Shadow_1_times',
# dungeon and deep_get(data, 'Dungeon.DungeonDaily.StagnantShadow') != 'do_not_achieve')
# set_daily('Clear_Cavern_of_Corrosion_1_times',
# dungeon and deep_get(data, 'Dungeon.DungeonDaily.CavernOfCorrosion') != 'do_not_achieve')
# # Combat requirements
# set_daily('In_a_single_battle_inflict_3_Weakness_Break_of_different_Types', 'achievable')
# set_daily('Inflict_Weakness_Break_5_times', 'achievable')
# set_daily('Defeat_a_total_of_20_enemies', 'achievable')
# set_daily('Enter_combat_by_attacking_enemy_Weakness_and_win_3_times', 'achievable')
# set_daily('Use_Technique_2_times', 'achievable')
# # Other game systems
# set_daily('Go_on_assignment_1_time', deep_get(data, 'Assignment.Scheduler.Enable'))
# set_daily('Take_1_photo', 'achievable')
# set_daily('Destroy_3_destructible_objects', 'achievable')
# set_daily('Complete_Forgotten_Hall_1_time', 'achievable')
# set_daily('Complete_Echo_of_War_1_times', deep_get(data, 'Weekly.Scheduler.Enable'))
# set_daily('Complete_1_stage_in_Simulated_Universe_Any_world', 'not_supported')
# set_daily('Obtain_victory_in_combat_with_support_characters_1_time',
# dungeon and deep_get(data, 'Dungeon.DungeonSupport.Use') in ['when_daily', 'always_use'])
# set_daily('Use_an_Ultimate_to_deal_the_final_blow_1_time', 'achievable')
# # Build
# set_daily('Level_up_any_character_1_time', 'not_supported')
# set_daily('Level_up_any_Light_Cone_1_time', 'not_supported')
# set_daily('Level_up_any_Relic_1_time', 'not_supported')
# # Items
# set_daily('Salvage_any_Relic', 'achievable')
# set_daily('Synthesize_Consumable_1_time', 'achievable')
# set_daily('Synthesize_material_1_time', 'achievable')
# set_daily('Use_Consumables_1_time', 'achievable')
# return data
def read_file(self, config_name, is_template=False):
"""
Read and update config file.
Args:
config_name (str): ./config/{file}.json
is_template (bool):
Returns:
dict:
"""
old = read_file(filepath_config(config_name))
new = self.config_update(old, is_template=is_template)
# The updated config did not write into file, although it doesn't matters.
# Commented for performance issue
# self.write_file(config_name, new)
return new
@staticmethod
def write_file(config_name, data, mod_name='alas'):
"""
Write config file.
Args:
config_name (str): ./config/{file}.json
data (dict):
mod_name (str):
"""
write_file(filepath_config(config_name, mod_name), data)
@timer
def update_file(self, config_name, is_template=False):
"""
Read, update and write config file.
Args:
config_name (str): ./config/{file}.json
is_template (bool):
Returns:
dict:
"""
data = self.read_file(config_name, is_template=is_template)
self.write_file(config_name, data)
return data
if __name__ == '__main__':
"""
Process the whole config generation.
task.yaml -+----------------> menu.json
argument.yaml -+-> args.json ---> config_generated.py
override.yaml -+ |
gui.yaml --------\|
||
(old) i18n/<lang>.json --------\\========> i18n/<lang>.json
(old) template.json ---------\========> template.json
"""
# Ensure running in Alas root folder
import os
os.chdir(os.path.join(os.path.dirname(__file__), '../../'))
ConfigGenerator().generate()
ConfigUpdater().update_file('template', is_template=True)
+340
View File
@@ -0,0 +1,340 @@
{
"Menu": {
"Alas": {
"name": "Menu.Alas.name",
"help": "Menu.Alas.help"
},
"Daily": {
"name": "Menu.Daily.name",
"help": "Menu.Daily.help"
}
},
"Task": {
"Alas": {
"name": "Task.Alas.name",
"help": "Task.Alas.help"
},
"Restart": {
"name": "Task.Restart.name",
"help": "Task.Restart.help"
},
"Cafe": {
"name": "Task.Cafe.name",
"help": "Task.Cafe.help"
},
"Mail": {
"name": "Task.Mail.name",
"help": "Task.Mail.help"
},
"Circle": {
"name": "Task.Circle.name",
"help": "Task.Circle.help"
},
"TacticalChallenge": {
"name": "Task.TacticalChallenge.name",
"help": "Task.TacticalChallenge.help"
},
"DataUpdate": {
"name": "Task.DataUpdate.name",
"help": "Task.DataUpdate.help"
}
},
"Scheduler": {
"_info": {
"name": "Scheduler._info.name",
"help": "Scheduler._info.help"
},
"Enable": {
"name": "Scheduler.Enable.name",
"help": "Scheduler.Enable.help",
"True": "True",
"False": "False"
},
"NextRun": {
"name": "Scheduler.NextRun.name",
"help": "Scheduler.NextRun.help"
},
"Command": {
"name": "Scheduler.Command.name",
"help": "Scheduler.Command.help"
},
"ServerUpdate": {
"name": "Scheduler.ServerUpdate.name",
"help": "Scheduler.ServerUpdate.help"
}
},
"Emulator": {
"_info": {
"name": "Emulator._info.name",
"help": "Emulator._info.help"
},
"Serial": {
"name": "Emulator.Serial.name",
"help": "Emulator.Serial.help"
},
"PackageName": {
"name": "Emulator.PackageName.name",
"help": "Emulator.PackageName.help",
"auto": "auto",
"JP-Official": "JP-Official"
},
"GameLanguage": {
"name": "Emulator.GameLanguage.name",
"help": "Emulator.GameLanguage.help",
"auto": "auto",
"jp": "jp"
},
"ScreenshotMethod": {
"name": "Emulator.ScreenshotMethod.name",
"help": "Emulator.ScreenshotMethod.help",
"auto": "auto",
"ADB": "ADB",
"ADB_nc": "ADB_nc",
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "Emulator.ControlMethod.name",
"help": "Emulator.ControlMethod.help",
"minitouch": "minitouch",
"MaaTouch": "MaaTouch"
},
"AdbRestart": {
"name": "Emulator.AdbRestart.name",
"help": "Emulator.AdbRestart.help"
}
},
"EmulatorInfo": {
"_info": {
"name": "EmulatorInfo._info.name",
"help": "EmulatorInfo._info.help"
},
"Emulator": {
"name": "EmulatorInfo.Emulator.name",
"help": "EmulatorInfo.Emulator.help",
"auto": "auto",
"NoxPlayer": "NoxPlayer",
"NoxPlayer64": "NoxPlayer64",
"BlueStacks4": "BlueStacks4",
"BlueStacks5": "BlueStacks5",
"BlueStacks4HyperV": "BlueStacks4HyperV",
"BlueStacks5HyperV": "BlueStacks5HyperV",
"LDPlayer3": "LDPlayer3",
"LDPlayer4": "LDPlayer4",
"LDPlayer9": "LDPlayer9",
"MuMuPlayer": "MuMuPlayer",
"MuMuPlayerX": "MuMuPlayerX",
"MuMuPlayer12": "MuMuPlayer12",
"MEmuPlayer": "MEmuPlayer"
},
"name": {
"name": "EmulatorInfo.name.name",
"help": "EmulatorInfo.name.help"
},
"path": {
"name": "EmulatorInfo.path.name",
"help": "EmulatorInfo.path.help"
}
},
"Error": {
"_info": {
"name": "Error._info.name",
"help": "Error._info.help"
},
"Restart": {
"name": "Error.Restart.name",
"help": "Error.Restart.help",
"game": "game",
"game_emulator": "game_emulator"
},
"SaveError": {
"name": "Error.SaveError.name",
"help": "Error.SaveError.help"
},
"ScreenshotLength": {
"name": "Error.ScreenshotLength.name",
"help": "Error.ScreenshotLength.help"
},
"OnePushConfig": {
"name": "Error.OnePushConfig.name",
"help": "Error.OnePushConfig.help"
}
},
"Optimization": {
"_info": {
"name": "Optimization._info.name",
"help": "Optimization._info.help"
},
"ScreenshotInterval": {
"name": "Optimization.ScreenshotInterval.name",
"help": "Optimization.ScreenshotInterval.help"
},
"CombatScreenshotInterval": {
"name": "Optimization.CombatScreenshotInterval.name",
"help": "Optimization.CombatScreenshotInterval.help"
},
"WhenTaskQueueEmpty": {
"name": "Optimization.WhenTaskQueueEmpty.name",
"help": "Optimization.WhenTaskQueueEmpty.help",
"stay_there": "stay_there",
"goto_main": "goto_main",
"close_game": "close_game"
}
},
"Cafe": {
"_info": {
"name": "Cafe._info.name",
"help": "Cafe._info.help"
},
"Reward": {
"name": "Cafe.Reward.name",
"help": "Cafe.Reward.help"
},
"Touch": {
"name": "Cafe.Touch.name",
"help": "Cafe.Touch.help"
},
"AutoAdjust": {
"name": "Cafe.AutoAdjust.name",
"help": "Cafe.AutoAdjust.help"
}
},
"TacticalChallenge": {
"_info": {
"name": "TacticalChallenge._info.name",
"help": "TacticalChallenge._info.help"
},
"PlayerSelect": {
"name": "TacticalChallenge.PlayerSelect.name",
"help": "TacticalChallenge.PlayerSelect.help",
"0": "0",
"1": "1",
"2": "2",
"3": "3"
}
},
"ItemStorage": {
"_info": {
"name": "ItemStorage._info.name",
"help": "ItemStorage._info.help"
},
"AP": {
"name": "ItemStorage.AP.name",
"help": "ItemStorage.AP.help"
},
"Credit": {
"name": "ItemStorage.Credit.name",
"help": "ItemStorage.Credit.help"
},
"Pyroxene": {
"name": "ItemStorage.Pyroxene.name",
"help": "ItemStorage.Pyroxene.help"
}
},
"Gui": {
"Aside": {
"Install": "Gui.Aside.Install",
"Home": "Gui.Aside.Home",
"Develop": "Gui.Aside.Develop",
"Performance": "Gui.Aside.Performance",
"Setting": "Gui.Aside.Setting",
"AddAlas": "Gui.Aside.AddAlas"
},
"Button": {
"Start": "Gui.Button.Start",
"Stop": "Gui.Button.Stop",
"ScrollON": "Gui.Button.ScrollON",
"ScrollOFF": "Gui.Button.ScrollOFF",
"ClearLog": "Gui.Button.ClearLog",
"Setting": "Gui.Button.Setting",
"CheckUpdate": "Gui.Button.CheckUpdate",
"ClickToUpdate": "Gui.Button.ClickToUpdate",
"RetryUpdate": "Gui.Button.RetryUpdate",
"CancelUpdate": "Gui.Button.CancelUpdate"
},
"Toast": {
"DisableTranslateMode": "Gui.Toast.DisableTranslateMode",
"ConfigSaved": "Gui.Toast.ConfigSaved",
"AlasIsRunning": "Gui.Toast.AlasIsRunning",
"ClickToUpdate": "Gui.Toast.ClickToUpdate"
},
"Status": {
"Running": "Gui.Status.Running",
"Inactive": "Gui.Status.Inactive",
"Warning": "Gui.Status.Warning",
"Updating": "Gui.Status.Updating"
},
"MenuAlas": {
"Overview": "Gui.MenuAlas.Overview",
"Log": "Gui.MenuAlas.Log"
},
"MenuDevelop": {
"HomePage": "Gui.MenuDevelop.HomePage",
"Translate": "Gui.MenuDevelop.Translate",
"Update": "Gui.MenuDevelop.Update",
"Remote": "Gui.MenuDevelop.Remote",
"Utils": "Gui.MenuDevelop.Utils"
},
"Overview": {
"Scheduler": "Gui.Overview.Scheduler",
"Log": "Gui.Overview.Log",
"Running": "Gui.Overview.Running",
"Pending": "Gui.Overview.Pending",
"Waiting": "Gui.Overview.Waiting",
"NoTask": "Gui.Overview.NoTask"
},
"Dashboard": {
"NoData": "Gui.Dashboard.NoData",
"TimeError": "Gui.Dashboard.TimeError",
"JustNow": "Gui.Dashboard.JustNow",
"MinutesAgo": "Gui.Dashboard.MinutesAgo",
"HoursAgo": "Gui.Dashboard.HoursAgo",
"DaysAgo": "Gui.Dashboard.DaysAgo",
"LongTimeAgo": "Gui.Dashboard.LongTimeAgo"
},
"AddAlas": {
"PopupTitle": "Gui.AddAlas.PopupTitle",
"NewName": "Gui.AddAlas.NewName",
"CopyFrom": "Gui.AddAlas.CopyFrom",
"Confirm": "Gui.AddAlas.Confirm",
"FileExist": "Gui.AddAlas.FileExist",
"InvalidChar": "Gui.AddAlas.InvalidChar",
"InvalidPrefixTemplate": "Gui.AddAlas.InvalidPrefixTemplate"
},
"Update": {
"UpToDate": "Gui.Update.UpToDate",
"HaveUpdate": "Gui.Update.HaveUpdate",
"UpdateStart": "Gui.Update.UpdateStart",
"UpdateWait": "Gui.Update.UpdateWait",
"UpdateRun": "Gui.Update.UpdateRun",
"UpdateSuccess": "Gui.Update.UpdateSuccess",
"UpdateFailed": "Gui.Update.UpdateFailed",
"UpdateChecking": "Gui.Update.UpdateChecking",
"UpdateCancel": "Gui.Update.UpdateCancel",
"UpdateFinish": "Gui.Update.UpdateFinish",
"Local": "Gui.Update.Local",
"Upstream": "Gui.Update.Upstream",
"Author": "Gui.Update.Author",
"Time": "Gui.Update.Time",
"Message": "Gui.Update.Message",
"DisabledWarn": "Gui.Update.DisabledWarn",
"DetailedHistory": "Gui.Update.DetailedHistory"
},
"Remote": {
"Running": "Gui.Remote.Running",
"NotRunning": "Gui.Remote.NotRunning",
"NotEnable": "Gui.Remote.NotEnable",
"EntryPoint": "Gui.Remote.EntryPoint",
"ConfigureHint": "Gui.Remote.ConfigureHint",
"SSHNotInstall": "Gui.Remote.SSHNotInstall"
},
"Text": {
"InvalidFeedBack": "Gui.Text.InvalidFeedBack",
"Clear": "Gui.Text.Clear"
}
}
}
+340
View File
@@ -0,0 +1,340 @@
{
"Menu": {
"Alas": {
"name": "AAS",
"help": ""
},
"Daily": {
"name": "每日",
"help": ""
}
},
"Task": {
"Alas": {
"name": "AAS设置",
"help": ""
},
"Restart": {
"name": "异常处理",
"help": ""
},
"Cafe": {
"name": "咖啡厅",
"help": ""
},
"Mail": {
"name": "邮箱",
"help": ""
},
"Circle": {
"name": "公会",
"help": "社团 / 小组"
},
"TacticalChallenge": {
"name": "战术对抗赛",
"help": "战术大赛 / 竞技场"
},
"DataUpdate": {
"name": "仪表盘更新",
"help": ""
}
},
"Scheduler": {
"_info": {
"name": "任务设置",
"help": ""
},
"Enable": {
"name": "启用该功能",
"help": "将这个任务加入调度器",
"True": "已启用",
"False": "已停用"
},
"NextRun": {
"name": "下一次运行时间",
"help": "自动计算的数值,不需要手动修改。清空后将立即运行"
},
"Command": {
"name": "内部任务名称",
"help": ""
},
"ServerUpdate": {
"name": "服务器刷新时间",
"help": "一些任务运行成功后,将推迟下一次运行至服务器刷新时间\n自动换算时区,一般不需要修改"
}
},
"Emulator": {
"_info": {
"name": "模拟器设置",
"help": ""
},
"Serial": {
"name": "模拟器 Serial",
"help": "常见的模拟器 Serial 可以查询下方列表\n填 \"auto\" 自动检测模拟器,多个模拟器正在运行或使用不支持自动检测的模拟器时无法使用 \"auto\",必须手动填写\n\n模拟器默认 Serial\n- 蓝叠模拟器 127.0.0.1:5555\n- 蓝叠模拟器4 Hyper-v版,填\"bluestacks4-hyperv\"自动连接,多开填\"bluestacks4-hyperv-2\"以此类推\n- 蓝叠模拟器5 Hyper-v版,填\"bluestacks5-hyperv\"自动连接,多开填\"bluestacks5-hyperv-1\"以此类推\n- 夜神模拟器 127.0.0.1:62001\n- 夜神模拟器64位 127.0.0.1:59865\n- MuMu模拟器/MuMu模拟器X 127.0.0.1:7555\n- MuMu模拟器12 127.0.0.1:16384\n- 逍遥模拟器 127.0.0.1:21503\n- 雷电模拟器 emulator-5554 或 127.0.0.1:5555\n- WSA,填\"wsa-0\"使游戏在后台运行,需要使用第三方软件操控或关闭(建议使用scrcpy操控)\n如果你使用了模拟器的多开功能,它们的 Serial 将不是默认的,可以在 console.bat 中执行 `adb devices` 查询,或根据模拟器官方的教程填写"
},
"PackageName": {
"name": "游戏服务器",
"help": "无法区分国际服的不同地区,请手动选择服务器",
"auto": "自动检测",
"JP-Official": "[日服]-官服"
},
"GameLanguage": {
"name": "游戏内文本语言",
"help": "",
"auto": "自动检测",
"jp": "日语"
},
"ScreenshotMethod": {
"name": "模拟器截图方案",
"help": "使用自动选择时,将执行一次性能测试并自动更改为最快的截图方案\n一般情况下的速度: DroidCast_raw >> aScreenCap_nc > ADB_nc >>> aScreenCap > uiautomator2 ~= ADB\n运行 工具 - 性能测试 以寻找最快的方案",
"auto": "自动选择最快的",
"ADB": "ADB",
"ADB_nc": "ADB_nc",
"uiautomator2": "uiautomator2",
"aScreenCap": "aScreenCap",
"aScreenCap_nc": "aScreenCap_nc",
"DroidCast": "DroidCast",
"DroidCast_raw": "DroidCast_raw",
"scrcpy": "scrcpy"
},
"ControlMethod": {
"name": "模拟器控制方案",
"help": "速度: MaaTouch = minitouch >>> uiautomator2 ~= ADB\n建议选MaaTouch",
"minitouch": "minitouch",
"MaaTouch": "MaaTouch"
},
"AdbRestart": {
"name": "在检测不到设备的时候尝试重启adb",
"help": ""
}
},
"EmulatorInfo": {
"_info": {
"name": "模拟器设置",
"help": "下列数值是根据Serial自动填充的,如果不懂请不要随意修改"
},
"Emulator": {
"name": "模拟器类型",
"help": "",
"auto": "自动检测",
"NoxPlayer": "夜神模拟器",
"NoxPlayer64": "夜神模拟器64位",
"BlueStacks4": "蓝叠模拟器4",
"BlueStacks5": "蓝叠模拟器5",
"BlueStacks4HyperV": "蓝叠模拟器4 Hyper-V",
"BlueStacks5HyperV": "蓝叠模拟器5 Hyper-V",
"LDPlayer3": "雷电模拟器3",
"LDPlayer4": "雷电模拟器4",
"LDPlayer9": "雷电模拟器9",
"MuMuPlayer": "MuMu模拟器",
"MuMuPlayerX": "MuMu模拟器X",
"MuMuPlayer12": "MuMu模拟器12",
"MEmuPlayer": "逍遥模拟器"
},
"name": {
"name": "模拟器实例名称",
"help": ""
},
"path": {
"name": "模拟器安装路径",
"help": ""
}
},
"Error": {
"_info": {
"name": "调试设置",
"help": ""
},
"Restart": {
"name": "出错时,重启游戏",
"help": "",
"game": "重启游戏",
"game_emulator": "重启模拟器和游戏"
},
"SaveError": {
"name": "出错时,保存 Log 和截图",
"help": ""
},
"ScreenshotLength": {
"name": "出错时,保留最后 X 张截图",
"help": ""
},
"OnePushConfig": {
"name": "错误推送设置",
"help": "发生无法处理的异常后,使用 Onepush 推送一条错误信息。配置方法见文档:https://github.com/LmeSzinc/AzurLaneAutoScript/wiki/Onepush-configuration-%5BCN%5D"
}
},
"Optimization": {
"_info": {
"name": "优化设置",
"help": ""
},
"ScreenshotInterval": {
"name": "放慢截图速度至 X 秒一张",
"help": "执行两次截图之间的最小间隔,限制在 0.1 ~ 0.3,对于高配置电脑能降低 CPU 占用"
},
"CombatScreenshotInterval": {
"name": "战斗中放慢截图速度至 X 秒一张",
"help": "执行两次截图之间的最小间隔,限制在 0.1 ~ 1.0,能降低战斗时的 CPU 占用"
},
"WhenTaskQueueEmpty": {
"name": "当任务队列清空后",
"help": "无任务时关闭游戏,能在收菜期间降低 CPU 占用",
"stay_there": "停在原处",
"goto_main": "前往主界面",
"close_game": "关闭游戏"
}
},
"Cafe": {
"_info": {
"name": "咖啡厅设置",
"help": ""
},
"Reward": {
"name": "领取体力",
"help": ""
},
"Touch": {
"name": "学生互动点击",
"help": "自动检测可互动的学生并点击"
},
"AutoAdjust": {
"name": "自动调整界面",
"help": "在进行学生互动点击前对咖啡馆界面进行缩放和位置调整,以增加互动成功率"
}
},
"TacticalChallenge": {
"_info": {
"name": "战术对抗赛设置",
"help": ""
},
"PlayerSelect": {
"name": "挑战对手选取",
"help": "",
"0": "随机",
"1": "第一位",
"2": "第二位",
"3": "第三位"
}
},
"ItemStorage": {
"_info": {
"name": "ItemStorage._info.name",
"help": "ItemStorage._info.help"
},
"AP": {
"name": "体力",
"help": ""
},
"Credit": {
"name": "信用点",
"help": ""
},
"Pyroxene": {
"name": "青辉石",
"help": ""
}
},
"Gui": {
"Aside": {
"Install": "安装",
"Home": "主页",
"Develop": "开发",
"Performance": "性能",
"Setting": "设置",
"AddAlas": "新增"
},
"Button": {
"Start": "启动",
"Stop": "停止",
"ScrollON": "自动滚动 开",
"ScrollOFF": "自动滚动 关",
"ClearLog": "清空日志",
"Setting": "设置",
"CheckUpdate": "检查更新",
"ClickToUpdate": "进行更新",
"RetryUpdate": "重试更新",
"CancelUpdate": "取消更新"
},
"Toast": {
"DisableTranslateMode": "点击这里关闭翻译模式",
"ConfigSaved": "设置已保存",
"AlasIsRunning": "调度器已在运行中",
"ClickToUpdate": "有更新可用,点击这里进行更新"
},
"Status": {
"Running": "运行中",
"Inactive": "闲置",
"Warning": "发生错误",
"Updating": "等待更新"
},
"MenuAlas": {
"Overview": "总览",
"Log": "运行日志"
},
"MenuDevelop": {
"HomePage": "主页",
"Translate": "翻译",
"Update": "更新器",
"Remote": "远程控制",
"Utils": "工具"
},
"Overview": {
"Scheduler": "调度器",
"Log": "日志",
"Running": "运行中",
"Pending": "队列中",
"Waiting": "等待中",
"NoTask": "无任务"
},
"Dashboard": {
"NoData": "无数据",
"TimeError": "时间错误",
"JustNow": "刚刚",
"MinutesAgo": "{time}分钟前",
"HoursAgo": "{time}小时前",
"DaysAgo": "{time}天前",
"LongTimeAgo": "很久以前"
},
"AddAlas": {
"PopupTitle": "添加新配置",
"NewName": "新的配置文件名",
"CopyFrom": "从现有的配置中复制",
"Confirm": "添加",
"FileExist": "存在同名的配置文件,请重新输入一个",
"InvalidChar": "配置文件名不能包含下列任何字符:.\\/:*?\"<>|",
"InvalidPrefixTemplate": "配置文件名不能以 template 开头"
},
"Update": {
"UpToDate": "已是最新版本",
"HaveUpdate": "有新版本可用",
"UpdateStart": "开始更新",
"UpdateWait": "等待所有 Alas 完成当前任务",
"UpdateRun": "更新中",
"UpdateSuccess": "更新成功,正在重启",
"UpdateFailed": "更新失败,可在./log/*_gui.txt中找到错误日志",
"UpdateChecking": "检查更新中",
"UpdateCancel": "取消更新,重启 Alas 中",
"UpdateFinish": "更新成功,请手动重启",
"Local": "本地",
"Upstream": "上游仓库",
"Author": "作者",
"Time": "提交时间",
"Message": "提交信息",
"DisabledWarn": "更新模块未启用,你需要手动重启 Alas 进行更新",
"DetailedHistory": "详细提交历史"
},
"Remote": {
"Running": "运行中",
"NotRunning": "未运行,与服务器的连接断开或服务器离线",
"NotEnable": "未启用,在 deploy.yaml 中设置 webui 密码并启用远程控制",
"EntryPoint": "远程访问 url 地址:",
"ConfigureHint": "配置教程:",
"SSHNotInstall": "系统中没有 ssh 工具,请参考教程下载或安装 ssh"
},
"Text": {
"InvalidFeedBack": "格式错误。 示例:{0}",
"Clear": "清除"
}
}
}
+59
View File
@@ -0,0 +1,59 @@
"""
This file stores server, such as 'cn', 'en'.
Use 'import module.config.server as server' to import, don't use 'from xxx import xxx'.
"""
lang = 'jp' # Setting default to cn, will avoid errors when using dev_tools
server = 'JP-Official'
VALID_LANG = ['jp']
VALID_SERVER = {
'JP-Official': 'com.YostarJP.BlueArchive'
}
VALID_PACKAGE = set(list(VALID_SERVER.values()))
def set_lang(lang_: str):
"""
Change language and this will affect globally,
including assets and language specific methods.
Args:
lang_: package name or server.
"""
global lang
lang = lang_
from module.base.resource import release_resources
release_resources()
def to_server(package_or_server: str) -> str:
"""
Convert package/server to server.
To unknown packages, consider they are a CN channel servers.
"""
# Can't distinguish different regions of oversea servers,
# assume it's 'OVERSEA-Asia'
# if package_or_server == 'com.HoYoverse.hkrpgoversea':
# return 'OVERSEA-Asia'
for key, value in VALID_SERVER.items():
if value == package_or_server:
return key
if key == package_or_server:
return key
raise ValueError(f'Package invalid: {package_or_server}')
def to_package(package_or_server: str) -> str:
"""
Convert package/server to package.
"""
for key, value in VALID_SERVER.items():
if value == package_or_server:
return value
if key == package_or_server:
return value
raise ValueError(f'Server invalid: {package_or_server}')
+318
View File
@@ -0,0 +1,318 @@
from datetime import datetime
from functools import cached_property as functools_cached_property
from module.base.decorator import cached_property
from module.config.utils import DEFAULT_TIME, deep_get, get_server_last_monday_update, get_server_last_update
# from module.exception import ScriptError
def now():
return datetime.now().replace(microsecond=0)
def iter_attribute(cls):
"""
Args:
cls: Class or object
Yields:
str, obj: Attribute name, attribute value
"""
for attr in dir(cls):
if attr.startswith('_'):
continue
value = getattr(cls, attr)
if type(value).__name__ in ['function', 'property']:
continue
yield attr, value
class StoredBase:
time = DEFAULT_TIME
def __init__(self, key):
self._key = key
self._config = None
@cached_property
def _name(self):
return self._key.split('.')[-1]
def _bind(self, config):
"""
Args:
config (AzurLaneConfig):
"""
self._config = config
@functools_cached_property
def _stored(self):
assert self._config is not None, 'StoredBase._bind() must be called before getting stored data'
from module.logger import logger
out = {}
stored = deep_get(self._config.data, keys=self._key, default={})
for attr, default in self._attrs.items():
value = stored.get(attr, default)
if attr == 'time':
if not isinstance(value, datetime):
try:
value = datetime.fromisoformat(value)
except ValueError:
logger.warning(f'{self._name} has invalid attr: {attr}={value}, use default={default}')
value = default
else:
if not isinstance(value, type(default)):
logger.warning(f'{self._name} has invalid attr: {attr}={value}, use default={default}')
value = default
out[attr] = value
return out
@cached_property
def _attrs(self) -> dict:
"""
All attributes defined
"""
attrs = {
# time is the first one
'time': DEFAULT_TIME
}
for attr, value in iter_attribute(self.__class__):
if attr.islower():
attrs[attr] = value
return attrs
def __setattr__(self, key, value):
if key in self._attrs:
stored = self._stored
stored['time'] = now()
stored[key] = value
self._config.modified[self._key] = stored
if self._config.auto_update:
self._config.update()
else:
super().__setattr__(key, value)
def __getattribute__(self, item):
if not item.startswith('_') and item in self._attrs:
return self._stored[item]
else:
return super().__getattribute__(item)
def is_expired(self) -> bool:
return False
def show(self):
"""
Log self
"""
from module.logger import logger
logger.attr(self._name, self._stored)
class StoredExpiredAt0400(StoredBase):
def is_expired(self):
from module.logger import logger
self.show()
expired = self.time < get_server_last_update('04:00')
logger.attr(f'{self._name} expired', expired)
return expired
class StoredExpiredAtMonday0400(StoredBase):
def is_expired(self):
from module.logger import logger
self.show()
expired = self.time < get_server_last_monday_update('04:00')
logger.attr(f'{self._name} expired', expired)
return expired
class StoredInt(StoredBase):
value = 0
class StoredCounter(StoredBase):
value = 0
total = 0
FIXED_TOTAL = 0
def set(self, value, total=0):
if self.FIXED_TOTAL:
total = self.FIXED_TOTAL
with self._config.multi_set():
self.value = value
self.total = total
def to_counter(self) -> str:
return f'{self.value}/{self.total}'
def is_full(self) -> bool:
return self.value >= self.total
def get_remain(self) -> int:
return self.total - self.value
@cached_property
def _attrs(self) -> dict:
attrs = super()._attrs
if self.FIXED_TOTAL:
attrs['total'] = self.FIXED_TOTAL
return attrs
@functools_cached_property
def _stored(self):
stored = super()._stored
if self.FIXED_TOTAL:
stored['total'] = self.FIXED_TOTAL
return stored
class StoredAP(StoredCounter):
pass
# class StoredDailyActivity(StoredCounter, StoredExpiredAt0400):
# FIXED_TOTAL = 500
#
#
# class StoredTrailblazePower(StoredCounter):
# FIXED_TOTAL = 240
#
#
# class StoredSimulatedUniverse(StoredCounter, StoredExpiredAt0400):
# pass
#
#
# class StoredAssignment(StoredCounter):
# pass
#
#
# class StoredDaily(StoredCounter, StoredExpiredAt0400):
# quest1 = ''
# quest2 = ''
# quest3 = ''
# quest4 = ''
# quest5 = ''
# quest6 = ''
#
# FIXED_TOTAL = 6
#
# def load_quests(self):
# """
# Returns:
# list[DailyQuest]: Note that must check if quests are expired
# """
# # DailyQuest should be lazy loaded
# from tasks.daily.keywords import DailyQuest
# quests = []
# for name in [self.quest1, self.quest2, self.quest3, self.quest4, self.quest5, self.quest6]:
# if not name:
# continue
# try:
# quest = DailyQuest.find(name)
# quests.append(quest)
# except ScriptError:
# pass
# return quests
#
# def write_quests(self, quests):
# """
# Args:
# quests (list[DailyQuest, str]):
# """
# from tasks.daily.keywords import DailyQuest
# quests = [q.name if isinstance(q, DailyQuest) else q for q in quests]
# with self._config.multi_set():
# self.set(value=max(self.FIXED_TOTAL - len(quests), 0))
# try:
# self.quest1 = quests[0]
# except IndexError:
# self.quest1 = ''
# try:
# self.quest2 = quests[1]
# except IndexError:
# self.quest2 = ''
# try:
# self.quest3 = quests[2]
# except IndexError:
# self.quest3 = ''
# try:
# self.quest4 = quests[3]
# except IndexError:
# self.quest4 = ''
# try:
# self.quest5 = quests[4]
# except IndexError:
# self.quest5 = ''
# try:
# self.quest6 = quests[5]
# except IndexError:
# self.quest6 = ''
#
#
# class StoredDungeonDouble(StoredExpiredAt0400):
# calyx = 0
# relic = 0
#
#
# class StoredEchoOfWar(StoredCounter, StoredExpiredAtMonday0400):
# FIXED_TOTAL = 3
#
#
# class StoredBattlePassLevel(StoredCounter):
# FIXED_TOTAL = 50
#
#
# class StoredBattlePassTodayQuest(StoredCounter, StoredExpiredAt0400):
# quest1 = ''
# quest2 = ''
# quest3 = ''
# quest4 = ''
#
# FIXED_TOTAL = 4
#
# def load_quests(self):
# """
# Returns:
# list[DailyQuest]: Note that must check if quests are expired
# """
# # BattlePassQuest should be lazy loaded
# from tasks.battle_pass.keywords import BattlePassQuest
# quests = []
# for name in [self.quest1, self.quest2, self.quest3, self.quest4]:
# if not name:
# continue
# try:
# quest = BattlePassQuest.find(name)
# quests.append(quest)
# except ScriptError:
# pass
# return quests
#
# def write_quests(self, quests):
# """
# Args:
# quests (list[DailyQuest, str]):
# """
# from tasks.battle_pass.keywords import BattlePassQuest
# quests = [q.name if isinstance(q, BattlePassQuest) else q for q in quests]
# with self._config.multi_set():
# self.set(value=max(self.FIXED_TOTAL - len(quests), 0))
# try:
# self.quest1 = quests[0]
# except IndexError:
# self.quest1 = ''
# try:
# self.quest2 = quests[1]
# except IndexError:
# self.quest2 = ''
# try:
# self.quest3 = quests[2]
# except IndexError:
# self.quest3 = ''
# try:
# self.quest4 = quests[3]
# except IndexError:
# self.quest4 = ''
+17
View File
@@ -0,0 +1,17 @@
from module.config.stored.classes import (
StoredAP,
StoredBase,
StoredCounter,
StoredExpiredAt0400,
StoredExpiredAtMonday0400,
StoredInt,
)
# This file was auto-generated, do not modify it manually. To generate:
# ``` python -m module/config/config_updater.py ```
class StoredGenerated:
AP = StoredAP("DataUpdate.ItemStorage.AP")
Credit = StoredInt("DataUpdate.ItemStorage.Credit")
Pyroxene = StoredInt("DataUpdate.ItemStorage.Pyroxene")
+664
View File
@@ -0,0 +1,664 @@
import json
import os
import random
import string
from datetime import datetime, timedelta, timezone
import yaml
from filelock import FileLock
import module.config.server as server_
from module.config.atomicwrites import atomic_write
LANGUAGES = ['zh-CN', 'en-US']
SERVER_TO_TIMEZONE = {
'JP-Official': timedelta(hours=9),
}
DEFAULT_TIME = datetime(2020, 1, 1, 0, 0)
# https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data/15423007
def str_presenter(dumper, data):
if len(data.splitlines()) > 1: # check for multiline string
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_presenter)
yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
def filepath_args(filename='args', mod_name='alas'):
return f'./module/config/argument/{filename}.json'
def filepath_argument(filename):
return f'./module/config/argument/{filename}.yaml'
def filepath_i18n(lang, mod_name='alas'):
return os.path.join('./module/config/i18n', f'{lang}.json')
def filepath_config(filename, mod_name='alas'):
if mod_name == 'alas':
return os.path.join('./config', f'{filename}.json')
else:
return os.path.join('./config', f'{filename}.{mod_name}.json')
def filepath_code():
return './module/config/config_generated.py'
def read_file(file):
"""
Read a file, support both .yaml and .json format.
Return empty dict if file not exists.
Args:
file (str):
Returns:
dict, list:
"""
folder = os.path.dirname(file)
if not os.path.exists(folder):
os.mkdir(folder)
if not os.path.exists(file):
return {}
_, ext = os.path.splitext(file)
lock = FileLock(f"{file}.lock")
with lock:
print(f'read: {file}')
if ext == '.yaml':
with open(file, mode='r', encoding='utf-8') as f:
s = f.read()
data = list(yaml.safe_load_all(s))
if len(data) == 1:
data = data[0]
if not data:
data = {}
return data
elif ext == '.json':
with open(file, mode='r', encoding='utf-8') as f:
s = f.read()
return json.loads(s)
else:
print(f'Unsupported config file extension: {ext}')
return {}
def write_file(file, data):
"""
Write data into a file, supports both .yaml and .json format.
Args:
file (str):
data (dict, list):
"""
folder = os.path.dirname(file)
if not os.path.exists(folder):
os.mkdir(folder)
_, ext = os.path.splitext(file)
lock = FileLock(f"{file}.lock")
with lock:
print(f'write: {file}')
if ext == '.yaml':
with atomic_write(file, overwrite=True, encoding='utf-8', newline='') as f:
if isinstance(data, list):
yaml.safe_dump_all(data, f, default_flow_style=False, encoding='utf-8', allow_unicode=True,
sort_keys=False)
else:
yaml.safe_dump(data, f, default_flow_style=False, encoding='utf-8', allow_unicode=True,
sort_keys=False)
elif ext == '.json':
with atomic_write(file, overwrite=True, encoding='utf-8', newline='') as f:
s = json.dumps(data, indent=2, ensure_ascii=False, sort_keys=False, default=str)
f.write(s)
else:
print(f'Unsupported config file extension: {ext}')
def iter_folder(folder, is_dir=False, ext=None):
"""
Args:
folder (str):
is_dir (bool): True to iter directories only
ext (str): File extension, such as `.yaml`
Yields:
str: Absolute path of files
"""
for file in os.listdir(folder):
sub = os.path.join(folder, file)
if is_dir:
if os.path.isdir(sub):
yield sub.replace('\\\\', '/').replace('\\', '/')
elif ext is not None:
if not os.path.isdir(sub):
_, extension = os.path.splitext(file)
if extension == ext:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')
else:
yield os.path.join(folder, file).replace('\\\\', '/').replace('\\', '/')
def alas_template():
"""
Returns:
list[str]: Name of all Alas instances, except `template`.
"""
out = []
for file in os.listdir('./config'):
name, extension = os.path.splitext(file)
if name == 'template' and extension == '.json':
out.append(f'{name}-aas')
# out.extend(mod_template())
return out
def alas_instance():
"""
Returns:
list[str]: Name of all Alas instances, except `template`.
"""
out = []
for file in os.listdir('./config'):
name, extension = os.path.splitext(file)
config_name, mod_name = os.path.splitext(name)
mod_name = mod_name[1:]
if name != 'template' and extension == '.json' and mod_name == '':
out.append(name)
# out.extend(mod_instance())
if not len(out):
out = ['aas']
return out
def deep_get(d, keys, default=None):
"""
Get values in dictionary safely.
https://stackoverflow.com/questions/25833613/safe-method-to-get-value-of-nested-dictionary
Args:
d (dict):
keys (str, list): Such as `Scheduler.NextRun.value`
default: Default return if key not found.
Returns:
"""
if isinstance(keys, str):
keys = keys.split('.')
assert type(keys) is list
if d is None:
return default
if not keys:
return d
return deep_get(d.get(keys[0]), keys[1:], default)
def deep_set(d, keys, value):
"""
Set value into dictionary safely, imitating deep_get().
"""
if isinstance(keys, str):
keys = keys.split('.')
assert type(keys) is list
if not keys:
return value
if not isinstance(d, dict):
d = {}
d[keys[0]] = deep_set(d.get(keys[0], {}), keys[1:], value)
return d
def deep_pop(d, keys, default=None):
"""
Pop value from dictionary safely, imitating deep_get().
"""
if isinstance(keys, str):
keys = keys.split('.')
assert type(keys) is list
if not isinstance(d, dict):
return default
if not keys:
return default
elif len(keys) == 1:
return d.pop(keys[0], default)
return deep_pop(d.get(keys[0]), keys[1:], default)
def deep_default(d, keys, value):
"""
Set default value into dictionary safely, imitating deep_get().
Value is set only when the dict doesn't contain such keys.
"""
if isinstance(keys, str):
keys = keys.split('.')
assert type(keys) is list
if not keys:
if d:
return d
else:
return value
if not isinstance(d, dict):
d = {}
d[keys[0]] = deep_default(d.get(keys[0], {}), keys[1:], value)
return d
def deep_iter(data, depth=0, current_depth=1):
"""
Iter a dictionary safely.
Args:
data (dict):
depth (int): Maximum depth to iter
current_depth (int):
Returns:
list: Key path
Any:
"""
if isinstance(data, dict) \
and (depth and current_depth <= depth):
for key, value in data.items():
for child_path, child_value in deep_iter(value, depth=depth, current_depth=current_depth + 1):
yield [key] + child_path, child_value
else:
yield [], data
def parse_value(value, data):
"""
Convert a string to float, int, datetime, if possible.
Args:
value (str):
data (dict):
Returns:
"""
if 'option' in data:
if value not in data['option']:
return data['value']
if isinstance(value, str):
if value == '':
return None
if value == 'true' or value == 'True':
return True
if value == 'false' or value == 'False':
return False
if '.' in value:
try:
return float(value)
except ValueError:
pass
else:
try:
return int(value)
except ValueError:
pass
try:
return datetime.fromisoformat(value)
except ValueError:
pass
return value
def data_to_type(data, **kwargs):
"""
| Condition | Type |
| ------------------------------------ | -------- |
| `value` is bool | checkbox |
| Arg has `options` | select |
| Arg has `stored` | select |
| `Filter` is in name (in data['arg']) | textarea |
| Rest of the args | input |
Args:
data (dict):
kwargs: Any additional properties
Returns:
str:
"""
kwargs.update(data)
if isinstance(kwargs.get('value'), bool):
return 'checkbox'
elif 'option' in kwargs and kwargs['option']:
return 'select'
elif 'stored' in kwargs and kwargs['stored']:
return 'stored'
elif 'Filter' in kwargs['arg']:
return 'textarea'
else:
return 'input'
def data_to_path(data):
"""
Args:
data (dict):
Returns:
str: <func>.<group>.<arg>
"""
return '.'.join([data.get(attr, '') for attr in ['func', 'group', 'arg']])
def path_to_arg(path):
"""
Convert dictionary keys in .yaml files to argument names in config.
Args:
path (str): Such as `Scheduler.ServerUpdate`
Returns:
str: Such as `Scheduler_ServerUpdate`
"""
return path.replace('.', '_')
def dict_to_kv(dictionary, allow_none=True):
"""
Args:
dictionary: Such as `{'path': 'Scheduler.ServerUpdate', 'value': True}`
allow_none (bool):
Returns:
str: Such as `path='Scheduler.ServerUpdate', value=True`
"""
return ', '.join([f'{k}={repr(v)}' for k, v in dictionary.items() if allow_none or v is not None])
def server_timezone() -> timedelta:
return SERVER_TO_TIMEZONE.get(server_.server, SERVER_TO_TIMEZONE['JP-Official'])
def server_time_offset() -> timedelta:
"""
To convert local time to server time:
server_time = local_time + server_time_offset()
To convert server time to local time:
local_time = server_time - server_time_offset()
"""
return datetime.now(timezone.utc).astimezone().utcoffset() - server_timezone()
def random_normal_distribution_int(a, b, n=3):
"""
A non-numpy implementation of the `random_normal_distribution_int` in module.base.utils
Generate a normal distribution int within the interval.
Use the average value of several random numbers to
simulate normal distribution.
Args:
a (int): The minimum of the interval.
b (int): The maximum of the interval.
n (int): The amount of numbers in simulation. Default to 3.
Returns:
int
"""
if a < b:
output = sum([random.randint(a, b) for _ in range(n)]) / n
return int(round(output))
else:
return b
def ensure_time(second, n=3, precision=3):
"""Ensure to be time.
Args:
second (int, float, tuple): time, such as 10, (10, 30), '10, 30'
n (int): The amount of numbers in simulation. Default to 5.
precision (int): Decimals.
Returns:
float:
"""
if isinstance(second, tuple):
multiply = 10 ** precision
return random_normal_distribution_int(second[0] * multiply, second[1] * multiply, n) / multiply
elif isinstance(second, str):
if ',' in second:
lower, upper = second.replace(' ', '').split(',')
lower, upper = int(lower), int(upper)
return ensure_time((lower, upper), n=n, precision=precision)
if '-' in second:
lower, upper = second.replace(' ', '').split('-')
lower, upper = int(lower), int(upper)
return ensure_time((lower, upper), n=n, precision=precision)
else:
return int(second)
else:
return second
def get_os_next_reset():
"""
Get the first day of next month.
Returns:
datetime.datetime
"""
diff = server_time_offset()
server_now = datetime.now() - diff
server_reset = (server_now.replace(day=1) + timedelta(days=32)) \
.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
local_reset = server_reset + diff
return local_reset
def get_os_reset_remain():
"""
Returns:
int: number of days before next opsi reset
"""
from module.logger import logger
next_reset = get_os_next_reset()
now = datetime.now()
logger.attr('OpsiNextReset', next_reset)
remain = int((next_reset - now).total_seconds() // 86400)
logger.attr('ResetRemain', remain)
return remain
def get_server_next_update(daily_trigger):
"""
Args:
daily_trigger (list[str], str): [ "00:00", "12:00", "18:00",]
Returns:
datetime.datetime
"""
if isinstance(daily_trigger, str):
daily_trigger = daily_trigger.replace(' ', '').split(',')
diff = server_time_offset()
local_now = datetime.now()
trigger = []
for t in daily_trigger:
h, m = [int(x) for x in t.split(':')]
future = local_now.replace(hour=h, minute=m, second=0, microsecond=0) + diff
s = (future - local_now).total_seconds() % 86400
future = local_now + timedelta(seconds=s)
trigger.append(future)
update = sorted(trigger)[0]
return update
def get_server_last_update(daily_trigger):
"""
Args:
daily_trigger (list[str], str): [ "00:00", "12:00", "18:00",]
Returns:
datetime.datetime
"""
if isinstance(daily_trigger, str):
daily_trigger = daily_trigger.replace(' ', '').split(',')
diff = server_time_offset()
local_now = datetime.now()
trigger = []
for t in daily_trigger:
h, m = [int(x) for x in t.split(':')]
future = local_now.replace(hour=h, minute=m, second=0, microsecond=0) + diff
s = (future - local_now).total_seconds() % 86400 - 86400
future = local_now + timedelta(seconds=s)
trigger.append(future)
update = sorted(trigger)[-1]
return update
def get_server_last_monday_update(daily_trigger):
"""
Args:
daily_trigger (list[str], str): [ "00:00", "12:00", "18:00",]
Returns:
datetime.datetime
"""
update = get_server_next_update(daily_trigger)
diff = update.weekday()
update = update - timedelta(days=diff)
return update
def get_server_next_monday_update(daily_trigger):
"""
Args:
daily_trigger (list[str], str): [ "00:00", "12:00", "18:00",]
Returns:
datetime.datetime
"""
update = get_server_next_update(daily_trigger)
diff = (7 - update.weekday()) % 7
update = update + timedelta(days=diff)
return update
def nearest_future(future, interval=120):
"""
Get the neatest future time.
Return the last one if two things will finish within `interval`.
Args:
future (list[datetime.datetime]):
interval (int): Seconds
Returns:
datetime.datetime:
"""
future = [datetime.fromisoformat(f) if isinstance(f, str) else f for f in future]
future = sorted(future)
next_run = future[0]
for finish in future:
if finish - next_run < timedelta(seconds=interval):
next_run = finish
return next_run
def get_nearest_weekday_date(target):
"""
Get nearest weekday date starting
from current date
Args:
target (int): target weekday to
calculate
Returns:
datetime.datetime
"""
diff = server_time_offset()
server_now = datetime.now() - diff
days_ahead = target - server_now.weekday()
if days_ahead <= 0:
# Target day has already happened
days_ahead += 7
server_reset = (server_now + timedelta(days=days_ahead)) \
.replace(hour=0, minute=0, second=0, microsecond=0)
local_reset = server_reset + diff
return local_reset
def get_server_weekday():
"""
Returns:
int: The server's current day of the week
"""
diff = server_time_offset()
server_now = datetime.now() - diff
result = server_now.weekday()
return result
def random_id(length=32):
"""
Args:
length (int):
Returns:
str: Random azurstat id.
"""
return ''.join(random.sample(string.ascii_lowercase + string.digits, length))
def to_list(text, length=1):
"""
Args:
text (str): Such as `1, 2, 3`
length (int): If there's only one digit, return a list expanded to given length,
i.e. text='3', length=5, returns `[3, 3, 3, 3, 3]`
Returns:
list[int]:
"""
if text.isdigit():
return [int(text)] * length
out = [int(letter.strip()) for letter in text.split(',')]
return out
def type_to_str(typ):
"""
Convert any types or any objects to a string
Remove <> to prevent them from being parsed as HTML tags.
Args:
typ:
Returns:
str: Such as `int`, 'datetime.datetime'.
"""
if not isinstance(typ, type):
typ = type(typ).__name__
return str(typ)
if __name__ == '__main__':
get_os_reset_remain()
+33
View File
@@ -0,0 +1,33 @@
import os
from datetime import datetime
from module.config.utils import filepath_config, DEFAULT_TIME
from module.logger import logger
class ConfigWatcher:
config_name = 'alas'
start_mtime = DEFAULT_TIME
def start_watching(self) -> None:
self.start_mtime = self.get_mtime()
def get_mtime(self) -> datetime:
"""
Last modify time of the file
"""
timestamp = os.stat(filepath_config(self.config_name)).st_mtime
mtime = datetime.fromtimestamp(timestamp).replace(microsecond=0)
return mtime
def should_reload(self) -> bool:
"""
Returns:
bool: Whether the file has been modified and configs should reload
"""
mtime = self.get_mtime()
if mtime > self.start_mtime:
logger.info(f'Config "{self.config_name}" changed at {mtime}')
return True
else:
return False