From e36b6e2f261e96521050ab4638c44b578518398f Mon Sep 17 00:00:00 2001 From: GuanMu Date: Tue, 10 Jun 2025 13:16:32 +0000 Subject: [PATCH] feat: enhance task state management and output handling in advanced chat pipeline --- api/core/app/apps/advanced_chat/generate_task_pipeline.py | 7 +++++-- api/core/app/entities/task_entities.py | 1 + api/core/workflow/graph_engine/graph_engine.py | 1 + .../components/workflow/nodes/_base/hooks/use-var-list.ts | 2 ++ web/app/components/workflow/nodes/answer/node.tsx | 1 - web/app/components/workflow/nodes/answer/panel.tsx | 2 +- 6 files changed, 10 insertions(+), 4 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 1d2b843043..c470bf3655 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,3 +1,4 @@ +import json import logging import time from collections.abc import Generator, Mapping @@ -506,7 +507,8 @@ class AdvancedChatAppGenerateTaskPipeline: task_id=self._application_generate_entity.task_id, workflow_execution=workflow_execution, ) - print(f"workflow_finish_resp: {workflow_finish_resp}") + self._task_state.metadata.data = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs') + print(f"self._task_state.metadata.data: {self._task_state.metadata.data}") yield workflow_finish_resp self._base_task_pipeline._queue_manager.publish( QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE @@ -630,6 +632,7 @@ class AdvancedChatAppGenerateTaskPipeline: tts_publisher.publish(queue_message) self._task_state.answer += delta_text + print(f"self._task_state.answer: {self._task_state.answer}") yield self._message_cycle_manager.message_to_stream_response( answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector ) @@ -651,7 +654,7 @@ class AdvancedChatAppGenerateTaskPipeline: answer=output_moderation_answer, reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, ) - + print(f"graph_runtime_state: {graph_runtime_state}") # Save message with Session(db.engine, expire_on_commit=False) as session: self._save_message(session=session, graph_runtime_state=graph_runtime_state) diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index ac464f8d1f..7e628fb62d 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -24,6 +24,7 @@ class AnnotationReply(BaseModel): class TaskStateMetadata(BaseModel): annotation_reply: AnnotationReply | None = None retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list) + data: Optional[Mapping[str, Any]] = None usage: LLMUsage | None = None diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 363b2ee920..9870df749d 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -194,6 +194,7 @@ class GraphEngine: self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ "answer" ].strip() + self.graph_runtime_state.outputs["outputs"] = item.route_node_state.node_run_result.outputs except Exception as e: logger.exception("Graph run failed") yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions)) diff --git a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts index 63d284f260..fe8360465e 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-var-list.ts @@ -21,6 +21,8 @@ function useVarList({ const handleAddVariable = useCallback(() => { const newInputs = produce(inputs, (draft: any) => { + if (!draft[varKey]) + draft[varKey] = [] draft[varKey].push({ variable: '', value_selector: [], diff --git a/web/app/components/workflow/nodes/answer/node.tsx b/web/app/components/workflow/nodes/answer/node.tsx index 57e3fab7bd..3d36de4565 100644 --- a/web/app/components/workflow/nodes/answer/node.tsx +++ b/web/app/components/workflow/nodes/answer/node.tsx @@ -38,7 +38,6 @@ const Node: FC> = ({ const { outputs = [] } = data const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0) - console.log('filteredOutputs', filteredOutputs) if (!filteredOutputs.length) return null diff --git a/web/app/components/workflow/nodes/answer/panel.tsx b/web/app/components/workflow/nodes/answer/panel.tsx index f76931d745..8bfc2c8191 100644 --- a/web/app/components/workflow/nodes/answer/panel.tsx +++ b/web/app/components/workflow/nodes/answer/panel.tsx @@ -26,7 +26,7 @@ const Panel: FC> = ({ handleAddVariable, } = useConfig(id, data) - const outputs = inputs.outputs + const outputs = inputs?.outputs || [] const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { onlyLeafNodeVar: false,