方案:任务管理器模式
创建一个中央任务管理器 (TaskManager),它负责:
- 接收创建任务的请求
- 统一管理和存储所有活跃的任务
- 提供启动、停止、取消特定任务或所有任务的方法
- 支持不同类型的定时任务(每日、延迟)
这样可以避免手动管理多个独立的 QTimer 对象,使代码更清晰、更易维护。
代码实现
from PyQt5.QtCore import QObject, QTimer, QTime, pyqtSignal
import re
import uuid
class ScheduledTask(QObject):
"""
代表一个具体的定时任务(内部类,由TaskManager管理)。
"""
task_triggered = pyqtSignal(str)
def __init__(self, task_id, task_type, execution_time, task_function):
super().__init__()
self.id = task_id
self.type = task_type # 'daily', 'once_delay'
self.execution_time = execution_time # 对于daily是QTime, 对于once_delay是总毫秒数
self.task_function = task_function
self.timer = QTimer(self)
self.timer.timeout.connect(self._run_task)
if self.type == 'daily':
# 每日任务:检查时间的定时器
self.timer.setInterval(60 * 1000) # 每分钟检查一次
elif self.type == 'once_delay':
# 单次延迟任务:一次性定时器
self.timer.setSingleShot(True)
self.timer.setInterval(self.execution_time)
else:
raise ValueError(f"Unknown task type: {self.type}")
def start(self):
if not self.timer.isActive():
self.timer.start()
print(f"Task [{self.id}] started (Type: {self.type}).")
def stop(self):
if self.timer.isActive():
self.timer.stop()
print(f"Task [{self.id}] stopped.")
def _run_task(self):
if self.type == 'daily':
# 检查是否到达每日执行时间
current_time = QTime.currentTime()
if (current_time.hour() == self.execution_time.hour() and
current_time.minute() == self.execution_time.minute() and
current_time.second() == self.execution_time.second()):
print(f"Executing daily task [{self.id}] at {current_time.toString('hh:mm:ss')}")
self._execute_user_task()
elif self.type == 'once_delay':
# 单次延迟任务,直接执行
print(f"Executing once-delay task [{self.id}] after delay.")
self._execute_user_task()
# 单次任务执行完后自动停止
self.timer.stop()
def _execute_user_task(self):
"""执行用户定义的任务函数。"""
try:
result = self.task_function()
success_msg = f"Task [{self.id}] executed. Result: {result}"
self.task_triggered.emit(success_msg)
print(success_msg)
except Exception as e:
error_msg = f"Task [{self.id}] failed: {e}"
self.task_triggered.emit(error_msg)
print(error_msg)
class TaskManager(QObject):
"""
任务管理器,用于创建、管理和控制多个定时任务。
"""
# 信号:当任何任务执行时发出
any_task_executed = pyqtSignal(str)
def __init__(self, parent=None):
super().__init__(parent)
self.active_tasks = {} # {task_id: ScheduledTask_instance}
def create_daily_task(self, execution_time_hms, task_function, custom_task_id=None):
"""
创建一个每日定时任务。
Args:
execution_time_hms (str): 执行时间,格式为 "HH:MM:SS"。
task_function (callable): 要执行的任务函数。
custom_task_id (str, optional): 自定义任务ID。如果不提供,则自动生成。
Returns:
str: 任务的唯一ID。
"""
task_id = custom_task_id or f"daily_{uuid.uuid4().hex[:8]}"
execution_qtime = QTime.fromString(execution_time_hms, "hh:mm:ss")
if not execution_qtime.isValid():
raise ValueError(f"Invalid daily time format: {execution_time_hms}. Expected HH:MM:SS")
task_instance = ScheduledTask(task_id, 'daily', execution_qtime, task_function)
task_instance.task_triggered.connect(self.any_task_executed)
self.active_tasks[task_id] = task_instance
print(f"Created daily task [{task_id}] for time {execution_time_hms}.")
return task_id
def create_once_delay_task(self, delay_time_hms, task_function, custom_task_id=None):
"""
创建一个单次延迟任务。
Args:
delay_time_hms (str): 延迟时间,格式为 "HH:MM:SS" (例如 "00:05:30" 表示 5分30秒后执行)。
task_function (callable): 要执行的任务函数。
custom_task_id (str, optional): 自定义任务ID。如果不提供,则自动生成。
Returns:
str: 任务的唯一ID。
"""
task_id = custom_task_id or f"delay_{uuid.uuid4().hex[:8]}"
# 解析延迟时间字符串
total_seconds = self._parse_hhmmss_to_seconds(delay_time_hms)
if total_seconds < 0:
raise ValueError(f"Invalid delay time format: {delay_time_hms}. Expected HH:MM:SS")
delay_ms = total_seconds * 1000
task_instance = ScheduledTask(task_id, 'once_delay', delay_ms, task_function)
task_instance.task_triggered.connect(self.any_task_executed)
self.active_tasks[task_id] = task_instance
print(f"Created once-delay task [{task_id}] with delay {delay_time_hms} ({delay_ms} ms).")
return task_id
def _parse_hhmmss_to_seconds(self, time_str):
"""将 HH:MM:SS 格式的字符串解析为总秒数。"""
pattern = r'^(\d{2}):(\d{2}):(\d{2})$'
match = re.match(pattern, time_str)
if not match:
return -1
hours, minutes, seconds = map(int, match.groups())
return hours * 3600 + minutes * 60 + seconds
def start_task(self, task_id):
"""启动指定ID的任务。"""
if task_id in self.active_tasks:
self.active_tasks[task_id].start()
else:
print(f"Warning: Cannot start task [{task_id}], it does not exist.")
def stop_task(self, task_id):
"""停止指定ID的任务(对于daily任务有效)。"""
if task_id in self.active_tasks:
self.active_tasks[task_id].stop()
else:
print(f"Warning: Cannot stop task [{task_id}], it does not exist.")
def cancel_task(self, task_id):
"""取消(停止)指定ID的任务。对于once_delay任务,如果已启动则无法取消,但对于daily任务有效。"""
self.stop_task(task_id) # 对于once_delay,stop就是cancel
if task_id in self.active_tasks:
# 可以选择性地从管理器中移除已完成或被取消的任务
# del self.active_tasks[task_id]
pass # 暂时不删除,方便状态查询
def stop_all_tasks(self):
"""停止所有活跃的任务。"""
for task in self.active_tasks.values():
task.stop()
print("All tasks have been stopped.")
def get_task_status(self, task_id):
"""获取指定任务的状态。"""
if task_id in self.active_tasks:
task = self.active_tasks[task_id]
return {
'id': task.id,
'type': task.type,
'is_active': task.timer.isActive(),
'execution_info': task.execution_time.toString('hh:mm:ss') if task.type == 'daily' else f"{task.execution_time} ms"
}
else:
return None
def list_all_task_ids(self):
"""列出所有任务的ID。"""
return list(self.active_tasks.keys())
# --- 使用示例 ---
def example_task_1():
print("Task 1 is running!")
return "Result from Task 1"
def example_task_2():
print("Task 2 is running!")
return "Result from Task 2"
def example_delay_task():
print("Delay Task is running!")
return "Result from Delay Task"
# --- 创建任务管理器 ---
task_manager = TaskManager()
# 连接信号,监听所有任务的执行情况
task_manager.any_task_executed.connect(lambda msg: print(f"*** Manager Log: {msg} ***"))
# --- 创建任务 ---
task1_id = task_manager.create_daily_task("17:50:00", example_task_1, custom_task_id="my_daily_job_1")
task2_id = task_manager.create_daily_task("17:51:00", example_task_2, custom_task_id="my_daily_job_2")
delay_task_id = task_manager.create_once_delay_task("00:00:10", example_delay_task, custom_task_id="my_delay_job_1") # 10秒后执行
# --- 启动任务 ---
task_manager.start_task(task1_id)
task_manager.start_task(task2_id)
task_manager.start_task(delay_task_id) # 这个会10秒后执行一次
print(f"All created task IDs: {task_manager.list_all_task_ids()}")
print(f"Status of {task1_id}: {task_manager.get_task_status(task1_id)}")
# --- 在某个时刻取消一个任务 ---
# QTimer.singleShot(5000, lambda: task_manager.cancel_task(task1_id)) # 5秒后取消task1
# --- 在程序结束前停止所有任务 ---
# import atexit
# atexit.register(task_manager.stop_all_tasks)如何在你的数据获取线程中使用
# 在你的 QueryWorker_2 或主线程的槽函数中
def _on_data_fetched(self, fetched_data_list):
"""
假设 fetched_data_list 是一个包含多个任务配置的列表。
例如:
[
{'type': 'daily', 'time': '08:00:00', 'func': func1},
{'type': 'once_delay', 'delay': '00:05:00', 'func': func2},
...
]
"""
for task_config in fetched_data_list:
task_type = task_config.get('type')
task_func = task_config.get('func')
if task_type == 'daily':
time_hms = task_config.get('time')
task_id = self.task_manager.create_daily_task(time_hms, task_func)
self.task_manager.start_task(task_id)
elif task_type == 'once_delay':
delay_hms = task_config.get('delay')
task_id = self.task_manager.create_once_delay_task(delay_hms, task_func)
self.task_manager.start_task(task_id)
else:
print(f"Unknown task type in config: {task_config}")
# --- 在 MainWindow 或其他地方初始化 ---
self.task_manager = TaskManager()
# 连接信号以显示日志
self.task_manager.any_task_executed.connect(self.update_log_display)
def update_log_display(self, message):
self.log_display.append(message)优势
- 集中管理: 所有任务由一个对象统一管理,易于启动、停止、查询和取消。
- 灵活性: 支持多种任务类型(每日、延迟)。
- 可扩展性: 很容易添加新的任务类型或管理功能。
- 清晰的生命周期: 任务的创建、启动、停止都有明确的接口。
- 错误隔离: 单个任务失败不会影响其他任务。
这个 TaskManager 方案非常适合需要动态生成和管理多个定时任务的场景。
评论已关闭