|
|
|
|
@ -414,27 +414,30 @@ class RagPipelineService:
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
def run_datasource_workflow_node(
|
|
|
|
|
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account, datasource_type: str
|
|
|
|
|
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account, datasource_type: str, is_published: bool
|
|
|
|
|
) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
Run published workflow datasource
|
|
|
|
|
"""
|
|
|
|
|
if is_published:
|
|
|
|
|
# fetch published workflow by app_model
|
|
|
|
|
published_workflow = self.get_published_workflow(pipeline=pipeline)
|
|
|
|
|
if not published_workflow:
|
|
|
|
|
workflow = self.get_published_workflow(pipeline=pipeline)
|
|
|
|
|
else:
|
|
|
|
|
workflow = self.get_draft_workflow(pipeline=pipeline)
|
|
|
|
|
if not workflow:
|
|
|
|
|
raise ValueError("Workflow not initialized")
|
|
|
|
|
|
|
|
|
|
# run draft workflow node
|
|
|
|
|
datasource_node_data = None
|
|
|
|
|
start_at = time.perf_counter()
|
|
|
|
|
datasource_nodes = published_workflow.graph_dict.get("nodes", [])
|
|
|
|
|
datasource_nodes = workflow.graph_dict.get("nodes", [])
|
|
|
|
|
for datasource_node in datasource_nodes:
|
|
|
|
|
if datasource_node.get("id") == node_id:
|
|
|
|
|
datasource_node_data = datasource_node.get("data", {})
|
|
|
|
|
break
|
|
|
|
|
if not datasource_node_data:
|
|
|
|
|
raise ValueError("Datasource node data not found")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
datasource_parameters = datasource_node_data.get("datasource_parameters", {})
|
|
|
|
|
for key, value in datasource_parameters.items():
|
|
|
|
|
if not user_inputs.get(key):
|
|
|
|
|
@ -651,7 +654,7 @@ class RagPipelineService:
|
|
|
|
|
if item.get("belong_to_node_id") == node_id or item.get("belong_to_node_id") == "shared"
|
|
|
|
|
]
|
|
|
|
|
return datasource_provider_variables
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_published_first_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
|
|
|
|
|
"""
|
|
|
|
|
Get first step parameters of rag pipeline
|
|
|
|
|
@ -683,7 +686,7 @@ class RagPipelineService:
|
|
|
|
|
if not re.match(r"\{\{#([a-zA-Z0-9_]{1,50}(?:\.[a-zA-Z_][a-zA-Z0-9_]{0,29}){1,10})#\}\}", value["value"]):
|
|
|
|
|
user_input_variables.append(variables_map.get(key, {}))
|
|
|
|
|
return user_input_variables
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_draft_first_step_parameters(self, pipeline: Pipeline, node_id: str) -> list[dict]:
|
|
|
|
|
"""
|
|
|
|
|
Get first step parameters of rag pipeline
|
|
|
|
|
|