|
|
|
|
@ -373,9 +373,6 @@ class RagPipelineService:
|
|
|
|
|
tenant_id=pipeline.tenant_id,
|
|
|
|
|
node_id=node_id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
workflow_node_execution.app_id = pipeline.id
|
|
|
|
|
workflow_node_execution.created_by = account.id
|
|
|
|
|
workflow_node_execution.workflow_id = draft_workflow.id
|
|
|
|
|
|
|
|
|
|
db.session.add(workflow_node_execution)
|
|
|
|
|
@ -409,8 +406,6 @@ class RagPipelineService:
|
|
|
|
|
node_id=node_id,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
workflow_node_execution.app_id = pipeline.id
|
|
|
|
|
workflow_node_execution.created_by = account.id
|
|
|
|
|
workflow_node_execution.workflow_id = published_workflow.id
|
|
|
|
|
|
|
|
|
|
db.session.add(workflow_node_execution)
|
|
|
|
|
@ -568,18 +563,17 @@ class RagPipelineService:
|
|
|
|
|
node_run_result = None
|
|
|
|
|
error = e.error
|
|
|
|
|
|
|
|
|
|
workflow_node_execution = WorkflowNodeExecution()
|
|
|
|
|
workflow_node_execution.id = str(uuid4())
|
|
|
|
|
workflow_node_execution.tenant_id = tenant_id
|
|
|
|
|
workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
|
|
|
|
|
workflow_node_execution.index = 1
|
|
|
|
|
workflow_node_execution.node_id = node_id
|
|
|
|
|
workflow_node_execution.node_type = node_instance.node_type
|
|
|
|
|
workflow_node_execution.title = node_instance.node_data.title
|
|
|
|
|
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
|
|
|
|
|
workflow_node_execution.created_by_role = CreatorUserRole.ACCOUNT.value
|
|
|
|
|
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
|
|
|
|
|
workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
|
|
|
|
|
workflow_node_execution = WorkflowNodeExecution(
|
|
|
|
|
id=str(uuid4()),
|
|
|
|
|
workflow_id=node_instance.workflow_id,
|
|
|
|
|
index=1,
|
|
|
|
|
node_id=node_id,
|
|
|
|
|
node_type=node_instance.node_type,
|
|
|
|
|
title=node_instance.node_data.title,
|
|
|
|
|
elapsed_time=time.perf_counter() - start_at,
|
|
|
|
|
finished_at=datetime.now(UTC).replace(tzinfo=None),
|
|
|
|
|
created_at=datetime.now(UTC).replace(tzinfo=None),
|
|
|
|
|
)
|
|
|
|
|
if run_succeeded and node_run_result:
|
|
|
|
|
# create workflow node execution
|
|
|
|
|
inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
|
|
|
|
|
@ -590,20 +584,18 @@ class RagPipelineService:
|
|
|
|
|
)
|
|
|
|
|
outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
|
|
|
|
|
|
|
|
|
|
workflow_node_execution.inputs = json.dumps(inputs)
|
|
|
|
|
workflow_node_execution.process_data = json.dumps(process_data)
|
|
|
|
|
workflow_node_execution.outputs = json.dumps(outputs)
|
|
|
|
|
workflow_node_execution.execution_metadata = (
|
|
|
|
|
json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
|
|
|
|
|
)
|
|
|
|
|
workflow_node_execution.inputs = inputs
|
|
|
|
|
workflow_node_execution.process_data = process_data
|
|
|
|
|
workflow_node_execution.outputs = outputs
|
|
|
|
|
workflow_node_execution.metadata = node_run_result.metadata
|
|
|
|
|
if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED
|
|
|
|
|
elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION
|
|
|
|
|
workflow_node_execution.error = node_run_result.error
|
|
|
|
|
else:
|
|
|
|
|
# create workflow node execution
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
|
|
|
|
|
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED
|
|
|
|
|
workflow_node_execution.error = error
|
|
|
|
|
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
@ -678,18 +670,18 @@ class RagPipelineService:
|
|
|
|
|
break
|
|
|
|
|
if not datasource_node_data:
|
|
|
|
|
raise ValueError("Datasource node data not found")
|
|
|
|
|
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
|
|
|
|
|
if datasource_parameters:
|
|
|
|
|
datasource_parameters_map = {
|
|
|
|
|
item["variable"]: item for item in datasource_parameters
|
|
|
|
|
variables = datasource_node_data.get("variables", {})
|
|
|
|
|
if variables:
|
|
|
|
|
variables_map = {
|
|
|
|
|
item["variable"]: item for item in variables
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
datasource_parameters_map = {}
|
|
|
|
|
variables = datasource_node_data.get("variables", {})
|
|
|
|
|
variables_map = {}
|
|
|
|
|
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
|
|
|
|
|
user_input_variables = []
|
|
|
|
|
for key, value in variables.items():
|
|
|
|
|
for key, value in datasource_parameters.items():
|
|
|
|
|
if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]):
|
|
|
|
|
user_input_variables.append(datasource_parameters_map.get(key, {}))
|
|
|
|
|
user_input_variables.append(variables_map.get(key, {}))
|
|
|
|
|
return user_input_variables
|
|
|
|
|
|
|
|
|
|
def get_draft_first_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
|
|
|
|
|
@ -710,18 +702,19 @@ class RagPipelineService:
|
|
|
|
|
break
|
|
|
|
|
if not datasource_node_data:
|
|
|
|
|
raise ValueError("Datasource node data not found")
|
|
|
|
|
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
|
|
|
|
|
if datasource_parameters:
|
|
|
|
|
datasource_parameters_map = {
|
|
|
|
|
item["variable"]: item for item in datasource_parameters
|
|
|
|
|
variables = datasource_node_data.get("variables", {})
|
|
|
|
|
if variables:
|
|
|
|
|
variables_map = {
|
|
|
|
|
item["variable"]: item for item in variables
|
|
|
|
|
}
|
|
|
|
|
else:
|
|
|
|
|
datasource_parameters_map = {}
|
|
|
|
|
variables = datasource_node_data.get("variables", {})
|
|
|
|
|
variables = {}
|
|
|
|
|
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
|
|
|
|
|
|
|
|
|
|
user_input_variables = []
|
|
|
|
|
for key, value in variables.items():
|
|
|
|
|
for key, value in datasource_parameters.items():
|
|
|
|
|
if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]):
|
|
|
|
|
user_input_variables.append(datasource_parameters_map.get(key, {}))
|
|
|
|
|
user_input_variables.append(variables_map.get(key, {}))
|
|
|
|
|
return user_input_variables
|
|
|
|
|
|
|
|
|
|
def get_draft_second_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
|
|
|
|
|
@ -845,10 +838,8 @@ class RagPipelineService:
|
|
|
|
|
order_config=order_config,
|
|
|
|
|
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
|
|
|
|
|
)
|
|
|
|
|
# Convert domain models to database models
|
|
|
|
|
workflow_node_executions = [repository.to_db_model(node_execution) for node_execution in node_executions]
|
|
|
|
|
|
|
|
|
|
return workflow_node_executions
|
|
|
|
|
return list(node_executions)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def publish_customized_pipeline_template(cls, pipeline_id: str, args: dict):
|
|
|
|
|
|