feat: Initializer of admin, anonymous account and workflows. (#6)

* feat: Tools from metrics 0-14

* feat: Tools from metrics 15-30

* feat: Tools from metrics 31-41. Fill with '.*' when param is empty.

* feat: Initialize the administrator account.

* feat: Initializer of anonymous and workflows.
pull/17608/head
MisluNotFound 1 year ago committed by GitHub
parent afac446656
commit b15a8af990
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -30,6 +30,8 @@ def create_app() -> DifyApp:
start_time = time.perf_counter()
app = create_flask_app_with_configs()
initialize_extensions(app)
from initializer import run_initializers
run_initializers(app)
end_time = time.perf_counter()
if dify_config.DEBUG:
logging.info(f"Finished create_app ({round((end_time - start_time) * 1000, 2)} ms)")

@ -14,4 +14,12 @@ class APOConfig(BaseSettings):
APO_VM_URL: str = Field(
description="apo vm url",
default="http://localhost:8080",
)
INITIAL_LANGUAGE: str = Field(
description="Initial workflows' language",
default="en-US"
)
WORKFLOW_DIR: str = Field(
description="Directory of workflows yaml file.",
default="./workflows"
)

@ -0,0 +1,48 @@
import json
from collections.abc import Generator
from typing import Any, Optional
import requests
from configs import dify_config
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from libs.apo_utils import APOUtils
class ContainerCpuThrottleContainerdSecondsTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
cadvisor_job_name = tool_parameters.get('cadvisor_job_name', '.*')
namespace = tool_parameters.get('namespace', '.*')
pod = tool_parameters.get('pod', '.*')
start_time = tool_parameters.get("startTime")
end_time = tool_parameters.get("endTime")
params = {
'metricName': '基础设施情况 - 容器CPU - 容器CPU节流时间 - Containerd',
'params': {
'cadvisor_job_name': cadvisor_job_name,
'namespace': namespace,
'pod': pod
},
'startTime': start_time,
'endTime': end_time,
'step': APOUtils.get_step(start_time, end_time),
}
resp = requests.post(dify_config.APO_BACKEND_URL + '/api/metric/query', json=params)
list = resp.json()['result']
list = json.dumps({
'type': 'metric',
'display': True,
'unit': list['unit'],
'data': {
'timeseries': list['timeseries']
}
})
yield self.create_text_message(list)

@ -0,0 +1,75 @@
identity:
name: 容器CPU节流时长(使用Containerd容器运行时,按容器和Pod统计)
author: APO
label:
en_US: Container CPU throttling time (Containerd runtime, aggregated by container and Pod)
zh_Hans: 容器CPU节流时长(使用Containerd容器运行时,按容器和Pod统计)
description:
human:
en_US: Container CPU throttling time (Containerd runtime, aggregated by container and Pod)
zh_Hans: 容器CPU节流时长(使用Containerd容器运行时,按容器和Pod统计)
llm: Container CPU throttling time (Containerd runtime, aggregated by container and Pod)
display:
type: metric
title: 基础设施情况 - 容器CPU - 容器CPU节流时间 - Containerd
unit: "s"
parameters:
- name: cadvisor_job_name
type: string
required: False
label:
en_US: cAdvisor job name
zh_Hans: cAdvisor任务名称
human_description:
en_US: cAdvisor job name
zh_Hans: cAdvisor任务名称
llm_description: cAdvisor job name
form: llm
- name: namespace
type: string
required: False
label:
en_US: Namespace
zh_Hans: 命名空间
human_description:
en_US: Namespace
zh_Hans: 命名空间
llm_description: Namespace
form: llm
- name: pod
type: string
required: False
label:
en_US: Pod name
zh_Hans: Pod名称
human_description:
en_US: Pod name
zh_Hans: Pod名称
llm_description: Pod name
form: llm
- name: startTime
type: number
required: true
label:
en_US: startTime
zh_Hans: startTime
pt_BR: startTime
human_description:
en_US: Data query start time
zh_Hans: 开始时间 (微秒)
pt_BR: Data query start time
llm_description: Data query start time
form: llm
- name: endTime
type: number
required: true
label:
en_US: endTime
zh_Hans: endTime
pt_BR: endTime
human_description:
en_US: Data query end time
zh_Hans: 结束时间 (微秒)
pt_BR: Data query end time
llm_description: Data query start time
form: llm

@ -0,0 +1,44 @@
import json
from collections.abc import Generator
from typing import Any, Optional
import requests
from configs import dify_config
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from libs.apo_utils import APOUtils
class LinuxNetworkDroppedPacketsReceiveTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
cluster = tool_parameters.get('cluster', '.*')
start_time = tool_parameters.get("startTime")
end_time = tool_parameters.get("endTime")
params = {
'metricName': '集群总览 - 总览 - 网络饱和 - 丢包数 - Linux Packets dropped (receive)',
'params': {
'cluster': cluster
},
'startTime': start_time,
'endTime': end_time,
'step': APOUtils.get_step(start_time, end_time),
}
resp = requests.post(dify_config.APO_BACKEND_URL + '/api/metric/query', json=params)
list = resp.json()['result']
list = json.dumps({
'type': 'metric',
'display': True,
'unit': list['unit'],
'data': {
'timeseries': list['timeseries']
}
})
yield self.create_text_message(list)

@ -0,0 +1,53 @@
identity:
name: Linux集群网络接收丢包数
author: APO
label:
en_US: Linux cluster network receive packet drops
zh_Hans: Linux集群网络接收丢包数
description:
human:
en_US: Linux cluster network receive packet drops
zh_Hans: Linux集群网络接收丢包数
llm: Linux cluster network receive packet drops
display:
type: metric
title: 集群总览 - 总览 - 网络饱和 - 丢包数 - Linux Packets dropped (receive)
unit: "short"
parameters:
- name: cluster
type: string
required: False
label:
en_US: Cluster name
zh_Hans: 集群名称
human_description:
en_US: Cluster name
zh_Hans: 集群名称
llm_description: Cluster name
form: llm
- name: startTime
type: number
required: true
label:
en_US: startTime
zh_Hans: startTime
pt_BR: startTime
human_description:
en_US: Data query start time
zh_Hans: 开始时间 (微秒)
pt_BR: Data query start time
llm_description: Data query start time
form: llm
- name: endTime
type: number
required: true
label:
en_US: endTime
zh_Hans: endTime
pt_BR: endTime
human_description:
en_US: Data query end time
zh_Hans: 结束时间 (微秒)
pt_BR: Data query end time
llm_description: Data query start time
form: llm

@ -0,0 +1,44 @@
import json
from collections.abc import Generator
from typing import Any, Optional
import requests
from configs import dify_config
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from libs.apo_utils import APOUtils
class LinuxNetworkDroppedPacketsTransmitTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
cluster = tool_parameters.get('cluster', '.*')
start_time = tool_parameters.get("startTime")
end_time = tool_parameters.get("endTime")
params = {
'metricName': '集群总览 - 总览 - 网络饱和 - 丢包数 - Linux Packets dropped (transmit)',
'params': {
'cluster': cluster
},
'startTime': start_time,
'endTime': end_time,
'step': APOUtils.get_step(start_time, end_time),
}
resp = requests.post(dify_config.APO_BACKEND_URL + '/api/metric/query', json=params)
list = resp.json()['result']
list = json.dumps({
'type': 'metric',
'display': True,
'unit': list['unit'],
'data': {
'timeseries': list['timeseries']
}
})
yield self.create_text_message(list)

@ -0,0 +1,53 @@
identity:
name: Linux集群网络发送丢包数
author: APO
label:
en_US: Linux cluster network transmit packet drops
zh_Hans: Linux集群网络发送丢包数
description:
human:
en_US: Linux cluster network transmit packet drops
zh_Hans: Linux集群网络发送丢包数
llm: Linux cluster network transmit packet drops
display:
type: metric
title: 集群总览 - 总览 - 网络饱和 - 丢包数 - Linux Packets dropped (transmit)
unit: "short"
parameters:
- name: cluster
type: string
required: False
label:
en_US: Cluster name
zh_Hans: 集群名称
human_description:
en_US: Cluster name
zh_Hans: 集群名称
llm_description: Cluster name
form: llm
- name: startTime
type: number
required: true
label:
en_US: startTime
zh_Hans: startTime
pt_BR: startTime
human_description:
en_US: Data query start time
zh_Hans: 开始时间 (微秒)
pt_BR: Data query start time
llm_description: Data query start time
form: llm
- name: endTime
type: number
required: true
label:
en_US: endTime
zh_Hans: endTime
pt_BR: endTime
human_description:
en_US: Data query end time
zh_Hans: 结束时间 (微秒)
pt_BR: Data query end time
llm_description: Data query start time
form: llm

@ -0,0 +1,44 @@
import json
from collections.abc import Generator
from typing import Any, Optional
import requests
from configs import dify_config
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from libs.apo_utils import APOUtils
class NodeCpuUtilizationLinuxTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
cluster = tool_parameters.get('cluster', '.*')
start_time = tool_parameters.get("startTime")
end_time = tool_parameters.get("endTime")
params = {
'metricName': '集群总览 - 节点资源使用 - 节点CPU使用率 - Linux',
'params': {
'cluster': cluster
},
'startTime': start_time,
'endTime': end_time,
'step': APOUtils.get_step(start_time, end_time),
}
resp = requests.post(dify_config.APO_BACKEND_URL + '/api/metric/query', json=params)
list = resp.json()['result']
list = json.dumps({
'type': 'metric',
'display': True,
'unit': list['unit'],
'data': {
'timeseries': list['timeseries']
}
})
yield self.create_text_message(list)

@ -0,0 +1,53 @@
identity:
name: Linux节点CPU使用率
author: APO
label:
en_US: Linux node CPU utilization rate
zh_Hans: Linux节点CPU使用率
description:
human:
en_US: Linux node CPU utilization rate
zh_Hans: Linux节点CPU使用率
llm: Linux node CPU utilization rate
display:
type: metric
title: 集群总览 - 节点资源使用 - 节点CPU使用率 - Linux
unit: "percentunit"
parameters:
- name: cluster
type: string
required: False
label:
en_US: Cluster name
zh_Hans: 集群名称
human_description:
en_US: Cluster name
zh_Hans: 集群名称
llm_description: Cluster name
form: llm
- name: startTime
type: number
required: true
label:
en_US: startTime
zh_Hans: startTime
pt_BR: startTime
human_description:
en_US: Data query start time
zh_Hans: 开始时间 (微秒)
pt_BR: Data query start time
llm_description: Data query start time
form: llm
- name: endTime
type: number
required: true
label:
en_US: endTime
zh_Hans: endTime
pt_BR: endTime
human_description:
en_US: Data query end time
zh_Hans: 结束时间 (微秒)
pt_BR: Data query end time
llm_description: Data query start time
form: llm

@ -0,0 +1,44 @@
import json
from collections.abc import Generator
from typing import Any, Optional
import requests
from configs import dify_config
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from libs.apo_utils import APOUtils
class NodeMemoryUsageLinuxTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
conversation_id: Optional[str] = None,
app_id: Optional[str] = None,
message_id: Optional[str] = None,
) -> Generator[ToolInvokeMessage, None, None]:
cluster = tool_parameters.get('cluster', '.*')
start_time = tool_parameters.get("startTime")
end_time = tool_parameters.get("endTime")
params = {
'metricName': '集群总览 - 节点资源使用 - 节点内存使用量 - Linux',
'params': {
'cluster': cluster
},
'startTime': start_time,
'endTime': end_time,
'step': APOUtils.get_step(start_time, end_time),
}
resp = requests.post(dify_config.APO_BACKEND_URL + '/api/metric/query', json=params)
list = resp.json()['result']
list = json.dumps({
'type': 'metric',
'display': True,
'unit': list['unit'],
'data': {
'timeseries': list['timeseries']
}
})
yield self.create_text_message(list)

@ -0,0 +1,53 @@
identity:
name: Linux节点内存使用字节数
author: APO
label:
en_US: Linux node memory usage in bytes
zh_Hans: Linux节点内存使用字节数
description:
human:
en_US: Linux node memory usage in bytes
zh_Hans: Linux节点内存使用字节数
llm: Linux node memory usage in bytes
display:
type: metric
title: 集群总览 - 节点资源使用 - 节点内存使用量 - Linux
unit: "bytes"
parameters:
- name: cluster
type: string
required: False
label:
en_US: Cluster name
zh_Hans: 集群名称
human_description:
en_US: Cluster name
zh_Hans: 集群名称
llm_description: Cluster name
form: llm
- name: startTime
type: number
required: true
label:
en_US: startTime
zh_Hans: startTime
pt_BR: startTime
human_description:
en_US: Data query start time
zh_Hans: 开始时间 (微秒)
pt_BR: Data query start time
llm_description: Data query start time
form: llm
- name: endTime
type: number
required: true
label:
en_US: endTime
zh_Hans: endTime
pt_BR: endTime
human_description:
en_US: Data query end time
zh_Hans: 结束时间 (微秒)
pt_BR: Data query end time
llm_description: Data query start time
form: llm

@ -0,0 +1,11 @@
from flask import Flask
from . import (
account,
workflow
)
from .decorator import _initializers
def run_initializers(app: Flask):
with app.app_context():
for func, _ in sorted(_initializers, key=lambda x: x[1]):
func()

@ -0,0 +1,28 @@
from models import Account
from extensions.ext_database import db
from services.account_service import RegisterService, AccountService, TenantService
from .decorator import initializer
from .admin import get_admin
@initializer(priority=1)
def init_admin_account():
if db.session.query(Account).filter_by(name="admin").first():
return
registerService = RegisterService()
registerService.setup("admin@apo.com", "admin", "APO2024@admin", "")
@initializer(priority=2)
def init_anonymous_account():
if db.session.query(Account).filter_by(name="anonymous").first():
return
accountService = AccountService()
anonymous = accountService.create_account("anonymous@apo.com", "anonymous", "en-US", "APO2024@anonymous", True)
admin = get_admin()
tenantService = TenantService()
tenant = tenantService.get_current_tenant_by_account(admin)
tenantService.create_tenant_member(tenant, anonymous)

@ -0,0 +1,11 @@
from extensions.ext_database import db
from models import Account, Tenant, TenantAccountJoin, TenantAccountRole
def get_admin() -> Account:
admin = db.session.query(Account).filter_by(name="admin").first()
tenant_account_join = db.session.query(TenantAccountJoin).filter_by(account_id=admin.id, role=TenantAccountRole.OWNER).first()
tenant = db.session.query(Tenant).filter_by(id=tenant_account_join.tenant_id).first()
admin.current_tenant = tenant
admin.current_tenant_id = tenant.id
return admin

@ -0,0 +1,9 @@
from typing import Callable, List, Tuple
_initializers: List[Tuple[Callable, int]] = []
def initializer(priority: int = 10) -> Callable:
def decorator(func: Callable) -> Callable:
_initializers.append((func, priority))
return func
return decorator

@ -0,0 +1,203 @@
import os
import yaml
import logging
from sqlalchemy.exc import SQLAlchemyError
from services.app_dsl_service import AppDslService
from services.workflow_service import WorkflowService
from extensions.ext_database import db
from .decorator import initializer
from .admin import get_admin
from models import App, ApiToken, Workflow, InstalledApp
from contexts import tenant_id
from configs.app_config import APOConfig
from typing import Union
@initializer(priority=3)
def init_workflow():
apo_config = APOConfig()
initial_language = apo_config.INITIAL_LANGUAGE
workflow_dir = apo_config.WORKFLOW_DIR
sub_dir = ''
if initial_language == 'en-US':
sub_dir = 'en'
elif initial_language == 'zh-Hans':
sub_dir = 'zh'
workflows = []
dir = f'{workflow_dir}/{sub_dir}'
if not os.path.isdir(dir):
raise ValueError(f"Invalid directory: {dir}")
for file_entry in os.scandir(dir):
if not file_entry.name.endswith('.yaml') and not file_entry.name.endswith('.yml') or file_entry.name.startswith('.'):
continue
try:
with open(file_entry.path, 'r', encoding='utf-8') as file:
content = file.read()
workflows.append(content)
except Exception as e:
logging.ERROR(f"Failed to read file: {file_entry.path}")
admin = get_admin()
try:
original_token = tenant_id.set(admin.current_tenant_id)
for w in workflows:
result = _check_workflow_to_update(db.session, w, admin)
if result is False:
continue
import_service = AppDslService(db.session)
workflow_service = WorkflowService()
'''Import an app or update existing app'''
imp = import_service.import_app(
account=admin,
import_mode="yaml-content",
yaml_content=w,
app_id=result if result else None
)
app_model = (
db.session.query(App)
.filter(
App.id == imp.app_id,
App.tenant_id == admin.current_tenant_id,
App.status == "normal"
)
.first()
)
workflow_service.publish_workflow(app_model=app_model, account=admin)
if result is None:
_generate_api_key(db.session, imp.app_id, admin)
db.session.commit()
_adjust_workflows(initial_language)
except Exception as e:
db.session.rollback()
raise
finally:
tenant_id.reset(original_token)
def _check_workflow_to_update(session, content, account) -> Union[str, bool, None]:
"""Check if the workflow needs to be updated or created.
- None: Need to create a new App.
- str(app_id): Need to update the existing App.
- False: No need to update.
"""
try:
content_dict = yaml.safe_load(content)
app_name = content_dict.get('app', {}).get('name')
graph = content_dict.get('workflow', {}).get('graph')
features = content_dict.get('workflow', {}).get('features')
app_model = (
session.query(App)
.filter(
App.name == app_name,
App.tenant_id == account.current_tenant_id,
App.status == "normal",
)
.first()
)
if not app_model:
return None
workflow = (
session.query(Workflow)
.filter(
Workflow.graph == graph,
Workflow.features == features,
)
.first()
)
if workflow:
return False
else:
return app_model.id
except Exception as e:
logging.error(f"Failed to check workflow: {str(e)}")
return False
def _generate_api_key(session, app_id, account, key=None):
if not app_id or not account:
return
key = key or ApiToken.generate_api_key('app-', 24)
api_token = ApiToken(
app_id=app_id,
tenant_id=account.current_tenant_id,
token=key,
type='app'
)
session.add(api_token)
def _adjust_workflows(language):
to_adjust = {
"zh-Hans": {
"告警有效性分析": {
"app_id": "dcfeddd2-d6e7-4dc4-a284-e48ab56bf6af",
"api_token": "app-x0mOJKUvhr35BOISSeNmsfXj"
},
"告警简单根因分析": {
"app_id": "a2d4d3aa-3401-4393-859e-df051bdd5cd1"
}
},
"en-US": {
"alert validity confirmation": {
"app_id": "dcfeddd2-d6e7-4dc4-a284-e48ab56bf6af",
"api_token": "app-x0mOJKUvhr35BOISSeNmsfXj"
},
"alert simple root cause analysis": {
"app_id": "a2d4d3aa-3401-4393-859e-df051bdd5cd1"
}
}
}
try:
config = to_adjust.get(language)
for app_name, field_config in config.items():
app = db.session.query(App).filter_by(name=app_name).first()
new_app_id = field_config["app_id"]
if not app:
continue
if app.id == new_app_id:
continue
origin_app_id = app.id
update_mappings = [
(ApiToken, {"app_id": new_app_id}),
(Workflow, {"app_id": new_app_id}),
(InstalledApp, {"app_id": new_app_id})
]
for model, update_values in update_mappings:
db.session.query(model)\
.filter_by(app_id=origin_app_id)\
.update(update_values, synchronize_session=False)
app.id = new_app_id
db.session.merge(app)
if "api_token" in field_config:
new_token = field_config["api_token"]
db.session.query(ApiToken)\
.filter_by(app_id=new_app_id)\
.update({"token": new_token}, synchronize_session=False)
db.session.commit()
print(f"Successfully updated {app_name} (ID: {origin_app_id}{new_app_id})")
except SQLAlchemyError as e:
db.session.rollback()
logging.ERROR(f"Database error occurred: {str(e)}")
raise
except Exception as e:
db.session.rollback()
logging.ERROR(f"Unexpected error: {str(e)}")
raise
Loading…
Cancel
Save