断点恢复机制允许在任务中断后,从最近的安全点继续执行,而不需要重新开始整个流程。
关键组件:
- 状态存储服务: 高可用数据库(如Redis集群、MongoDB)
- 检查点管理器: 决定何时创建检查点
- 状态恢复器: 负责从检查点重建执行环境
持久化内容:
任务元数据
执行图状态
中间结果数据
class CheckpointManager:
def __init__(self, storage_client, config):
self.storage = storage_client
self.checkpoint_frequency = config.get("checkpoint_frequency", "node") # node/time/hybrid
self.time_interval = config.get("time_interval", 300) # 秒
def should_checkpoint(self, workflow_context):
# 节点完成检查点
if self.checkpoint_frequency == "node" and workflow_context.node_completed:
return True
# 时间间隔检查点
if self.checkpoint_frequency in ["time", "hybrid"]:
time_since_last = time.time() - workflow_context.last_checkpoint_time
if time_since_last >= self.time_interval:
return True
# 关键节点强制检查点
if workflow_context.current_node.is_critical:
return True
return False
def create_checkpoint(self, workflow_context):
checkpoint_data = {
"task_id": workflow_context.task_id,
"timestamp": time.time(),
"completed_nodes": workflow_context.completed_nodes,
"current_node": workflow_context.current_node.id,
"node_states": self._serialize_node_states(workflow_context),
"global_context": workflow_context.global_context,
"resources": self._get_resource_references(workflow_context)
}
checkpoint_id = f"{workflow_context.task_id}:cp:{uuid.uuid4()}"
self.storage.save(checkpoint_id, checkpoint_data)
# 更新最新检查点ID
self.storage.save(f"{workflow_context.task_id}:latest_checkpoint", checkpoint_id)
return checkpoint_id
class WorkflowRecovery:
def __init__(self, storage_client, workflow_engine):
self.storage = storage_client
self.engine = workflow_engine
def recover_workflow(self, task_id):
# 获取最新检查点
checkpoint_id = self.storage.get(f"{task_id}:latest_checkpoint")
if not checkpoint_id:
raise RecoveryError("No checkpoint found for task")
checkpoint_data = self.storage.get(checkpoint_id)
# 重建工作流上下文
workflow_context = WorkflowContext(
task_id=checkpoint_data["task_id"],
completed_nodes=checkpoint_data["completed_nodes"],
global_context=checkpoint_data["global_context"]
)
# 恢复资源引用
self._restore_resources(workflow_context, checkpoint_data["resources"])
# 恢复节点状态
self._restore_node_states(workflow_context, checkpoint_data["node_states"])
# 从当前节点恢复执行
next_node_id = self._determine_next_node(checkpoint_data)
# 将恢复状态记录到审计日志
self._log_recovery_event(task_id, checkpoint_id)
# 重新启动工作流
return self.engine.resume_workflow(workflow_context, next_node_id)
替代方案自动启动机制允许在主要执行路径失败时,自动切换到备选方案,确保任务完成。
检测机制:
超时监控
错误模式识别
质量检测
# 任务节点故障处理策略示例
node_id: generate_power_analysis_report
primary_model: power_analysis_v2
fallback_strategies:
- condition: "error.type == 'TIMEOUT'"
actions:
- type: retry
max_attempts: 3
backoff: exponential
base_delay: 5
- condition: "error.type == 'RESOURCE_UNAVAILABLE'"
actions:
- type: switch_model
alternative: power_analysis_v1
- condition: "error.type == 'QUALITY_LOW' AND error.score < 0.6"
actions:
- type: switch_model
alternative: general_report_model
with_params:
domain_adaptation: "power_industry"
- condition: "error.type == 'FATAL'"
actions:
- type: switch_path
alternative_path: simplified_report_generation
- condition: "default"
actions:
- type: notify_human
role: "domain_expert"
wait_for_input: true
class FallbackManager:
def __init__(self, workflow_engine, model_registry, config):
self.workflow_engine = workflow_engine
self.model_registry = model_registry
self.strategies = self._load_strategies(config)
self.execution_log = ExecutionHistoryStore()
def handle_failure(self, workflow_context, node_id, error):
# 获取节点故障处理策略
node_strategies = self.strategies.get(node_id, self.strategies.get("default", []))
# 构建错误上下文
error_context = {
"type": error.error_type,
"message": error.message,
"timestamp": time.time(),
"node_id": node_id,
"attempt": workflow_context.get_attempt_count(node_id),
"score": error.quality_score if hasattr(error, "quality_score") else None
}
# 记录失败
self.execution_log.record_failure(workflow_context.task_id, error_context)
# 评估适用策略
applicable_strategy = self._find_applicable_strategy(node_strategies, error_context)
if not applicable_strategy:
return self._handle_unrecoverable_error(workflow_context, error_context)
# 执行策略动作
return self._execute_strategy_actions(applicable_strategy, workflow_context, error_context)
def _execute_strategy_actions(self, strategy, workflow_context, error_context):
results = []
for action in strategy["actions"]:
if action["type"] == "retry":
result = self._handle_retry(action, workflow_context, error_context)
elif action["type"] == "switch_model":
result = self._handle_model_switch(action, workflow_context, error_context)
elif action["type"] == "switch_path":
result = self._handle_path_switch(action, workflow_context, error_context)
elif action["type"] == "notify_human":
result = self._handle_human_intervention(action, workflow_context, error_context)
results.append(result)
# 记录应用的策略和结果
self.execution_log.record_recovery_attempt(
workflow_context.task_id,
strategy["condition"],
[a["type"] for a in strategy["actions"]],
results
)
return results[-1] # 返回最后一个动作的结果
def _handle_retry(self, action, workflow_context, error_context):
# 检查重试次数
attempt = error_context["attempt"]
if attempt >= action.get("max_attempts", 3):
return {"success": False, "reason": "Max retry attempts exceeded"}
# 计算延迟
delay = self._calculate_backoff_delay(action, attempt)
# 安排重试
self.workflow_engine.schedule_retry(
workflow_context,
error_context["node_id"],
delay=delay
)
return {
"success": True,
"action": "retry",
"delay": delay,
"attempt": attempt + 1
}
def _handle_model_switch(self, action, workflow_context, error_context):
# 获取替代模型
alternative_model = action["alternative"]
params = action.get("with_params", {})
# 验证替代模型可用性
if not self.model_registry.is_available(alternative_model):
return {"success": False, "reason": f"Alternative model {alternative_model} not available"}
# 切换模型
self.workflow_engine.update_node_model(
workflow_context,
error_context["node_id"],
alternative_model,
additional_params=params
)
# 重启节点
self.workflow_engine.restart_node(workflow_context, error_context["node_id"])
return {
"success": True,
"action": "model_switch",
"from": workflow_context.get_node_model(error_context["node_id"]),
"to": alternative_model
}
class AdaptiveFallbackSelector:
def __init__(self, model_registry, execution_history):
self.registry = model_registry
self.history = execution_history
self.success_rate_cache = {}
def select_best_alternative(self, task_type, failed_model, error_context):
# 获取该任务类型的所有可用模型
available_models = self.registry.get_models_for_task(task_type)
# 排除已失败的模型
alternatives = [m for m in available_models if m.id != failed_model.id]
if not alternatives:
return None
# 计算每个替代模型的成功率
model_scores = []
for model in alternatives:
# 获取历史成功率
success_rate = self._get_model_success_rate(model.id, task_type)
# 考虑错误类型的特定表现
error_type_performance = self._get_performance_for_error(
model.id, error_context["type"]
)
# 考虑资源消耗
resource_score = self._calculate_resource_efficiency(model)
# 计算综合得分
score = (0.5 * success_rate) + (0.3 * error_type_performance) + (0.2 * resource_score)
model_scores.append((model, score))
# 选择得分最高的模型
model_scores.sort(key=lambda x: x[1], reverse=True)
return model_scores[0][0] if model_scores else None
def _get_model_success_rate(self, model_id, task_type):
cache_key = f"{model_id}:{task_type}"
if cache_key in self.success_rate_cache:
return self.success_rate_cache[cache_key]
# 从历史记录中计算成功率
history = self.history.get_model_execution_history(model_id, task_type)
if not history or sum(history.values()) == 0:
return 0.5 # 默认值
success_rate = history.get("success", 0) / sum(history.values())
# 更新缓存
self.success_rate_cache[cache_key] = success_rate
return success_rate
在某些情况下,不需要从检查点完全恢复,只需恢复部分状态:
class IncrementalRecovery:
def recover_partial(self, workflow_context, failed_node_id):
# 只恢复失败节点的依赖关系
dependencies = workflow_context.get_node_dependencies(failed_node_id)
# 验证依赖节点的输出是否仍有效
invalid_deps = [dep for dep in dependencies if not self._validate_output(workflow_context, dep)]
if invalid_deps:
# 仅重新执行无效的依赖节点
for dep in invalid_deps:
self.engine.restart_node(workflow_context, dep)
# 然后恢复失败节点
return self.engine.restart_node(workflow_context, failed_node_id)
class FailurePredictor:
def __init__(self, model_registry, execution_history, monitoring_service):
self.registry = model_registry
self.history = execution_history
self.monitoring = monitoring_service
self.prediction_model = self._load_prediction_model()
def predict_failures(self, workflow_context):
risk_assessments = {}
for node in workflow_context.get_pending_nodes():
# 收集节点特征
features = self._collect_node_features(workflow_context, node)
# 预测失败风险
risk_score = self.prediction_model.predict(features)
if risk_score > 0.7: # 高风险阈值
# 推荐预防措施
preventive_actions = self._recommend_preventive_actions(
workflow_context, node, risk_score
)
risk_assessments[node.id] = {
"risk_score": risk_score,
"recommended_actions": preventive_actions
}
return risk_assessments
def _recommend_preventive_actions(self, workflow_context, node, risk_score):
actions = []
# 分析风险因素
risk_factors = self._analyze_risk_factors(workflow_context, node)
# 基于风险因素推荐措施
if "resource_contention" in risk_factors:
actions.append({
"type": "resource_allocation",
"action": "increase_priority",
"reason": "Potential resource contention detected"
})
if "model_reliability" in risk_factors:
# 提前准备备选模型
alternative = self._find_reliable_alternative(node.model_id)
if alternative:
actions.append({
"type": "prepare_alternative",
"model": alternative,
"reason": "Primary model has shown instability"
})
if "data_quality" in risk_factors:
actions.append({
"type": "data_validation",
"action": "enhance_preprocessing",
"reason": "Input data may cause processing issues"
})
return actions
class AdaptiveCheckpointStrategy:
def __init__(self, execution_history, system_monitor):
self.history = execution_history
self.monitor = system_monitor
self.node_failure_rates = {}
self.update_interval = 3600 # 每小时更新一次策略
self.last_update = 0
def get_checkpoint_strategy(self, workflow_id, node_id):
current_time = time.time()
# 定期更新节点失败率
if current_time - self.last_update > self.update_interval:
self._update_failure_statistics()
self.last_update = current_time
# 获取节点失败率
failure_rate = self.node_failure_rates.get(node_id, 0.05) # 默认5%
# 获取系统负载状况
system_load = self.monitor.get_system_load()
# 基于失败率和系统负载调整策略
if failure_rate > 0.20: # 高失败率节点
return {
"frequency": "always",
"detail_level": "full"
}
elif failure_rate > 0.10 or system_load > 0.85: # 中等失败率或高系统负载
return {
"frequency": "node",
"detail_level": "essential"
}
else: # 低失败率
return {
"frequency": "sparse", # 每N个节点一次
"interval": max(3, int(10 * (1 - failure_rate))),
"detail_level": "minimal"
}
这些机制结合起来,形成了一个强大的系统,可以应对各种故障情况,最大限度地确保任务的可靠完成,同时优化系统资源使用。