feat: enhance answer node functionality and output handling

- Updated `AnswerNode` to include dynamic output variable handling.
- Modified `AnswerStreamProcessor` to manage output states and ensure proper event processing.
- Added `outputs` field to relevant components and TypeScript types for better integration.
- Improved logging for debugging purposes in task pipeline and answer stream processing.
pull/20921/head
GuanMu 8 months ago
parent 53a2bca784
commit cf4fd0ecb4

@ -165,7 +165,7 @@ class AdvancedChatAppGenerateTaskPipeline:
)
generator = self._wrapper_process_stream_response(trace_manager=self._application_generate_entity.trace_manager)
print(f"generator: {generator}")
print(f"generator: {generator}=======")
if self._base_task_pipeline._stream:
return self._to_stream_response(generator)
else:
@ -183,7 +183,6 @@ class AdvancedChatAppGenerateTaskPipeline:
extras = {}
if stream_response.metadata:
extras["metadata"] = stream_response.metadata
print(f"stream_response: {stream_response}")
return ChatbotAppBlockingResponse(
task_id=stream_response.task_id,
data=ChatbotAppBlockingResponse.Data(
@ -210,7 +209,6 @@ class AdvancedChatAppGenerateTaskPipeline:
:return:
"""
for stream_response in generator:
print(f"stream_response: {stream_response}")
yield ChatbotAppStreamResponse(
conversation_id=self._conversation_id,
message_id=self._message_id,
@ -361,7 +359,6 @@ class AdvancedChatAppGenerateTaskPipeline:
self._recorded_files.extend(
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
)
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager.handle_workflow_node_execution_success(
event=event
@ -494,6 +491,7 @@ class AdvancedChatAppGenerateTaskPipeline:
if not graph_runtime_state:
raise ValueError("workflow run not initialized.")
print(f"outputsfff: {event.outputs}")
with Session(db.engine, expire_on_commit=False) as session:
workflow_execution = self._workflow_cycle_manager.handle_workflow_run_success(
workflow_run_id=self._workflow_run_id,

@ -49,7 +49,7 @@ class WorkflowTaskState(TaskState):
"""
answer: str = ""
outputs: Optional[Mapping[str, Any]] = None
class StreamEvent(Enum):
"""

@ -45,7 +45,18 @@ class AnswerNode(BaseNode[AnswerNodeData]):
part = cast(TextGenerateRouteChunk, part)
answer += part.text
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"answer": answer, "files": files})
output_variables = self.node_data.outputs
outputs = {}
for variable_selector in output_variables:
variable = self.graph_runtime_state.variable_pool.get(variable_selector.value_selector)
value = variable.to_object() if variable is not None else None
outputs[variable_selector.variable] = value
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"answer": answer, "files": files, "outputs": outputs},
)
@classmethod
def _extract_variable_selector_to_variable_mapping(

@ -23,6 +23,7 @@ class AnswerStreamGeneratorRouter:
"""
# parse stream output node value selectors of answer nodes
answer_generate_route: dict[str, list[GenerateRouteChunk]] = {}
answer_stream_variable_selectors_mapping: dict[str, list[list[str]]] = {}
for answer_node_id, node_config in node_id_config_mapping.items():
if node_config.get("data", {}).get("type") != NodeType.ANSWER.value:
continue
@ -30,6 +31,7 @@ class AnswerStreamGeneratorRouter:
# get generate route for stream output
generate_route = cls._extract_generate_route_selectors(node_config)
answer_generate_route[answer_node_id] = generate_route
answer_stream_variable_selectors_mapping[answer_node_id] = generate_route
# fetch answer dependencies
answer_node_ids = list(answer_generate_route.keys())
@ -38,9 +40,18 @@ class AnswerStreamGeneratorRouter:
reverse_edge_mapping=reverse_edge_mapping,
node_id_config_mapping=node_id_config_mapping,
)
answer_end_ids = list(answer_stream_variable_selectors_mapping.keys())
answer_end_dependencies = cls._fetch_answers_dependencies(
answer_node_ids=answer_end_ids,
reverse_edge_mapping=reverse_edge_mapping,
node_id_config_mapping=node_id_config_mapping,
)
return AnswerStreamGenerateRoute(
answer_generate_route=answer_generate_route, answer_dependencies=answer_dependencies
answer_generate_route=answer_generate_route,
answer_dependencies=answer_dependencies,
answer_stream_variable_selectors_mapping=answer_stream_variable_selectors_mapping,
answer_end_dependencies=answer_end_dependencies,
)
@classmethod
@ -89,6 +100,7 @@ class AnswerStreamGeneratorRouter:
:param config: node config
:return:
"""
print(f"config: {config}=======")
node_data = AnswerNodeData(**config.get("data", {}))
return cls.extract_generate_route_from_node_data(node_data)

@ -26,9 +26,12 @@ class AnswerStreamProcessor(StreamProcessor):
for answer_node_id in self.generate_routes.answer_generate_route:
self.route_position[answer_node_id] = 0
self.current_stream_chunk_generating_node_ids: dict[str, list[str]] = {}
self.has_output = False
self.output_node_ids: set[str] = set()
def process(self, generator: Generator[GraphEngineEvent, None, None]) -> Generator[GraphEngineEvent, None, None]:
for event in generator:
if isinstance(event, NodeRunStartedEvent):
if event.route_node_state.node_id == self.graph.root_node_id and not self.rest_node_ids:
self.reset()
@ -36,6 +39,11 @@ class AnswerStreamProcessor(StreamProcessor):
yield event
elif isinstance(event, NodeRunStreamChunkEvent):
if event.in_iteration_id or event.in_loop_id:
if self.has_output and event.node_id not in self.output_node_ids:
print(f"event22: {event}")
event.chunk_content = "\n" + event.chunk_content
self.output_node_ids.add(event.node_id)
self.has_output = True
yield event
continue
@ -50,6 +58,10 @@ class AnswerStreamProcessor(StreamProcessor):
)
for _ in stream_out_answer_node_ids:
if self.has_output and event.node_id not in self.output_node_ids:
event.chunk_content = "\n" + event.chunk_content
self.output_node_ids.add(event.node_id)
self.has_output = True
yield event
elif isinstance(event, NodeRunSucceededEvent | NodeRunExceptionEvent):
yield event

@ -4,7 +4,7 @@ from enum import Enum
from pydantic import BaseModel, Field
from core.workflow.nodes.base import BaseNodeData
from core.workflow.entities.variable_entities import VariableSelector
class AnswerNodeData(BaseNodeData):
"""
@ -12,6 +12,7 @@ class AnswerNodeData(BaseNodeData):
"""
answer: str = Field(..., description="answer template string")
outputs: list[VariableSelector]
class GenerateRouteChunk(BaseModel):

@ -7,6 +7,7 @@ const nodeDefault: NodeDefault<AnswerNodeType> = {
defaultValue: {
variables: [],
answer: '',
outputs: [],
},
getAvailablePrevNodes(isChatMode: boolean) {
const nodes = isChatMode

@ -7,6 +7,9 @@ import Editor from '@/app/components/workflow/nodes/_base/components/prompt/edit
import type { NodePanelProps } from '@/app/components/workflow/types'
import useAvailableVarList from '@/app/components/workflow/nodes/_base/hooks/use-available-var-list'
const i18nPrefix = 'workflow.nodes.answer'
import Field from '@/app/components/workflow/nodes/_base/components/field'
import AddButton from '@/app/components/base/button/add-button'
import VarList from '@/app/components/workflow/nodes/_base/components/variable/var-list'
const Panel: FC<NodePanelProps<AnswerNodeType>> = ({
id,
@ -46,7 +49,7 @@ const Panel: FC<NodePanelProps<AnswerNodeType>> = ({
isSupportFileVar
/>
</div>
{/* <div className='space-y-4 px-4 pb-4'>
<div className='space-y-4 px-4 pb-4'>
<Field
title={t(`${i18nPrefix}.outputVars`)}
operations={
@ -60,7 +63,7 @@ const Panel: FC<NodePanelProps<AnswerNodeType>> = ({
onChange={handleVarListChange}
/>
</Field>
</div> */}
</div>
</div>
)
}

@ -16,6 +16,7 @@ const useConfig = (id: string, payload: AnswerNodeType) => {
const { handleVarListChange, handleAddVariable } = useVarList<AnswerNodeType>({
inputs,
setInputs,
varKey: 'outputs',
})
const handleAnswerChange = useCallback((value: string) => {

Loading…
Cancel
Save