feat: enhance task state management and output handling in advanced chat pipeline

pull/20921/head
GuanMu 11 months ago
parent fb420781ab
commit e36b6e2f26

@ -1,3 +1,4 @@
import json
import logging import logging
import time import time
from collections.abc import Generator, Mapping from collections.abc import Generator, Mapping
@ -506,7 +507,8 @@ class AdvancedChatAppGenerateTaskPipeline:
task_id=self._application_generate_entity.task_id, task_id=self._application_generate_entity.task_id,
workflow_execution=workflow_execution, workflow_execution=workflow_execution,
) )
print(f"workflow_finish_resp: {workflow_finish_resp}") self._task_state.metadata.data = workflow_finish_resp.data.outputs.get('outputs', {}).get('outputs')
print(f"self._task_state.metadata.data: {self._task_state.metadata.data}")
yield workflow_finish_resp yield workflow_finish_resp
self._base_task_pipeline._queue_manager.publish( self._base_task_pipeline._queue_manager.publish(
QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE QueueAdvancedChatMessageEndEvent(), PublishFrom.TASK_PIPELINE
@ -630,6 +632,7 @@ class AdvancedChatAppGenerateTaskPipeline:
tts_publisher.publish(queue_message) tts_publisher.publish(queue_message)
self._task_state.answer += delta_text self._task_state.answer += delta_text
print(f"self._task_state.answer: {self._task_state.answer}")
yield self._message_cycle_manager.message_to_stream_response( yield self._message_cycle_manager.message_to_stream_response(
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
) )
@ -651,7 +654,7 @@ class AdvancedChatAppGenerateTaskPipeline:
answer=output_moderation_answer, answer=output_moderation_answer,
reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION, reason=QueueMessageReplaceEvent.MessageReplaceReason.OUTPUT_MODERATION,
) )
print(f"graph_runtime_state: {graph_runtime_state}")
# Save message # Save message
with Session(db.engine, expire_on_commit=False) as session: with Session(db.engine, expire_on_commit=False) as session:
self._save_message(session=session, graph_runtime_state=graph_runtime_state) self._save_message(session=session, graph_runtime_state=graph_runtime_state)

@ -24,6 +24,7 @@ class AnnotationReply(BaseModel):
class TaskStateMetadata(BaseModel): class TaskStateMetadata(BaseModel):
annotation_reply: AnnotationReply | None = None annotation_reply: AnnotationReply | None = None
retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list) retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list)
data: Optional[Mapping[str, Any]] = None
usage: LLMUsage | None = None usage: LLMUsage | None = None

@ -194,6 +194,7 @@ class GraphEngine:
self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[ self.graph_runtime_state.outputs["answer"] = self.graph_runtime_state.outputs[
"answer" "answer"
].strip() ].strip()
self.graph_runtime_state.outputs["outputs"] = item.route_node_state.node_run_result.outputs
except Exception as e: except Exception as e:
logger.exception("Graph run failed") logger.exception("Graph run failed")
yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions)) yield GraphRunFailedEvent(error=str(e), exceptions_count=len(handle_exceptions))

@ -21,6 +21,8 @@ function useVarList<T>({
const handleAddVariable = useCallback(() => { const handleAddVariable = useCallback(() => {
const newInputs = produce(inputs, (draft: any) => { const newInputs = produce(inputs, (draft: any) => {
if (!draft[varKey])
draft[varKey] = []
draft[varKey].push({ draft[varKey].push({
variable: '', variable: '',
value_selector: [], value_selector: [],

@ -38,7 +38,6 @@ const Node: FC<NodeProps<AnswerNodeType>> = ({
const { outputs = [] } = data const { outputs = [] } = data
const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0) const filteredOutputs = (outputs as Variable[]).filter(({ value_selector }) => value_selector.length > 0)
console.log('filteredOutputs', filteredOutputs)
if (!filteredOutputs.length) if (!filteredOutputs.length)
return null return null

@ -26,7 +26,7 @@ const Panel: FC<NodePanelProps<AnswerNodeType>> = ({
handleAddVariable, handleAddVariable,
} = useConfig(id, data) } = useConfig(id, data)
const outputs = inputs.outputs const outputs = inputs?.outputs || []
const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { const { availableVars, availableNodesWithParent } = useAvailableVarList(id, {
onlyLeafNodeVar: false, onlyLeafNodeVar: false,

Loading…
Cancel
Save