智能体任务断点恢复与替代方案自动启动机制

一、断点恢复机制设计

断点恢复机制允许在任务中断后,从最近的安全点继续执行,而不需要重新开始整个流程。

1. 状态持久化设计

关键组件:

- 状态存储服务: 高可用数据库(如Redis集群、MongoDB)
- 检查点管理器: 决定何时创建检查点
- 状态恢复器: 负责从检查点重建执行环境

持久化内容:

  1. 任务元数据

    • 任务ID、类型、优先级
    • 开始时间、当前状态
    • 所有者和访问控制信息
  2. 执行图状态

    • 已完成节点列表
    • 当前执行节点
    • 待执行节点队列
  3. 中间结果数据

    • 每个节点的输出结果
    • 模型生成的临时资源(图片、文档等)
    • 上下文参数和配置

2. 检查点策略

class CheckpointManager:
    def __init__(self, storage_client, config):
        self.storage = storage_client
        self.checkpoint_frequency = config.get("checkpoint_frequency", "node")  # node/time/hybrid
        self.time_interval = config.get("time_interval", 300)  # 秒
        
    def should_checkpoint(self, workflow_context):
        # 节点完成检查点
        if self.checkpoint_frequency == "node" and workflow_context.node_completed:
            return True
            
        # 时间间隔检查点
        if self.checkpoint_frequency in ["time", "hybrid"]:
            time_since_last = time.time() - workflow_context.last_checkpoint_time
            if time_since_last >= self.time_interval:
                return True
                
        # 关键节点强制检查点
        if workflow_context.current_node.is_critical:
            return True
            
        return False
        
    def create_checkpoint(self, workflow_context):
        checkpoint_data = {
            "task_id": workflow_context.task_id,
            "timestamp": time.time(),
            "completed_nodes": workflow_context.completed_nodes,
            "current_node": workflow_context.current_node.id,
            "node_states": self._serialize_node_states(workflow_context),
            "global_context": workflow_context.global_context,
            "resources": self._get_resource_references(workflow_context)
        }
        
        checkpoint_id = f"{workflow_context.task_id}:cp:{uuid.uuid4()}"
        self.storage.save(checkpoint_id, checkpoint_data)
        
        # 更新最新检查点ID
        self.storage.save(f"{workflow_context.task_id}:latest_checkpoint", checkpoint_id)
        
        return checkpoint_id

3. 恢复执行流程

class WorkflowRecovery:
    def __init__(self, storage_client, workflow_engine):
        self.storage = storage_client
        self.engine = workflow_engine
        
    def recover_workflow(self, task_id):
        # 获取最新检查点
        checkpoint_id = self.storage.get(f"{task_id}:latest_checkpoint")
        if not checkpoint_id:
            raise RecoveryError("No checkpoint found for task")
            
        checkpoint_data = self.storage.get(checkpoint_id)
        
        # 重建工作流上下文
        workflow_context = WorkflowContext(
            task_id=checkpoint_data["task_id"],
            completed_nodes=checkpoint_data["completed_nodes"],
            global_context=checkpoint_data["global_context"]
        )
        
        # 恢复资源引用
        self._restore_resources(workflow_context, checkpoint_data["resources"])
        
        # 恢复节点状态
        self._restore_node_states(workflow_context, checkpoint_data["node_states"])
        
        # 从当前节点恢复执行
        next_node_id = self._determine_next_node(checkpoint_data)
        
        # 将恢复状态记录到审计日志
        self._log_recovery_event(task_id, checkpoint_id)
        
        # 重新启动工作流
        return self.engine.resume_workflow(workflow_context, next_node_id)

二、替代方案自动启动机制

替代方案自动启动机制允许在主要执行路径失败时,自动切换到备选方案,确保任务完成。

1. 故障检测与分类

检测机制:

  1. 超时监控

    • 每个节点设置最大执行时间
    • 全局任务设置总超时阈值
  2. 错误模式识别

    • 模型错误分类(如资源不足、API限制、模型崩溃)
    • 数据处理异常(格式错误、数据缺失)
    • 系统级错误(网络中断、服务不可用)
  3. 质量检测

    • 输出内容质量评估
    • 一致性与完整性检查

2. 策略定义

# 任务节点故障处理策略示例
node_id: generate_power_analysis_report
primary_model: power_analysis_v2
fallback_strategies:
  - condition: "error.type == 'TIMEOUT'"
    actions:
      - type: retry
        max_attempts: 3
        backoff: exponential
        base_delay: 5
        
  - condition: "error.type == 'RESOURCE_UNAVAILABLE'"
    actions:
      - type: switch_model
        alternative: power_analysis_v1
        
  - condition: "error.type == 'QUALITY_LOW' AND error.score < 0.6"
    actions:
      - type: switch_model
        alternative: general_report_model
        with_params:
          domain_adaptation: "power_industry"
          
  - condition: "error.type == 'FATAL'"
    actions:
      - type: switch_path
        alternative_path: simplified_report_generation
        
  - condition: "default"
    actions:
      - type: notify_human
        role: "domain_expert"
        wait_for_input: true

3. 实现方案

class FallbackManager:
    def __init__(self, workflow_engine, model_registry, config):
        self.workflow_engine = workflow_engine
        self.model_registry = model_registry
        self.strategies = self._load_strategies(config)
        self.execution_log = ExecutionHistoryStore()
        
    def handle_failure(self, workflow_context, node_id, error):
        # 获取节点故障处理策略
        node_strategies = self.strategies.get(node_id, self.strategies.get("default", []))
        
        # 构建错误上下文
        error_context = {
            "type": error.error_type,
            "message": error.message,
            "timestamp": time.time(),
            "node_id": node_id,
            "attempt": workflow_context.get_attempt_count(node_id),
            "score": error.quality_score if hasattr(error, "quality_score") else None
        }
        
        # 记录失败
        self.execution_log.record_failure(workflow_context.task_id, error_context)
        
        # 评估适用策略
        applicable_strategy = self._find_applicable_strategy(node_strategies, error_context)
        if not applicable_strategy:
            return self._handle_unrecoverable_error(workflow_context, error_context)
            
        # 执行策略动作
        return self._execute_strategy_actions(applicable_strategy, workflow_context, error_context)
        
    def _execute_strategy_actions(self, strategy, workflow_context, error_context):
        results = []
        
        for action in strategy["actions"]:
            if action["type"] == "retry":
                result = self._handle_retry(action, workflow_context, error_context)
                
            elif action["type"] == "switch_model":
                result = self._handle_model_switch(action, workflow_context, error_context)
                
            elif action["type"] == "switch_path":
                result = self._handle_path_switch(action, workflow_context, error_context)
                
            elif action["type"] == "notify_human":
                result = self._handle_human_intervention(action, workflow_context, error_context)
                
            results.append(result)
            
        # 记录应用的策略和结果
        self.execution_log.record_recovery_attempt(
            workflow_context.task_id, 
            strategy["condition"], 
            [a["type"] for a in strategy["actions"]],
            results
        )
        
        return results[-1]  # 返回最后一个动作的结果
        
    def _handle_retry(self, action, workflow_context, error_context):
        # 检查重试次数
        attempt = error_context["attempt"]
        if attempt >= action.get("max_attempts", 3):
            return {"success": False, "reason": "Max retry attempts exceeded"}
            
        # 计算延迟
        delay = self._calculate_backoff_delay(action, attempt)
        
        # 安排重试
        self.workflow_engine.schedule_retry(
            workflow_context,
            error_context["node_id"],
            delay=delay
        )
        
        return {
            "success": True,
            "action": "retry",
            "delay": delay,
            "attempt": attempt + 1
        }
        
    def _handle_model_switch(self, action, workflow_context, error_context):
        # 获取替代模型
        alternative_model = action["alternative"]
        params = action.get("with_params", {})
        
        # 验证替代模型可用性
        if not self.model_registry.is_available(alternative_model):
            return {"success": False, "reason": f"Alternative model {alternative_model} not available"}
            
        # 切换模型
        self.workflow_engine.update_node_model(
            workflow_context,
            error_context["node_id"],
            alternative_model,
            additional_params=params
        )
        
        # 重启节点
        self.workflow_engine.restart_node(workflow_context, error_context["node_id"])
        
        return {
            "success": True,
            "action": "model_switch",
            "from": workflow_context.get_node_model(error_context["node_id"]),
            "to": alternative_model
        }

4. 智能替代方案选择

class AdaptiveFallbackSelector:
    def __init__(self, model_registry, execution_history):
        self.registry = model_registry
        self.history = execution_history
        self.success_rate_cache = {}
        
    def select_best_alternative(self, task_type, failed_model, error_context):
        # 获取该任务类型的所有可用模型
        available_models = self.registry.get_models_for_task(task_type)
        
        # 排除已失败的模型
        alternatives = [m for m in available_models if m.id != failed_model.id]
        if not alternatives:
            return None
            
        # 计算每个替代模型的成功率
        model_scores = []
        for model in alternatives:
            # 获取历史成功率
            success_rate = self._get_model_success_rate(model.id, task_type)
            
            # 考虑错误类型的特定表现
            error_type_performance = self._get_performance_for_error(
                model.id, error_context["type"]
            )
            
            # 考虑资源消耗
            resource_score = self._calculate_resource_efficiency(model)
            
            # 计算综合得分
            score = (0.5 * success_rate) + (0.3 * error_type_performance) + (0.2 * resource_score)
            
            model_scores.append((model, score))
            
        # 选择得分最高的模型
        model_scores.sort(key=lambda x: x[1], reverse=True)
        
        return model_scores[0][0] if model_scores else None
        
    def _get_model_success_rate(self, model_id, task_type):
        cache_key = f"{model_id}:{task_type}"
        
        if cache_key in self.success_rate_cache:
            return self.success_rate_cache[cache_key]
            
        # 从历史记录中计算成功率
        history = self.history.get_model_execution_history(model_id, task_type)
        
        if not history or sum(history.values()) == 0:
            return 0.5  # 默认值
            
        success_rate = history.get("success", 0) / sum(history.values())
        
        # 更新缓存
        self.success_rate_cache[cache_key] = success_rate
        
        return success_rate

三、高级功能与优化

1. 增量恢复

在某些情况下,不需要从检查点完全恢复,只需恢复部分状态:

class IncrementalRecovery:
    def recover_partial(self, workflow_context, failed_node_id):
        # 只恢复失败节点的依赖关系
        dependencies = workflow_context.get_node_dependencies(failed_node_id)
        
        # 验证依赖节点的输出是否仍有效
        invalid_deps = [dep for dep in dependencies if not self._validate_output(workflow_context, dep)]
        
        if invalid_deps:
            # 仅重新执行无效的依赖节点
            for dep in invalid_deps:
                self.engine.restart_node(workflow_context, dep)
                
        # 然后恢复失败节点
        return self.engine.restart_node(workflow_context, failed_node_id)

2. 故障预测与预防

class FailurePredictor:
    def __init__(self, model_registry, execution_history, monitoring_service):
        self.registry = model_registry
        self.history = execution_history
        self.monitoring = monitoring_service
        self.prediction_model = self._load_prediction_model()
        
    def predict_failures(self, workflow_context):
        risk_assessments = {}
        
        for node in workflow_context.get_pending_nodes():
            # 收集节点特征
            features = self._collect_node_features(workflow_context, node)
            
            # 预测失败风险
            risk_score = self.prediction_model.predict(features)
            
            if risk_score > 0.7:  # 高风险阈值
                # 推荐预防措施
                preventive_actions = self._recommend_preventive_actions(
                    workflow_context, node, risk_score
                )
                
                risk_assessments[node.id] = {
                    "risk_score": risk_score,
                    "recommended_actions": preventive_actions
                }
                
        return risk_assessments
        
    def _recommend_preventive_actions(self, workflow_context, node, risk_score):
        actions = []
        
        # 分析风险因素
        risk_factors = self._analyze_risk_factors(workflow_context, node)
        
        # 基于风险因素推荐措施
        if "resource_contention" in risk_factors:
            actions.append({
                "type": "resource_allocation",
                "action": "increase_priority",
                "reason": "Potential resource contention detected"
            })
            
        if "model_reliability" in risk_factors:
            # 提前准备备选模型
            alternative = self._find_reliable_alternative(node.model_id)
            if alternative:
                actions.append({
                    "type": "prepare_alternative",
                    "model": alternative,
                    "reason": "Primary model has shown instability"
                })
                
        if "data_quality" in risk_factors:
            actions.append({
                "type": "data_validation",
                "action": "enhance_preprocessing",
                "reason": "Input data may cause processing issues"
            })
            
        return actions

3. 自适应检查点策略

class AdaptiveCheckpointStrategy:
    def __init__(self, execution_history, system_monitor):
        self.history = execution_history
        self.monitor = system_monitor
        self.node_failure_rates = {}
        self.update_interval = 3600  # 每小时更新一次策略
        self.last_update = 0
        
    def get_checkpoint_strategy(self, workflow_id, node_id):
        current_time = time.time()
        
        # 定期更新节点失败率
        if current_time - self.last_update > self.update_interval:
            self._update_failure_statistics()
            self.last_update = current_time
            
        # 获取节点失败率
        failure_rate = self.node_failure_rates.get(node_id, 0.05)  # 默认5%
        
        # 获取系统负载状况
        system_load = self.monitor.get_system_load()
        
        # 基于失败率和系统负载调整策略
        if failure_rate > 0.20:  # 高失败率节点
            return {
                "frequency": "always",
                "detail_level": "full"
            }
        elif failure_rate > 0.10 or system_load > 0.85:  # 中等失败率或高系统负载
            return {
                "frequency": "node",
                "detail_level": "essential"
            }
        else:  # 低失败率
            return {
                "frequency": "sparse",  # 每N个节点一次
                "interval": max(3, int(10 * (1 - failure_rate))),
                "detail_level": "minimal"
            }

这些机制结合起来,形成了一个强大的系统,可以应对各种故障情况,最大限度地确保任务的可靠完成,同时优化系统资源使用。