diff --git a/assets/workflow/auto-dev-loop/workflow_engine/runner.py b/assets/workflow/auto-dev-loop/workflow_engine/runner.py index f6025a0..ddd854e 100644 --- a/assets/workflow/auto-dev-loop/workflow_engine/runner.py +++ b/assets/workflow/auto-dev-loop/workflow_engine/runner.py @@ -23,6 +23,7 @@ STEP_MAP = { } STEP_FLOW = ["step1", "step2", "step3", "step4", "step5"] +MAX_RETRY_COUNT = 3 def load_state() -> dict: @@ -80,6 +81,12 @@ def run_step(step: str, state: dict, input_data: dict = None): "output": f"[MOCK] {step} completed", "timestamp": datetime.now().isoformat() } + # 可选:通过环境变量模拟 step4 验证失败,用于本地验证重试/熔断逻辑 + # 默认不生效,不影响正常使用 + if step == "step4": + mock_verify = os.environ.get("VIBE_WORKFLOW_MOCK_VERIFY_STATUS") + if mock_verify in ("success", "failed"): + result["verify"] = {"status": mock_verify} # ======================== # 保存产物 @@ -117,79 +124,51 @@ def start_workflow(input_file: str = None): print(f"[START] 新工作流 run_id={run_id}") - # 依次执行 step1 -> step5 - for step in STEP_FLOW: + # 显式状态机:支持 step5 失败回跳 step2,并可多次重试(带熔断) + step_idx = 0 + while step_idx < len(STEP_FLOW): + step = STEP_FLOW[step_idx] result = run_step(step, state, input_data) - + if not result: state["status"] = "error" save_state(state) return - + # step4 后检查验证结果 if step == "step4": verify_status = result.get("verify", {}).get("status", "success") state["verify"] = {"status": verify_status} - - # step5 决定下一步 + + # step5 决定下一步(回跳/完成) if step == "step5": - # 模拟 step5 的决策逻辑 if state.get("verify", {}).get("status") == "failed": - state["target_step"] = "step2" # 失败回跳 - state["status"] = "retry" - print(f"[RETRY] 验证失败,返回 step2 重规划") - else: - state["target_step"] = "done" - state["status"] = "completed" - print(f"[COMPLETE] 工作流完成") - - save_state(state) - - # 如果需要回跳,递归处理(带熔断) - if state.get("target_step") == "step2": - retry_count = state.get("retry_count", 0) + 1 - if retry_count > 3: - print(f"[FATAL] 超过最大重试次数") - state["status"] = "fatal_error" - save_state(state) - return - state["retry_count"] = retry_count - print(f"[RETRY {retry_count}/3] 从 step2 重新执行") - - # 递归调用自身,复用完整的验证逻辑 - state["target_step"] = None # 清除回跳标记 - save_state(state) - - for retry_step in STEP_FLOW[1:]: # step2 onwards - result = run_step(retry_step, state, input_data) - if not result: - state["status"] = "error" + next_retry_count = state.get("retry_count", 0) + 1 + if next_retry_count > MAX_RETRY_COUNT: + print(f"[FATAL] 超过最大重试次数") + state["retry_count"] = next_retry_count + state["target_step"] = "step2" + state["status"] = "fatal_error" save_state(state) return - - # step4 后检查验证结果 - if retry_step == "step4": - verify_status = result.get("verify", {}).get("status", "success") - state["verify"] = {"status": verify_status} - - # step5 决定下一步 - if retry_step == "step5": - if state.get("verify", {}).get("status") == "failed": - state["target_step"] = "step2" - state["status"] = "retry" - print(f"[RETRY] 验证仍失败,继续重试") - break # 跳出内层循环,触发外层重试检查 - else: - state["target_step"] = "done" - state["status"] = "completed" - print(f"[COMPLETE] 重试成功,工作流完成") - + + state["retry_count"] = next_retry_count + state["target_step"] = "step2" + state["status"] = "retry" save_state(state) - - # 如果 step5 设置了新的回跳目标,继续递归 - if state.get("target_step") == "step2": - continue # 继续外层循环的下一次迭代(实际会被 break 跳出) - break + + print(f"[RETRY {next_retry_count}/{MAX_RETRY_COUNT}] 验证失败,返回 step2 重规划") + step_idx = STEP_FLOW.index("step2") + continue + + state["target_step"] = "done" + state["status"] = "completed" + save_state(state) + print(f"[COMPLETE] 工作流完成") + return + + save_state(state) + step_idx += 1 def main():