|
|
|
|
@ -72,7 +72,7 @@ from services.entities.knowledge_entities.rag_pipeline_entities import (
|
|
|
|
|
)
|
|
|
|
|
from services.errors.app import WorkflowHashNotEqualError
|
|
|
|
|
from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory
|
|
|
|
|
from services.workflow_draft_variable_service import DraftVarLoader
|
|
|
|
|
from services.workflow_draft_variable_service import DraftVarLoader, DraftVariableSaver
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
@ -389,7 +389,7 @@ class RagPipelineService:
|
|
|
|
|
|
|
|
|
|
def run_draft_workflow_node(
|
|
|
|
|
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account
|
|
|
|
|
) -> WorkflowNodeExecution:
|
|
|
|
|
) -> WorkflowNodeExecutionModel:
|
|
|
|
|
"""
|
|
|
|
|
Run draft workflow node
|
|
|
|
|
"""
|
|
|
|
|
@ -400,6 +400,13 @@ class RagPipelineService:
|
|
|
|
|
|
|
|
|
|
# run draft workflow node
|
|
|
|
|
start_at = time.perf_counter()
|
|
|
|
|
node_config = draft_workflow.get_node_config_by_id(node_id)
|
|
|
|
|
|
|
|
|
|
eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config)
|
|
|
|
|
if eclosing_node_type_and_id:
|
|
|
|
|
_, enclosing_node_id = eclosing_node_type_and_id
|
|
|
|
|
else:
|
|
|
|
|
enclosing_node_id = None
|
|
|
|
|
|
|
|
|
|
workflow_node_execution = self._handle_node_run_result(
|
|
|
|
|
getter=lambda: WorkflowEntry.single_step_run(
|
|
|
|
|
@ -426,9 +433,30 @@ class RagPipelineService:
|
|
|
|
|
)
|
|
|
|
|
workflow_node_execution.workflow_id = draft_workflow.id
|
|
|
|
|
|
|
|
|
|
db.session.add(workflow_node_execution)
|
|
|
|
|
db.session.commit()
|
|
|
|
|
# Create repository and save the node execution
|
|
|
|
|
repository = SQLAlchemyWorkflowNodeExecutionRepository(
|
|
|
|
|
session_factory=db.engine,
|
|
|
|
|
user=account,
|
|
|
|
|
app_id=pipeline.id,
|
|
|
|
|
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
|
|
|
|
|
)
|
|
|
|
|
repository.save(workflow_node_execution)
|
|
|
|
|
|
|
|
|
|
# Convert node_execution to WorkflowNodeExecution after save
|
|
|
|
|
workflow_node_execution = repository.to_db_model(workflow_node_execution)
|
|
|
|
|
|
|
|
|
|
with Session(bind=db.engine) as session, session.begin():
|
|
|
|
|
draft_var_saver = DraftVariableSaver(
|
|
|
|
|
session=session,
|
|
|
|
|
app_id=pipeline.id,
|
|
|
|
|
node_id=workflow_node_execution.node_id,
|
|
|
|
|
node_type=NodeType(workflow_node_execution.node_type),
|
|
|
|
|
enclosing_node_id=enclosing_node_id,
|
|
|
|
|
node_execution_id=workflow_node_execution.id,
|
|
|
|
|
)
|
|
|
|
|
draft_var_saver.save(process_data=workflow_node_execution.process_data,
|
|
|
|
|
outputs=workflow_node_execution.outputs)
|
|
|
|
|
session.commit()
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
def run_datasource_workflow_node(
|
|
|
|
|
|