|
|
|
|
@ -1,18 +1,21 @@
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
import logging
|
|
|
|
|
from collections.abc import Callable, Generator, Sequence
|
|
|
|
|
from datetime import UTC, datetime
|
|
|
|
|
from inspect import isgenerator
|
|
|
|
|
from typing import Any, Optional
|
|
|
|
|
from uuid import uuid4
|
|
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
from sqlalchemy import select
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
|
|
|
|
from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
|
|
|
|
|
from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
|
|
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
|
|
|
from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
|
|
|
|
|
from core.variables import Variable
|
|
|
|
|
from core.workflow.entities.node_entities import NodeRunResult
|
|
|
|
|
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
|
|
|
|
|
from core.workflow.entities.node_execution_entities import NodeExecution, NodeExecutionStatus
|
|
|
|
|
from core.workflow.errors import WorkflowNodeRunFailedError
|
|
|
|
|
from core.workflow.graph_engine.entities.event import InNodeEvent
|
|
|
|
|
@ -35,10 +38,10 @@ from models.workflow import (
|
|
|
|
|
WorkflowNodeExecutionTriggeredFrom,
|
|
|
|
|
WorkflowType,
|
|
|
|
|
)
|
|
|
|
|
from services.errors.app import WorkflowHashNotEqualError
|
|
|
|
|
from services.errors.app import IsDraftWorkflowError, WorkflowHashNotEqualError
|
|
|
|
|
from services.workflow.workflow_converter import WorkflowConverter
|
|
|
|
|
|
|
|
|
|
from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
|
|
|
|
|
from .workflow_draft_variable_service import WorkflowDraftVariableService, should_save_output_variables_for_draft
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WorkflowService:
|
|
|
|
|
@ -89,6 +92,21 @@ class WorkflowService:
|
|
|
|
|
# return draft workflow
|
|
|
|
|
return workflow
|
|
|
|
|
|
|
|
|
|
def get_published_workflow_by_id(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
|
|
|
|
# fetch published workflow by workflow_id
|
|
|
|
|
workflow = (
|
|
|
|
|
db.session.query(Workflow)
|
|
|
|
|
.filter(
|
|
|
|
|
Workflow.tenant_id == app_model.tenant_id,
|
|
|
|
|
Workflow.app_id == app_model.id,
|
|
|
|
|
Workflow.id == workflow_id,
|
|
|
|
|
)
|
|
|
|
|
.first()
|
|
|
|
|
)
|
|
|
|
|
if workflow.version == Workflow.VERSION_DRAFT:
|
|
|
|
|
raise IsDraftWorkflowError(f"Workflow is draft version, id={workflow_id}")
|
|
|
|
|
return workflow
|
|
|
|
|
|
|
|
|
|
def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
|
|
|
|
|
"""
|
|
|
|
|
Get published workflow
|
|
|
|
|
@ -227,7 +245,7 @@ class WorkflowService:
|
|
|
|
|
tenant_id=app_model.tenant_id,
|
|
|
|
|
app_id=app_model.id,
|
|
|
|
|
type=draft_workflow.type,
|
|
|
|
|
version=str(datetime.now(UTC).replace(tzinfo=None)),
|
|
|
|
|
version=Workflow.version_from_datetime(datetime.now(UTC).replace(tzinfo=None)),
|
|
|
|
|
graph=draft_workflow.graph,
|
|
|
|
|
features=draft_workflow.features,
|
|
|
|
|
created_by=account.id,
|
|
|
|
|
@ -291,8 +309,17 @@ class WorkflowService:
|
|
|
|
|
if not draft_workflow:
|
|
|
|
|
raise ValueError("Workflow not initialized")
|
|
|
|
|
|
|
|
|
|
# conv_vars = common_helpers.get_conversation_variables()
|
|
|
|
|
|
|
|
|
|
# run draft workflow node
|
|
|
|
|
start_at = time.perf_counter()
|
|
|
|
|
with Session(bind=db.engine) as session:
|
|
|
|
|
# TODO(QunatumGhost): inject conversation variables
|
|
|
|
|
# to variable pool.
|
|
|
|
|
draft_var_srv = WorkflowDraftVariableService(session)
|
|
|
|
|
|
|
|
|
|
conv_vars_list = draft_var_srv.list_conversation_variables(app_id=app_model.id)
|
|
|
|
|
conv_var_mapping = {v.name: v.get_value().value for v in conv_vars_list.variables}
|
|
|
|
|
|
|
|
|
|
node_execution = self._handle_node_run_result(
|
|
|
|
|
invoke_node_fn=lambda: WorkflowEntry.single_step_run(
|
|
|
|
|
@ -300,6 +327,7 @@ class WorkflowService:
|
|
|
|
|
node_id=node_id,
|
|
|
|
|
user_inputs=user_inputs,
|
|
|
|
|
user_id=account.id,
|
|
|
|
|
conversation_variables=conv_var_mapping,
|
|
|
|
|
),
|
|
|
|
|
start_at=start_at,
|
|
|
|
|
node_id=node_id,
|
|
|
|
|
@ -319,6 +347,27 @@ class WorkflowService:
|
|
|
|
|
|
|
|
|
|
# Convert node_execution to WorkflowNodeExecution after save
|
|
|
|
|
workflow_node_execution = repository.to_db_model(node_execution)
|
|
|
|
|
output = workflow_node_execution.outputs_dict or {}
|
|
|
|
|
|
|
|
|
|
exec_metadata = workflow_node_execution.execution_metadata_dict or {}
|
|
|
|
|
|
|
|
|
|
should_save = should_save_output_variables_for_draft(
|
|
|
|
|
invoke_from=InvokeFrom.DEBUGGER,
|
|
|
|
|
loop_id=exec_metadata.get(NodeRunMetadataKey.LOOP_ID, None),
|
|
|
|
|
iteration_id=exec_metadata.get(NodeRunMetadataKey.ITERATION_ID, None),
|
|
|
|
|
)
|
|
|
|
|
if not should_save:
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
# TODO(QuantumGhost): single step does not include loop_id or iteration_id in execution_metadata.
|
|
|
|
|
with Session(bind=db.engine) as session:
|
|
|
|
|
draft_var_srv = WorkflowDraftVariableService(session)
|
|
|
|
|
draft_var_srv.save_output_variables(
|
|
|
|
|
app_id=app_model.id,
|
|
|
|
|
node_id=workflow_node_execution.node_id,
|
|
|
|
|
node_type=NodeType(workflow_node_execution.node_type),
|
|
|
|
|
output=output,
|
|
|
|
|
)
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
return workflow_node_execution
|
|
|
|
|
|
|
|
|
|
@ -353,6 +402,7 @@ class WorkflowService:
|
|
|
|
|
) -> NodeExecution:
|
|
|
|
|
try:
|
|
|
|
|
node_instance, generator = invoke_node_fn()
|
|
|
|
|
generator = _inspect_generator(generator)
|
|
|
|
|
|
|
|
|
|
node_run_result: NodeRunResult | None = None
|
|
|
|
|
for event in generator:
|
|
|
|
|
@ -559,3 +609,19 @@ class WorkflowService:
|
|
|
|
|
|
|
|
|
|
session.delete(workflow)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _inspect_generator(gen: Generator[Any] | Any) -> Any:
|
|
|
|
|
if not isgenerator(gen):
|
|
|
|
|
return gen
|
|
|
|
|
|
|
|
|
|
def wrapper():
|
|
|
|
|
for item in gen:
|
|
|
|
|
logging.getLogger(__name__).info(
|
|
|
|
|
"received generator item, type=%s, value=%s",
|
|
|
|
|
type(item),
|
|
|
|
|
item,
|
|
|
|
|
)
|
|
|
|
|
yield item
|
|
|
|
|
|
|
|
|
|
return wrapper()
|
|
|
|
|
|