refactor(workflow_cycle_manager): Rename all handle_workflow_xxx methods to public

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20067/head
-LAN- 1 year ago
parent 4b57591e37
commit 6ac40eb4c6
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -297,7 +297,7 @@ class AdvancedChatAppGenerateTaskPipeline:
with Session(db.engine, expire_on_commit=False) as session:
# init workflow run
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start(
session=session,
workflow_id=self._workflow_id,
)
@ -320,7 +320,7 @@ class AdvancedChatAppGenerateTaskPipeline:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id, event=event
)
node_retry_resp = self._workflow_cycle_manager.workflow_node_retry_to_stream_response(
@ -336,7 +336,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event
)
@ -356,7 +356,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
event=event
)
@ -376,7 +376,7 @@ class AdvancedChatAppGenerateTaskPipeline:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event
)
@ -486,7 +486,7 @@ class AdvancedChatAppGenerateTaskPipeline:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -512,7 +512,7 @@ class AdvancedChatAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -538,7 +538,7 @@ class AdvancedChatAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -564,7 +564,7 @@ class AdvancedChatAppGenerateTaskPipeline:
elif isinstance(event, QueueStopEvent):
if self._workflow_run_id and graph_runtime_state:
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,

@ -263,7 +263,7 @@ class WorkflowAppGenerateTaskPipeline:
with Session(db.engine, expire_on_commit=False) as session:
# init workflow run
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start(
session=session,
workflow_id=self._workflow_id,
)
@ -281,7 +281,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id,
event=event,
)
@ -298,7 +298,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event
)
node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response(
@ -310,7 +310,7 @@ class WorkflowAppGenerateTaskPipeline:
if node_start_response:
yield node_start_response
elif isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
event=event
)
node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response(
@ -328,7 +328,7 @@ class WorkflowAppGenerateTaskPipeline:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event,
)
node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response(
@ -445,7 +445,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -472,7 +472,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -500,7 +500,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,

@ -263,7 +263,7 @@ class WorkflowAppGenerateTaskPipeline:
with Session(db.engine, expire_on_commit=False) as session:
# init workflow run
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_start(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_start(
session=session,
workflow_id=self._workflow_id,
)
@ -281,7 +281,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_retried(
workflow_execution_id=self._workflow_run_id,
event=event,
)
@ -298,7 +298,7 @@ class WorkflowAppGenerateTaskPipeline:
if not self._workflow_run_id:
raise ValueError("workflow run not initialized.")
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
workflow_node_execution = self._workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=self._workflow_run_id, event=event
)
node_start_response = self._workflow_cycle_manager.workflow_node_start_to_stream_response(
@ -310,7 +310,7 @@ class WorkflowAppGenerateTaskPipeline:
if node_start_response:
yield node_start_response
elif isinstance(event, QueueNodeSucceededEvent):
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
event=event
)
node_success_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response(
@ -328,7 +328,7 @@ class WorkflowAppGenerateTaskPipeline:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event,
)
node_failed_response = self._workflow_cycle_manager.workflow_node_finish_to_stream_response(
@ -445,7 +445,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -472,7 +472,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_partial_success(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,
@ -500,7 +500,7 @@ class WorkflowAppGenerateTaskPipeline:
raise ValueError("graph runtime state not initialized.")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager._handle_workflow_run_failed(
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id=self._workflow_run_id,
total_tokens=graph_runtime_state.total_tokens,
total_steps=graph_runtime_state.node_run_steps,

@ -85,7 +85,7 @@ class WorkflowCycleManager:
self._workflow_execution_repository = workflow_execution_repository
self._workflow_node_execution_repository = workflow_node_execution_repository
def _handle_workflow_run_start(
def handle_workflow_run_start(
self,
*,
session: Session,
@ -130,7 +130,7 @@ class WorkflowCycleManager:
return execution
def _handle_workflow_run_success(
def handle_workflow_run_success(
self,
*,
workflow_run_id: str,
@ -162,7 +162,7 @@ class WorkflowCycleManager:
return workflow_execution
def _handle_workflow_run_partial_success(
def handle_workflow_run_partial_success(
self,
*,
workflow_run_id: str,
@ -195,7 +195,7 @@ class WorkflowCycleManager:
return execution
def _handle_workflow_run_failed(
def handle_workflow_run_failed(
self,
*,
workflow_run_id: str,
@ -246,7 +246,7 @@ class WorkflowCycleManager:
return execution
def _handle_node_execution_start(
def handle_node_execution_start(
self,
*,
workflow_execution_id: str,
@ -282,7 +282,7 @@ class WorkflowCycleManager:
return domain_execution
def _handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> NodeExecution:
def handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> NodeExecution:
# Get the domain model from repository
domain_execution = self._workflow_node_execution_repository.get_by_node_execution_id(event.node_execution_id)
if not domain_execution:
@ -315,7 +315,7 @@ class WorkflowCycleManager:
return domain_execution
def _handle_workflow_node_execution_failed(
def handle_workflow_node_execution_failed(
self,
*,
event: QueueNodeFailedEvent
@ -365,7 +365,7 @@ class WorkflowCycleManager:
return domain_execution
def _handle_workflow_node_execution_retried(
def handle_workflow_node_execution_retried(
self, *, workflow_execution_id: str, event: QueueNodeRetryEvent
) -> NodeExecution:
workflow_execution = self._get_workflow_execution_or_raise_error(workflow_execution_id)

@ -172,12 +172,12 @@ def test_init(
def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_workflow):
"""Test _handle_workflow_run_start method"""
"""Test handle_workflow_run_start method"""
# Mock session.scalar to return the workflow and max sequence
mock_session.scalar.side_effect = [real_workflow, 5]
# Call the method
workflow_execution = workflow_cycle_manager._handle_workflow_run_start(
workflow_execution = workflow_cycle_manager.handle_workflow_run_start(
session=mock_session,
workflow_id="test-workflow-id",
)
@ -191,7 +191,7 @@ def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_wo
def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execution_repository):
"""Test _handle_workflow_run_success method"""
"""Test handle_workflow_run_success method"""
# Create a real WorkflowExecution
workflow_execution = WorkflowExecution(
@ -209,7 +209,7 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu
workflow_cycle_manager._workflow_execution_repository.get.return_value = workflow_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_run_success(
result = workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id="test-workflow-run-id",
total_tokens=100,
total_steps=5,
@ -226,7 +226,7 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu
def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execution_repository):
"""Test _handle_workflow_run_failed method"""
"""Test handle_workflow_run_failed method"""
# Create a real WorkflowExecution
workflow_execution = WorkflowExecution(
@ -247,7 +247,7 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut
workflow_cycle_manager._workflow_node_execution_repository.get_running_executions.return_value = []
# Call the method
result = workflow_cycle_manager._handle_workflow_run_failed(
result = workflow_cycle_manager.handle_workflow_run_failed(
workflow_run_id="test-workflow-run-id",
total_tokens=50,
total_steps=3,
@ -265,7 +265,7 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut
def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execution_repository):
"""Test _handle_node_execution_start method"""
"""Test handle_node_execution_start method"""
# Create a real WorkflowExecution
workflow_execution = WorkflowExecution(
@ -300,7 +300,7 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu
event.in_loop_id = "test-loop-id"
# Call the method
result = workflow_cycle_manager._handle_node_execution_start(
result = workflow_cycle_manager.handle_node_execution_start(
workflow_execution_id=workflow_execution.id,
event=event,
)
@ -351,7 +351,7 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work
def test_handle_workflow_node_execution_success(workflow_cycle_manager):
"""Test _handle_workflow_node_execution_success method"""
"""Test handle_workflow_node_execution_success method"""
# Create a mock event
event = MagicMock(spec=QueueNodeSucceededEvent)
event.node_execution_id = "test-node-execution-id"
@ -379,7 +379,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager):
workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_node_execution_success(
result = workflow_cycle_manager.handle_workflow_node_execution_success(
event=event,
)
@ -392,7 +392,7 @@ def test_handle_workflow_node_execution_success(workflow_cycle_manager):
def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workflow_execution_repository):
"""Test _handle_workflow_run_partial_success method"""
"""Test handle_workflow_run_partial_success method"""
# Create a real WorkflowExecution
workflow_execution = WorkflowExecution(
@ -410,7 +410,7 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl
workflow_cycle_manager._workflow_execution_repository.get.return_value = workflow_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_run_partial_success(
result = workflow_cycle_manager.handle_workflow_run_partial_success(
workflow_run_id="test-workflow-run-id",
total_tokens=75,
total_steps=4,
@ -429,7 +429,7 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl
def test_handle_workflow_node_execution_failed(workflow_cycle_manager):
"""Test _handle_workflow_node_execution_failed method"""
"""Test handle_workflow_node_execution_failed method"""
# Create a mock event
event = MagicMock(spec=QueueNodeFailedEvent)
event.node_execution_id = "test-node-execution-id"
@ -458,7 +458,7 @@ def test_handle_workflow_node_execution_failed(workflow_cycle_manager):
workflow_cycle_manager._workflow_node_execution_repository.get_by_node_execution_id.return_value = node_execution
# Call the method
result = workflow_cycle_manager._handle_workflow_node_execution_failed(
result = workflow_cycle_manager.handle_workflow_node_execution_failed(
event=event,
)

Loading…
Cancel
Save