|
|
|
@ -1,6 +1,8 @@
|
|
|
|
from collections.abc import Mapping
|
|
|
|
from collections.abc import Mapping
|
|
|
|
from typing import Any, Optional, cast
|
|
|
|
from typing import Any, Optional, cast
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
|
|
|
|
|
|
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
|
|
|
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
|
|
|
from core.app.apps.base_app_runner import AppRunner
|
|
|
|
from core.app.apps.base_app_runner import AppRunner
|
|
|
|
from core.app.entities.queue_entities import (
|
|
|
|
from core.app.entities.queue_entities import (
|
|
|
|
@ -66,12 +68,19 @@ from core.workflow.workflow_entry import WorkflowEntry
|
|
|
|
from extensions.ext_database import db
|
|
|
|
from extensions.ext_database import db
|
|
|
|
from models.model import App
|
|
|
|
from models.model import App
|
|
|
|
from models.workflow import Workflow
|
|
|
|
from models.workflow import Workflow
|
|
|
|
|
|
|
|
from services.workflow_draft_variable_service import (
|
|
|
|
|
|
|
|
WorkflowDraftVariableService,
|
|
|
|
|
|
|
|
should_save_output_variables_for_draft,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WorkflowBasedAppRunner(AppRunner):
|
|
|
|
class WorkflowBasedAppRunner(AppRunner):
|
|
|
|
def __init__(self, queue_manager: AppQueueManager):
|
|
|
|
def __init__(self, queue_manager: AppQueueManager):
|
|
|
|
self.queue_manager = queue_manager
|
|
|
|
self.queue_manager = queue_manager
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_app_id(self) -> str:
|
|
|
|
|
|
|
|
raise NotImplementedError("not implemented")
|
|
|
|
|
|
|
|
|
|
|
|
def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph:
|
|
|
|
def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph:
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Init graph
|
|
|
|
Init graph
|
|
|
|
@ -376,6 +385,24 @@ class WorkflowBasedAppRunner(AppRunner):
|
|
|
|
in_loop_id=event.in_loop_id,
|
|
|
|
in_loop_id=event.in_loop_id,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# FIXME(QuantumGhost): rely on private state of queue_manager is not ideal.
|
|
|
|
|
|
|
|
should_save = should_save_output_variables_for_draft(
|
|
|
|
|
|
|
|
self.queue_manager._invoke_from,
|
|
|
|
|
|
|
|
loop_id=event.in_loop_id,
|
|
|
|
|
|
|
|
iteration_id=event.in_iteration_id,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
if should_save and outputs is not None:
|
|
|
|
|
|
|
|
with Session(bind=db.engine) as session:
|
|
|
|
|
|
|
|
draft_var_srv = WorkflowDraftVariableService(session)
|
|
|
|
|
|
|
|
draft_var_srv.save_output_variables(
|
|
|
|
|
|
|
|
app_id=self._get_app_id(),
|
|
|
|
|
|
|
|
node_id=event.node_id,
|
|
|
|
|
|
|
|
node_type=event.node_type,
|
|
|
|
|
|
|
|
output=outputs,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
session.commit()
|
|
|
|
|
|
|
|
|
|
|
|
elif isinstance(event, NodeRunFailedEvent):
|
|
|
|
elif isinstance(event, NodeRunFailedEvent):
|
|
|
|
self._publish_event(
|
|
|
|
self._publish_event(
|
|
|
|
QueueNodeFailedEvent(
|
|
|
|
QueueNodeFailedEvent(
|
|
|
|
|