From 791ae24f0998dae3e8e6ef12a68ac34426bc6166 Mon Sep 17 00:00:00 2001 From: neatguycoding <15627489+NeatGuyCoding@users.noreply.github.com> Date: Mon, 16 Jun 2025 13:53:34 +0800 Subject: [PATCH] bug: fix sequence number may be duplicated when multi-threads running the same workflow #21047 --- .../sqlalchemy_workflow_execution_repository.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/api/core/repositories/sqlalchemy_workflow_execution_repository.py b/api/core/repositories/sqlalchemy_workflow_execution_repository.py index e5ead9dc56..04b41ebb83 100644 --- a/api/core/repositories/sqlalchemy_workflow_execution_repository.py +++ b/api/core/repositories/sqlalchemy_workflow_execution_repository.py @@ -151,12 +151,17 @@ class SQLAlchemyWorkflowExecutionRepository(WorkflowExecutionRepository): existing = session.scalar(select(WorkflowRun).where(WorkflowRun.id == domain_model.id_)) if not existing: # For new records, get the next sequence number - stmt = select(func.max(WorkflowRun.sequence_number)).where( - WorkflowRun.app_id == self._app_id, - WorkflowRun.tenant_id == self._tenant_id, + # in case multiple executions are created concurrently, use for update + stmt = ( + select(func.coalesce(func.max(WorkflowRun.sequence_number), 0) + 1) + .where( + WorkflowRun.app_id == self._app_id, + WorkflowRun.tenant_id == self._tenant_id, + ) + .with_for_update() ) - max_sequence = session.scalar(stmt) - db_model.sequence_number = (max_sequence or 0) + 1 + next_seq = session.scalar(stmt) + db_model.sequence_number = next_seq else: # For updates, keep the existing sequence number db_model.sequence_number = existing.sequence_number