diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index a3da5c1b49..5292cb5505 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -175,6 +175,11 @@ class PluginConfig(BaseSettings): default=15728640 * 12, ) + KAGENTS_URL: HttpUrl = Field( + description="KAGENTS JAVA API URL", + default=HttpUrl("http://192.168.8.41:10002"), + ) + class MarketplaceConfig(BaseSettings): """ diff --git a/api/controllers/console/app/error.py b/api/controllers/console/app/error.py index 1559f82d6e..8430c56bc4 100644 --- a/api/controllers/console/app/error.py +++ b/api/controllers/console/app/error.py @@ -1,6 +1,12 @@ from libs.exception import BaseHTTPException +class TenantNotFoundError(BaseHTTPException): + error_code = "tenant_not_found" + description = "tenant not found." + code = 401 + + class AppNotFoundError(BaseHTTPException): error_code = "app_not_found" description = "App not found." diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index cbbdd324ba..b48d1e3506 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1,3 +1,4 @@ +import datetime import json import logging from typing import cast @@ -15,11 +16,12 @@ from controllers.console.app.error import ( DraftWorkflowNotExist, DraftWorkflowNotSync, ) -from controllers.console.app.wraps import get_app_model +from controllers.console.app.wraps import get_app_model, get_tenant_id from controllers.console.wraps import account_initialization_required, setup_required from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom +from core.tools.entities.api_entities import ToolApiEntity, ToolProviderApiEntity from extensions.ext_database import db from factories import variable_factory from fields.workflow_fields import workflow_fields, workflow_pagination_fields @@ -29,10 +31,12 @@ from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required from models import App from models.account import Account -from models.model import AppMode +from models.model import ApiToken, AppMode from services.app_generate_service import AppGenerateService +from services.app_service import AppService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError +from services.tools.builtin_tools_manage_service import BuiltinToolManageService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService logger = logging.getLogger(__name__) @@ -138,6 +142,99 @@ class DraftWorkflowApi(Resource): } +class SyncWorkflowApi(Resource): + @get_tenant_id + def post(self, tenant_id): + """ + Sync workflow from kagents + """ + parser = reqparse.RequestParser() + parser.add_argument("conversation_id", type=uuid_value, location="json") + parser.add_argument("token", type=str, location="json") + parser.add_argument("planer_info", type=json, location="json") + + # mandatory filed + args = parser.parse_args() + args["mode"] = "workflow" + args["icon"] = "😀" + args["icon_background"] = "#FFEAD5" + args["name"] = "autoGenAgent " + str(datetime.datetime.now().replace(microsecond=0)) + planer_info = args["planer_info"] + + try: + app_service = AppService() + builtin_service = BuiltinToolManageService() + workflow_service = WorkflowService() + + app_model = app_service.create_app(tenant_id, args, current_user) + + tools_to_use: list[ToolApiEntity] = [] + providers_to_use: list[ToolProviderApiEntity] = [] + tools_id_from_planer = [str] + + installed_tools_list: list[ToolProviderApiEntity] = builtin_service.list_builtin_tools( + current_user.id, + current_user.current_tenant_id, + ) + + for item in planer_info: + tools_id_from_planer.append(item.get("agentId")) + + for _, tools_with_provider in enumerate(installed_tools_list): + for tool in tools_with_provider.tools: + if tool.name in tools_id_from_planer: + tools_to_use.append(tool) + providers_to_use.append(tools_with_provider) + + draft = workflow_service.generate_kagents_workflow(tools_to_use, providers_to_use) + + workflow = workflow_service.sync_draft_workflow( + app_model=app_model, + graph=draft, + features={"retriever_resource": {"enabled": True}}, + unique_hash="", + account=current_user, + environment_variables=[], + conversation_variables=[], + ) + + with Session(db.engine) as session: + workflow = workflow_service.publish_workflow( + session=session, + app_model=app_model, + account=current_user, + marked_name="", + marked_comment="", + ) + + app_model.workflow_id = workflow.id + db.session.commit() + + workflow_created_at = TimestampField().format(workflow.created_at) + + session.commit() + + key = ApiToken.generate_api_key('app-', 24) + api_token = ApiToken() + api_token.app_id = app_model.workflow_id + api_token.tenant_id = current_user.current_tenant_id + api_token.token = key + api_token.type = self.resource_type + db.session.add(api_token) + db.session.commit() + + return {"result": "success", + "api_key": key, + "workflow_id": app_model.workflow_id, + "workflow_created_at": workflow_created_at, + "workflow_name": args["name"]} + + + except Exception: + logging.exception("Unhandled exception") + raise InternalServerError() + + class AdvancedChatDraftWorkflowRunApi(Resource): @setup_required @login_required @@ -731,6 +828,10 @@ class WorkflowByIdApi(Resource): return None, 204 +api.add_resource( + SyncWorkflowApi, + "/apps/workflows/sync", +) api.add_resource( DraftWorkflowApi, "/apps//workflows/draft", diff --git a/api/controllers/console/app/wraps.py b/api/controllers/console/app/wraps.py index 9ad8c15847..e211cf6f52 100644 --- a/api/controllers/console/app/wraps.py +++ b/api/controllers/console/app/wraps.py @@ -2,7 +2,7 @@ from collections.abc import Callable from functools import wraps from typing import Optional, Union -from controllers.console.app.error import AppNotFoundError +from controllers.console.app.error import AppNotFoundError, TenantNotFoundError from extensions.ext_database import db from libs.login import current_user from models import App, AppMode @@ -53,3 +53,22 @@ def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[ return decorator else: return decorator(view) + + +def get_tenant_id(view: Optional[Callable] = None): + def decorator(view_func): + @wraps(view_func) + def decorated_view(*args, **kwargs): + current_tenant_id = current_user.current_tenant_id + if not current_tenant_id: + raise TenantNotFoundError() + + kwargs["tenant_id"] = current_tenant_id + return view_func(*args, **kwargs) + + return decorated_view + + if view is None: + return decorator + else: + return decorator(view) diff --git a/api/services/kagents_service.py b/api/services/kagents_service.py new file mode 100644 index 0000000000..b93d74609e --- /dev/null +++ b/api/services/kagents_service.py @@ -0,0 +1,46 @@ +import logging +from typing import Any, Optional + +import requests +from yarl import URL + +from configs import dify_config + + +# leave it here for later use +class KagentsService: + """ + Service for interacting with Kagents API. + """ + + _ENDPOINT_SEGMENTS = ("kagent", "chat", "conversationRecordDetail") + + def __init__(self, token) -> None: + self._token: URL = token + self._base_url: URL = URL(str(dify_config.KAGENTS_URL)) + self._session = requests.Session() + self._session.headers.update( + {"Content-Type": "application/json", "Accept": "*/*"} + ) + + def query_conversation_by_id( + self, token: str, conversation_id: str + ) -> Optional[list[dict[str, Any]]]: + endpoint = self._base_url.with_path("/".join(self._ENDPOINT_SEGMENTS)) + headers = {"token": self._token} + + try: + resp = self._session.post( + str(endpoint), headers=headers, json={"conversationId": conversation_id} + ) + resp.raise_for_status() + body = resp.json().get("responseBody", []) + except (requests.HTTPError, ValueError) as exc: + logging.exception("Error fetching conversation %s", conversation_id) + return None + + for entry in body: + if plan_info := entry.get("result").get("plan_info"): + return plan_info + + return None diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index bc213ccce6..e2b7ffe088 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1,16 +1,19 @@ +import copy import json import time from collections.abc import Callable, Generator, Sequence from datetime import UTC, datetime -from typing import Any, Optional +from typing import Any, Literal, Optional, Union from uuid import uuid4 +from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.orm import Session from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository +from core.tools.entities.api_entities import ToolApiEntity, ToolProviderApiEntity from core.variables import Variable from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus @@ -40,6 +43,220 @@ from services.workflow.workflow_converter import WorkflowConverter from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError +class OutputSelector(BaseModel): + variable: str + value_selector: list[str] + + +class Kvariable(BaseModel): + variable: str + label: str + type: str + max_length: int + required: Union[str, bool] + options: list[str] + + +class NodeData(BaseModel): + type: str + title: str + desc: str + variables: Optional[list[Kvariable]] = [] + selected: bool = False + + +class GeneralNodeData(NodeData): + tool_parameters: Optional[dict] = {} + tool_configurations: Optional[dict] = {} + provider_id: Optional[str] = None + provider_type: Optional[str] = None + provider_name: Optional[str] = None + tool_name: Optional[str] = None + tool_label: Optional[str] = None + tool_description: Optional[str] = None + is_team_authorization: Optional[bool] = None + output_schema: Optional[dict] = None + paramSchemas: Optional[list[dict]] = None + params: Optional[dict[str, str]] = None + outputs: Optional[list[OutputSelector]] = None + + +class Position(BaseModel): + x: Union[int, float] + y: Union[int, float] + + +class Node(BaseModel): + id: str + type: Literal["custom"] + data: GeneralNodeData + position: Position + targetPosition: Literal["left"] + sourcePosition: Literal["right"] + positionAbsolute: Position + width: int + height: int + selected: bool + + +class EdgeData(BaseModel): + sourceType: str + targetType: str + isInLoop: bool + + +class Edge(BaseModel): + id: str + type: Literal["custom"] + source: str + target: str + sourceHandle: str + targetHandle: str + data: EdgeData + zIndex: int = 0 + + +class Viewport(BaseModel): + x: float + y: float + zoom: float + + +class AutoGenWorkflow(BaseModel): + nodes: list[Node] + edges: list[Edge] + viewport: Viewport + + +WORKFLOW_INIT_DICT = {"viewport": {"x": 420, "y": 220, "zoom": 0.5}} + +NODE_TEMPLATE = Node( + id="0", + type="custom", + data=GeneralNodeData( + type="", + title="", + desc="", + variables=[Kvariable(variable="url", label="url", type="paragraph", max_length=482, required="", options=[])], + selected=False, + ), + position=Position(x=0, y=0), + targetPosition="left", + sourcePosition="right", + positionAbsolute=Position(x=0, y=0), + width=244, + height=90, + selected=False, +) + +EDGE_TEMPLATE = Edge( + id="template-edge-id", + type="custom", + source="source-id", + target="target-id", + sourceHandle="source", + targetHandle="target", + data=EdgeData(sourceType="start", targetType="tool", isInLoop=False), + zIndex=0, +) + +def generate_nodes(tools_to_use: list[ToolApiEntity], providers_to_use: list[ToolProviderApiEntity], draft): + position_x = 50 + position_y = 50 + now = int(time.time() * 1000) + + # Start node + start_node = copy.deepcopy(NODE_TEMPLATE) + start_node.id = str(now) + start_node.data = GeneralNodeData( + type="start", + title="Start", + desc="", + variables=[ + Kvariable(variable="text", label="text", type="paragraph", max_length=1000, required=True, options=[]) + ], + selected=True, + ) + time.sleep(0.15) + start_node.position = Position(x=position_x, y=position_y) + start_node.positionAbsolute = Position(x=position_x, y=position_y) + draft.setdefault("nodes", []).append(start_node.model_dump()) + + pre_id = start_node.id + for index, payload in enumerate(tools_to_use): + node = copy.deepcopy(NODE_TEMPLATE) + node.id = str(int(time.time() * 1000) + index) + provider = providers_to_use[index] + params = {item.name: "" for item in payload.parameters} + tool_parameters = { + item.name: {"type": "mixed", "value": f"{{{{#{pre_id}.text#}}}}"} for item in payload.parameters + } + param_schemas = [p.model_dump(mode="json") for p in payload.parameters] + + node.data = GeneralNodeData( + type="tool", + title=payload.label.zh_Hans, + desc=payload.description.zh_Hans, + provider_id=provider.id, + provider_type=provider.type, + provider_name=provider.name, + tool_name=payload.name, + tool_parameters=tool_parameters, + tool_label=payload.label.zh_Hans, + tool_description=payload.description.zh_Hans, + is_team_authorization=provider.is_team_authorization, + output_schema=payload.output_schema, + paramSchemas=param_schemas, + params=params, + ) + node.position = Position(x=position_x + (index + 1) * 300, y=position_y) + node.positionAbsolute = node.position + draft.setdefault("nodes", []).append(node.model_dump()) + pre_id = node.id + time.sleep(0.15) + + # End node + end_node = copy.deepcopy(NODE_TEMPLATE) + end_node.id = str(int(time.time() * 1000) + 100) + end_node.data = GeneralNodeData(type="end", + title="End", + desc="", + outputs=[OutputSelector( + variable="text", + value_selector=[pre_id, "text" ] + )], + selected=False) + end_node.position = Position(x=position_x + (len(tools_to_use) + 1) * 300, y=position_y + 100) + end_node.positionAbsolute = end_node.position + draft.setdefault("nodes", []).append(end_node.model_dump()) + + +def generate_edges(plan_info, draft): + nodes = draft.get("nodes", []) + edges = [] + + for i in range(len(nodes) - 1): + src = nodes[i]["id"] + tgt = nodes[i + 1]["id"] + if i == 0: + src_type = "start" + else: + src_type = "tool" + if i + 1 == len(nodes) - 1: + tgt_type = "end" + else: + tgt_type = "tool" + + edge = copy.deepcopy(EDGE_TEMPLATE) + edge.id = f"{src}-source-{tgt}-target" + edge.source = src + edge.target = tgt + edge.data = EdgeData(sourceType=src_type, targetType=tgt_type, isInLoop=False) + edges.append(edge.model_dump()) + + draft.setdefault("edges", []).extend(edges) + + class WorkflowService: """ Workflow Service @@ -531,3 +748,11 @@ class WorkflowService: session.delete(workflow) return True + + def generate_kagents_workflow( + self, tools_to_use: list[ToolApiEntity], providers_to_use: list[ToolProviderApiEntity] + ): + draft = copy.deepcopy(WORKFLOW_INIT_DICT) + generate_nodes(tools_to_use, providers_to_use, draft) + generate_edges(tools_to_use, draft) + return draft diff --git a/docker/docker-compose.middleware.yaml b/docker/docker-compose.middleware.yaml index 4081bfd818..e708b098fd 100644 --- a/docker/docker-compose.middleware.yaml +++ b/docker/docker-compose.middleware.yaml @@ -94,7 +94,7 @@ services: PLUGIN_REMOTE_INSTALLING_HOST: ${PLUGIN_DEBUGGING_HOST:-0.0.0.0} PLUGIN_REMOTE_INSTALLING_PORT: ${PLUGIN_DEBUGGING_PORT:-5003} PLUGIN_WORKING_PATH: ${PLUGIN_WORKING_PATH:-/app/storage/cwd} - FORCE_VERIFYING_SIGNATURE: ${FORCE_VERIFYING_SIGNATURE:-true} + FORCE_VERIFYING_SIGNATURE: false PYTHON_ENV_INIT_TIMEOUT: ${PLUGIN_PYTHON_ENV_INIT_TIMEOUT:-120} PLUGIN_MAX_EXECUTION_TIMEOUT: ${PLUGIN_MAX_EXECUTION_TIMEOUT:-600} PIP_MIRROR_URL: ${PIP_MIRROR_URL:-}