refactor(workflow): Remove sequence_number from WorkflowExecution

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/20455/head
-LAN- 12 months ago
parent cd0a05f114
commit 2a39ca7d70
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -77,7 +77,6 @@ class WorkflowResponseConverter:
data=WorkflowStartStreamResponse.Data( data=WorkflowStartStreamResponse.Data(
id=workflow_execution.id, id=workflow_execution.id,
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
sequence_number=workflow_execution.sequence_number,
inputs=workflow_execution.inputs, inputs=workflow_execution.inputs,
created_at=int(workflow_execution.started_at.timestamp()), created_at=int(workflow_execution.started_at.timestamp()),
), ),
@ -126,7 +125,6 @@ class WorkflowResponseConverter:
data=WorkflowFinishStreamResponse.Data( data=WorkflowFinishStreamResponse.Data(
id=workflow_execution.id, id=workflow_execution.id,
workflow_id=workflow_execution.workflow_id, workflow_id=workflow_execution.workflow_id,
sequence_number=workflow_execution.sequence_number,
status=workflow_execution.status, status=workflow_execution.status,
outputs=workflow_execution.outputs, outputs=workflow_execution.outputs,
error=workflow_execution.error_message, error=workflow_execution.error_message,

@ -189,7 +189,6 @@ class WorkflowStartStreamResponse(StreamResponse):
id: str id: str
workflow_id: str workflow_id: str
sequence_number: int
inputs: Mapping[str, Any] inputs: Mapping[str, Any]
created_at: int created_at: int
@ -210,7 +209,6 @@ class WorkflowFinishStreamResponse(StreamResponse):
id: str id: str
workflow_id: str workflow_id: str
sequence_number: int
status: str status: str
outputs: Optional[Mapping[str, Any]] = None outputs: Optional[Mapping[str, Any]] = None
error: Optional[str] = None error: Optional[str] = None

@ -106,7 +106,6 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
return WorkflowExecution( return WorkflowExecution(
id=db_model.id, id=db_model.id,
workflow_id=db_model.workflow_id, workflow_id=db_model.workflow_id,
sequence_number=db_model.sequence_number,
type=WorkflowType(db_model.type), type=WorkflowType(db_model.type),
workflow_version=db_model.version, workflow_version=db_model.version,
graph=graph, graph=graph,
@ -146,7 +145,22 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository):
db_model.app_id = self._app_id db_model.app_id = self._app_id
db_model.workflow_id = domain_model.workflow_id db_model.workflow_id = domain_model.workflow_id
db_model.triggered_from = self._triggered_from db_model.triggered_from = self._triggered_from
db_model.sequence_number = domain_model.sequence_number
# Check if this is a new record
with self._session_factory() as session:
existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id))
if not existing:
# For new records, get the next sequence number
stmt = select(WorkflowRun.sequence_number).where(
WorkflowRun.app_id == self._app_id,
WorkflowRun.tenant_id == self._tenant_id,
)
max_sequence = session.scalar(stmt.order_by(WorkflowRun.sequence_number.desc()))
db_model.sequence_number = (max_sequence or 0) + 1
else:
# For updates, keep the existing sequence number
db_model.sequence_number = existing.sequence_number
db_model.type = domain_model.type db_model.type = domain_model.type
db_model.version = domain_model.workflow_version db_model.version = domain_model.workflow_version
db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None db_model.graph = json.dumps(domain_model.graph) if domain_model.graph else None

@ -39,8 +39,6 @@ class WorkflowExecution(BaseModel):
id: str = Field(...) id: str = Field(...)
workflow_id: str = Field(...) workflow_id: str = Field(...)
workflow_version: str = Field(...) workflow_version: str = Field(...)
sequence_number: int = Field(...)
type: WorkflowType = Field(...) type: WorkflowType = Field(...)
graph: Mapping[str, Any] = Field(...) graph: Mapping[str, Any] = Field(...)
@ -71,7 +69,6 @@ class WorkflowExecution(BaseModel):
*, *,
id: str, id: str,
workflow_id: str, workflow_id: str,
sequence_number: int,
type: WorkflowType, type: WorkflowType,
workflow_version: str, workflow_version: str,
graph: Mapping[str, Any], graph: Mapping[str, Any],
@ -81,7 +78,6 @@ class WorkflowExecution(BaseModel):
return WorkflowExecution( return WorkflowExecution(
id=id, id=id,
workflow_id=workflow_id, workflow_id=workflow_id,
sequence_number=sequence_number,
type=type, type=type,
workflow_version=workflow_version, workflow_version=workflow_version,
graph=graph, graph=graph,

@ -3,7 +3,7 @@ from datetime import UTC, datetime
from typing import Any, Optional, Union from typing import Any, Optional, Union
from uuid import uuid4 from uuid import uuid4
from sqlalchemy import func, select from sqlalchemy import select
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, WorkflowAppGenerateEntity
@ -31,7 +31,6 @@ from core.workflow.repository.workflow_node_execution_repository import Workflow
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from models import ( from models import (
Workflow, Workflow,
WorkflowRun,
WorkflowRunStatus, WorkflowRunStatus,
) )
@ -61,13 +60,6 @@ class WorkflowCycleManager:
if not workflow: if not workflow:
raise ValueError(f"Workflow not found: {workflow_id}") raise ValueError(f"Workflow not found: {workflow_id}")
max_sequence_stmt = select(func.max(WorkflowRun.sequence_number)).where(
WorkflowRun.tenant_id == workflow.tenant_id,
WorkflowRun.app_id == workflow.app_id,
)
max_sequence = session.scalar(max_sequence_stmt) or 0
new_sequence_number = max_sequence + 1
inputs = {**self._application_generate_entity.inputs} inputs = {**self._application_generate_entity.inputs}
for key, value in (self._workflow_system_variables or {}).items(): for key, value in (self._workflow_system_variables or {}).items():
if key.value == "conversation": if key.value == "conversation":
@ -83,7 +75,6 @@ class WorkflowCycleManager:
execution = WorkflowExecution.new( execution = WorkflowExecution.new(
id=execution_id, id=execution_id,
workflow_id=workflow.id, workflow_id=workflow.id,
sequence_number=new_sequence_number,
type=WorkflowType(workflow.type), type=WorkflowType(workflow.type),
workflow_version=workflow.version, workflow_version=workflow.version,
graph=workflow.graph_dict, graph=workflow.graph_dict,

@ -184,7 +184,6 @@ def test_handle_workflow_run_start(workflow_cycle_manager, mock_session, real_wo
# Verify the result # Verify the result
assert workflow_execution.workflow_id == real_workflow.id assert workflow_execution.workflow_id == real_workflow.id
assert workflow_execution.sequence_number == 6 # max_sequence + 1
# Verify the workflow_execution_repository.save was called # Verify the workflow_execution_repository.save was called
workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution) workflow_cycle_manager._workflow_execution_repository.save.assert_called_once_with(workflow_execution)
@ -198,7 +197,6 @@ def test_handle_workflow_run_success(workflow_cycle_manager, mock_workflow_execu
id="test-workflow-run-id", id="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
sequence_number=1,
type=WorkflowType.CHAT, type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
@ -233,7 +231,6 @@ def test_handle_workflow_run_failed(workflow_cycle_manager, mock_workflow_execut
id="test-workflow-run-id", id="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
sequence_number=1,
type=WorkflowType.CHAT, type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
@ -272,7 +269,6 @@ def test_handle_node_execution_start(workflow_cycle_manager, mock_workflow_execu
id="test-workflow-execution-id", id="test-workflow-execution-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
sequence_number=1,
type=WorkflowType.CHAT, type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
@ -326,7 +322,6 @@ def test_get_workflow_execution_or_raise_error(workflow_cycle_manager, mock_work
id="test-workflow-run-id", id="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
sequence_number=1,
type=WorkflowType.CHAT, type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},
@ -399,7 +394,6 @@ def test_handle_workflow_run_partial_success(workflow_cycle_manager, mock_workfl
id="test-workflow-run-id", id="test-workflow-run-id",
workflow_id="test-workflow-id", workflow_id="test-workflow-id",
workflow_version="1.0", workflow_version="1.0",
sequence_number=1,
type=WorkflowType.CHAT, type=WorkflowType.CHAT,
graph={"nodes": [], "edges": []}, graph={"nodes": [], "edges": []},
inputs={"query": "test query"}, inputs={"query": "test query"},

Loading…
Cancel
Save