diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 7669bf74bb..82057a045f 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -77,7 +77,6 @@ class WorkflowResponseConverter: data=WorkflowStartStreamResponse.Data( id=workflow_execution.id, workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, inputs=workflow_execution.inputs, created_at=int(workflow_execution.started_at.timestamp()), ), @@ -126,7 +125,6 @@ class WorkflowResponseConverter: data=WorkflowFinishStreamResponse.Data( id=workflow_execution.id, workflow_id=workflow_execution.workflow_id, - sequence_number=workflow_execution.sequence_number, status=workflow_execution.status, outputs=workflow_execution.outputs, error=workflow_execution.error_message, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 9b2bfcbf61..3f03196a5f 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -189,7 +189,6 @@ class WorkflowStartStreamResponse(StreamResponse): id: str workflow_id: str - sequence_number: int inputs: Mapping[str, Any] created_at: int @@ -210,7 +209,6 @@ class WorkflowFinishStreamResponse(StreamResponse): id: str workflow_id: str - sequence_number: int status: str outputs: Optional[Mapping[str, Any]] = None error: Optional[str] = None diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index c1a71b45d0..4ea71696de 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -106,7 +106,6 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): return WorkflowExecution( id=db_model.id, workflow_id=db_model.workflow_id, - sequence_number=db_model.sequence_number, type=WorkflowType(db_model.type), workflow_version=db_model.version, graph=graph, @@ -146,7 +145,22 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): db_model.app_id = self._app_id db_model.workflow_id = domain_model.workflow_id db_model.triggered_from = self._triggered_from - db_model.sequence_number = domain_model.sequence_number + + # Check if this is a new record + with self._session_factory() as session: + existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id)) + if not existing: + # For new records, get the next sequence number + stmt = select(WorkflowRun.sequence_number).where( + WorkflowRun.app_id == self._app_id, + WorkflowRun.tenant_id == self._tenant_id, + ) + max_sequence = session.scalar(stmt.order_by(WorkflowRun.sequence_number.desc())) + db_model.sequence_number = (max_sequence or 0) + 1 + else: + # For updates, keep the existing sequence number + db_model.sequence_number = existing.sequence_number + db_model.type = domain_model.type db_model.version = domain_model.workflow_version db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None diff --git a/api/core/workflow/entities/workflow_execution_entities.py b/api/core/workflow/entities/workflow_execution_entities.py index 200d4697b5..80c3f65205 100644 --- a/api/core/workflow/entities/workflow_execution_entities.py +++ b/api/core/workflow/entities/workflow_execution_entities.py @@ -39,8 +39,6 @@ class WorkflowExecution(BaseModel): id: str = Field(...) workflow_id: str = Field(...) workflow_version: str = Field(...) - sequence_number: int = Field(...) - type: WorkflowType = Field(...) graph: Mapping[str, Any] = Field(...) @@ -71,7 +69,6 @@ class WorkflowExecution(BaseModel): *, id: str, workflow_id: str, - sequence_number: int, type: WorkflowType, workflow_version: str, graph: Mapping[str, Any], @@ -81,7 +78,6 @@ class WorkflowExecution(BaseModel): return WorkflowExecution( id=id, workflow_id=workflow_id, - sequence_number=sequence_number, type=type, workflow_version=workflow_version, graph=graph, diff --git a/api/core/workflow/workflow_cycle_manager.py b/api/core/workflow/workflow_cycle_manager.py index 24e23af093..c41770e2eb 100644 --- a/api/core/workflow/workflow_cycle_manager.py +++ b/api/core/workflow/workflow_cycle_manager.py @@ -3,7 +3,7 @@ from datetime import UTC, datetime from typing import Any, Optional, Union from uuid import uuid4 -from sqlalchemy import func, select +from sqlalchemy import select from sqlalchemy.orm import Session from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity @@ -31,7 +31,6 @@ from core.workflow.repository.workflow_node_execution_repository import Workflow from core.workflow.workflow_entry import WorkflowEntry from models import ( Workflow, - WorkflowRun, WorkflowRunStatus, ) @@ -61,13 +60,6 @@ class WorkflowCycleManager: if not workflow: raise ValueError(f"Workflow not found: {workflow_id}") - max_sequence_stmt = select(func.max(WorkflowRun.sequence_number)).where( - WorkflowRun.tenant_id == workflow.tenant_id, - WorkflowRun.app_id == workflow.app_id, - ) - max_sequence = session.scalar(max_sequence_stmt) or 0 - new_sequence_number = max_sequence + 1 - inputs = {**self._application_generate_entity.inputs} for key, value in (self._workflow_system_variables or {}).items(): if key.value == "conversation": @@ -83,7 +75,6 @@ class WorkflowCycleManager: execution = WorkflowExecution.new( id=execution_id, workflow_id=workflow.id, - sequence_number=new_sequence_number, type=WorkflowType(workflow.type), workflow_version=workflow.version, graph=workflow.graph_dict, diff --git a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py index 9c955fc086..3e17fe0e4e 100644 --- a/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py +++ b/api/tests/unit_tests/core/workflow/test_workflow_cycle_manager.py @@ -184,7 +184,6 @@ def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_wo # Verify the result assert workflow_execution.workflow_id == real_workflow.id - assert workflow_execution.sequence_number == 6 # max_sequence + 1 # Verify the workflow_execution_repository.save was called workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution) @@ -198,7 +197,6 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu id="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, @@ -233,7 +231,6 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut id="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, @@ -272,7 +269,6 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu id="test-workflow-execution-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, @@ -326,7 +322,6 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work id="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"}, @@ -399,7 +394,6 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl id="test-workflow-run-id", workflow_id="test-workflow-id", workflow_version="1.0", - sequence_number=1, type=WorkflowType.CHAT, graph={"nodes": [], "edges": []}, inputs={"query": "test query"},