refactor: Refactors node attribute access and method names

Signed-off-by: -LAN- <laipz8200@outlook.com>
pull/22581/head
-LAN- 10 months ago
parent 678a39a113
commit 578601975e
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF

@ -2,7 +2,7 @@ from core.workflow.nodes.base import BaseNode
class WorkflowNodeRunFailedError(Exception): class WorkflowNodeRunFailedError(Exception):
def __init__(self, node_instance: BaseNode, error: str): def __init__(self, node: BaseNode, error: str):
self.node_instance = node_instance self.node = node
self.error = error self.error = error
super().__init__(f"Node {node_instance.node_title} run failed: {error}") super().__init__(f"Node {node.title} run failed: {error}")

@ -267,7 +267,7 @@ class GraphEngine:
previous_node_id = previous_route_node_state.node_id if previous_route_node_state else None previous_node_id = previous_route_node_state.node_id if previous_route_node_state else None
# init workflow run state # init workflow run state
node_instance = node_cls( node = node_cls(
id=route_node_state.id, id=route_node_state.id,
config=node_config, config=node_config,
graph_init_params=self.init_params, graph_init_params=self.init_params,
@ -276,11 +276,11 @@ class GraphEngine:
previous_node_id=previous_node_id, previous_node_id=previous_node_id,
thread_pool_id=self.thread_pool_id, thread_pool_id=self.thread_pool_id,
) )
node_instance.init_node_data(node_config.get("data", {})) node.init_node_data(node_config.get("data", {}))
try: try:
# run node # run node
generator = self._run_node( generator = self._run_node(
node_instance=node_instance, node=node,
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=in_parallel_id, parallel_id=in_parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
@ -308,16 +308,16 @@ class GraphEngine:
route_node_state.failed_reason = str(e) route_node_state.failed_reason = str(e)
yield NodeRunFailedEvent( yield NodeRunFailedEvent(
error=str(e), error=str(e),
id=node_instance.id, id=node.id,
node_id=next_node_id, node_id=next_node_id,
node_type=node_type, node_type=node_type,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=in_parallel_id, parallel_id=in_parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
raise e raise e
@ -339,7 +339,7 @@ class GraphEngine:
edge = edge_mappings[0] edge = edge_mappings[0]
if ( if (
previous_route_node_state.status == RouteNodeState.Status.EXCEPTION previous_route_node_state.status == RouteNodeState.Status.EXCEPTION
and node_instance.error_strategy == ErrorStrategy.FAIL_BRANCH and node.error_strategy == ErrorStrategy.FAIL_BRANCH
and edge.run_condition is None and edge.run_condition is None
): ):
break break
@ -415,8 +415,8 @@ class GraphEngine:
next_node_id = final_node_id next_node_id = final_node_id
elif ( elif (
node_instance.error_strategy == ErrorStrategy.FAIL_BRANCH node.continue_on_error
and node_instance.continue_on_error and node.error_strategy == ErrorStrategy.FAIL_BRANCH
and previous_route_node_state.status == RouteNodeState.Status.EXCEPTION and previous_route_node_state.status == RouteNodeState.Status.EXCEPTION
): ):
break break
@ -599,7 +599,7 @@ class GraphEngine:
def _run_node( def _run_node(
self, self,
node_instance: BaseNode, node: BaseNode,
route_node_state: RouteNodeState, route_node_state: RouteNodeState,
parallel_id: Optional[str] = None, parallel_id: Optional[str] = None,
parallel_start_node_id: Optional[str] = None, parallel_start_node_id: Optional[str] = None,
@ -613,29 +613,29 @@ class GraphEngine:
# trigger node run start event # trigger node run start event
agent_strategy = ( agent_strategy = (
AgentNodeStrategyInit( AgentNodeStrategyInit(
name=cast(AgentNodeData, node_instance.get_base_node_data()).agent_strategy_name, name=cast(AgentNodeData, node._get_base_node_data()).agent_strategy_name,
icon=cast(AgentNode, node_instance).agent_strategy_icon, icon=cast(AgentNode, node).agent_strategy_icon,
) )
if node_instance.node_type == NodeType.AGENT if node.type_ == NodeType.AGENT
else None else None
) )
yield NodeRunStartedEvent( yield NodeRunStartedEvent(
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
predecessor_node_id=node_instance.previous_node_id, predecessor_node_id=node.previous_node_id,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
agent_strategy=agent_strategy, agent_strategy=agent_strategy,
node_version=node_instance.version(), node_version=node.version(),
) )
max_retries = node_instance.node_retry_config.max_retries max_retries = node.retry_config.max_retries
retry_interval = node_instance.node_retry_config.retry_interval_seconds retry_interval = node.retry_config.retry_interval_seconds
retries = 0 retries = 0
should_continue_retry = True should_continue_retry = True
while should_continue_retry and retries <= max_retries: while should_continue_retry and retries <= max_retries:
@ -644,7 +644,7 @@ class GraphEngine:
retry_start_at = datetime.now(UTC).replace(tzinfo=None) retry_start_at = datetime.now(UTC).replace(tzinfo=None)
# yield control to other threads # yield control to other threads
time.sleep(0.001) time.sleep(0.001)
event_stream = node_instance.run() event_stream = node.run()
for event in event_stream: for event in event_stream:
if isinstance(event, GraphEngineEvent): if isinstance(event, GraphEngineEvent):
# add parallel info to iteration event # add parallel info to iteration event
@ -660,21 +660,21 @@ class GraphEngine:
if run_result.status == WorkflowNodeExecutionStatus.FAILED: if run_result.status == WorkflowNodeExecutionStatus.FAILED:
if ( if (
retries == max_retries retries == max_retries
and node_instance.node_type == NodeType.HTTP_REQUEST and node.type_ == NodeType.HTTP_REQUEST
and run_result.outputs and run_result.outputs
and not node_instance.continue_on_error and not node.continue_on_error
): ):
run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED
if node_instance.retry and retries < max_retries: if node.retry and retries < max_retries:
retries += 1 retries += 1
route_node_state.node_run_result = run_result route_node_state.node_run_result = run_result
yield NodeRunRetryEvent( yield NodeRunRetryEvent(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
predecessor_node_id=node_instance.previous_node_id, predecessor_node_id=node.previous_node_id,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
@ -682,17 +682,17 @@ class GraphEngine:
error=run_result.error or "Unknown error", error=run_result.error or "Unknown error",
retry_index=retries, retry_index=retries,
start_at=retry_start_at, start_at=retry_start_at,
node_version=node_instance.version(), node_version=node.version(),
) )
time.sleep(retry_interval) time.sleep(retry_interval)
break break
route_node_state.set_finished(run_result=run_result) route_node_state.set_finished(run_result=run_result)
if run_result.status == WorkflowNodeExecutionStatus.FAILED: if run_result.status == WorkflowNodeExecutionStatus.FAILED:
if node_instance.continue_on_error: if node.continue_on_error:
# if run failed, handle error # if run failed, handle error
run_result = self._handle_continue_on_error( run_result = self._handle_continue_on_error(
node_instance, node,
event.run_result, event.run_result,
self.graph_runtime_state.variable_pool, self.graph_runtime_state.variable_pool,
handle_exceptions=handle_exceptions, handle_exceptions=handle_exceptions,
@ -703,44 +703,44 @@ class GraphEngine:
for variable_key, variable_value in run_result.outputs.items(): for variable_key, variable_value in run_result.outputs.items():
# append variables to variable pool recursively # append variables to variable pool recursively
self._append_variables_recursively( self._append_variables_recursively(
node_id=node_instance.node_id, node_id=node.node_id,
variable_key_list=[variable_key], variable_key_list=[variable_key],
variable_value=variable_value, variable_value=variable_value,
) )
yield NodeRunExceptionEvent( yield NodeRunExceptionEvent(
error=run_result.error or "System Error", error=run_result.error or "System Error",
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
should_continue_retry = False should_continue_retry = False
else: else:
yield NodeRunFailedEvent( yield NodeRunFailedEvent(
error=route_node_state.failed_reason or "Unknown error.", error=route_node_state.failed_reason or "Unknown error.",
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
should_continue_retry = False should_continue_retry = False
elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: elif run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
if ( if (
node_instance.continue_on_error node.continue_on_error
and self.graph.edge_mapping.get(node_instance.node_id) and self.graph.edge_mapping.get(node.node_id)
and node_instance.error_strategy is ErrorStrategy.FAIL_BRANCH and node.error_strategy is ErrorStrategy.FAIL_BRANCH
): ):
run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS run_result.edge_source_handle = FailBranchSourceHandle.SUCCESS
if run_result.metadata and run_result.metadata.get( if run_result.metadata and run_result.metadata.get(
@ -760,7 +760,7 @@ class GraphEngine:
for variable_key, variable_value in run_result.outputs.items(): for variable_key, variable_value in run_result.outputs.items():
# append variables to variable pool recursively # append variables to variable pool recursively
self._append_variables_recursively( self._append_variables_recursively(
node_id=node_instance.node_id, node_id=node.node_id,
variable_key_list=[variable_key], variable_key_list=[variable_key],
variable_value=variable_value, variable_value=variable_value,
) )
@ -785,26 +785,26 @@ class GraphEngine:
run_result.metadata = metadata_dict run_result.metadata = metadata_dict
yield NodeRunSucceededEvent( yield NodeRunSucceededEvent(
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
should_continue_retry = False should_continue_retry = False
break break
elif isinstance(event, RunStreamChunkEvent): elif isinstance(event, RunStreamChunkEvent):
yield NodeRunStreamChunkEvent( yield NodeRunStreamChunkEvent(
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
chunk_content=event.chunk_content, chunk_content=event.chunk_content,
from_variable_selector=event.from_variable_selector, from_variable_selector=event.from_variable_selector,
route_node_state=route_node_state, route_node_state=route_node_state,
@ -812,14 +812,14 @@ class GraphEngine:
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
elif isinstance(event, RunRetrieverResourceEvent): elif isinstance(event, RunRetrieverResourceEvent):
yield NodeRunRetrieverResourceEvent( yield NodeRunRetrieverResourceEvent(
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
retriever_resources=event.retriever_resources, retriever_resources=event.retriever_resources,
context=event.context, context=event.context,
route_node_state=route_node_state, route_node_state=route_node_state,
@ -827,7 +827,7 @@ class GraphEngine:
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
except GenerateTaskStoppedError: except GenerateTaskStoppedError:
# trigger node run failed event # trigger node run failed event
@ -835,20 +835,20 @@ class GraphEngine:
route_node_state.failed_reason = "Workflow stopped." route_node_state.failed_reason = "Workflow stopped."
yield NodeRunFailedEvent( yield NodeRunFailedEvent(
error="Workflow stopped.", error="Workflow stopped.",
id=node_instance.id, id=node.id,
node_id=node_instance.node_id, node_id=node.node_id,
node_type=node_instance.node_type, node_type=node.type_,
node_data=node_instance.get_base_node_data(), node_data=node._get_base_node_data(),
route_node_state=route_node_state, route_node_state=route_node_state,
parallel_id=parallel_id, parallel_id=parallel_id,
parallel_start_node_id=parallel_start_node_id, parallel_start_node_id=parallel_start_node_id,
parent_parallel_id=parent_parallel_id, parent_parallel_id=parent_parallel_id,
parent_parallel_start_node_id=parent_parallel_start_node_id, parent_parallel_start_node_id=parent_parallel_start_node_id,
node_version=node_instance.version(), node_version=node.version(),
) )
return return
except Exception as e: except Exception as e:
logger.exception(f"Node {node_instance.node_title} run failed") logger.exception(f"Node {node.title} run failed")
raise e raise e
def _append_variables_recursively(self, node_id: str, variable_key_list: list[str], variable_value: VariableValue): def _append_variables_recursively(self, node_id: str, variable_key_list: list[str], variable_value: VariableValue):
@ -888,22 +888,14 @@ class GraphEngine:
def _handle_continue_on_error( def _handle_continue_on_error(
self, self,
node_instance: BaseNode, node: BaseNode,
error_result: NodeRunResult, error_result: NodeRunResult,
variable_pool: VariablePool, variable_pool: VariablePool,
handle_exceptions: list[str] = [], handle_exceptions: list[str] = [],
) -> NodeRunResult: ) -> NodeRunResult:
"""
handle continue on error when self._should_continue_on_error is True
:param error_result (NodeRunResult): error run result
:param variable_pool (VariablePool): variable pool
:return: excption run result
"""
# add error message and error type to variable pool # add error message and error type to variable pool
variable_pool.add([node_instance.node_id, "error_message"], error_result.error) variable_pool.add([node.node_id, "error_message"], error_result.error)
variable_pool.add([node_instance.node_id, "error_type"], error_result.error_type) variable_pool.add([node.node_id, "error_type"], error_result.error_type)
# add error message to handle_exceptions # add error message to handle_exceptions
handle_exceptions.append(error_result.error or "") handle_exceptions.append(error_result.error or "")
node_error_args: dict[str, Any] = { node_error_args: dict[str, Any] = {
@ -911,21 +903,21 @@ class GraphEngine:
"error": error_result.error, "error": error_result.error,
"inputs": error_result.inputs, "inputs": error_result.inputs,
"metadata": { "metadata": {
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: node_instance.error_strategy, WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: node.error_strategy,
}, },
} }
if node_instance.error_strategy is ErrorStrategy.DEFAULT_VALUE: if node.error_strategy is ErrorStrategy.DEFAULT_VALUE:
return NodeRunResult( return NodeRunResult(
**node_error_args, **node_error_args,
outputs={ outputs={
**node_instance.default_value_dict, **node.default_value_dict,
"error_message": error_result.error, "error_message": error_result.error,
"error_type": error_result.error_type, "error_type": error_result.error_type,
}, },
) )
elif node_instance.error_strategy is ErrorStrategy.FAIL_BRANCH: elif node.error_strategy is ErrorStrategy.FAIL_BRANCH:
if self.graph.edge_mapping.get(node_instance.node_id): if self.graph.edge_mapping.get(node.node_id):
node_error_args["edge_source_handle"] = FailBranchSourceHandle.FAILED node_error_args["edge_source_handle"] = FailBranchSourceHandle.FAILED
return NodeRunResult( return NodeRunResult(
**node_error_args, **node_error_args,

@ -68,22 +68,22 @@ class AgentNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = AgentNodeData.model_validate(data) self._node_data = AgentNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod
@ -185,7 +185,7 @@ class AgentNode(BaseNode):
parameters_for_log=parameters_for_log, parameters_for_log=parameters_for_log,
user_id=self.user_id, user_id=self.user_id,
tenant_id=self.tenant_id, tenant_id=self.tenant_id,
node_type=self.node_type, node_type=self.type_,
node_id=self.node_id, node_id=self.node_id,
node_execution_id=self.id, node_execution_id=self.id,
) )

@ -25,22 +25,22 @@ class AnswerNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = AnswerNodeData.model_validate(data) self._node_data = AnswerNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -122,9 +122,9 @@ class RetryConfig(BaseModel):
class BaseNodeData(ABC, BaseModel): class BaseNodeData(ABC, BaseModel):
title: str title: str
desc: Optional[str] = None desc: Optional[str] = None
version: str = "1"
error_strategy: Optional[ErrorStrategy] = None error_strategy: Optional[ErrorStrategy] = None
default_value: Optional[list[DefaultValue]] = None default_value: Optional[list[DefaultValue]] = None
version: str = "1"
retry_config: RetryConfig = RetryConfig() retry_config: RetryConfig = RetryConfig()
@property @property

@ -145,7 +145,7 @@ class BaseNode:
return {} return {}
@property @property
def node_type(self) -> NodeType: def type_(self) -> NodeType:
return self._node_type return self._node_type
@classmethod @classmethod
@ -170,32 +170,32 @@ class BaseNode:
# to BaseNodeData properties in a type-safe way # to BaseNodeData properties in a type-safe way
@abstractmethod @abstractmethod
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
"""Get the error strategy for this node.""" """Get the error strategy for this node."""
... ...
@abstractmethod @abstractmethod
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
"""Get the retry configuration for this node.""" """Get the retry configuration for this node."""
... ...
@abstractmethod @abstractmethod
def get_title(self) -> str: def _get_title(self) -> str:
"""Get the node title.""" """Get the node title."""
... ...
@abstractmethod @abstractmethod
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
"""Get the node description.""" """Get the node description."""
... ...
@abstractmethod @abstractmethod
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
"""Get the default values dictionary for this node.""" """Get the default values dictionary for this node."""
... ...
@abstractmethod @abstractmethod
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
"""Get the BaseNodeData object for this node.""" """Get the BaseNodeData object for this node."""
... ...
@ -203,24 +203,24 @@ class BaseNode:
@property @property
def error_strategy(self) -> Optional[ErrorStrategy]: def error_strategy(self) -> Optional[ErrorStrategy]:
"""Get the error strategy for this node.""" """Get the error strategy for this node."""
return self.get_error_strategy() return self._get_error_strategy()
@property @property
def node_retry_config(self) -> RetryConfig: def retry_config(self) -> RetryConfig:
"""Get the retry configuration for this node.""" """Get the retry configuration for this node."""
return self.get_retry_config() return self._get_retry_config()
@property @property
def node_title(self) -> str: def title(self) -> str:
"""Get the node title.""" """Get the node title."""
return self.get_title() return self._get_title()
@property @property
def node_description(self) -> Optional[str]: def description(self) -> Optional[str]:
"""Get the node description.""" """Get the node description."""
return self.get_description() return self._get_description()
@property @property
def default_value_dict(self) -> dict[str, Any]: def default_value_dict(self) -> dict[str, Any]:
"""Get the default values dictionary for this node.""" """Get the default values dictionary for this node."""
return self.get_default_value_dict() return self._get_default_value_dict()

@ -30,22 +30,22 @@ class CodeNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = CodeNodeData.model_validate(data) self._node_data = CodeNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -50,22 +50,22 @@ class DocumentExtractorNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = DocumentExtractorNodeData.model_validate(data) self._node_data = DocumentExtractorNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -17,22 +17,22 @@ class EndNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = EndNodeData(**data) self._node_data = EndNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -41,22 +41,22 @@ class HttpRequestNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = HttpRequestNodeData.model_validate(data) self._node_data = HttpRequestNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -22,22 +22,22 @@ class IfElseNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = IfElseNodeData.model_validate(data) self._node_data = IfElseNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -69,22 +69,22 @@ class IterationNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = IterationNodeData.model_validate(data) self._node_data = IterationNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod
@ -184,7 +184,7 @@ class IterationNode(BaseNode):
yield IterationRunStartedEvent( yield IterationRunStartedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -195,7 +195,7 @@ class IterationNode(BaseNode):
yield IterationRunNextEvent( yield IterationRunNextEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
index=0, index=0,
pre_iteration_output=None, pre_iteration_output=None,
@ -276,7 +276,7 @@ class IterationNode(BaseNode):
yield IterationRunSucceededEvent( yield IterationRunSucceededEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -301,7 +301,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -461,7 +461,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
parallel_mode_run_id=parallel_mode_run_id, parallel_mode_run_id=parallel_mode_run_id,
start_at=start_at, start_at=start_at,
@ -475,7 +475,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -510,7 +510,7 @@ class IterationNode(BaseNode):
yield IterationRunNextEvent( yield IterationRunNextEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
index=next_index, index=next_index,
parallel_mode_run_id=parallel_mode_run_id, parallel_mode_run_id=parallel_mode_run_id,
@ -531,7 +531,7 @@ class IterationNode(BaseNode):
yield IterationRunNextEvent( yield IterationRunNextEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
index=next_index, index=next_index,
parallel_mode_run_id=parallel_mode_run_id, parallel_mode_run_id=parallel_mode_run_id,
@ -554,7 +554,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
parallel_mode_run_id=parallel_mode_run_id, parallel_mode_run_id=parallel_mode_run_id,
start_at=start_at, start_at=start_at,
@ -568,7 +568,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -607,7 +607,7 @@ class IterationNode(BaseNode):
yield IterationRunNextEvent( yield IterationRunNextEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
index=next_index, index=next_index,
parallel_mode_run_id=parallel_mode_run_id, parallel_mode_run_id=parallel_mode_run_id,
@ -620,7 +620,7 @@ class IterationNode(BaseNode):
yield IterationRunFailedEvent( yield IterationRunFailedEvent(
iteration_id=self.id, iteration_id=self.id,
iteration_node_id=self.node_id, iteration_node_id=self.node_id,
iteration_node_type=self.node_type, iteration_node_type=self.type_,
iteration_node_data=self._node_data, iteration_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,

@ -16,28 +16,28 @@ class IterationStartNode(BaseNode):
_node_type = NodeType.ITERATION_START _node_type = NodeType.ITERATION_START
node_data: IterationStartNodeData _node_data: IterationStartNodeData
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self.node_data = IterationStartNodeData(**data) self._node_data = IterationStartNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self.node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self.node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self.node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self.node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self.node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self.node_data return self._node_data
@classmethod @classmethod
def version(cls) -> str: def version(cls) -> str:

@ -128,22 +128,22 @@ class KnowledgeRetrievalNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = KnowledgeRetrievalNodeData.model_validate(data) self._node_data = KnowledgeRetrievalNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -22,22 +22,22 @@ class ListOperatorNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = ListOperatorNodeData(**data) self._node_data = ListOperatorNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -141,22 +141,22 @@ class LLMNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = LLMNodeData.model_validate(data) self._node_data = LLMNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -21,22 +21,22 @@ class LoopEndNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = LoopEndNodeData(**data) self._node_data = LoopEndNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -56,22 +56,22 @@ class LoopNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = LoopNodeData.model_validate(data) self._node_data = LoopNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod
@ -150,7 +150,7 @@ class LoopNode(BaseNode):
yield LoopRunStartedEvent( yield LoopRunStartedEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -207,7 +207,7 @@ class LoopNode(BaseNode):
yield LoopRunSucceededEvent( yield LoopRunSucceededEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -240,7 +240,7 @@ class LoopNode(BaseNode):
yield LoopRunFailedEvent( yield LoopRunFailedEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -343,7 +343,7 @@ class LoopNode(BaseNode):
yield LoopRunFailedEvent( yield LoopRunFailedEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -374,7 +374,7 @@ class LoopNode(BaseNode):
yield LoopRunFailedEvent( yield LoopRunFailedEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
start_at=start_at, start_at=start_at,
inputs=inputs, inputs=inputs,
@ -423,7 +423,7 @@ class LoopNode(BaseNode):
yield LoopRunNextEvent( yield LoopRunNextEvent(
loop_id=self.id, loop_id=self.id,
loop_node_id=self.node_id, loop_node_id=self.node_id,
loop_node_type=self.node_type, loop_node_type=self.type_,
loop_node_data=self._node_data, loop_node_data=self._node_data,
index=next_index, index=next_index,
pre_loop_output=self._node_data.outputs, pre_loop_output=self._node_data.outputs,

@ -21,22 +21,22 @@ class LoopStartNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = LoopStartNodeData(**data) self._node_data = LoopStartNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -99,22 +99,22 @@ class ParameterExtractorNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = ParameterExtractorNodeData.model_validate(data) self._node_data = ParameterExtractorNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
_model_instance: Optional[ModelInstance] = None _model_instance: Optional[ModelInstance] = None

@ -86,22 +86,22 @@ class QuestionClassifierNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = QuestionClassifierNodeData.model_validate(data) self._node_data = QuestionClassifierNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -18,22 +18,22 @@ class StartNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = StartNodeData(**data) self._node_data = StartNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -21,22 +21,22 @@ class TemplateTransformNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = TemplateTransformNodeData.model_validate(data) self._node_data = TemplateTransformNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -403,22 +403,22 @@ class ToolNode(BaseNode):
return result return result
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@property @property

@ -18,22 +18,22 @@ class VariableAggregatorNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = VariableAssignerNodeData(**data) self._node_data = VariableAssignerNodeData(**data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
@classmethod @classmethod

@ -32,22 +32,22 @@ class VariableAssignerNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = VariableAssignerData.model_validate(data) self._node_data = VariableAssignerData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
def __init__( def __init__(

@ -61,22 +61,22 @@ class VariableAssignerNode(BaseNode):
def init_node_data(self, data: Mapping[str, Any]) -> None: def init_node_data(self, data: Mapping[str, Any]) -> None:
self._node_data = VariableAssignerNodeData.model_validate(data) self._node_data = VariableAssignerNodeData.model_validate(data)
def get_error_strategy(self) -> Optional[ErrorStrategy]: def _get_error_strategy(self) -> Optional[ErrorStrategy]:
return self._node_data.error_strategy return self._node_data.error_strategy
def get_retry_config(self) -> RetryConfig: def _get_retry_config(self) -> RetryConfig:
return self._node_data.retry_config return self._node_data.retry_config
def get_title(self) -> str: def _get_title(self) -> str:
return self._node_data.title return self._node_data.title
def get_description(self) -> Optional[str]: def _get_description(self) -> Optional[str]:
return self._node_data.desc return self._node_data.desc
def get_default_value_dict(self) -> dict[str, Any]: def _get_default_value_dict(self) -> dict[str, Any]:
return self._node_data.default_value_dict return self._node_data.default_value_dict
def get_base_node_data(self) -> BaseNodeData: def _get_base_node_data(self) -> BaseNodeData:
return self._node_data return self._node_data
def _conv_var_updater_factory(self) -> ConversationVariableUpdater: def _conv_var_updater_factory(self) -> ConversationVariableUpdater:

@ -146,7 +146,7 @@ class WorkflowEntry:
graph = Graph.init(graph_config=workflow.graph_dict) graph = Graph.init(graph_config=workflow.graph_dict)
# init workflow run state # init workflow run state
node_instance = node_cls( node = node_cls(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
config=node_config, config=node_config,
graph_init_params=GraphInitParams( graph_init_params=GraphInitParams(
@ -190,17 +190,11 @@ class WorkflowEntry:
try: try:
# run node # run node
generator = node_instance.run() generator = node.run()
except Exception as e: except Exception as e:
logger.exception( logger.exception(f"error while running node, {workflow.id=}, {node.id=}, {node.type_=}, {node.version()=}")
"error while running node_instance, workflow_id=%s, node_id=%s, type=%s, version=%s", raise WorkflowNodeRunFailedError(node=node, error=str(e))
workflow.id, return node, generator
node_instance.id,
node_instance.node_type,
node_instance.version(),
)
raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e))
return node_instance, generator
@classmethod @classmethod
def run_free_node( def run_free_node(
@ -262,7 +256,7 @@ class WorkflowEntry:
node_cls = cast(type[BaseNode], node_cls) node_cls = cast(type[BaseNode], node_cls)
# init workflow run state # init workflow run state
node_instance: BaseNode = node_cls( node: BaseNode = node_cls(
id=str(uuid.uuid4()), id=str(uuid.uuid4()),
config=node_config, config=node_config,
graph_init_params=GraphInitParams( graph_init_params=GraphInitParams(
@ -297,17 +291,12 @@ class WorkflowEntry:
) )
# run node # run node
generator = node_instance.run() generator = node.run()
return node_instance, generator return node, generator
except Exception as e: except Exception as e:
logger.exception( logger.exception(f"error while running node, {node.id=}, {node.type_=}, {node.version()=}")
"error while running node_instance, node_id=%s, type=%s, version=%s", raise WorkflowNodeRunFailedError(node=node, error=str(e))
node_instance.id,
node_instance.node_type,
node_instance.version(),
)
raise WorkflowNodeRunFailedError(node_instance=node_instance, error=str(e))
@staticmethod @staticmethod
def handle_special_values(value: Optional[Mapping[str, Any]]) -> Mapping[str, Any] | None: def handle_special_values(value: Optional[Mapping[str, Any]]) -> Mapping[str, Any] | None:

@ -465,10 +465,10 @@ class WorkflowService:
node_id: str, node_id: str,
) -> WorkflowNodeExecution: ) -> WorkflowNodeExecution:
try: try:
node_instance, generator = invoke_node_fn() node, node_events = invoke_node_fn()
node_run_result: NodeRunResult | None = None node_run_result: NodeRunResult | None = None
for event in generator: for event in node_events:
if isinstance(event, RunCompletedEvent): if isinstance(event, RunCompletedEvent):
node_run_result = event.run_result node_run_result = event.run_result
@ -479,18 +479,18 @@ class WorkflowService:
if not node_run_result: if not node_run_result:
raise ValueError("Node run failed with no run result") raise ValueError("Node run failed with no run result")
# single step debug mode error handling return # single step debug mode error handling return
if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.continue_on_error: if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node.continue_on_error:
node_error_args: dict[str, Any] = { node_error_args: dict[str, Any] = {
"status": WorkflowNodeExecutionStatus.EXCEPTION, "status": WorkflowNodeExecutionStatus.EXCEPTION,
"error": node_run_result.error, "error": node_run_result.error,
"inputs": node_run_result.inputs, "inputs": node_run_result.inputs,
"metadata": {"error_strategy": node_instance.error_strategy}, "metadata": {"error_strategy": node.error_strategy},
} }
if node_instance.error_strategy is ErrorStrategy.DEFAULT_VALUE: if node.error_strategy is ErrorStrategy.DEFAULT_VALUE:
node_run_result = NodeRunResult( node_run_result = NodeRunResult(
**node_error_args, **node_error_args,
outputs={ outputs={
**node_instance.default_value_dict, **node.default_value_dict,
"error_message": node_run_result.error, "error_message": node_run_result.error,
"error_type": node_run_result.error_type, "error_type": node_run_result.error_type,
}, },
@ -509,7 +509,7 @@ class WorkflowService:
) )
error = node_run_result.error if not run_succeeded else None error = node_run_result.error if not run_succeeded else None
except WorkflowNodeRunFailedError as e: except WorkflowNodeRunFailedError as e:
node_instance = e.node_instance node = e.node
run_succeeded = False run_succeeded = False
node_run_result = None node_run_result = None
error = e.error error = e.error
@ -520,8 +520,8 @@ class WorkflowService:
workflow_id="", # This is a single-step execution, so no workflow ID workflow_id="", # This is a single-step execution, so no workflow ID
index=1, index=1,
node_id=node_id, node_id=node_id,
node_type=node_instance.node_type, node_type=node.type_,
title=node_instance.node_title, title=node.title,
elapsed_time=time.perf_counter() - start_at, elapsed_time=time.perf_counter() - start_at,
created_at=datetime.now(UTC).replace(tzinfo=None), created_at=datetime.now(UTC).replace(tzinfo=None),
finished_at=datetime.now(UTC).replace(tzinfo=None), finished_at=datetime.now(UTC).replace(tzinfo=None),

Loading…
Cancel
Save