方案:任务管理器模式

创建一个中央任务管理器 (TaskManager),它负责:

  • 接收创建任务的请求
  • 统一管理和存储所有活跃的任务
  • 提供启动、停止、取消特定任务或所有任务的方法
  • 支持不同类型的定时任务(每日、延迟)

这样可以避免手动管理多个独立的 QTimer 对象,使代码更清晰、更易维护。

代码实现

from PyQt5.QtCore import QObject, QTimer, QTime, pyqtSignal
import re
import uuid

class ScheduledTask(QObject):
    """
    代表一个具体的定时任务(内部类,由TaskManager管理)。
    """
    task_triggered = pyqtSignal(str)

    def __init__(self, task_id, task_type, execution_time, task_function):
        super().__init__()
        self.id = task_id
        self.type = task_type  # 'daily', 'once_delay'
        self.execution_time = execution_time # 对于daily是QTime, 对于once_delay是总毫秒数
        self.task_function = task_function
        self.timer = QTimer(self)
        self.timer.timeout.connect(self._run_task)
        
        if self.type == 'daily':
            # 每日任务:检查时间的定时器
            self.timer.setInterval(60 * 1000) # 每分钟检查一次
        elif self.type == 'once_delay':
            # 单次延迟任务:一次性定时器
            self.timer.setSingleShot(True)
            self.timer.setInterval(self.execution_time)
        else:
            raise ValueError(f"Unknown task type: {self.type}")

    def start(self):
        if not self.timer.isActive():
            self.timer.start()
            print(f"Task [{self.id}] started (Type: {self.type}).")

    def stop(self):
        if self.timer.isActive():
            self.timer.stop()
            print(f"Task [{self.id}] stopped.")

    def _run_task(self):
        if self.type == 'daily':
            # 检查是否到达每日执行时间
            current_time = QTime.currentTime()
            if (current_time.hour() == self.execution_time.hour() and
                current_time.minute() == self.execution_time.minute() and
                current_time.second() == self.execution_time.second()):
                    
                print(f"Executing daily task [{self.id}] at {current_time.toString('hh:mm:ss')}")
                self._execute_user_task()
        elif self.type == 'once_delay':
            # 单次延迟任务,直接执行
            print(f"Executing once-delay task [{self.id}] after delay.")
            self._execute_user_task()
            # 单次任务执行完后自动停止
            self.timer.stop()

    def _execute_user_task(self):
        """执行用户定义的任务函数。"""
        try:
            result = self.task_function()
            success_msg = f"Task [{self.id}] executed. Result: {result}"
            self.task_triggered.emit(success_msg)
            print(success_msg)
        except Exception as e:
            error_msg = f"Task [{self.id}] failed: {e}"
            self.task_triggered.emit(error_msg)
            print(error_msg)

class TaskManager(QObject):
    """
    任务管理器,用于创建、管理和控制多个定时任务。
    """
    # 信号:当任何任务执行时发出
    any_task_executed = pyqtSignal(str)

    def __init__(self, parent=None):
        super().__init__(parent)
        self.active_tasks = {} # {task_id: ScheduledTask_instance}

    def create_daily_task(self, execution_time_hms, task_function, custom_task_id=None):
        """
        创建一个每日定时任务。

        Args:
            execution_time_hms (str): 执行时间,格式为 "HH:MM:SS"。
            task_function (callable): 要执行的任务函数。
            custom_task_id (str, optional): 自定义任务ID。如果不提供,则自动生成。

        Returns:
            str: 任务的唯一ID。
        """
        task_id = custom_task_id or f"daily_{uuid.uuid4().hex[:8]}"
        execution_qtime = QTime.fromString(execution_time_hms, "hh:mm:ss")
        
        if not execution_qtime.isValid():
            raise ValueError(f"Invalid daily time format: {execution_time_hms}. Expected HH:MM:SS")

        task_instance = ScheduledTask(task_id, 'daily', execution_qtime, task_function)
        task_instance.task_triggered.connect(self.any_task_executed)
        
        self.active_tasks[task_id] = task_instance
        print(f"Created daily task [{task_id}] for time {execution_time_hms}.")
        return task_id

    def create_once_delay_task(self, delay_time_hms, task_function, custom_task_id=None):
        """
        创建一个单次延迟任务。

        Args:
            delay_time_hms (str): 延迟时间,格式为 "HH:MM:SS" (例如 "00:05:30" 表示 5分30秒后执行)。
            task_function (callable): 要执行的任务函数。
            custom_task_id (str, optional): 自定义任务ID。如果不提供,则自动生成。

        Returns:
            str: 任务的唯一ID。
        """
        task_id = custom_task_id or f"delay_{uuid.uuid4().hex[:8]}"
        
        # 解析延迟时间字符串
        total_seconds = self._parse_hhmmss_to_seconds(delay_time_hms)
        if total_seconds < 0:
            raise ValueError(f"Invalid delay time format: {delay_time_hms}. Expected HH:MM:SS")

        delay_ms = total_seconds * 1000
        task_instance = ScheduledTask(task_id, 'once_delay', delay_ms, task_function)
        task_instance.task_triggered.connect(self.any_task_executed)
        
        self.active_tasks[task_id] = task_instance
        print(f"Created once-delay task [{task_id}] with delay {delay_time_hms} ({delay_ms} ms).")
        return task_id

    def _parse_hhmmss_to_seconds(self, time_str):
        """将 HH:MM:SS 格式的字符串解析为总秒数。"""
        pattern = r'^(\d{2}):(\d{2}):(\d{2})$'
        match = re.match(pattern, time_str)
        if not match:
            return -1
        hours, minutes, seconds = map(int, match.groups())
        return hours * 3600 + minutes * 60 + seconds

    def start_task(self, task_id):
        """启动指定ID的任务。"""
        if task_id in self.active_tasks:
            self.active_tasks[task_id].start()
        else:
            print(f"Warning: Cannot start task [{task_id}], it does not exist.")

    def stop_task(self, task_id):
        """停止指定ID的任务(对于daily任务有效)。"""
        if task_id in self.active_tasks:
            self.active_tasks[task_id].stop()
        else:
            print(f"Warning: Cannot stop task [{task_id}], it does not exist.")

    def cancel_task(self, task_id):
        """取消(停止)指定ID的任务。对于once_delay任务,如果已启动则无法取消,但对于daily任务有效。"""
        self.stop_task(task_id) # 对于once_delay,stop就是cancel
        if task_id in self.active_tasks:
             # 可以选择性地从管理器中移除已完成或被取消的任务
             # del self.active_tasks[task_id]
             pass # 暂时不删除,方便状态查询

    def stop_all_tasks(self):
        """停止所有活跃的任务。"""
        for task in self.active_tasks.values():
            task.stop()
        print("All tasks have been stopped.")

    def get_task_status(self, task_id):
        """获取指定任务的状态。"""
        if task_id in self.active_tasks:
            task = self.active_tasks[task_id]
            return {
                'id': task.id,
                'type': task.type,
                'is_active': task.timer.isActive(),
                'execution_info': task.execution_time.toString('hh:mm:ss') if task.type == 'daily' else f"{task.execution_time} ms"
            }
        else:
            return None

    def list_all_task_ids(self):
        """列出所有任务的ID。"""
        return list(self.active_tasks.keys())


# --- 使用示例 ---
def example_task_1():
    print("Task 1 is running!")
    return "Result from Task 1"

def example_task_2():
    print("Task 2 is running!")
    return "Result from Task 2"

def example_delay_task():
    print("Delay Task is running!")
    return "Result from Delay Task"

# --- 创建任务管理器 ---
task_manager = TaskManager()

# 连接信号,监听所有任务的执行情况
task_manager.any_task_executed.connect(lambda msg: print(f"*** Manager Log: {msg} ***"))

# --- 创建任务 ---
task1_id = task_manager.create_daily_task("17:50:00", example_task_1, custom_task_id="my_daily_job_1")
task2_id = task_manager.create_daily_task("17:51:00", example_task_2, custom_task_id="my_daily_job_2")
delay_task_id = task_manager.create_once_delay_task("00:00:10", example_delay_task, custom_task_id="my_delay_job_1") # 10秒后执行

# --- 启动任务 ---
task_manager.start_task(task1_id)
task_manager.start_task(task2_id)
task_manager.start_task(delay_task_id) # 这个会10秒后执行一次

print(f"All created task IDs: {task_manager.list_all_task_ids()}")
print(f"Status of {task1_id}: {task_manager.get_task_status(task1_id)}")

# --- 在某个时刻取消一个任务 ---
# QTimer.singleShot(5000, lambda: task_manager.cancel_task(task1_id)) # 5秒后取消task1

# --- 在程序结束前停止所有任务 ---
# import atexit
# atexit.register(task_manager.stop_all_tasks)

如何在你的数据获取线程中使用

# 在你的 QueryWorker_2 或主线程的槽函数中
def _on_data_fetched(self, fetched_data_list):
    """
    假设 fetched_data_list 是一个包含多个任务配置的列表。
    例如: 
    [
        {'type': 'daily', 'time': '08:00:00', 'func': func1},
        {'type': 'once_delay', 'delay': '00:05:00', 'func': func2},
        ...
    ]
    """
    for task_config in fetched_data_list:
        task_type = task_config.get('type')
        task_func = task_config.get('func')
        
        if task_type == 'daily':
            time_hms = task_config.get('time')
            task_id = self.task_manager.create_daily_task(time_hms, task_func)
            self.task_manager.start_task(task_id)
        elif task_type == 'once_delay':
            delay_hms = task_config.get('delay')
            task_id = self.task_manager.create_once_delay_task(delay_hms, task_func)
            self.task_manager.start_task(task_id)
        else:
            print(f"Unknown task type in config: {task_config}")

# --- 在 MainWindow 或其他地方初始化 ---
self.task_manager = TaskManager()
# 连接信号以显示日志
self.task_manager.any_task_executed.connect(self.update_log_display)

def update_log_display(self, message):
    self.log_display.append(message)

优势

  • 集中管理: 所有任务由一个对象统一管理,易于启动、停止、查询和取消。
  • 灵活性: 支持多种任务类型(每日、延迟)。
  • 可扩展性: 很容易添加新的任务类型或管理功能。
  • 清晰的生命周期: 任务的创建、启动、停止都有明确的接口。
  • 错误隔离: 单个任务失败不会影响其他任务。

这个 TaskManager 方案非常适合需要动态生成和管理多个定时任务的场景。