diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 7ef681addc..0036e22f21 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any, Optional, cast from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity from core.file import FileType, file_manager from core.helper.code_executor import CodeExecutor, CodeLanguage +from core.llm_generator.output_parser.errors import OutputParserError from core.llm_generator.output_parser.structured_output import invoke_llm_with_structured_output from core.memory.token_buffer_memory import TokenBufferMemory from core.model_manager import ModelInstance, ModelManager @@ -340,26 +341,32 @@ class LLMNode(BaseNode[LLMNodeData]): usage = LLMUsage.empty_usage() finish_reason = None full_text_buffer = io.StringIO() - for result in invoke_result: - if isinstance(result, LLMResultChunk): - contents = result.delta.message.content - for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents): - full_text_buffer.write(text_part) - yield RunStreamChunkEvent(chunk_content=text_part, from_variable_selector=[self.node_id, "text"]) - - # Update the whole metadata - if not model and result.model: - model = result.model - if len(prompt_messages) == 0: - # TODO(QuantumGhost): it seems that this update has no visable effect. - # What's the purpose of the line below? - prompt_messages = list(result.prompt_messages) - if usage.prompt_tokens == 0 and result.delta.usage: - usage = result.delta.usage - if finish_reason is None and result.delta.finish_reason: - finish_reason = result.delta.finish_reason - elif isinstance(result, LLMStructuredOutput): - yield result + # Consume the invoke result and handle generator exception + try: + for result in invoke_result: + if isinstance(result, LLMResultChunk): + contents = result.delta.message.content + for text_part in self._save_multimodal_output_and_convert_result_to_markdown(contents): + full_text_buffer.write(text_part) + yield RunStreamChunkEvent( + chunk_content=text_part, from_variable_selector=[self.node_id, "text"] + ) + + # Update the whole metadata + if not model and result.model: + model = result.model + if len(prompt_messages) == 0: + # TODO(QuantumGhost): it seems that this update has no visable effect. + # What's the purpose of the line below? + prompt_messages = list(result.prompt_messages) + if usage.prompt_tokens == 0 and result.delta.usage: + usage = result.delta.usage + if finish_reason is None and result.delta.finish_reason: + finish_reason = result.delta.finish_reason + elif isinstance(result, LLMStructuredOutput): + yield result + except OutputParserError as e: + raise LLMNodeError(f"Failed to parse structured output: {e}") yield ModelInvokeCompletedEvent(text=full_text_buffer.getvalue(), usage=usage, finish_reason=finish_reason)