|
|
|
|
@ -137,7 +137,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
|
|
|
|
logger.debug(f"Queued async save for workflow node execution: {execution.id}")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Failed to queue save operation for node execution {execution.id}: {e}")
|
|
|
|
|
logger.exception(f"Failed to queue save operation for node execution {execution.id}")
|
|
|
|
|
# In case of Celery failure, we could implement a fallback to synchronous save
|
|
|
|
|
# For now, we'll re-raise the exception
|
|
|
|
|
raise
|
|
|
|
|
@ -186,7 +186,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Failed to get workflow node executions for run {workflow_run_id}: {e}")
|
|
|
|
|
logger.exception(f"Failed to get workflow node executions for run {workflow_run_id}")
|
|
|
|
|
# Could implement fallback to direct database access here
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
@ -215,7 +215,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
|
|
|
|
logger.debug(f"Waiting for pending save to complete before read: {execution_id}")
|
|
|
|
|
task_result.get(timeout=self._async_timeout)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Failed to wait for pending save {execution_id}: {e}")
|
|
|
|
|
logger.exception(f"Failed to wait for pending save {execution_id}")
|
|
|
|
|
|
|
|
|
|
# Clean up completed tasks from both caches
|
|
|
|
|
if task_result and task_result.ready():
|
|
|
|
|
@ -243,7 +243,7 @@ class CeleryWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository):
|
|
|
|
|
del self._pending_saves[execution_id]
|
|
|
|
|
self._workflow_execution_mapping.pop(execution_id, None)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Failed to wait for save operation {execution_id}: {e}")
|
|
|
|
|
logger.exception(f"Failed to wait for save operation {execution_id}")
|
|
|
|
|
|
|
|
|
|
def get_pending_save_count(self) -> int:
|
|
|
|
|
"""
|
|
|
|
|
|