Merge pull request #43 from langgenius/main

[pull] main from langgenius:main
pull/21616/head
qiuqiua 11 months ago committed by GitHub
commit 621922cc50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -8,7 +8,7 @@ inputs:
uv-version:
description: UV version to set up
required: true
default: '0.6.14'
default: '~=0.7.11'
uv-lockfile:
description: Path to the UV lockfile to restore cache from
required: true

@ -83,9 +83,15 @@ jobs:
compose-file: |
docker/docker-compose.middleware.yaml
services: |
db
redis
sandbox
ssrf_proxy
- name: setup test config
run: |
cp api/tests/integration_tests/.env.example api/tests/integration_tests/.env
- name: Run Workflow
run: uv run --project api bash dev/pytest/pytest_workflow.sh

@ -0,0 +1,28 @@
name: Deploy RAG Dev
permissions:
contents: read
on:
workflow_run:
workflows: ["Build and Push API & Web"]
branches:
- "deploy/rag-dev"
types:
- completed
jobs:
deploy:
runs-on: ubuntu-latest
if: |
github.event.workflow_run.conclusion == 'success' &&
github.event.workflow_run.head_branch == 'deploy/rag-dev'
steps:
- name: Deploy to server
uses: appleboy/ssh-action@v0.1.8
with:
host: ${{ secrets.RAG_SSH_HOST }}
username: ${{ secrets.SSH_USER }}
key: ${{ secrets.SSH_PRIVATE_KEY }}
script: |
${{ vars.SSH_SCRIPT || secrets.SSH_SCRIPT }}

@ -10,6 +10,7 @@ yq eval '.services["elasticsearch"].ports += ["9200:9200"]' -i docker/docker-com
yq eval '.services.couchbase-server.ports += ["8091-8096:8091-8096"]' -i docker/docker-compose.yaml
yq eval '.services.couchbase-server.ports += ["11210:11210"]' -i docker/docker-compose.yaml
yq eval '.services.tidb.ports += ["4000:4000"]' -i docker/tidb/docker-compose.yaml
yq eval '.services.oceanbase.ports += ["2881:2881"]' -i docker/docker-compose.yaml
yq eval '.services.opengauss.ports += ["6600:6600"]' -i docker/docker-compose.yaml
echo "Ports exposed for sandbox, weaviate, tidb, qdrant, chroma, milvus, pgvector, pgvecto-rs, elasticsearch, couchbase, opengauss"

@ -31,6 +31,13 @@ jobs:
with:
persist-credentials: false
- name: Free Disk Space
uses: endersonmenezes/free-disk-space@v2
with:
remove_dotnet: true
remove_haskell: true
remove_tool_cache: true
- name: Setup UV and Python
uses: ./.github/actions/setup-uv
with:
@ -59,7 +66,7 @@ jobs:
tidb
tiflash
- name: Set up Vector Stores (Weaviate, Qdrant, PGVector, Milvus, PgVecto-RS, Chroma, MyScale, ElasticSearch, Couchbase)
- name: Set up Vector Stores (Weaviate, Qdrant, PGVector, Milvus, PgVecto-RS, Chroma, MyScale, ElasticSearch, Couchbase, OceanBase)
uses: hoverkraft-tech/compose-action@v2.0.2
with:
compose-file: |
@ -75,8 +82,15 @@ jobs:
pgvector
chroma
elasticsearch
oceanbase
- name: setup test config
run: |
echo $(pwd)
ls -lah .
cp api/tests/integration_tests/.env.example api/tests/integration_tests/.env
- name: Check TiDB Ready
- name: Check VDB Ready (TiDB)
run: uv run --project api python api/tests/integration_tests/vdb/tidb_vector/check_tiflash_ready.py
- name: Test Vector Stores

11
.gitignore vendored

@ -179,6 +179,7 @@ docker/volumes/pgvecto_rs/data/*
docker/volumes/couchbase/*
docker/volumes/oceanbase/*
docker/volumes/plugin_daemon/*
docker/volumes/matrixone/*
!docker/volumes/oceanbase/init.d
docker/nginx/conf.d/default.conf
@ -192,12 +193,12 @@ sdks/python-client/dist
sdks/python-client/dify_client.egg-info
.vscode/*
!.vscode/launch.json
!.vscode/launch.json.template
!.vscode/README.md
pyrightconfig.json
api/.vscode
.idea/
.vscode
# pnpm
/.pnpm-store
@ -207,3 +208,9 @@ plugins.jsonl
# mise
mise.toml
# Next.js build output
.next/
# AI Assistant
.roo/

14
.vscode/README.md vendored

@ -0,0 +1,14 @@
# Debugging with VS Code
This `launch.json.template` file provides various debug configurations for the Dify project within VS Code / Cursor. To use these configurations, you should copy the contents of this file into a new file named `launch.json` in the same `.vscode` directory.
## How to Use
1. **Create `launch.json`**: If you don't have one, create a file named `launch.json` inside the `.vscode` directory.
2. **Copy Content**: Copy the entire content from `launch.json.template` into your newly created `launch.json` file.
3. **Select Debug Configuration**: Go to the Run and Debug view in VS Code / Cursor (Ctrl+Shift+D or Cmd+Shift+D).
4. **Start Debugging**: Select the desired configuration from the dropdown menu and click the green play button.
## Tips
- If you need to debug with Edge browser instead of Chrome, modify the `serverReadyAction` configuration in the "Next.js: debug full stack" section, change `"debugWithChrome"` to `"debugWithEdge"` to use Microsoft Edge for debugging.

@ -0,0 +1,68 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Flask API",
"type": "debugpy",
"request": "launch",
"module": "flask",
"env": {
"FLASK_APP": "app.py",
"FLASK_ENV": "development",
"GEVENT_SUPPORT": "True"
},
"args": [
"run",
"--host=0.0.0.0",
"--port=5001",
"--no-debugger",
"--no-reload"
],
"jinja": true,
"justMyCode": true,
"cwd": "${workspaceFolder}/api",
"python": "${workspaceFolder}/api/.venv/bin/python"
},
{
"name": "Python: Celery Worker (Solo)",
"type": "debugpy",
"request": "launch",
"module": "celery",
"env": {
"GEVENT_SUPPORT": "True"
},
"args": [
"-A",
"app.celery",
"worker",
"-P",
"solo",
"-c",
"1",
"-Q",
"dataset,generation,mail,ops_trace",
"--loglevel",
"INFO"
],
"justMyCode": false,
"cwd": "${workspaceFolder}/api",
"python": "${workspaceFolder}/api/.venv/bin/python"
},
{
"name": "Next.js: debug full stack",
"type": "node",
"request": "launch",
"program": "${workspaceFolder}/web/node_modules/next/dist/bin/next",
"runtimeArgs": ["--inspect"],
"skipFiles": ["<node_internals>/**"],
"serverReadyAction": {
"action": "debugWithChrome",
"killOnServerStop": true,
"pattern": "- Local:.+(https?://.+)",
"uriFormat": "%s",
"webRoot": "${workspaceFolder}/web"
},
"cwd": "${workspaceFolder}/web"
}
]
}

@ -226,6 +226,15 @@ Deploy Dify to AWS with [CDK](https://aws.amazon.com/cdk/)
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Using Alibaba Cloud Computing Nest
Quickly deploy Dify to Alibaba cloud with [Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Using Alibaba Cloud Data Management
One-Click deploy Dify to Alibaba Cloud with [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contributing
For those who'd like to contribute code, see our [Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).

@ -209,6 +209,14 @@ docker compose up -d
- [AWS CDK بواسطة @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### استخدام Alibaba Cloud للنشر
[بسرعة نشر Dify إلى سحابة علي بابا مع عش الحوسبة السحابية علي بابا](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### استخدام Alibaba Cloud Data Management للنشر
انشر Dify على علي بابا كلاود بنقرة واحدة باستخدام [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## المساهمة
لأولئك الذين يرغبون في المساهمة، انظر إلى [دليل المساهمة](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md) لدينا.

@ -225,6 +225,15 @@ GitHub-এ ডিফাইকে স্টার দিয়ে রাখুন
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud ব্যবহার করে ডিপ্লয়
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management ব্যবহার করে ডিপ্লয়
[Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contributing
যারা কোড অবদান রাখতে চান, তাদের জন্য আমাদের [অবদান নির্দেশিকা] দেখুন (https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)।

@ -221,6 +221,15 @@ docker compose up -d
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### 使用 阿里云计算巢 部署
使用 [阿里云计算巢](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88) 将 Dify 一键部署到 阿里云
#### 使用 阿里云数据管理DMS 部署
使用 [阿里云数据管理DMS](https://help.aliyun.com/zh/dms/dify-in-invitational-preview) 将 Dify 一键部署到 阿里云
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=langgenius/dify&type=Date)](https://star-history.com/#langgenius/dify&Date)

@ -221,6 +221,15 @@ Bereitstellung von Dify auf AWS mit [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Ein-Klick-Bereitstellung von Dify in der Alibaba Cloud mit [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contributing
Falls Sie Code beitragen möchten, lesen Sie bitte unseren [Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md). Gleichzeitig bitten wir Sie, Dify zu unterstützen, indem Sie es in den sozialen Medien teilen und auf Veranstaltungen und Konferenzen präsentieren.

@ -221,6 +221,15 @@ Despliegue Dify en AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Despliega Dify en Alibaba Cloud con un solo clic con [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contribuir
Para aquellos que deseen contribuir con código, consulten nuestra [Guía de contribución](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).

@ -219,6 +219,15 @@ Déployez Dify sur AWS en utilisant [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK par @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Déployez Dify en un clic sur Alibaba Cloud avec [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contribuer
Pour ceux qui souhaitent contribuer du code, consultez notre [Guide de contribution](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).

@ -155,7 +155,7 @@ DifyはオープンソースのLLMアプリケーション開発プラットフ
[こちら](https://dify.ai)のDify Cloudサービスを利用して、セットアップ不要で試すことができます。サンドボックスプランには、200回のGPT-4呼び出しが無料で含まれています。
- **Dify Community Editionのセルフホスティング</br>**
この[スタートガイド](#quick-start)を使用して、ローカル環境でDifyを簡単に実行できます。
この[スタートガイド](#クイックスタート)を使用して、ローカル環境でDifyを簡単に実行できます。
詳しくは[ドキュメント](https://docs.dify.ai)をご覧ください。
- **企業/組織向けのDify</br>**
@ -220,6 +220,13 @@ docker compose up -d
##### AWS
- [@KevinZhaoによるAWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
[Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/) を利用して、DifyをAlibaba Cloudへワンクリックでデプロイできます
## 貢献
コードに貢献したい方は、[Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)を参照してください。

@ -219,6 +219,15 @@ wa'logh nIqHom neH ghun deployment toy'wI' [CDK](https://aws.amazon.com/cdk/) lo
##### AWS
- [AWS CDK qachlot @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
[Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contributing
For those who'd like to contribute code, see our [Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).

@ -213,6 +213,15 @@ Dify를 Kubernetes에 배포하고 프리미엄 스케일링 설정을 구성했
##### AWS
- [KevinZhao의 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
[Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)를 통해 원클릭으로 Dify를 Alibaba Cloud에 배포할 수 있습니다
## 기여
코드에 기여하고 싶은 분들은 [기여 가이드](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)를 참조하세요.

@ -218,6 +218,15 @@ Implante o Dify na AWS usando [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK por @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Implante o Dify na Alibaba Cloud com um clique usando o [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Contribuindo
Para aqueles que desejam contribuir com código, veja nosso [Guia de Contribuição](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).

@ -219,6 +219,15 @@ Uvedite Dify v AWS z uporabo [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK by @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Z enim klikom namestite Dify na Alibaba Cloud z [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Prispevam
Za tiste, ki bi radi prispevali kodo, si oglejte naš vodnik za prispevke . Hkrati vas prosimo, da podprete Dify tako, da ga delite na družbenih medijih ter na dogodkih in konferencah.

@ -212,6 +212,15 @@ Dify'ı bulut platformuna tek tıklamayla dağıtın [terraform](https://www.ter
##### AWS
- [AWS CDK tarafından @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
[Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/) kullanarak Dify'ı tek tıkla Alibaba Cloud'a dağıtın
## Katkıda Bulunma
Kod katkısında bulunmak isteyenler için [Katkı Kılavuzumuza](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md) bakabilirsiniz.

@ -224,6 +224,15 @@ Dify 的所有功能都提供相應的 API因此您可以輕鬆地將 Dify
- [由 @KevinZhao 提供的 AWS CDK](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### 使用 阿里云计算巢進行部署
[阿里云](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### 使用 阿里雲數據管理DMS 進行部署
透過 [阿里雲數據管理DMS](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/),一鍵將 Dify 部署至阿里雲
## 貢獻
對於想要貢獻程式碼的開發者,請參閱我們的[貢獻指南](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)。

@ -214,6 +214,16 @@ Triển khai Dify trên AWS bằng [CDK](https://aws.amazon.com/cdk/)
##### AWS
- [AWS CDK bởi @KevinZhao](https://github.com/aws-samples/solution-for-deploying-dify-on-aws)
#### Alibaba Cloud
[Alibaba Cloud Computing Nest](https://computenest.console.aliyun.com/service/instance/create/default?type=user&ServiceName=Dify%E7%A4%BE%E5%8C%BA%E7%89%88)
#### Alibaba Cloud Data Management
Triển khai Dify lên Alibaba Cloud chỉ với một cú nhấp chuột bằng [Alibaba Cloud Data Management](https://www.alibabacloud.com/help/en/dms/dify-in-invitational-preview/)
## Đóng góp
Đối với những người muốn đóng góp mã, xem [Hướng dẫn Đóng góp](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md) của chúng tôi.

@ -137,7 +137,7 @@ WEB_API_CORS_ALLOW_ORIGINS=http://127.0.0.1:3000,*
CONSOLE_CORS_ALLOW_ORIGINS=http://127.0.0.1:3000,*
# Vector database configuration
# support: weaviate, qdrant, milvus, myscale, relyt, pgvecto_rs, pgvector, pgvector, chroma, opensearch, tidb_vector, couchbase, vikingdb, upstash, lindorm, oceanbase, opengauss, tablestore
# support: weaviate, qdrant, milvus, myscale, relyt, pgvecto_rs, pgvector, pgvector, chroma, opensearch, tidb_vector, couchbase, vikingdb, upstash, lindorm, oceanbase, opengauss, tablestore, matrixone
VECTOR_STORE=weaviate
# Weaviate configuration
@ -294,6 +294,13 @@ VIKINGDB_SCHEMA=http
VIKINGDB_CONNECTION_TIMEOUT=30
VIKINGDB_SOCKET_TIMEOUT=30
# Matrixone configration
MATRIXONE_HOST=127.0.0.1
MATRIXONE_PORT=6001
MATRIXONE_USER=dump
MATRIXONE_PASSWORD=111
MATRIXONE_DATABASE=dify
# Lindorm configuration
LINDORM_URL=http://ld-*******************-proxy-search-pub.lindorm.aliyuncs.com:30070
LINDORM_USERNAME=admin
@ -332,9 +339,11 @@ PROMPT_GENERATION_MAX_TOKENS=512
CODE_GENERATION_MAX_TOKENS=1024
PLUGIN_BASED_TOKEN_COUNTING_ENABLED=false
# Mail configuration, support: resend, smtp
# Mail configuration, support: resend, smtp, sendgrid
MAIL_TYPE=
# If using SendGrid, use the 'from' field for authentication if necessary.
MAIL_DEFAULT_SEND_FROM=no-reply <no-reply@dify.ai>
# resend configuration
RESEND_API_KEY=
RESEND_API_URL=https://api.resend.com
# smtp configuration
@ -344,7 +353,8 @@ SMTP_USERNAME=123
SMTP_PASSWORD=abc
SMTP_USE_TLS=true
SMTP_OPPORTUNISTIC_TLS=false
# Sendgid configuration
SENDGRID_API_KEY=
# Sentry configuration
SENTRY_DSN=

@ -1,6 +1,4 @@
exclude = [
"migrations/*",
]
exclude = ["migrations/*"]
line-length = 120
[format]
@ -9,14 +7,14 @@ quote-style = "double"
[lint]
preview = false
select = [
"B", # flake8-bugbear rules
"C4", # flake8-comprehensions
"E", # pycodestyle E rules
"F", # pyflakes rules
"FURB", # refurb rules
"I", # isort rules
"N", # pep8-naming
"PT", # flake8-pytest-style rules
"B", # flake8-bugbear rules
"C4", # flake8-comprehensions
"E", # pycodestyle E rules
"F", # pyflakes rules
"FURB", # refurb rules
"I", # isort rules
"N", # pep8-naming
"PT", # flake8-pytest-style rules
"PLC0208", # iteration-over-set
"PLC0414", # useless-import-alias
"PLE0604", # invalid-all-object
@ -24,58 +22,60 @@ select = [
"PLR0402", # manual-from-import
"PLR1711", # useless-return
"PLR1714", # repeated-equality-comparison
"RUF013", # implicit-optional
"RUF019", # unnecessary-key-check
"RUF100", # unused-noqa
"RUF101", # redirected-noqa
"RUF200", # invalid-pyproject-toml
"RUF022", # unsorted-dunder-all
"S506", # unsafe-yaml-load
"SIM", # flake8-simplify rules
"TRY400", # error-instead-of-exception
"TRY401", # verbose-log-message
"UP", # pyupgrade rules
"W191", # tab-indentation
"W605", # invalid-escape-sequence
"RUF013", # implicit-optional
"RUF019", # unnecessary-key-check
"RUF100", # unused-noqa
"RUF101", # redirected-noqa
"RUF200", # invalid-pyproject-toml
"RUF022", # unsorted-dunder-all
"S506", # unsafe-yaml-load
"SIM", # flake8-simplify rules
"TRY400", # error-instead-of-exception
"TRY401", # verbose-log-message
"UP", # pyupgrade rules
"W191", # tab-indentation
"W605", # invalid-escape-sequence
# security related linting rules
# RCE proctection (sort of)
"S102", # exec-builtin, disallow use of `exec`
"S307", # suspicious-eval-usage, disallow use of `eval` and `ast.literal_eval`
"S301", # suspicious-pickle-usage, disallow use of `pickle` and its wrappers.
"S302", # suspicious-marshal-usage, disallow use of `marshal` module
"S311", # suspicious-non-cryptographic-random-usage
]
ignore = [
"E402", # module-import-not-at-top-of-file
"E711", # none-comparison
"E712", # true-false-comparison
"E721", # type-comparison
"E722", # bare-except
"F821", # undefined-name
"F841", # unused-variable
"E402", # module-import-not-at-top-of-file
"E711", # none-comparison
"E712", # true-false-comparison
"E721", # type-comparison
"E722", # bare-except
"F821", # undefined-name
"F841", # unused-variable
"FURB113", # repeated-append
"FURB152", # math-constant
"UP007", # non-pep604-annotation
"UP032", # f-string
"UP045", # non-pep604-annotation-optional
"B005", # strip-with-multi-characters
"B006", # mutable-argument-default
"B007", # unused-loop-control-variable
"B026", # star-arg-unpacking-after-keyword-arg
"B903", # class-as-data-structure
"B904", # raise-without-from-inside-except
"B905", # zip-without-explicit-strict
"N806", # non-lowercase-variable-in-function
"N815", # mixed-case-variable-in-class-scope
"PT011", # pytest-raises-too-broad
"SIM102", # collapsible-if
"SIM103", # needless-bool
"SIM105", # suppressible-exception
"SIM107", # return-in-try-except-finally
"SIM108", # if-else-block-instead-of-if-exp
"SIM113", # enumerate-for-loop
"SIM117", # multiple-with-statements
"SIM210", # if-expr-with-true-false
"UP007", # non-pep604-annotation
"UP032", # f-string
"UP045", # non-pep604-annotation-optional
"B005", # strip-with-multi-characters
"B006", # mutable-argument-default
"B007", # unused-loop-control-variable
"B026", # star-arg-unpacking-after-keyword-arg
"B903", # class-as-data-structure
"B904", # raise-without-from-inside-except
"B905", # zip-without-explicit-strict
"N806", # non-lowercase-variable-in-function
"N815", # mixed-case-variable-in-class-scope
"PT011", # pytest-raises-too-broad
"SIM102", # collapsible-if
"SIM103", # needless-bool
"SIM105", # suppressible-exception
"SIM107", # return-in-try-except-finally
"SIM108", # if-else-block-instead-of-if-exp
"SIM113", # enumerate-for-loop
"SIM117", # multiple-with-statements
"SIM210", # if-expr-with-true-false
"UP038", # deprecated and not recommended by Ruff, https://docs.astral.sh/ruff/rules/non-pep604-isinstance/
]
[lint.per-file-ignores]

@ -4,7 +4,7 @@ FROM python:3.12-slim-bookworm AS base
WORKDIR /app/api
# Install uv
ENV UV_VERSION=0.6.14
ENV UV_VERSION=0.7.11
RUN pip install --no-cache-dir uv==${UV_VERSION}

@ -27,7 +27,7 @@ from models.dataset import Dataset, DatasetCollectionBinding, DatasetMetadata, D
from models.dataset import Document as DatasetDocument
from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation
from models.provider import Provider, ProviderModel
from services.account_service import RegisterService, TenantService
from services.account_service import AccountService, RegisterService, TenantService
from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs
from services.plugin.data_migration import PluginDataMigration
from services.plugin.plugin_migration import PluginMigration
@ -68,6 +68,7 @@ def reset_password(email, new_password, password_confirm):
account.password = base64_password_hashed
account.password_salt = base64_salt
db.session.commit()
AccountService.reset_login_error_rate_limit(email)
click.echo(click.style("Password reset successfully.", fg="green"))
@ -280,6 +281,7 @@ def migrate_knowledge_vector_database():
VectorType.ELASTICSEARCH,
VectorType.OPENGAUSS,
VectorType.TABLESTORE,
VectorType.MATRIXONE,
}
lower_collection_vector_types = {
VectorType.ANALYTICDB,

@ -609,7 +609,7 @@ class MailConfig(BaseSettings):
"""
MAIL_TYPE: Optional[str] = Field(
description="Email service provider type ('smtp' or 'resend'), default to None.",
description="Email service provider type ('smtp' or 'resend' or 'sendGrid), default to None.",
default=None,
)
@ -663,6 +663,11 @@ class MailConfig(BaseSettings):
default=50,
)
SENDGRID_API_KEY: Optional[str] = Field(
description="API key for SendGrid service",
default=None,
)
class RagEtlConfig(BaseSettings):
"""

@ -24,6 +24,7 @@ from .vdb.couchbase_config import CouchbaseConfig
from .vdb.elasticsearch_config import ElasticsearchConfig
from .vdb.huawei_cloud_config import HuaweiCloudConfig
from .vdb.lindorm_config import LindormConfig
from .vdb.matrixone_config import MatrixoneConfig
from .vdb.milvus_config import MilvusConfig
from .vdb.myscale_config import MyScaleConfig
from .vdb.oceanbase_config import OceanBaseVectorConfig
@ -323,5 +324,6 @@ class MiddlewareConfig(
OpenGaussConfig,
TableStoreConfig,
DatasetQueueMonitorConfig,
MatrixoneConfig,
):
pass

@ -0,0 +1,14 @@
from pydantic import BaseModel, Field
class MatrixoneConfig(BaseModel):
"""Matrixone vector database configuration."""
MATRIXONE_HOST: str = Field(default="localhost", description="Host address of the Matrixone server")
MATRIXONE_PORT: int = Field(default=6001, description="Port number of the Matrixone server")
MATRIXONE_USER: str = Field(default="dump", description="Username for authenticating with Matrixone")
MATRIXONE_PASSWORD: str = Field(default="111", description="Password for authenticating with Matrixone")
MATRIXONE_DATABASE: str = Field(default="dify", description="Name of the Matrixone database to connect to")
MATRIXONE_METRIC: str = Field(
default="l2", description="Distance metric type for vector similarity search (cosine or l2)"
)

@ -9,7 +9,7 @@ class PackagingInfo(BaseSettings):
CURRENT_VERSION: str = Field(
description="Dify version",
default="1.4.1",
default="1.5.0",
)
COMMIT_SHA: str = Field(

@ -63,6 +63,7 @@ from .app import (
statistic,
workflow,
workflow_app_log,
workflow_draft_variable,
workflow_run,
workflow_statistic,
)

@ -56,8 +56,7 @@ class InsertExploreAppListApi(Resource):
parser.add_argument("position", type=int, required=True, nullable=False, location="json")
args = parser.parse_args()
with Session(db.engine) as session:
app = session.execute(select(App).filter(App.id == args["app_id"])).scalar_one_or_none()
app = db.session.execute(select(App).filter(App.id == args["app_id"])).scalar_one_or_none()
if not app:
raise NotFound(f"App '{args['app_id']}' is not found")
@ -78,38 +77,38 @@ class InsertExploreAppListApi(Resource):
select(RecommendedApp).filter(RecommendedApp.app_id == args["app_id"])
).scalar_one_or_none()
if not recommended_app:
recommended_app = RecommendedApp(
app_id=app.id,
description=desc,
copyright=copy_right,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
language=args["language"],
category=args["category"],
position=args["position"],
)
db.session.add(recommended_app)
app.is_public = True
db.session.commit()
return {"result": "success"}, 201
else:
recommended_app.description = desc
recommended_app.copyright = copy_right
recommended_app.privacy_policy = privacy_policy
recommended_app.custom_disclaimer = custom_disclaimer
recommended_app.language = args["language"]
recommended_app.category = args["category"]
recommended_app.position = args["position"]
if not recommended_app:
recommended_app = RecommendedApp(
app_id=app.id,
description=desc,
copyright=copy_right,
privacy_policy=privacy_policy,
custom_disclaimer=custom_disclaimer,
language=args["language"],
category=args["category"],
position=args["position"],
)
db.session.add(recommended_app)
app.is_public = True
db.session.commit()
return {"result": "success"}, 201
else:
recommended_app.description = desc
recommended_app.copyright = copy_right
recommended_app.privacy_policy = privacy_policy
recommended_app.custom_disclaimer = custom_disclaimer
recommended_app.language = args["language"]
recommended_app.category = args["category"]
recommended_app.position = args["position"]
app.is_public = True
app.is_public = True
db.session.commit()
db.session.commit()
return {"result": "success"}, 200
return {"result": "success"}, 200
class InsertExploreAppApi(Resource):

@ -208,7 +208,7 @@ class AnnotationBatchImportApi(Resource):
if len(request.files) > 1:
raise TooManyFilesError()
# check file type
if not file.filename.endswith(".csv"):
if not file.filename or not file.filename.lower().endswith(".csv"):
raise ValueError("Invalid file type. Only CSV files are allowed")
return AppAnnotationService.batch_import_app_annotations(app_id, file)

@ -17,6 +17,8 @@ from libs.login import login_required
from models import Account
from models.model import App
from services.app_dsl_service import AppDslService, ImportStatus
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
class AppImportApi(Resource):
@ -60,7 +62,9 @@ class AppImportApi(Resource):
app_id=args.get("app_id"),
)
session.commit()
if result.app_id and FeatureService.get_system_features().webapp_auth.enabled:
# update web app setting as private
EnterpriseService.WebAppAuth.update_app_access_mode(result.app_id, "private")
# Return appropriate status code based on result
status = result.status
if status == ImportStatus.FAILED.value:

@ -1,5 +1,6 @@
import json
import logging
from collections.abc import Sequence
from typing import cast
from flask import abort, request
@ -18,10 +19,12 @@ from controllers.console.app.error import (
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file.models import File
from extensions.ext_database import db
from factories import variable_factory
from factories import file_factory, variable_factory
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
from fields.workflow_run_fields import workflow_run_node_execution_fields
from libs import helper
@ -30,6 +33,7 @@ from libs.login import current_user, login_required
from models import App
from models.account import Account
from models.model import AppMode
from models.workflow import Workflow
from services.app_generate_service import AppGenerateService
from services.errors.app import WorkflowHashNotEqualError
from services.errors.llm import InvokeRateLimitError
@ -38,6 +42,24 @@ from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseE
logger = logging.getLogger(__name__)
# TODO(QuantumGhost): Refactor existing node run API to handle file parameter parsing
# at the controller level rather than in the workflow logic. This would improve separation
# of concerns and make the code more maintainable.
def _parse_file(workflow: Workflow, files: list[dict] | None = None) -> Sequence[File]:
files = files or []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
file_objs: Sequence[File] = []
if file_extra_config is None:
return file_objs
file_objs = file_factory.build_from_mappings(
mappings=files,
tenant_id=workflow.tenant_id,
config=file_extra_config,
)
return file_objs
class DraftWorkflowApi(Resource):
@setup_required
@login_required
@ -402,15 +424,30 @@ class DraftWorkflowNodeRunApi(Resource):
parser = reqparse.RequestParser()
parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json")
parser.add_argument("query", type=str, required=False, location="json", default="")
parser.add_argument("files", type=list, location="json", default=[])
args = parser.parse_args()
inputs = args.get("inputs")
if inputs == None:
user_inputs = args.get("inputs")
if user_inputs is None:
raise ValueError("missing inputs")
workflow_srv = WorkflowService()
# fetch draft workflow by app_model
draft_workflow = workflow_srv.get_draft_workflow(app_model=app_model)
if not draft_workflow:
raise ValueError("Workflow not initialized")
files = _parse_file(draft_workflow, args.get("files"))
workflow_service = WorkflowService()
workflow_node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model, node_id=node_id, user_inputs=inputs, account=current_user
app_model=app_model,
draft_workflow=draft_workflow,
node_id=node_id,
user_inputs=user_inputs,
account=current_user,
query=args.get("query", ""),
files=files,
)
return workflow_node_execution
@ -731,6 +768,27 @@ class WorkflowByIdApi(Resource):
return None, 204
class DraftWorkflowNodeLastRunApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
@marshal_with(workflow_run_node_execution_fields)
def get(self, app_model: App, node_id: str):
srv = WorkflowService()
workflow = srv.get_draft_workflow(app_model)
if not workflow:
raise NotFound("Workflow not found")
node_exec = srv.get_node_last_run(
app_model=app_model,
workflow=workflow,
node_id=node_id,
)
if node_exec is None:
raise NotFound("last run not found")
return node_exec
api.add_resource(
DraftWorkflowApi,
"/apps/<uuid:app_id>/workflows/draft",
@ -795,3 +853,7 @@ api.add_resource(
WorkflowByIdApi,
"/apps/<uuid:app_id>/workflows/<string:workflow_id>",
)
api.add_resource(
DraftWorkflowNodeLastRunApi,
"/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/last-run",
)

@ -34,6 +34,20 @@ class WorkflowAppLogApi(Resource):
parser.add_argument(
"created_at__after", type=str, location="args", help="Filter logs created after this timestamp"
)
parser.add_argument(
"created_by_end_user_session_id",
type=str,
location="args",
required=False,
default=None,
)
parser.add_argument(
"created_by_account",
type=str,
location="args",
required=False,
default=None,
)
parser.add_argument("page", type=int_range(1, 99999), default=1, location="args")
parser.add_argument("limit", type=int_range(1, 100), default=20, location="args")
args = parser.parse_args()
@ -57,6 +71,8 @@ class WorkflowAppLogApi(Resource):
created_at_after=args.created_at__after,
page=args.page,
limit=args.limit,
created_by_end_user_session_id=args.created_by_end_user_session_id,
created_by_account=args.created_by_account,
)
return workflow_app_log_pagination

@ -0,0 +1,421 @@
import logging
from typing import Any, NoReturn
from flask import Response
from flask_restful import Resource, fields, inputs, marshal, marshal_with, reqparse
from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden
from controllers.console import api
from controllers.console.app.error import (
DraftWorkflowNotExist,
)
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from controllers.web.error import InvalidArgumentError, NotFoundError
from core.variables.segment_group import SegmentGroup
from core.variables.segments import ArrayFileSegment, FileSegment, Segment
from core.variables.types import SegmentType
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from factories.file_factory import build_from_mapping, build_from_mappings
from factories.variable_factory import build_segment_with_type
from libs.login import current_user, login_required
from models import App, AppMode, db
from models.workflow import WorkflowDraftVariable
from services.workflow_draft_variable_service import WorkflowDraftVariableList, WorkflowDraftVariableService
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
def _convert_values_to_json_serializable_object(value: Segment) -> Any:
if isinstance(value, FileSegment):
return value.value.model_dump()
elif isinstance(value, ArrayFileSegment):
return [i.model_dump() for i in value.value]
elif isinstance(value, SegmentGroup):
return [_convert_values_to_json_serializable_object(i) for i in value.value]
else:
return value.value
def _serialize_var_value(variable: WorkflowDraftVariable) -> Any:
value = variable.get_value()
# create a copy of the value to avoid affecting the model cache.
value = value.model_copy(deep=True)
# Refresh the url signature before returning it to client.
if isinstance(value, FileSegment):
file = value.value
file.remote_url = file.generate_url()
elif isinstance(value, ArrayFileSegment):
files = value.value
for file in files:
file.remote_url = file.generate_url()
return _convert_values_to_json_serializable_object(value)
def _create_pagination_parser():
parser = reqparse.RequestParser()
parser.add_argument(
"page",
type=inputs.int_range(1, 100_000),
required=False,
default=1,
location="args",
help="the page of data requested",
)
parser.add_argument("limit", type=inputs.int_range(1, 100), required=False, default=20, location="args")
return parser
_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS = {
"id": fields.String,
"type": fields.String(attribute=lambda model: model.get_variable_type()),
"name": fields.String,
"description": fields.String,
"selector": fields.List(fields.String, attribute=lambda model: model.get_selector()),
"value_type": fields.String,
"edited": fields.Boolean(attribute=lambda model: model.edited),
"visible": fields.Boolean,
}
_WORKFLOW_DRAFT_VARIABLE_FIELDS = dict(
_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS,
value=fields.Raw(attribute=_serialize_var_value),
)
_WORKFLOW_DRAFT_ENV_VARIABLE_FIELDS = {
"id": fields.String,
"type": fields.String(attribute=lambda _: "env"),
"name": fields.String,
"description": fields.String,
"selector": fields.List(fields.String, attribute=lambda model: model.get_selector()),
"value_type": fields.String,
"edited": fields.Boolean(attribute=lambda model: model.edited),
"visible": fields.Boolean,
}
_WORKFLOW_DRAFT_ENV_VARIABLE_LIST_FIELDS = {
"items": fields.List(fields.Nested(_WORKFLOW_DRAFT_ENV_VARIABLE_FIELDS)),
}
def _get_items(var_list: WorkflowDraftVariableList) -> list[WorkflowDraftVariable]:
return var_list.variables
_WORKFLOW_DRAFT_VARIABLE_LIST_WITHOUT_VALUE_FIELDS = {
"items": fields.List(fields.Nested(_WORKFLOW_DRAFT_VARIABLE_WITHOUT_VALUE_FIELDS), attribute=_get_items),
"total": fields.Raw(),
}
_WORKFLOW_DRAFT_VARIABLE_LIST_FIELDS = {
"items": fields.List(fields.Nested(_WORKFLOW_DRAFT_VARIABLE_FIELDS), attribute=_get_items),
}
def _api_prerequisite(f):
"""Common prerequisites for all draft workflow variable APIs.
It ensures the following conditions are satisfied:
- Dify has been property setup.
- The request user has logged in and initialized.
- The requested app is a workflow or a chat flow.
- The request user has the edit permission for the app.
"""
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def wrapper(*args, **kwargs):
if not current_user.is_editor:
raise Forbidden()
return f(*args, **kwargs)
return wrapper
class WorkflowVariableCollectionApi(Resource):
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_LIST_WITHOUT_VALUE_FIELDS)
def get(self, app_model: App):
"""
Get draft workflow
"""
parser = _create_pagination_parser()
args = parser.parse_args()
# fetch draft workflow by app_model
workflow_service = WorkflowService()
workflow_exist = workflow_service.is_workflow_exist(app_model=app_model)
if not workflow_exist:
raise DraftWorkflowNotExist()
# fetch draft workflow by app_model
with Session(bind=db.engine, expire_on_commit=False) as session:
draft_var_srv = WorkflowDraftVariableService(
session=session,
)
workflow_vars = draft_var_srv.list_variables_without_values(
app_id=app_model.id,
page=args.page,
limit=args.limit,
)
return workflow_vars
@_api_prerequisite
def delete(self, app_model: App):
draft_var_srv = WorkflowDraftVariableService(
session=db.session(),
)
draft_var_srv.delete_workflow_variables(app_model.id)
db.session.commit()
return Response("", 204)
def validate_node_id(node_id: str) -> NoReturn | None:
if node_id in [
CONVERSATION_VARIABLE_NODE_ID,
SYSTEM_VARIABLE_NODE_ID,
]:
# NOTE(QuantumGhost): While we store the system and conversation variables as node variables
# with specific `node_id` in database, we still want to make the API separated. By disallowing
# accessing system and conversation variables in `WorkflowDraftNodeVariableListApi`,
# we mitigate the risk that user of the API depending on the implementation detail of the API.
#
# ref: [Hyrum's Law](https://www.hyrumslaw.com/)
raise InvalidArgumentError(
f"invalid node_id, please use correspond api for conversation and system variables, node_id={node_id}",
)
return None
class NodeVariableCollectionApi(Resource):
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_LIST_FIELDS)
def get(self, app_model: App, node_id: str):
validate_node_id(node_id)
with Session(bind=db.engine, expire_on_commit=False) as session:
draft_var_srv = WorkflowDraftVariableService(
session=session,
)
node_vars = draft_var_srv.list_node_variables(app_model.id, node_id)
return node_vars
@_api_prerequisite
def delete(self, app_model: App, node_id: str):
validate_node_id(node_id)
srv = WorkflowDraftVariableService(db.session())
srv.delete_node_variables(app_model.id, node_id)
db.session.commit()
return Response("", 204)
class VariableApi(Resource):
_PATCH_NAME_FIELD = "name"
_PATCH_VALUE_FIELD = "value"
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_FIELDS)
def get(self, app_model: App, variable_id: str):
draft_var_srv = WorkflowDraftVariableService(
session=db.session(),
)
variable = draft_var_srv.get_variable(variable_id=variable_id)
if variable is None:
raise NotFoundError(description=f"variable not found, id={variable_id}")
if variable.app_id != app_model.id:
raise NotFoundError(description=f"variable not found, id={variable_id}")
return variable
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_FIELDS)
def patch(self, app_model: App, variable_id: str):
# Request payload for file types:
#
# Local File:
#
# {
# "type": "image",
# "transfer_method": "local_file",
# "url": "",
# "upload_file_id": "daded54f-72c7-4f8e-9d18-9b0abdd9f190"
# }
#
# Remote File:
#
#
# {
# "type": "image",
# "transfer_method": "remote_url",
# "url": "http://127.0.0.1:5001/files/1602650a-4fe4-423c-85a2-af76c083e3c4/file-preview?timestamp=1750041099&nonce=...&sign=...=",
# "upload_file_id": "1602650a-4fe4-423c-85a2-af76c083e3c4"
# }
parser = reqparse.RequestParser()
parser.add_argument(self._PATCH_NAME_FIELD, type=str, required=False, nullable=True, location="json")
# Parse 'value' field as-is to maintain its original data structure
parser.add_argument(self._PATCH_VALUE_FIELD, type=lambda x: x, required=False, nullable=True, location="json")
draft_var_srv = WorkflowDraftVariableService(
session=db.session(),
)
args = parser.parse_args(strict=True)
variable = draft_var_srv.get_variable(variable_id=variable_id)
if variable is None:
raise NotFoundError(description=f"variable not found, id={variable_id}")
if variable.app_id != app_model.id:
raise NotFoundError(description=f"variable not found, id={variable_id}")
new_name = args.get(self._PATCH_NAME_FIELD, None)
raw_value = args.get(self._PATCH_VALUE_FIELD, None)
if new_name is None and raw_value is None:
return variable
new_value = None
if raw_value is not None:
if variable.value_type == SegmentType.FILE:
if not isinstance(raw_value, dict):
raise InvalidArgumentError(description=f"expected dict for file, got {type(raw_value)}")
raw_value = build_from_mapping(mapping=raw_value, tenant_id=app_model.tenant_id)
elif variable.value_type == SegmentType.ARRAY_FILE:
if not isinstance(raw_value, list):
raise InvalidArgumentError(description=f"expected list for files, got {type(raw_value)}")
if len(raw_value) > 0 and not isinstance(raw_value[0], dict):
raise InvalidArgumentError(description=f"expected dict for files[0], got {type(raw_value)}")
raw_value = build_from_mappings(mappings=raw_value, tenant_id=app_model.tenant_id)
new_value = build_segment_with_type(variable.value_type, raw_value)
draft_var_srv.update_variable(variable, name=new_name, value=new_value)
db.session.commit()
return variable
@_api_prerequisite
def delete(self, app_model: App, variable_id: str):
draft_var_srv = WorkflowDraftVariableService(
session=db.session(),
)
variable = draft_var_srv.get_variable(variable_id=variable_id)
if variable is None:
raise NotFoundError(description=f"variable not found, id={variable_id}")
if variable.app_id != app_model.id:
raise NotFoundError(description=f"variable not found, id={variable_id}")
draft_var_srv.delete_variable(variable)
db.session.commit()
return Response("", 204)
class VariableResetApi(Resource):
@_api_prerequisite
def put(self, app_model: App, variable_id: str):
draft_var_srv = WorkflowDraftVariableService(
session=db.session(),
)
workflow_srv = WorkflowService()
draft_workflow = workflow_srv.get_draft_workflow(app_model)
if draft_workflow is None:
raise NotFoundError(
f"Draft workflow not found, app_id={app_model.id}",
)
variable = draft_var_srv.get_variable(variable_id=variable_id)
if variable is None:
raise NotFoundError(description=f"variable not found, id={variable_id}")
if variable.app_id != app_model.id:
raise NotFoundError(description=f"variable not found, id={variable_id}")
resetted = draft_var_srv.reset_variable(draft_workflow, variable)
db.session.commit()
if resetted is None:
return Response("", 204)
else:
return marshal(resetted, _WORKFLOW_DRAFT_VARIABLE_FIELDS)
def _get_variable_list(app_model: App, node_id) -> WorkflowDraftVariableList:
with Session(bind=db.engine, expire_on_commit=False) as session:
draft_var_srv = WorkflowDraftVariableService(
session=session,
)
if node_id == CONVERSATION_VARIABLE_NODE_ID:
draft_vars = draft_var_srv.list_conversation_variables(app_model.id)
elif node_id == SYSTEM_VARIABLE_NODE_ID:
draft_vars = draft_var_srv.list_system_variables(app_model.id)
else:
draft_vars = draft_var_srv.list_node_variables(app_id=app_model.id, node_id=node_id)
return draft_vars
class ConversationVariableCollectionApi(Resource):
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_LIST_FIELDS)
def get(self, app_model: App):
# NOTE(QuantumGhost): Prefill conversation variables into the draft variables table
# so their IDs can be returned to the caller.
workflow_srv = WorkflowService()
draft_workflow = workflow_srv.get_draft_workflow(app_model)
if draft_workflow is None:
raise NotFoundError(description=f"draft workflow not found, id={app_model.id}")
draft_var_srv = WorkflowDraftVariableService(db.session())
draft_var_srv.prefill_conversation_variable_default_values(draft_workflow)
db.session.commit()
return _get_variable_list(app_model, CONVERSATION_VARIABLE_NODE_ID)
class SystemVariableCollectionApi(Resource):
@_api_prerequisite
@marshal_with(_WORKFLOW_DRAFT_VARIABLE_LIST_FIELDS)
def get(self, app_model: App):
return _get_variable_list(app_model, SYSTEM_VARIABLE_NODE_ID)
class EnvironmentVariableCollectionApi(Resource):
@_api_prerequisite
def get(self, app_model: App):
"""
Get draft workflow
"""
# fetch draft workflow by app_model
workflow_service = WorkflowService()
workflow = workflow_service.get_draft_workflow(app_model=app_model)
if workflow is None:
raise DraftWorkflowNotExist()
env_vars = workflow.environment_variables
env_vars_list = []
for v in env_vars:
env_vars_list.append(
{
"id": v.id,
"type": "env",
"name": v.name,
"description": v.description,
"selector": v.selector,
"value_type": v.value_type.value,
"value": v.value,
# Do not track edited for env vars.
"edited": False,
"visible": True,
"editable": True,
}
)
return {"items": env_vars_list}
api.add_resource(
WorkflowVariableCollectionApi,
"/apps/<uuid:app_id>/workflows/draft/variables",
)
api.add_resource(NodeVariableCollectionApi, "/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/variables")
api.add_resource(VariableApi, "/apps/<uuid:app_id>/workflows/draft/variables/<uuid:variable_id>")
api.add_resource(VariableResetApi, "/apps/<uuid:app_id>/workflows/draft/variables/<uuid:variable_id>/reset")
api.add_resource(ConversationVariableCollectionApi, "/apps/<uuid:app_id>/workflows/draft/conversation-variables")
api.add_resource(SystemVariableCollectionApi, "/apps/<uuid:app_id>/workflows/draft/system-variables")
api.add_resource(EnvironmentVariableCollectionApi, "/apps/<uuid:app_id>/workflows/draft/environment-variables")

@ -8,6 +8,15 @@ from libs.login import current_user
from models import App, AppMode
def _load_app_model(app_id: str) -> Optional[App]:
app_model = (
db.session.query(App)
.filter(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
.first()
)
return app_model
def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[AppMode], None] = None):
def decorator(view_func):
@wraps(view_func)
@ -20,11 +29,7 @@ def get_app_model(view: Optional[Callable] = None, *, mode: Union[AppMode, list[
del kwargs["app_id"]
app_model = (
db.session.query(App)
.filter(App.id == app_id, App.tenant_id == current_user.current_tenant_id, App.status == "normal")
.first()
)
app_model = _load_app_model(app_id)
if not app_model:
raise AppNotFoundError()

@ -119,9 +119,6 @@ class ForgotPasswordResetApi(Resource):
if not reset_data:
raise InvalidTokenError()
# Must use token in reset phase
if reset_data.get("phase", "") != "reset":
raise InvalidTokenError()
# Must use token in reset phase
if reset_data.get("phase", "") != "reset":
raise InvalidTokenError()

@ -686,6 +686,7 @@ class DatasetRetrievalSettingApi(Resource):
| VectorType.TABLESTORE
| VectorType.HUAWEI_CLOUD
| VectorType.TENCENT
| VectorType.MATRIXONE
):
return {
"retrieval_method": [
@ -733,6 +734,7 @@ class DatasetRetrievalSettingMockApi(Resource):
| VectorType.TABLESTORE
| VectorType.TENCENT
| VectorType.HUAWEI_CLOUD
| VectorType.MATRIXONE
):
return {
"retrieval_method": [

@ -5,7 +5,7 @@ from typing import cast
from flask import request
from flask_login import current_user
from flask_restful import Resource, fields, marshal, marshal_with, reqparse
from flask_restful import Resource, marshal, marshal_with, reqparse
from sqlalchemy import asc, desc, select
from werkzeug.exceptions import Forbidden, NotFound
@ -43,7 +43,6 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.plugin.impl.exc import PluginDaemonClientSideError
from core.rag.extractor.entity.extract_setting import ExtractSetting
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from fields.document_fields import (
dataset_and_document_fields,
document_fields,
@ -54,8 +53,6 @@ from libs.login import login_required
from models import Dataset, DatasetProcessRule, Document, DocumentSegment, UploadFile
from services.dataset_service import DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig
from tasks.add_document_to_index_task import add_document_to_index_task
from tasks.remove_document_from_index_task import remove_document_from_index_task
class DocumentResource(Resource):
@ -242,12 +239,10 @@ class DatasetDocumentListApi(Resource):
return response
documents_and_batch_fields = {"documents": fields.List(fields.Nested(document_fields)), "batch": fields.String}
@setup_required
@login_required
@account_initialization_required
@marshal_with(documents_and_batch_fields)
@marshal_with(dataset_and_document_fields)
@cloud_edition_billing_resource_check("vector_space")
@cloud_edition_billing_rate_limit_check("knowledge")
def post(self, dataset_id):
@ -293,6 +288,8 @@ class DatasetDocumentListApi(Resource):
try:
documents, batch = DocumentService.save_document_with_dataset_id(dataset, knowledge_config, current_user)
dataset = DatasetService.get_dataset(dataset_id)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
except QuotaExceededError:
@ -300,7 +297,7 @@ class DatasetDocumentListApi(Resource):
except ModelCurrentlyNotSupportError:
raise ProviderModelCurrentlyNotSupportError()
return {"documents": documents, "batch": batch}
return {"dataset": dataset, "documents": documents, "batch": batch}
@setup_required
@login_required
@ -862,77 +859,16 @@ class DocumentStatusApi(DocumentResource):
DatasetService.check_dataset_permission(dataset, current_user)
document_ids = request.args.getlist("document_id")
for document_id in document_ids:
document = self.get_document(dataset_id, document_id)
indexing_cache_key = "document_{}_indexing".format(document.id)
cache_result = redis_client.get(indexing_cache_key)
if cache_result is not None:
raise InvalidActionError(f"Document:{document.name} is being indexed, please try again later")
if action == "enable":
if document.enabled:
continue
document.enabled = True
document.disabled_at = None
document.disabled_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
add_document_to_index_task.delay(document_id)
elif action == "disable":
if not document.completed_at or document.indexing_status != "completed":
raise InvalidActionError(f"Document: {document.name} is not completed.")
if not document.enabled:
continue
document.enabled = False
document.disabled_at = datetime.now(UTC).replace(tzinfo=None)
document.disabled_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
remove_document_from_index_task.delay(document_id)
elif action == "archive":
if document.archived:
continue
document.archived = True
document.archived_at = datetime.now(UTC).replace(tzinfo=None)
document.archived_by = current_user.id
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
if document.enabled:
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
remove_document_from_index_task.delay(document_id)
elif action == "un_archive":
if not document.archived:
continue
document.archived = False
document.archived_at = None
document.archived_by = None
document.updated_at = datetime.now(UTC).replace(tzinfo=None)
db.session.commit()
# Set cache to prevent indexing the same document multiple times
redis_client.setex(indexing_cache_key, 600, 1)
add_document_to_index_task.delay(document_id)
else:
raise InvalidActionError()
try:
DocumentService.batch_update_document_status(dataset, document_ids, action, current_user)
except services.errors.document.DocumentIndexingError as e:
raise InvalidActionError(str(e))
except ValueError as e:
raise InvalidActionError(str(e))
except NotFound as e:
raise NotFound(str(e))
return {"result": "success"}, 200

@ -374,7 +374,7 @@ class DatasetDocumentSegmentBatchImportApi(Resource):
if len(request.files) > 1:
raise TooManyFilesError()
# check file type
if not file.filename.endswith(".csv"):
if not file.filename or not file.filename.lower().endswith(".csv"):
raise ValueError("Invalid file type. Only CSV files are allowed")
try:

@ -59,7 +59,14 @@ class InstalledAppsListApi(Resource):
if FeatureService.get_system_features().webapp_auth.enabled:
user_id = current_user.id
res = []
app_ids = [installed_app["app"].id for installed_app in installed_app_list]
webapp_settings = EnterpriseService.WebAppAuth.batch_get_app_access_mode_by_id(app_ids)
for installed_app in installed_app_list:
webapp_setting = webapp_settings.get(installed_app["app"].id)
if not webapp_setting:
continue
if webapp_setting.access_mode == "sso_verified":
continue
app_code = AppService.get_app_code_by_id(str(installed_app["app"].id))
if EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(
user_id=user_id,

@ -15,7 +15,7 @@ class LoadBalancingCredentialsValidateApi(Resource):
@login_required
@account_initialization_required
def post(self, provider: str):
if not TenantAccountRole.is_privileged_role(current_user.current_tenant.current_role):
if not TenantAccountRole.is_privileged_role(current_user.current_role):
raise Forbidden()
tenant_id = current_user.current_tenant_id
@ -64,7 +64,7 @@ class LoadBalancingConfigCredentialsValidateApi(Resource):
@login_required
@account_initialization_required
def post(self, provider: str, config_id: str):
if not TenantAccountRole.is_privileged_role(current_user.current_tenant.current_role):
if not TenantAccountRole.is_privileged_role(current_user.current_role):
raise Forbidden()
tenant_id = current_user.current_tenant_id

@ -85,6 +85,7 @@ class MemberInviteEmailApi(Resource):
return {
"result": "success",
"invitation_results": invitation_results,
"tenant_id": str(current_user.current_tenant.id),
}, 201
@ -110,7 +111,7 @@ class MemberCancelInviteApi(Resource):
except Exception as e:
raise ValueError(str(e))
return {"result": "success"}, 204
return {"result": "success", "tenant_id": str(current_user.current_tenant.id)}, 200
class MemberUpdateRoleApi(Resource):

@ -13,6 +13,7 @@ from core.model_runtime.utils.encoders import jsonable_encoder
from core.plugin.impl.exc import PluginDaemonClientSideError
from libs.login import login_required
from models.account import TenantPluginPermission
from services.plugin.plugin_parameter_service import PluginParameterService
from services.plugin.plugin_permission_service import PluginPermissionService
from services.plugin.plugin_service import PluginService
@ -497,6 +498,42 @@ class PluginFetchPermissionApi(Resource):
)
class PluginFetchDynamicSelectOptionsApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self):
# check if the user is admin or owner
if not current_user.is_admin_or_owner:
raise Forbidden()
tenant_id = current_user.current_tenant_id
user_id = current_user.id
parser = reqparse.RequestParser()
parser.add_argument("plugin_id", type=str, required=True, location="args")
parser.add_argument("provider", type=str, required=True, location="args")
parser.add_argument("action", type=str, required=True, location="args")
parser.add_argument("parameter", type=str, required=True, location="args")
parser.add_argument("provider_type", type=str, required=True, location="args")
args = parser.parse_args()
try:
options = PluginParameterService.get_dynamic_select_options(
tenant_id,
user_id,
args["plugin_id"],
args["provider"],
args["action"],
args["parameter"],
args["provider_type"],
)
except PluginDaemonClientSideError as e:
raise ValueError(e)
return jsonable_encoder({"options": options})
api.add_resource(PluginDebuggingKeyApi, "/workspaces/current/plugin/debugging-key")
api.add_resource(PluginListApi, "/workspaces/current/plugin/list")
api.add_resource(PluginListLatestVersionsApi, "/workspaces/current/plugin/list/latest-versions")
@ -521,3 +558,5 @@ api.add_resource(PluginFetchMarketplacePkgApi, "/workspaces/current/plugin/marke
api.add_resource(PluginChangePermissionApi, "/workspaces/current/plugin/permission/change")
api.add_resource(PluginFetchPermissionApi, "/workspaces/current/plugin/permission/fetch")
api.add_resource(PluginFetchDynamicSelectOptionsApi, "/workspaces/current/plugin/parameters/dynamic-options")

@ -44,6 +44,17 @@ def only_edition_cloud(view):
return decorated
def only_edition_enterprise(view):
@wraps(view)
def decorated(*args, **kwargs):
if not dify_config.ENTERPRISE_ENABLED:
abort(404)
return view(*args, **kwargs)
return decorated
def only_edition_self_hosted(view):
@wraps(view)
def decorated(*args, **kwargs):

@ -29,7 +29,7 @@ from core.plugin.entities.request import (
RequestRequestUploadFile,
)
from core.tools.entities.tool_entities import ToolProviderType
from libs.helper import compact_generate_response
from libs.helper import length_prefixed_response
from models.account import Account, Tenant
from models.model import EndUser
@ -44,7 +44,7 @@ class PluginInvokeLLMApi(Resource):
response = PluginModelBackwardsInvocation.invoke_llm(user_model.id, tenant_model, payload)
return PluginModelBackwardsInvocation.convert_to_event_stream(response)
return compact_generate_response(generator())
return length_prefixed_response(0xF, generator())
class PluginInvokeTextEmbeddingApi(Resource):
@ -101,7 +101,7 @@ class PluginInvokeTTSApi(Resource):
)
return PluginModelBackwardsInvocation.convert_to_event_stream(response)
return compact_generate_response(generator())
return length_prefixed_response(0xF, generator())
class PluginInvokeSpeech2TextApi(Resource):
@ -162,7 +162,7 @@ class PluginInvokeToolApi(Resource):
),
)
return compact_generate_response(generator())
return length_prefixed_response(0xF, generator())
class PluginInvokeParameterExtractorNodeApi(Resource):
@ -228,7 +228,7 @@ class PluginInvokeAppApi(Resource):
files=payload.files,
)
return compact_generate_response(PluginAppBackwardsInvocation.convert_to_event_stream(response))
return length_prefixed_response(0xF, PluginAppBackwardsInvocation.convert_to_event_stream(response))
class PluginInvokeEncryptApi(Resource):

@ -32,6 +32,7 @@ def get_user(tenant_id: str, user_id: str | None) -> Account | EndUser:
)
session.add(user_model)
session.commit()
session.refresh(user_model)
else:
user_model = AccountService.load_user(user_id)
if not user_model:

@ -47,7 +47,13 @@ class AppInfoApi(Resource):
def get(self, app_model: App):
"""Get app information"""
tags = [tag.name for tag in app_model.tags]
return {"name": app_model.name, "description": app_model.description, "tags": tags, "mode": app_model.mode}
return {
"name": app_model.name,
"description": app_model.description,
"tags": tags,
"mode": app_model.mode,
"author_name": app_model.author_name,
}
api.add_resource(AppParameterApi, "/parameters")

@ -135,6 +135,20 @@ class WorkflowAppLogApi(Resource):
parser.add_argument("status", type=str, choices=["succeeded", "failed", "stopped"], location="args")
parser.add_argument("created_at__before", type=str, location="args")
parser.add_argument("created_at__after", type=str, location="args")
parser.add_argument(
"created_by_end_user_session_id",
type=str,
location="args",
required=False,
default=None,
)
parser.add_argument(
"created_by_account",
type=str,
location="args",
required=False,
default=None,
)
parser.add_argument("page", type=int_range(1, 99999), default=1, location="args")
parser.add_argument("limit", type=int_range(1, 100), default=20, location="args")
args = parser.parse_args()
@ -158,6 +172,8 @@ class WorkflowAppLogApi(Resource):
created_at_after=args.created_at__after,
page=args.page,
limit=args.limit,
created_by_end_user_session_id=args.created_by_end_user_session_id,
created_by_account=args.created_by_account,
)
return workflow_app_log_pagination

@ -4,8 +4,12 @@ from werkzeug.exceptions import Forbidden, NotFound
import services.dataset_service
from controllers.service_api import api
from controllers.service_api.dataset.error import DatasetInUseError, DatasetNameDuplicateError
from controllers.service_api.wraps import DatasetApiResource, validate_dataset_token
from controllers.service_api.dataset.error import DatasetInUseError, DatasetNameDuplicateError, InvalidActionError
from controllers.service_api.wraps import (
DatasetApiResource,
cloud_edition_billing_rate_limit_check,
validate_dataset_token,
)
from core.model_runtime.entities.model_entities import ModelType
from core.plugin.entities.plugin import ModelProviderID
from core.provider_manager import ProviderManager
@ -13,7 +17,7 @@ from fields.dataset_fields import dataset_detail_fields
from fields.tag_fields import tag_fields
from libs.login import current_user
from models.dataset import Dataset, DatasetPermissionEnum
from services.dataset_service import DatasetPermissionService, DatasetService
from services.dataset_service import DatasetPermissionService, DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import RetrievalModel
from services.tag_service import TagService
@ -70,6 +74,7 @@ class DatasetListApi(DatasetApiResource):
response = {"data": data, "has_more": len(datasets) == limit, "limit": limit, "total": total, "page": page}
return response, 200
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id):
"""Resource for creating datasets."""
parser = reqparse.RequestParser()
@ -193,6 +198,7 @@ class DatasetApi(DatasetApiResource):
return data, 200
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def patch(self, _, dataset_id):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
@ -293,6 +299,7 @@ class DatasetApi(DatasetApiResource):
return result_data, 200
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def delete(self, _, dataset_id):
"""
Deletes a dataset given its ID.
@ -322,6 +329,56 @@ class DatasetApi(DatasetApiResource):
raise DatasetInUseError()
class DocumentStatusApi(DatasetApiResource):
"""Resource for batch document status operations."""
def patch(self, tenant_id, dataset_id, action):
"""
Batch update document status.
Args:
tenant_id: tenant id
dataset_id: dataset id
action: action to perform (enable, disable, archive, un_archive)
Returns:
dict: A dictionary with a key 'result' and a value 'success'
int: HTTP status code 200 indicating that the operation was successful.
Raises:
NotFound: If the dataset with the given ID does not exist.
Forbidden: If the user does not have permission.
InvalidActionError: If the action is invalid or cannot be performed.
"""
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
if dataset is None:
raise NotFound("Dataset not found.")
# Check user's permission
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Check dataset model setting
DatasetService.check_dataset_model_setting(dataset)
# Get document IDs from request body
data = request.get_json()
document_ids = data.get("document_ids", [])
try:
DocumentService.batch_update_document_status(dataset, document_ids, action, current_user)
except services.errors.document.DocumentIndexingError as e:
raise InvalidActionError(str(e))
except ValueError as e:
raise InvalidActionError(str(e))
return {"result": "success"}, 200
class DatasetTagsApi(DatasetApiResource):
@validate_dataset_token
@marshal_with(tag_fields)
@ -450,6 +507,7 @@ class DatasetTagsBindingStatusApi(DatasetApiResource):
api.add_resource(DatasetListApi, "/datasets")
api.add_resource(DatasetApi, "/datasets/<uuid:dataset_id>")
api.add_resource(DocumentStatusApi, "/datasets/<uuid:dataset_id>/documents/status/<string:action>")
api.add_resource(DatasetTagsApi, "/datasets/tags")
api.add_resource(DatasetTagBindingApi, "/datasets/tags/binding")
api.add_resource(DatasetTagUnbindingApi, "/datasets/tags/unbinding")

@ -19,7 +19,11 @@ from controllers.service_api.dataset.error import (
ArchivedDocumentImmutableError,
DocumentIndexingError,
)
from controllers.service_api.wraps import DatasetApiResource, cloud_edition_billing_resource_check
from controllers.service_api.wraps import (
DatasetApiResource,
cloud_edition_billing_rate_limit_check,
cloud_edition_billing_resource_check,
)
from core.errors.error import ProviderTokenNotInitError
from extensions.ext_database import db
from fields.document_fields import document_fields, document_status_fields
@ -35,6 +39,7 @@ class DocumentAddByTextApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_resource_check("documents", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
"""Create document by text."""
parser = reqparse.RequestParser()
@ -99,6 +104,7 @@ class DocumentUpdateByTextApi(DatasetApiResource):
"""Resource for update documents."""
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id):
"""Update document by text."""
parser = reqparse.RequestParser()
@ -158,6 +164,7 @@ class DocumentAddByFileApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_resource_check("documents", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
"""Create document by upload file."""
args = {}
@ -175,8 +182,11 @@ class DocumentAddByFileApi(DatasetApiResource):
if not dataset:
raise ValueError("Dataset does not exist.")
if not dataset.indexing_technique and not args.get("indexing_technique"):
indexing_technique = args.get("indexing_technique") or dataset.indexing_technique
if not indexing_technique:
raise ValueError("indexing_technique is required.")
args["indexing_technique"] = indexing_technique
# save file info
file = request.files["file"]
@ -229,6 +239,7 @@ class DocumentUpdateByFileApi(DatasetApiResource):
"""Resource for update documents."""
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id):
"""Update document by upload file."""
args = {}
@ -299,6 +310,7 @@ class DocumentUpdateByFileApi(DatasetApiResource):
class DocumentDeleteApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def delete(self, tenant_id, dataset_id, document_id):
"""Delete document."""
document_id = str(document_id)

@ -1,9 +1,10 @@
from controllers.console.datasets.hit_testing_base import DatasetsHitTestingBase
from controllers.service_api import api
from controllers.service_api.wraps import DatasetApiResource
from controllers.service_api.wraps import DatasetApiResource, cloud_edition_billing_rate_limit_check
class HitTestingApi(DatasetApiResource, DatasetsHitTestingBase):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
dataset_id_str = str(dataset_id)

@ -3,7 +3,7 @@ from flask_restful import marshal, reqparse
from werkzeug.exceptions import NotFound
from controllers.service_api import api
from controllers.service_api.wraps import DatasetApiResource
from controllers.service_api.wraps import DatasetApiResource, cloud_edition_billing_rate_limit_check
from fields.dataset_fields import dataset_metadata_fields
from services.dataset_service import DatasetService
from services.entities.knowledge_entities.knowledge_entities import (
@ -14,6 +14,7 @@ from services.metadata_service import MetadataService
class DatasetMetadataCreateServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
parser = reqparse.RequestParser()
parser.add_argument("type", type=str, required=True, nullable=True, location="json")
@ -39,6 +40,7 @@ class DatasetMetadataCreateServiceApi(DatasetApiResource):
class DatasetMetadataServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def patch(self, tenant_id, dataset_id, metadata_id):
parser = reqparse.RequestParser()
parser.add_argument("name", type=str, required=True, nullable=True, location="json")
@ -54,6 +56,7 @@ class DatasetMetadataServiceApi(DatasetApiResource):
metadata = MetadataService.update_metadata_name(dataset_id_str, metadata_id_str, args.get("name"))
return marshal(metadata, dataset_metadata_fields), 200
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def delete(self, tenant_id, dataset_id, metadata_id):
dataset_id_str = str(dataset_id)
metadata_id_str = str(metadata_id)
@ -73,6 +76,7 @@ class DatasetMetadataBuiltInFieldServiceApi(DatasetApiResource):
class DatasetMetadataBuiltInFieldActionServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, action):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)
@ -88,6 +92,7 @@ class DatasetMetadataBuiltInFieldActionServiceApi(DatasetApiResource):
class DocumentMetadataEditServiceApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id):
dataset_id_str = str(dataset_id)
dataset = DatasetService.get_dataset(dataset_id_str)

@ -8,6 +8,7 @@ from controllers.service_api.app.error import ProviderNotInitializeError
from controllers.service_api.wraps import (
DatasetApiResource,
cloud_edition_billing_knowledge_limit_check,
cloud_edition_billing_rate_limit_check,
cloud_edition_billing_resource_check,
)
from core.errors.error import LLMBadRequestError, ProviderTokenNotInitError
@ -35,6 +36,7 @@ class SegmentApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_knowledge_limit_check("add_segment", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id):
"""Create single segment."""
# check dataset
@ -139,6 +141,7 @@ class SegmentApi(DatasetApiResource):
class DatasetSegmentApi(DatasetApiResource):
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def delete(self, tenant_id, dataset_id, document_id, segment_id):
# check dataset
dataset_id = str(dataset_id)
@ -162,6 +165,7 @@ class DatasetSegmentApi(DatasetApiResource):
return 204
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id, segment_id):
# check dataset
dataset_id = str(dataset_id)
@ -236,6 +240,7 @@ class ChildChunkApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_knowledge_limit_check("add_segment", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def post(self, tenant_id, dataset_id, document_id, segment_id):
"""Create child chunk."""
# check dataset
@ -332,6 +337,7 @@ class DatasetChildChunkApi(DatasetApiResource):
"""Resource for updating child chunks."""
@cloud_edition_billing_knowledge_limit_check("add_segment", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def delete(self, tenant_id, dataset_id, document_id, segment_id, child_chunk_id):
"""Delete child chunk."""
# check dataset
@ -370,6 +376,7 @@ class DatasetChildChunkApi(DatasetApiResource):
@cloud_edition_billing_resource_check("vector_space", "dataset")
@cloud_edition_billing_knowledge_limit_check("add_segment", "dataset")
@cloud_edition_billing_rate_limit_check("knowledge", "dataset")
def patch(self, tenant_id, dataset_id, document_id, segment_id, child_chunk_id):
"""Update child chunk."""
# check dataset

@ -15,4 +15,17 @@ api.add_resource(FileApi, "/files/upload")
api.add_resource(RemoteFileInfoApi, "/remote-files/<path:url>")
api.add_resource(RemoteFileUploadApi, "/remote-files/upload")
from . import app, audio, completion, conversation, feature, message, passport, saved_message, site, workflow
from . import (
app,
audio,
completion,
conversation,
feature,
forgot_password,
login,
message,
passport,
saved_message,
site,
workflow,
)

@ -10,6 +10,8 @@ from libs.passport import PassportService
from models.model import App, AppMode
from services.app_service import AppService
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
class AppParameterApi(WebApiResource):
@ -46,10 +48,22 @@ class AppMeta(WebApiResource):
class AppAccessMode(Resource):
def get(self):
parser = reqparse.RequestParser()
parser.add_argument("appId", type=str, required=True, location="args")
parser.add_argument("appId", type=str, required=False, location="args")
parser.add_argument("appCode", type=str, required=False, location="args")
args = parser.parse_args()
app_id = args["appId"]
features = FeatureService.get_system_features()
if not features.webapp_auth.enabled:
return {"accessMode": "public"}
app_id = args.get("appId")
if args.get("appCode"):
app_code = args["appCode"]
app_id = AppService.get_app_id_by_code(app_code)
if not app_id:
raise ValueError("appId or appCode must be provided")
res = EnterpriseService.WebAppAuth.get_app_access_mode_by_id(app_id)
return {"accessMode": res.access_mode}
@ -75,6 +89,10 @@ class AppWebAuthPermission(Resource):
except Exception as e:
pass
features = FeatureService.get_system_features()
if not features.webapp_auth.enabled:
return {"result": True}
parser = reqparse.RequestParser()
parser.add_argument("appId", type=str, required=True, location="args")
args = parser.parse_args()
@ -82,7 +100,9 @@ class AppWebAuthPermission(Resource):
app_id = args["appId"]
app_code = AppService.get_app_code_by_id(app_id)
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code)
res = True
if WebAppAuthService.is_app_require_permission_check(app_id=app_id):
res = EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(str(user_id), app_code)
return {"result": res}

@ -139,3 +139,13 @@ class InvokeRateLimitError(BaseHTTPException):
error_code = "rate_limit_error"
description = "Rate Limit Error"
code = 429
class NotFoundError(BaseHTTPException):
error_code = "not_found"
code = 404
class InvalidArgumentError(BaseHTTPException):
error_code = "invalid_param"
code = 400

@ -0,0 +1,147 @@
import base64
import secrets
from flask import request
from flask_restful import Resource, reqparse
from sqlalchemy import select
from sqlalchemy.orm import Session
from controllers.console.auth.error import (
EmailCodeError,
EmailPasswordResetLimitError,
InvalidEmailError,
InvalidTokenError,
PasswordMismatchError,
)
from controllers.console.error import AccountNotFound, EmailSendIpLimitError
from controllers.console.wraps import email_password_login_enabled, only_edition_enterprise, setup_required
from controllers.web import api
from extensions.ext_database import db
from libs.helper import email, extract_remote_ip
from libs.password import hash_password, valid_password
from models.account import Account
from services.account_service import AccountService
class ForgotPasswordSendEmailApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
parser.add_argument("language", type=str, required=False, location="json")
args = parser.parse_args()
ip_address = extract_remote_ip(request)
if AccountService.is_email_send_ip_limit(ip_address):
raise EmailSendIpLimitError()
if args["language"] is not None and args["language"] == "zh-Hans":
language = "zh-Hans"
else:
language = "en-US"
with Session(db.engine) as session:
account = session.execute(select(Account).filter_by(email=args["email"])).scalar_one_or_none()
token = None
if account is None:
raise AccountNotFound()
else:
token = AccountService.send_reset_password_email(account=account, email=args["email"], language=language)
return {"result": "success", "data": token}
class ForgotPasswordCheckApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
parser.add_argument("code", type=str, required=True, location="json")
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
args = parser.parse_args()
user_email = args["email"]
is_forgot_password_error_rate_limit = AccountService.is_forgot_password_error_rate_limit(args["email"])
if is_forgot_password_error_rate_limit:
raise EmailPasswordResetLimitError()
token_data = AccountService.get_reset_password_data(args["token"])
if token_data is None:
raise InvalidTokenError()
if user_email != token_data.get("email"):
raise InvalidEmailError()
if args["code"] != token_data.get("code"):
AccountService.add_forgot_password_error_rate_limit(args["email"])
raise EmailCodeError()
# Verified, revoke the first token
AccountService.revoke_reset_password_token(args["token"])
# Refresh token data by generating a new token
_, new_token = AccountService.generate_reset_password_token(
user_email, code=args["code"], additional_data={"phase": "reset"}
)
AccountService.reset_forgot_password_error_rate_limit(args["email"])
return {"is_valid": True, "email": token_data.get("email"), "token": new_token}
class ForgotPasswordResetApi(Resource):
@only_edition_enterprise
@setup_required
@email_password_login_enabled
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
parser.add_argument("new_password", type=valid_password, required=True, nullable=False, location="json")
parser.add_argument("password_confirm", type=valid_password, required=True, nullable=False, location="json")
args = parser.parse_args()
# Validate passwords match
if args["new_password"] != args["password_confirm"]:
raise PasswordMismatchError()
# Validate token and get reset data
reset_data = AccountService.get_reset_password_data(args["token"])
if not reset_data:
raise InvalidTokenError()
# Must use token in reset phase
if reset_data.get("phase", "") != "reset":
raise InvalidTokenError()
# Revoke token to prevent reuse
AccountService.revoke_reset_password_token(args["token"])
# Generate secure salt and hash password
salt = secrets.token_bytes(16)
password_hashed = hash_password(args["new_password"], salt)
email = reset_data.get("email", "")
with Session(db.engine) as session:
account = session.execute(select(Account).filter_by(email=email)).scalar_one_or_none()
if account:
self._update_existing_account(account, password_hashed, salt, session)
else:
raise AccountNotFound()
return {"result": "success"}
def _update_existing_account(self, account, password_hashed, salt, session):
# Update existing account credentials
account.password = base64.b64encode(password_hashed).decode()
account.password_salt = base64.b64encode(salt).decode()
session.commit()
api.add_resource(ForgotPasswordSendEmailApi, "/forgot-password")
api.add_resource(ForgotPasswordCheckApi, "/forgot-password/validity")
api.add_resource(ForgotPasswordResetApi, "/forgot-password/resets")

@ -1,12 +1,11 @@
from flask import request
from flask_restful import Resource, reqparse
from jwt import InvalidTokenError # type: ignore
from werkzeug.exceptions import BadRequest
import services
from controllers.console.auth.error import EmailCodeError, EmailOrPasswordMismatchError, InvalidEmailError
from controllers.console.error import AccountBannedError, AccountNotFound
from controllers.console.wraps import setup_required
from controllers.console.wraps import only_edition_enterprise, setup_required
from controllers.web import api
from libs.helper import email
from libs.password import valid_password
from services.account_service import AccountService
@ -16,6 +15,8 @@ from services.webapp_auth_service import WebAppAuthService
class LoginApi(Resource):
"""Resource for web app email/password login."""
@setup_required
@only_edition_enterprise
def post(self):
"""Authenticate user and login."""
parser = reqparse.RequestParser()
@ -23,10 +24,6 @@ class LoginApi(Resource):
parser.add_argument("password", type=valid_password, required=True, location="json")
args = parser.parse_args()
app_code = request.headers.get("X-App-Code")
if app_code is None:
raise BadRequest("X-App-Code header is missing.")
try:
account = WebAppAuthService.authenticate(args["email"], args["password"])
except services.errors.account.AccountLoginError:
@ -36,12 +33,8 @@ class LoginApi(Resource):
except services.errors.account.AccountNotFoundError:
raise AccountNotFound()
WebAppAuthService._validate_user_accessibility(account=account, app_code=app_code)
end_user = WebAppAuthService.create_end_user(email=args["email"], app_code=app_code)
token = WebAppAuthService.login(account=account, app_code=app_code, end_user_id=end_user.id)
return {"result": "success", "token": token}
token = WebAppAuthService.login(account=account)
return {"result": "success", "data": {"access_token": token}}
# class LogoutApi(Resource):
@ -56,6 +49,7 @@ class LoginApi(Resource):
class EmailCodeLoginSendEmailApi(Resource):
@setup_required
@only_edition_enterprise
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
@ -78,6 +72,7 @@ class EmailCodeLoginSendEmailApi(Resource):
class EmailCodeLoginApi(Resource):
@setup_required
@only_edition_enterprise
def post(self):
parser = reqparse.RequestParser()
parser.add_argument("email", type=str, required=True, location="json")
@ -86,9 +81,6 @@ class EmailCodeLoginApi(Resource):
args = parser.parse_args()
user_email = args["email"]
app_code = request.headers.get("X-App-Code")
if app_code is None:
raise BadRequest("X-App-Code header is missing.")
token_data = WebAppAuthService.get_email_code_login_data(args["token"])
if token_data is None:
@ -105,16 +97,12 @@ class EmailCodeLoginApi(Resource):
if not account:
raise AccountNotFound()
WebAppAuthService._validate_user_accessibility(account=account, app_code=app_code)
end_user = WebAppAuthService.create_end_user(email=user_email, app_code=app_code)
token = WebAppAuthService.login(account=account, app_code=app_code, end_user_id=end_user.id)
token = WebAppAuthService.login(account=account)
AccountService.reset_login_error_rate_limit(args["email"])
return {"result": "success", "token": token}
return {"result": "success", "data": {"access_token": token}}
# api.add_resource(LoginApi, "/login")
api.add_resource(LoginApi, "/login")
# api.add_resource(LogoutApi, "/logout")
# api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")
# api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")
api.add_resource(EmailCodeLoginSendEmailApi, "/email-code-login")
api.add_resource(EmailCodeLoginApi, "/email-code-login/validity")

@ -1,9 +1,11 @@
import uuid
from datetime import UTC, datetime, timedelta
from flask import request
from flask_restful import Resource
from werkzeug.exceptions import NotFound, Unauthorized
from configs import dify_config
from controllers.web import api
from controllers.web.error import WebAppAuthRequiredError
from extensions.ext_database import db
@ -11,6 +13,7 @@ from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import EnterpriseService
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService, WebAppAuthType
class PassportResource(Resource):
@ -20,10 +23,19 @@ class PassportResource(Resource):
system_features = FeatureService.get_system_features()
app_code = request.headers.get("X-App-Code")
user_id = request.args.get("user_id")
web_app_access_token = request.args.get("web_app_access_token")
if app_code is None:
raise Unauthorized("X-App-Code header is missing.")
# exchange token for enterprise logined web user
enterprise_user_decoded = decode_enterprise_webapp_user_id(web_app_access_token)
if enterprise_user_decoded:
# a web user has already logged in, exchange a token for this app without redirecting to the login page
return exchange_token_for_existing_web_user(
app_code=app_code, enterprise_user_decoded=enterprise_user_decoded
)
if system_features.webapp_auth.enabled:
app_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
if not app_settings or not app_settings.access_mode == "public":
@ -84,6 +96,128 @@ class PassportResource(Resource):
api.add_resource(PassportResource, "/passport")
def decode_enterprise_webapp_user_id(jwt_token: str | None):
"""
Decode the enterprise user session from the Authorization header.
"""
if not jwt_token:
return None
decoded = PassportService().verify(jwt_token)
source = decoded.get("token_source")
if not source or source != "webapp_login_token":
raise Unauthorized("Invalid token source. Expected 'webapp_login_token'.")
return decoded
def exchange_token_for_existing_web_user(app_code: str, enterprise_user_decoded: dict):
"""
Exchange a token for an existing web user session.
"""
user_id = enterprise_user_decoded.get("user_id")
end_user_id = enterprise_user_decoded.get("end_user_id")
session_id = enterprise_user_decoded.get("session_id")
user_auth_type = enterprise_user_decoded.get("auth_type")
if not user_auth_type:
raise Unauthorized("Missing auth_type in the token.")
site = db.session.query(Site).filter(Site.code == app_code, Site.status == "normal").first()
if not site:
raise NotFound()
app_model = db.session.query(App).filter(App.id == site.app_id).first()
if not app_model or app_model.status != "normal" or not app_model.enable_site:
raise NotFound()
app_auth_type = WebAppAuthService.get_app_auth_type(app_code=app_code)
if app_auth_type == WebAppAuthType.PUBLIC:
return _exchange_for_public_app_token(app_model, site, enterprise_user_decoded)
elif app_auth_type == WebAppAuthType.EXTERNAL and user_auth_type != "external":
raise WebAppAuthRequiredError("Please login as external user.")
elif app_auth_type == WebAppAuthType.INTERNAL and user_auth_type != "internal":
raise WebAppAuthRequiredError("Please login as internal user.")
end_user = None
if end_user_id:
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
if session_id:
end_user = (
db.session.query(EndUser)
.filter(
EndUser.session_id == session_id,
EndUser.tenant_id == app_model.tenant_id,
EndUser.app_id == app_model.id,
)
.first()
)
if not end_user:
if not session_id:
raise NotFound("Missing session_id for existing web user.")
end_user = EndUser(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="browser",
is_anonymous=True,
session_id=session_id,
)
db.session.add(end_user)
db.session.commit()
exp_dt = datetime.now(UTC) + timedelta(minutes=dify_config.ACCESS_TOKEN_EXPIRE_MINUTES)
exp = int(exp_dt.timestamp())
payload = {
"iss": site.id,
"sub": "Web API Passport",
"app_id": site.app_id,
"app_code": site.code,
"user_id": user_id,
"end_user_id": end_user.id,
"auth_type": user_auth_type,
"granted_at": int(datetime.now(UTC).timestamp()),
"token_source": "webapp",
"exp": exp,
}
token: str = PassportService().issue(payload)
return {
"access_token": token,
}
def _exchange_for_public_app_token(app_model, site, token_decoded):
user_id = token_decoded.get("user_id")
end_user = None
if user_id:
end_user = (
db.session.query(EndUser).filter(EndUser.app_id == app_model.id, EndUser.session_id == user_id).first()
)
if not end_user:
end_user = EndUser(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type="browser",
is_anonymous=True,
session_id=generate_session_id(),
)
db.session.add(end_user)
db.session.commit()
payload = {
"iss": site.app_id,
"sub": "Web API Passport",
"app_id": site.app_id,
"app_code": site.code,
"end_user_id": end_user.id,
}
tk = PassportService().issue(payload)
return {
"access_token": tk,
}
def generate_session_id():
"""
Generate a unique session ID.

@ -1,3 +1,4 @@
from datetime import UTC, datetime
from functools import wraps
from flask import request
@ -8,8 +9,9 @@ from controllers.web.error import WebAppAuthAccessDeniedError, WebAppAuthRequire
from extensions.ext_database import db
from libs.passport import PassportService
from models.model import App, EndUser, Site
from services.enterprise.enterprise_service import EnterpriseService
from services.enterprise.enterprise_service import EnterpriseService, WebAppSettings
from services.feature_service import FeatureService
from services.webapp_auth_service import WebAppAuthService
def validate_jwt_token(view=None):
@ -45,7 +47,8 @@ def decode_jwt_token():
raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")
decoded = PassportService().verify(tk)
app_code = decoded.get("app_code")
app_model = db.session.query(App).filter(App.id == decoded["app_id"]).first()
app_id = decoded.get("app_id")
app_model = db.session.query(App).filter(App.id == app_id).first()
site = db.session.query(Site).filter(Site.code == app_code).first()
if not app_model:
raise NotFound()
@ -53,23 +56,30 @@ def decode_jwt_token():
raise BadRequest("Site URL is no longer valid.")
if app_model.enable_site is False:
raise BadRequest("Site is disabled.")
end_user = db.session.query(EndUser).filter(EndUser.id == decoded["end_user_id"]).first()
end_user_id = decoded.get("end_user_id")
end_user = db.session.query(EndUser).filter(EndUser.id == end_user_id).first()
if not end_user:
raise NotFound()
# for enterprise webapp auth
app_web_auth_enabled = False
webapp_settings = None
if system_features.webapp_auth.enabled:
app_web_auth_enabled = (
EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code).access_mode != "public"
)
webapp_settings = EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=app_code)
if not webapp_settings:
raise NotFound("Web app settings not found.")
app_web_auth_enabled = webapp_settings.access_mode != "public"
_validate_webapp_token(decoded, app_web_auth_enabled, system_features.webapp_auth.enabled)
_validate_user_accessibility(decoded, app_code, app_web_auth_enabled, system_features.webapp_auth.enabled)
_validate_user_accessibility(
decoded, app_code, app_web_auth_enabled, system_features.webapp_auth.enabled, webapp_settings
)
return app_model, end_user
except Unauthorized as e:
if system_features.webapp_auth.enabled:
if not app_code:
raise Unauthorized("Please re-login to access the web app.")
app_web_auth_enabled = (
EnterpriseService.WebAppAuth.get_app_access_mode_by_code(app_code=str(app_code)).access_mode != "public"
)
@ -95,15 +105,41 @@ def _validate_webapp_token(decoded, app_web_auth_enabled: bool, system_webapp_au
raise Unauthorized("webapp token expired.")
def _validate_user_accessibility(decoded, app_code, app_web_auth_enabled: bool, system_webapp_auth_enabled: bool):
def _validate_user_accessibility(
decoded,
app_code,
app_web_auth_enabled: bool,
system_webapp_auth_enabled: bool,
webapp_settings: WebAppSettings | None,
):
if system_webapp_auth_enabled and app_web_auth_enabled:
# Check if the user is allowed to access the web app
user_id = decoded.get("user_id")
if not user_id:
raise WebAppAuthRequiredError()
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code):
raise WebAppAuthAccessDeniedError()
if not webapp_settings:
raise WebAppAuthRequiredError("Web app settings not found.")
if WebAppAuthService.is_app_require_permission_check(access_mode=webapp_settings.access_mode):
if not EnterpriseService.WebAppAuth.is_user_allowed_to_access_webapp(user_id, app_code=app_code):
raise WebAppAuthAccessDeniedError()
auth_type = decoded.get("auth_type")
granted_at = decoded.get("granted_at")
if not auth_type:
raise WebAppAuthAccessDeniedError("Missing auth_type in the token.")
if not granted_at:
raise WebAppAuthAccessDeniedError("Missing granted_at in the token.")
# check if sso has been updated
if auth_type == "external":
last_update_time = EnterpriseService.get_app_sso_settings_last_update_time()
if granted_at and datetime.fromtimestamp(granted_at, tz=UTC) < last_update_time:
raise WebAppAuthAccessDeniedError("SSO settings have been updated. Please re-login.")
elif auth_type == "internal":
last_update_time = EnterpriseService.get_workspace_sso_settings_last_update_time()
if granted_at and datetime.fromtimestamp(granted_at, tz=UTC) < last_update_time:
raise WebAppAuthAccessDeniedError("SSO settings have been updated. Please re-login.")
class WebApiResource(Resource):

@ -138,14 +138,11 @@ class DatasetConfigManager:
if not config.get("dataset_configs"):
config["dataset_configs"] = {"retrieval_model": "single"}
if not config["dataset_configs"].get("datasets"):
config["dataset_configs"]["datasets"] = {"strategy": "router", "datasets": []}
if not isinstance(config["dataset_configs"], dict):
raise ValueError("dataset_configs must be of object type")
if not isinstance(config["dataset_configs"], dict):
raise ValueError("dataset_configs must be of object type")
if not config["dataset_configs"].get("datasets"):
config["dataset_configs"]["datasets"] = {"strategy": "router", "datasets": []}
need_manual_query_datasets = config.get("dataset_configs") and config["dataset_configs"].get(
"datasets", {}

@ -104,6 +104,7 @@ class VariableEntity(BaseModel):
Variable Entity.
"""
# `variable` records the name of the variable in user inputs.
variable: str
label: str
description: str = ""

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Optional, Union, overload
from flask import Flask, copy_current_request_context, current_app, has_request_context
from flask import Flask, current_app
from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker
@ -29,12 +29,14 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, Conversation, EndUser, Message, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
logger = logging.getLogger(__name__)
@ -115,6 +117,11 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
)
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
if file_extra_config:
@ -260,6 +267,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
)
var_loader = DraftVarLoader(
engine=db.engine,
app_id=application_generate_entity.app_config.app_id,
tenant_id=application_generate_entity.app_config.tenant_id,
)
draft_var_srv = WorkflowDraftVariableService(db.session())
draft_var_srv.prefill_conversation_variable_default_values(workflow)
return self._generate(
workflow=workflow,
@ -270,6 +284,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=None,
stream=streaming,
variable_loader=var_loader,
)
def single_loop_generate(
@ -335,6 +350,13 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
)
var_loader = DraftVarLoader(
engine=db.engine,
app_id=application_generate_entity.app_config.app_id,
tenant_id=application_generate_entity.app_config.tenant_id,
)
draft_var_srv = WorkflowDraftVariableService(db.session())
draft_var_srv.prefill_conversation_variable_default_values(workflow)
return self._generate(
workflow=workflow,
@ -345,6 +367,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_node_execution_repository=workflow_node_execution_repository,
conversation=None,
stream=streaming,
variable_loader=var_loader,
)
def _generate(
@ -358,6 +381,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
conversation: Optional[Conversation] = None,
stream: bool = True,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
) -> Mapping[str, Any] | Generator[str | Mapping[str, Any], Any, None]:
"""
Generate App response.
@ -366,6 +390,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:param user: account or end user
:param invoke_from: invoke from source
:param application_generate_entity: application generate entity
:param workflow_execution_repository: repository for workflow execution
:param workflow_node_execution_repository: repository for workflow node execution
:param conversation: conversation
:param stream: is stream
@ -399,20 +424,18 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# new thread with request context and contextvars
context = contextvars.copy_context()
@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
context=context,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
"context": context,
"variable_loader": variable_loader,
},
)
worker_thread.start()
@ -439,6 +462,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
conversation_id: str,
message_id: str,
context: contextvars.Context,
variable_loader: VariableLoader,
) -> None:
"""
Generate worker in a new thread.
@ -449,29 +473,12 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
:param message_id: message ID
:return:
"""
for var, val in context.items():
var.set(val)
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context():
with preserve_flask_contexts(flask_app, context_vars=context):
try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = AdvancedChatAppRunner(
@ -480,6 +487,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
conversation=conversation,
message=message,
dialogue_count=self._dialogue_count,
variable_loader=variable_loader,
)
runner.run()

@ -19,6 +19,7 @@ from core.moderation.base import ModerationError
from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.enums import UserFrom
@ -40,14 +41,17 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
conversation: Conversation,
message: Message,
dialogue_count: int,
variable_loader: VariableLoader,
) -> None:
super().__init__(queue_manager)
super().__init__(queue_manager, variable_loader)
self.application_generate_entity = application_generate_entity
self.conversation = conversation
self.message = message
self._dialogue_count = dialogue_count
def _get_app_id(self) -> str:
return self.application_generate_entity.app_config.app_id
def run(self) -> None:
app_config = self.application_generate_entity.app_config
app_config = cast(AdvancedChatAppConfig, app_config)

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping
from typing import Any, Literal, Union, overload
from flask import Flask, copy_current_request_context, current_app, has_request_context
from flask import Flask, current_app
from pydantic import ValidationError
from configs import dify_config
@ -23,9 +23,9 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError
from core.ops.ops_trace_manager import TraceQueueManager
from extensions.ext_database import db
from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, EndUser
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -123,6 +123,11 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
override_model_config_dict["retriever_resource"] = {"enabled": True}
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args.get("files") or []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
@ -182,20 +187,17 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
# new thread with request context and contextvars
context = contextvars.copy_context()
@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
context=context,
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
conversation_id=conversation.id,
message_id=message.id,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"context": context,
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"conversation_id": conversation.id,
"message_id": message.id,
},
)
worker_thread.start()
@ -229,29 +231,12 @@ class AgentChatAppGenerator(MessageBasedAppGenerator):
:param message_id: message ID
:return:
"""
for var, val in context.items():
var.set(val)
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context():
with preserve_flask_contexts(flask_app, context_vars=context):
try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = AgentChatAppRunner()

@ -1,3 +1,4 @@
import logging
import time
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any, Optional, Union
@ -33,6 +34,8 @@ from models.model import App, AppMode, Message, MessageAnnotation
if TYPE_CHECKING:
from core.file.models import File
_logger = logging.getLogger(__name__)
class AppRunner:
def get_pre_calculate_rest_tokens(
@ -298,7 +301,7 @@ class AppRunner:
)
def _handle_invoke_result_stream(
self, invoke_result: Generator, queue_manager: AppQueueManager, agent: bool
self, invoke_result: Generator[LLMResultChunk, None, None], queue_manager: AppQueueManager, agent: bool
) -> None:
"""
Handle invoke result
@ -317,18 +320,28 @@ class AppRunner:
else:
queue_manager.publish(QueueAgentMessageEvent(chunk=result), PublishFrom.APPLICATION_MANAGER)
text += result.delta.message.content
message = result.delta.message
if isinstance(message.content, str):
text += message.content
elif isinstance(message.content, list):
for content in message.content:
if not isinstance(content, str):
# TODO(QuantumGhost): Add multimodal output support for easy ui.
_logger.warning("received multimodal output, type=%s", type(content))
text += content.data
else:
text += content # failback to str
if not model:
model = result.model
if not prompt_messages:
prompt_messages = result.prompt_messages
prompt_messages = list(result.prompt_messages)
if result.delta.usage:
usage = result.delta.usage
if not usage:
if usage is None:
usage = LLMUsage.empty_usage()
llm_result = LLMResult(

@ -25,7 +25,6 @@ from factories import file_factory
from models.account import Account
from models.model import App, EndUser
from services.conversation_service import ConversationService
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -115,6 +114,11 @@ class ChatAppGenerator(MessageBasedAppGenerator):
override_model_config_dict["retriever_resource"] = {"enabled": True}
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
@ -219,8 +223,6 @@ class ChatAppGenerator(MessageBasedAppGenerator):
# get conversation and message
conversation = self._get_conversation(conversation_id)
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError("Message not exists")
# chatbot app
runner = ChatAppRunner()

@ -48,6 +48,7 @@ from core.workflow.entities.workflow_execution import WorkflowExecution
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecution, WorkflowNodeExecutionStatus
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.workflow_type_encoder import WorkflowRuntimeTypeConverter
from models import (
Account,
CreatorUserRole,
@ -125,7 +126,7 @@ class WorkflowResponseConverter:
id=workflow_execution.id_,
workflow_id=workflow_execution.workflow_id,
status=workflow_execution.status,
outputs=workflow_execution.outputs,
outputs=WorkflowRuntimeTypeConverter().to_json_encodable(workflow_execution.outputs),
error=workflow_execution.error_message,
elapsed_time=workflow_execution.elapsed_time,
total_tokens=workflow_execution.total_tokens,
@ -202,6 +203,8 @@ class WorkflowResponseConverter:
if not workflow_node_execution.finished_at:
return None
json_converter = WorkflowRuntimeTypeConverter()
return NodeFinishStreamResponse(
task_id=task_id,
workflow_run_id=workflow_node_execution.workflow_execution_id,
@ -214,7 +217,7 @@ class WorkflowResponseConverter:
predecessor_node_id=workflow_node_execution.predecessor_node_id,
inputs=workflow_node_execution.inputs,
process_data=workflow_node_execution.process_data,
outputs=workflow_node_execution.outputs,
outputs=json_converter.to_json_encodable(workflow_node_execution.outputs),
status=workflow_node_execution.status,
error=workflow_node_execution.error,
elapsed_time=workflow_node_execution.elapsed_time,
@ -245,6 +248,8 @@ class WorkflowResponseConverter:
if not workflow_node_execution.finished_at:
return None
json_converter = WorkflowRuntimeTypeConverter()
return NodeRetryStreamResponse(
task_id=task_id,
workflow_run_id=workflow_node_execution.workflow_execution_id,
@ -257,7 +262,7 @@ class WorkflowResponseConverter:
predecessor_node_id=workflow_node_execution.predecessor_node_id,
inputs=workflow_node_execution.inputs,
process_data=workflow_node_execution.process_data,
outputs=workflow_node_execution.outputs,
outputs=json_converter.to_json_encodable(workflow_node_execution.outputs),
status=workflow_node_execution.status,
error=workflow_node_execution.error,
elapsed_time=workflow_node_execution.elapsed_time,
@ -376,6 +381,7 @@ class WorkflowResponseConverter:
workflow_execution_id: str,
event: QueueIterationCompletedEvent,
) -> IterationNodeCompletedStreamResponse:
json_converter = WorkflowRuntimeTypeConverter()
return IterationNodeCompletedStreamResponse(
task_id=task_id,
workflow_run_id=workflow_execution_id,
@ -384,7 +390,7 @@ class WorkflowResponseConverter:
node_id=event.node_id,
node_type=event.node_type.value,
title=event.node_data.title,
outputs=event.outputs,
outputs=json_converter.to_json_encodable(event.outputs),
created_at=int(time.time()),
extras={},
inputs=event.inputs or {},
@ -463,7 +469,7 @@ class WorkflowResponseConverter:
node_id=event.node_id,
node_type=event.node_type.value,
title=event.node_data.title,
outputs=event.outputs,
outputs=WorkflowRuntimeTypeConverter().to_json_encodable(event.outputs),
created_at=int(time.time()),
extras={},
inputs=event.inputs or {},

@ -101,6 +101,11 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
)
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
files = args["files"] if args.get("files") else []
file_extra_config = FileUploadConfigManager.convert(override_model_config_dict or app_model_config.to_dict())
if file_extra_config:
@ -196,8 +201,6 @@ class CompletionAppGenerator(MessageBasedAppGenerator):
try:
# get message
message = self._get_message(message_id)
if message is None:
raise MessageNotExistsError()
# chatbot app
runner = CompletionAppRunner()

@ -29,6 +29,7 @@ from models.enums import CreatorUserRole
from models.model import App, AppMode, AppModelConfig, Conversation, EndUser, Message, MessageFile
from services.errors.app_model_config import AppModelConfigBrokenError
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError
logger = logging.getLogger(__name__)
@ -251,7 +252,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
return introduction or ""
def _get_conversation(self, conversation_id: str):
def _get_conversation(self, conversation_id: str) -> Conversation:
"""
Get conversation by conversation id
:param conversation_id: conversation id
@ -260,11 +261,11 @@ class MessageBasedAppGenerator(BaseAppGenerator):
conversation = db.session.query(Conversation).filter(Conversation.id == conversation_id).first()
if not conversation:
raise ConversationNotExistsError()
raise ConversationNotExistsError("Conversation not exists")
return conversation
def _get_message(self, message_id: str) -> Optional[Message]:
def _get_message(self, message_id: str) -> Message:
"""
Get message by message id
:param message_id: message id
@ -272,4 +273,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
"""
message = db.session.query(Message).filter(Message.id == message_id).first()
if message is None:
raise MessageNotExistsError("Message not exists")
return message

@ -5,7 +5,7 @@ import uuid
from collections.abc import Generator, Mapping, Sequence
from typing import Any, Literal, Optional, Union, overload
from flask import Flask, copy_current_request_context, current_app, has_request_context
from flask import Flask, current_app
from pydantic import ValidationError
from sqlalchemy.orm import sessionmaker
@ -27,10 +27,13 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository
from core.repositories.sqlalchemy_workflow_execution_repository import SQLAlchemyWorkflowExecutionRepository
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
from extensions.ext_database import db
from factories import file_factory
from libs.flask_utils import preserve_flask_contexts
from models import Account, App, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
from models.enums import WorkflowRunTriggeredFrom
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
logger = logging.getLogger(__name__)
@ -93,6 +96,11 @@ class WorkflowAppGenerator(BaseAppGenerator):
files: Sequence[Mapping[str, Any]] = args.get("files") or []
# parse files
# TODO(QuantumGhost): Move file parsing logic to the API controller layer
# for better separation of concerns.
#
# For implementation reference, see the `_parse_file` function and
# `DraftWorkflowNodeRunApi` class which handle this properly.
file_extra_config = FileUploadConfigManager.convert(workflow.features_dict, is_vision=False)
system_files = file_factory.build_from_mappings(
mappings=files,
@ -185,6 +193,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_node_execution_repository: WorkflowNodeExecutionRepository,
streaming: bool = True,
workflow_thread_pool_id: Optional[str] = None,
variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER,
) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
"""
Generate App response.
@ -194,6 +203,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
:param user: account or end user
:param application_generate_entity: application generate entity
:param invoke_from: invoke from source
:param workflow_execution_repository: repository for workflow execution
:param workflow_node_execution_repository: repository for workflow node execution
:param streaming: is stream
:param workflow_thread_pool_id: workflow thread pool id
@ -209,19 +219,17 @@ class WorkflowAppGenerator(BaseAppGenerator):
# new thread with request context and contextvars
context = contextvars.copy_context()
@copy_current_request_context
def worker_with_context():
# Run the worker within the copied context
return context.run(
self._generate_worker,
flask_app=current_app._get_current_object(), # type: ignore
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
context=context,
workflow_thread_pool_id=workflow_thread_pool_id,
)
worker_thread = threading.Thread(target=worker_with_context)
worker_thread = threading.Thread(
target=self._generate_worker,
kwargs={
"flask_app": current_app._get_current_object(), # type: ignore
"application_generate_entity": application_generate_entity,
"queue_manager": queue_manager,
"context": context,
"workflow_thread_pool_id": workflow_thread_pool_id,
"variable_loader": variable_loader,
},
)
worker_thread.start()
@ -304,6 +312,13 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
)
draft_var_srv = WorkflowDraftVariableService(db.session())
draft_var_srv.prefill_conversation_variable_default_values(workflow)
var_loader = DraftVarLoader(
engine=db.engine,
app_id=application_generate_entity.app_config.app_id,
tenant_id=application_generate_entity.app_config.tenant_id,
)
return self._generate(
app_model=app_model,
@ -314,6 +329,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
)
def single_loop_generate(
@ -380,7 +396,13 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP,
)
draft_var_srv = WorkflowDraftVariableService(db.session())
draft_var_srv.prefill_conversation_variable_default_values(workflow)
var_loader = DraftVarLoader(
engine=db.engine,
app_id=application_generate_entity.app_config.app_id,
tenant_id=application_generate_entity.app_config.tenant_id,
)
return self._generate(
app_model=app_model,
workflow=workflow,
@ -390,6 +412,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
workflow_execution_repository=workflow_execution_repository,
workflow_node_execution_repository=workflow_node_execution_repository,
streaming=streaming,
variable_loader=var_loader,
)
def _generate_worker(
@ -398,6 +421,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
application_generate_entity: WorkflowAppGenerateEntity,
queue_manager: AppQueueManager,
context: contextvars.Context,
variable_loader: VariableLoader,
workflow_thread_pool_id: Optional[str] = None,
) -> None:
"""
@ -408,29 +432,15 @@ class WorkflowAppGenerator(BaseAppGenerator):
:param workflow_thread_pool_id: workflow thread pool id
:return:
"""
for var, val in context.items():
var.set(val)
# FIXME(-LAN-): Save current user before entering new app context
from flask import g
saved_user = None
if has_request_context() and hasattr(g, "_login_user"):
saved_user = g._login_user
with flask_app.app_context():
with preserve_flask_contexts(flask_app, context_vars=context):
try:
# Restore user in new app context
if saved_user is not None:
from flask import g
g._login_user = saved_user
# workflow app
runner = WorkflowAppRunner(
application_generate_entity=application_generate_entity,
queue_manager=queue_manager,
workflow_thread_pool_id=workflow_thread_pool_id,
variable_loader=variable_loader,
)
runner.run()

@ -12,6 +12,7 @@ from core.app.entities.app_invoke_entities import (
from core.workflow.callbacks import WorkflowCallback, WorkflowLoggingCallback
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.enums import SystemVariableKey
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.enums import UserFrom
@ -30,6 +31,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
self,
application_generate_entity: WorkflowAppGenerateEntity,
queue_manager: AppQueueManager,
variable_loader: VariableLoader,
workflow_thread_pool_id: Optional[str] = None,
) -> None:
"""
@ -37,10 +39,13 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
:param queue_manager: application queue manager
:param workflow_thread_pool_id: workflow thread pool id
"""
super().__init__(queue_manager, variable_loader)
self.application_generate_entity = application_generate_entity
self.queue_manager = queue_manager
self.workflow_thread_pool_id = workflow_thread_pool_id
def _get_app_id(self) -> str:
return self.application_generate_entity.app_config.app_id
def run(self) -> None:
"""
Run application

@ -1,6 +1,8 @@
from collections.abc import Mapping
from typing import Any, Optional, cast
from sqlalchemy.orm import Session
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
from core.app.apps.base_app_runner import AppRunner
from core.app.entities.queue_entities import (
@ -33,6 +35,7 @@ from core.workflow.entities.variable_pool import VariablePool
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from core.workflow.graph_engine.entities.event import (
AgentLogEvent,
BaseNodeEvent,
GraphEngineEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
@ -62,15 +65,23 @@ from core.workflow.graph_engine.entities.event import (
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.nodes import NodeType
from core.workflow.nodes.node_mapping import NODE_TYPE_CLASSES_MAPPING
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader, load_into_variable_pool
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.model import App
from models.workflow import Workflow
from services.workflow_draft_variable_service import (
DraftVariableSaver,
)
class WorkflowBasedAppRunner(AppRunner):
def __init__(self, queue_manager: AppQueueManager):
def __init__(self, queue_manager: AppQueueManager, variable_loader: VariableLoader = DUMMY_VARIABLE_LOADER) -> None:
self.queue_manager = queue_manager
self._variable_loader = variable_loader
def _get_app_id(self) -> str:
raise NotImplementedError("not implemented")
def _init_graph(self, graph_config: Mapping[str, Any]) -> Graph:
"""
@ -173,6 +184,13 @@ class WorkflowBasedAppRunner(AppRunner):
except NotImplementedError:
variable_mapping = {}
load_into_variable_pool(
variable_loader=self._variable_loader,
variable_pool=variable_pool,
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
WorkflowEntry.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
user_inputs=user_inputs,
@ -262,6 +280,12 @@ class WorkflowBasedAppRunner(AppRunner):
)
except NotImplementedError:
variable_mapping = {}
load_into_variable_pool(
self._variable_loader,
variable_pool=variable_pool,
variable_mapping=variable_mapping,
user_inputs=user_inputs,
)
WorkflowEntry.mapping_user_inputs_to_variable_pool(
variable_mapping=variable_mapping,
@ -376,6 +400,8 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id,
)
)
self._save_draft_var_for_event(event)
elif isinstance(event, NodeRunFailedEvent):
self._publish_event(
QueueNodeFailedEvent(
@ -438,6 +464,8 @@ class WorkflowBasedAppRunner(AppRunner):
in_loop_id=event.in_loop_id,
)
)
self._save_draft_var_for_event(event)
elif isinstance(event, NodeInIterationFailedEvent):
self._publish_event(
QueueNodeInIterationFailedEvent(
@ -690,3 +718,30 @@ class WorkflowBasedAppRunner(AppRunner):
def _publish_event(self, event: AppQueueEvent) -> None:
self.queue_manager.publish(event, PublishFrom.APPLICATION_MANAGER)
def _save_draft_var_for_event(self, event: BaseNodeEvent):
run_result = event.route_node_state.node_run_result
if run_result is None:
return
process_data = run_result.process_data
outputs = run_result.outputs
with Session(bind=db.engine) as session, session.begin():
draft_var_saver = DraftVariableSaver(
session=session,
app_id=self._get_app_id(),
node_id=event.node_id,
node_type=event.node_type,
# FIXME(QuantumGhost): rely on private state of queue_manager is not ideal.
invoke_from=self.queue_manager._invoke_from,
node_execution_id=event.id,
enclosing_node_id=event.in_loop_id or event.in_iteration_id or None,
)
draft_var_saver.save(process_data=process_data, outputs=outputs)
def _remove_first_element_from_variable_string(key: str) -> str:
"""
Remove the first element from the prefix.
"""
prefix, remaining = key.split(".", maxsplit=1)
return remaining

@ -17,9 +17,24 @@ class InvokeFrom(Enum):
Invoke From.
"""
# SERVICE_API indicates that this invocation is from an API call to Dify app.
#
# Description of service api in Dify docs:
# https://docs.dify.ai/en/guides/application-publishing/developing-with-apis
SERVICE_API = "service-api"
# WEB_APP indicates that this invocation is from
# the web app of the workflow (or chatflow).
#
# Description of web app in Dify docs:
# https://docs.dify.ai/en/guides/application-publishing/launch-your-webapp-quickly/README
WEB_APP = "web-app"
# EXPLORE indicates that this invocation is from
# the workflow (or chatflow) explore page.
EXPLORE = "explore"
# DEBUGGER indicates that this invocation is from
# the workflow (or chatflow) edit page.
DEBUGGER = "debugger"
@classmethod

@ -48,6 +48,7 @@ from core.model_manager import ModelInstance
from core.model_runtime.entities.llm_entities import LLMResult, LLMResultChunk, LLMResultChunkDelta, LLMUsage
from core.model_runtime.entities.message_entities import (
AssistantPromptMessage,
TextPromptMessageContent,
)
from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
from core.ops.entities.trace_entity import TraceTaskName
@ -309,6 +310,23 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline):
delta_text = chunk.delta.message.content
if delta_text is None:
continue
if isinstance(chunk.delta.message.content, list):
delta_text = ""
for content in chunk.delta.message.content:
logger.debug(
"The content type %s in LLM chunk delta message content.: %r", type(content), content
)
if isinstance(content, TextPromptMessageContent):
delta_text += content.data
elif isinstance(content, str):
delta_text += content # failback to str
else:
logger.warning(
"Unsupported content type %s in LLM chunk delta message content.: %r",
type(content),
content,
)
continue
if not self._task_state.llm_result.prompt_messages:
self._task_state.llm_result.prompt_messages = chunk.prompt_messages

@ -15,6 +15,11 @@ class CommonParameterType(StrEnum):
MODEL_SELECTOR = "model-selector"
TOOLS_SELECTOR = "array[tools]"
# Dynamic select parameter
# Once you are not sure about the available options until authorization is done
# eg: Select a Slack channel from a Slack workspace
DYNAMIC_SELECT = "dynamic-select"
# TOOL_SELECTOR = "tool-selector"

@ -1 +1,11 @@
from typing import Any
# TODO(QuantumGhost): Refactor variable type identification. Instead of directly
# comparing `dify_model_identity` with constants throughout the codebase, extract
# this logic into a dedicated function. This would encapsulate the implementation
# details of how different variable types are identified.
FILE_MODEL_IDENTITY = "__dify__file__"
def maybe_file_object(o: Any) -> bool:
return isinstance(o, dict) and o.get("dify_model_identity") == FILE_MODEL_IDENTITY

@ -1,5 +1,5 @@
import logging
import random
import secrets
from typing import cast
from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
@ -38,7 +38,7 @@ def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEnt
if len(text_chunks) == 0:
return True
text_chunk = random.choice(text_chunks)
text_chunk = secrets.choice(text_chunks)
try:
model_provider_factory = ModelProviderFactory(tenant_id)

@ -534,7 +534,7 @@ class IndexingRunner:
# chunk nodes by chunk size
indexing_start_at = time.perf_counter()
tokens = 0
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
# create keyword index
create_keyword_thread = threading.Thread(
target=self._process_keyword_index,
@ -572,7 +572,7 @@ class IndexingRunner:
for future in futures:
tokens += future.result()
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:
if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX and dataset.indexing_technique == "economy":
create_keyword_thread.join()
indexing_end_at = time.perf_counter()

@ -542,8 +542,6 @@ class LBModelManager:
return config
return None
def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:
"""
Cooldown model load balancing config

@ -98,6 +98,7 @@ class WeaveConfig(BaseTracingConfig):
entity: str | None = None
project: str
endpoint: str = "https://trace.wandb.ai"
host: str | None = None
@field_validator("endpoint")
@classmethod
@ -109,6 +110,14 @@ class WeaveConfig(BaseTracingConfig):
return v
@field_validator("host")
@classmethod
def validate_host(cls, v, info: ValidationInfo):
if v is not None and v != "":
if not v.startswith(("https://", "http://")):
raise ValueError("host must start with https:// or http://")
return v
OPS_FILE_PATH = "ops_trace/"
OPS_TRACE_FAILED_KEY = "FAILED_OPS_TRACE"

@ -81,7 +81,7 @@ class OpsTraceProviderConfigMap(dict[str, dict[str, Any]]):
return {
"config_class": WeaveConfig,
"secret_keys": ["api_key"],
"other_keys": ["project", "entity", "endpoint"],
"other_keys": ["project", "entity", "endpoint", "host"],
"trace_instance": WeaveDataTrace,
}
@ -251,7 +251,7 @@ class OpsTraceManager:
provider_config_map[tracing_provider]["trace_instance"],
provider_config_map[tracing_provider]["config_class"],
)
decrypt_trace_config_key = str(decrypt_trace_config)
decrypt_trace_config_key = json.dumps(decrypt_trace_config, sort_keys=True)
tracing_instance = cls.ops_trace_instances_cache.get(decrypt_trace_config_key)
if tracing_instance is None:
# create new tracing_instance and update the cache if it absent

@ -40,9 +40,14 @@ class WeaveDataTrace(BaseTraceInstance):
self.weave_api_key = weave_config.api_key
self.project_name = weave_config.project
self.entity = weave_config.entity
self.host = weave_config.host
# Login with API key first, including host if provided
if self.host:
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True, host=self.host)
else:
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
# Login with API key first
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
if not login_status:
logger.error("Failed to login to Weights & Biases with the provided API key")
raise ValueError("Weave login failed")
@ -386,7 +391,11 @@ class WeaveDataTrace(BaseTraceInstance):
def api_check(self):
try:
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
if self.host:
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True, host=self.host)
else:
login_status = wandb.login(key=self.weave_api_key, verify=True, relogin=True)
if not login_status:
raise ValueError("Weave login failed")
else:

@ -11,14 +11,12 @@ class BaseBackwardsInvocation:
try:
for chunk in response:
if isinstance(chunk, BaseModel | dict):
yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
elif isinstance(chunk, str):
yield f"event: {chunk}\n\n".encode()
yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode()
except Exception as e:
error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
yield f"{error_message}\n\n".encode()
yield error_message.encode()
else:
yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode() + b"\n\n"
yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode()
T = TypeVar("T", bound=dict | Mapping | str | bool | int | BaseModel)

@ -21,7 +21,7 @@ from core.plugin.entities.request import (
)
from core.tools.entities.tool_entities import ToolProviderType
from core.tools.utils.model_invocation_utils import ModelInvocationUtils
from core.workflow.nodes.llm.node import LLMNode
from core.workflow.nodes.llm import llm_utils
from models.account import Tenant
@ -55,7 +55,7 @@ class PluginModelBackwardsInvocation(BaseBackwardsInvocation):
def handle() -> Generator[LLMResultChunk, None, None]:
for chunk in response:
if chunk.delta.usage:
LLMNode.deduct_llm_quota(
llm_utils.deduct_llm_quota(
tenant_id=tenant.id, model_instance=model_instance, usage=chunk.delta.usage
)
chunk.prompt_messages = []
@ -64,7 +64,7 @@ class PluginModelBackwardsInvocation(BaseBackwardsInvocation):
return handle()
else:
if response.usage:
LLMNode.deduct_llm_quota(tenant_id=tenant.id, model_instance=model_instance, usage=response.usage)
llm_utils.deduct_llm_quota(tenant_id=tenant.id, model_instance=model_instance, usage=response.usage)
def handle_non_streaming(response: LLMResult) -> Generator[LLMResultChunk, None, None]:
yield LLMResultChunk(

@ -35,6 +35,7 @@ class PluginParameterType(enum.StrEnum):
APP_SELECTOR = CommonParameterType.APP_SELECTOR.value
MODEL_SELECTOR = CommonParameterType.MODEL_SELECTOR.value
TOOLS_SELECTOR = CommonParameterType.TOOLS_SELECTOR.value
DYNAMIC_SELECT = CommonParameterType.DYNAMIC_SELECT.value
# deprecated, should not use.
SYSTEM_FILES = CommonParameterType.SYSTEM_FILES.value

@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from datetime import datetime
from enum import StrEnum
from typing import Any, Generic, Optional, TypeVar
@ -9,6 +9,7 @@ from core.agent.plugin_entities import AgentProviderEntityWithPlugin
from core.model_runtime.entities.model_entities import AIModelEntity
from core.model_runtime.entities.provider_entities import ProviderEntity
from core.plugin.entities.base import BasePluginEntity
from core.plugin.entities.parameters import PluginParameterOption
from core.plugin.entities.plugin import PluginDeclaration, PluginEntity
from core.tools.entities.common_entities import I18nObject
from core.tools.entities.tool_entities import ToolProviderEntityWithPlugin
@ -156,9 +157,23 @@ class PluginInstallTaskStartResponse(BaseModel):
task_id: str = Field(description="The ID of the install task.")
class PluginUploadResponse(BaseModel):
class PluginVerification(BaseModel):
"""
Verification of the plugin.
"""
class AuthorizedCategory(StrEnum):
Langgenius = "langgenius"
Partner = "partner"
Community = "community"
authorized_category: AuthorizedCategory = Field(description="The authorized category of the plugin.")
class PluginDecodeResponse(BaseModel):
unique_identifier: str = Field(description="The unique identifier of the plugin.")
manifest: PluginDeclaration
verification: Optional[PluginVerification] = Field(default=None, description="Basic verification information")
class PluginOAuthAuthorizationUrlResponse(BaseModel):
@ -172,3 +187,7 @@ class PluginOAuthCredentialsResponse(BaseModel):
class PluginListResponse(BaseModel):
list: list[PluginEntity]
total: int
class PluginDynamicSelectOptionsResponse(BaseModel):
options: Sequence[PluginParameterOption] = Field(description="The options of the dynamic select.")

@ -0,0 +1,45 @@
from collections.abc import Mapping
from typing import Any
from core.plugin.entities.plugin import GenericProviderID
from core.plugin.entities.plugin_daemon import PluginDynamicSelectOptionsResponse
from core.plugin.impl.base import BasePluginClient
class DynamicSelectClient(BasePluginClient):
def fetch_dynamic_select_options(
self,
tenant_id: str,
user_id: str,
plugin_id: str,
provider: str,
action: str,
credentials: Mapping[str, Any],
parameter: str,
) -> PluginDynamicSelectOptionsResponse:
"""
Fetch dynamic select options for a plugin parameter.
"""
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/dynamic_select/fetch_parameter_options",
PluginDynamicSelectOptionsResponse,
data={
"user_id": user_id,
"data": {
"provider": GenericProviderID(provider).provider_name,
"credentials": credentials,
"provider_action": action,
"parameter": parameter,
},
},
headers={
"X-Plugin-ID": plugin_id,
"Content-Type": "application/json",
},
)
for options in response:
return options
raise ValueError("Plugin service returned no options")

@ -1,3 +1,4 @@
import binascii
from collections.abc import Mapping
from typing import Any
@ -16,7 +17,7 @@ class OAuthHandler(BasePluginClient):
provider: str,
system_credentials: Mapping[str, Any],
) -> PluginOAuthAuthorizationUrlResponse:
return self._request_with_plugin_daemon_response(
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/oauth/get_authorization_url",
PluginOAuthAuthorizationUrlResponse,
@ -32,6 +33,9 @@ class OAuthHandler(BasePluginClient):
"Content-Type": "application/json",
},
)
for resp in response:
return resp
raise ValueError("No response received from plugin daemon for authorization URL request.")
def get_credentials(
self,
@ -49,7 +53,7 @@ class OAuthHandler(BasePluginClient):
# encode request to raw http request
raw_request_bytes = self._convert_request_to_raw_data(request)
return self._request_with_plugin_daemon_response(
response = self._request_with_plugin_daemon_response_stream(
"POST",
f"plugin/{tenant_id}/dispatch/oauth/get_credentials",
PluginOAuthCredentialsResponse,
@ -58,7 +62,8 @@ class OAuthHandler(BasePluginClient):
"data": {
"provider": provider,
"system_credentials": system_credentials,
"raw_request_bytes": raw_request_bytes,
# for json serialization
"raw_http_request": binascii.hexlify(raw_request_bytes).decode(),
},
},
headers={
@ -66,6 +71,9 @@ class OAuthHandler(BasePluginClient):
"Content-Type": "application/json",
},
)
for resp in response:
return resp
raise ValueError("No response received from plugin daemon for authorization URL request.")
def _convert_request_to_raw_data(self, request: Request) -> bytes:
"""
@ -79,7 +87,7 @@ class OAuthHandler(BasePluginClient):
"""
# Start with the request line
method = request.method
path = request.path
path = request.full_path
protocol = request.headers.get("HTTP_VERSION", "HTTP/1.1")
raw_data = f"{method} {path} {protocol}\r\n".encode()

@ -10,10 +10,10 @@ from core.plugin.entities.plugin import (
PluginInstallationSource,
)
from core.plugin.entities.plugin_daemon import (
PluginDecodeResponse,
PluginInstallTask,
PluginInstallTaskStartResponse,
PluginListResponse,
PluginUploadResponse,
)
from core.plugin.impl.base import BasePluginClient
@ -53,7 +53,7 @@ class PluginInstaller(BasePluginClient):
tenant_id: str,
pkg: bytes,
verify_signature: bool = False,
) -> PluginUploadResponse:
) -> PluginDecodeResponse:
"""
Upload a plugin package and return the plugin unique identifier.
"""
@ -68,7 +68,7 @@ class PluginInstaller(BasePluginClient):
return self._request_with_plugin_daemon_response(
"POST",
f"plugin/{tenant_id}/management/install/upload/package",
PluginUploadResponse,
PluginDecodeResponse,
files=body,
data=data,
)
@ -176,6 +176,18 @@ class PluginInstaller(BasePluginClient):
params={"plugin_unique_identifier": plugin_unique_identifier},
)
def decode_plugin_from_identifier(self, tenant_id: str, plugin_unique_identifier: str) -> PluginDecodeResponse:
"""
Decode a plugin from an identifier.
"""
return self._request_with_plugin_daemon_response(
"GET",
f"plugin/{tenant_id}/management/decode/from_identifier",
PluginDecodeResponse,
data={"plugin_unique_identifier": plugin_unique_identifier},
headers={"Content-Type": "application/json"},
)
def fetch_plugin_installation_by_ids(
self, tenant_id: str, plugin_ids: Sequence[str]
) -> Sequence[PluginInstallation]:

@ -0,0 +1,233 @@
import json
import logging
import uuid
from functools import wraps
from typing import Any, Optional
from mo_vector.client import MoVectorClient # type: ignore
from pydantic import BaseModel, model_validator
from configs import dify_config
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.embedding.embedding_base import Embeddings
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
logger = logging.getLogger(__name__)
class MatrixoneConfig(BaseModel):
host: str = "localhost"
port: int = 6001
user: str = "dump"
password: str = "111"
database: str = "dify"
metric: str = "l2"
@model_validator(mode="before")
@classmethod
def validate_config(cls, values: dict) -> dict:
if not values["host"]:
raise ValueError("config host is required")
if not values["port"]:
raise ValueError("config port is required")
if not values["user"]:
raise ValueError("config user is required")
if not values["password"]:
raise ValueError("config password is required")
if not values["database"]:
raise ValueError("config database is required")
return values
def ensure_client(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if self.client is None:
self.client = self._get_client(None, False)
return func(self, *args, **kwargs)
return wrapper
class MatrixoneVector(BaseVector):
"""
Matrixone vector storage implementation.
"""
def __init__(self, collection_name: str, config: MatrixoneConfig):
super().__init__(collection_name)
self.config = config
self.collection_name = collection_name.lower()
self.client = None
@property
def collection_name(self):
return self._collection_name
@collection_name.setter
def collection_name(self, value):
self._collection_name = value
def get_type(self) -> str:
return VectorType.MATRIXONE
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
if self.client is None:
self.client = self._get_client(len(embeddings[0]), True)
return self.add_texts(texts, embeddings)
def _get_client(self, dimension: Optional[int] = None, create_table: bool = False) -> MoVectorClient:
"""
Create a new client for the collection.
The collection will be created if it doesn't exist.
"""
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
client = MoVectorClient(
connection_string=f"mysql+pymysql://{self.config.user}:{self.config.password}@{self.config.host}:{self.config.port}/{self.config.database}",
table_name=self.collection_name,
vector_dimension=dimension,
create_table=create_table,
)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return client
try:
client.create_full_text_index()
except Exception as e:
logger.exception("Failed to create full text index")
redis_client.set(collection_exist_cache_key, 1, ex=3600)
return client
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
if self.client is None:
self.client = self._get_client(len(embeddings[0]), True)
assert self.client is not None
ids = []
for _, doc in enumerate(documents):
if doc.metadata is not None:
doc_id = doc.metadata.get("doc_id", str(uuid.uuid4()))
ids.append(doc_id)
self.client.insert(
texts=[doc.page_content for doc in documents],
embeddings=embeddings,
metadatas=[doc.metadata for doc in documents],
ids=ids,
)
return ids
@ensure_client
def text_exists(self, id: str) -> bool:
assert self.client is not None
result = self.client.get(ids=[id])
return len(result) > 0
@ensure_client
def delete_by_ids(self, ids: list[str]) -> None:
assert self.client is not None
if not ids:
return
self.client.delete(ids=ids)
@ensure_client
def get_ids_by_metadata_field(self, key: str, value: str):
assert self.client is not None
results = self.client.query_by_metadata(filter={key: value})
return [result.id for result in results]
@ensure_client
def delete_by_metadata_field(self, key: str, value: str) -> None:
assert self.client is not None
self.client.delete(filter={key: value})
@ensure_client
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
assert self.client is not None
top_k = kwargs.get("top_k", 5)
document_ids_filter = kwargs.get("document_ids_filter")
filter = None
if document_ids_filter:
filter = {"document_id": {"$in": document_ids_filter}}
results = self.client.query(
query_vector=query_vector,
k=top_k,
filter=filter,
)
docs = []
# TODO: add the score threshold to the query
for result in results:
metadata = result.metadata
docs.append(
Document(
page_content=result.document,
metadata=metadata,
)
)
return docs
@ensure_client
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
assert self.client is not None
top_k = kwargs.get("top_k", 5)
document_ids_filter = kwargs.get("document_ids_filter")
filter = None
if document_ids_filter:
filter = {"document_id": {"$in": document_ids_filter}}
score_threshold = float(kwargs.get("score_threshold", 0.0))
results = self.client.full_text_query(
keywords=[query],
k=top_k,
filter=filter,
)
docs = []
for result in results:
metadata = result.metadata
if isinstance(metadata, str):
import json
metadata = json.loads(metadata)
score = 1 - result.distance
if score >= score_threshold:
metadata["score"] = score
docs.append(
Document(
page_content=result.document,
metadata=metadata,
)
)
return docs
@ensure_client
def delete(self) -> None:
assert self.client is not None
self.client.delete()
class MatrixoneVectorFactory(AbstractVectorFactory):
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> MatrixoneVector:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.MATRIXONE, collection_name))
config = MatrixoneConfig(
host=dify_config.MATRIXONE_HOST or "localhost",
port=dify_config.MATRIXONE_PORT or 6001,
user=dify_config.MATRIXONE_USER or "dump",
password=dify_config.MATRIXONE_PASSWORD or "111",
database=dify_config.MATRIXONE_DATABASE or "dify",
metric=dify_config.MATRIXONE_METRIC or "l2",
)
return MatrixoneVector(collection_name=collection_name, config=config)

@ -80,6 +80,23 @@ class OceanBaseVector(BaseVector):
self.delete()
vals = []
params = self._client.perform_raw_text_sql("SHOW PARAMETERS LIKE '%ob_vector_memory_limit_percentage%'")
for row in params:
val = int(row[6])
vals.append(val)
if len(vals) == 0:
raise ValueError("ob_vector_memory_limit_percentage not found in parameters.")
if any(val == 0 for val in vals):
try:
self._client.perform_raw_text_sql("ALTER SYSTEM SET ob_vector_memory_limit_percentage = 30")
except Exception as e:
raise Exception(
"Failed to set ob_vector_memory_limit_percentage. "
+ "Maybe the database user has insufficient privilege.",
e,
)
cols = [
Column("id", String(36), primary_key=True, autoincrement=False),
Column("vector", VECTOR(self._vec_dim)),
@ -110,22 +127,6 @@ class OceanBaseVector(BaseVector):
+ "to support fulltext index and vector index in the same table",
e,
)
vals = []
params = self._client.perform_raw_text_sql("SHOW PARAMETERS LIKE '%ob_vector_memory_limit_percentage%'")
for row in params:
val = int(row[6])
vals.append(val)
if len(vals) == 0:
raise ValueError("ob_vector_memory_limit_percentage not found in parameters.")
if any(val == 0 for val in vals):
try:
self._client.perform_raw_text_sql("ALTER SYSTEM SET ob_vector_memory_limit_percentage = 30")
except Exception as e:
raise Exception(
"Failed to set ob_vector_memory_limit_percentage. "
+ "Maybe the database user has insufficient privilege.",
e,
)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
def _check_hybrid_search_support(self) -> bool:

@ -184,7 +184,16 @@ class OpenSearchVector(BaseVector):
}
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
query["query"] = {"terms": {"metadata.document_id": document_ids_filter}}
query["query"] = {
"script_score": {
"query": {"bool": {"filter": [{"terms": {Field.DOCUMENT_ID.value: document_ids_filter}}]}},
"script": {
"source": "knn_score",
"lang": "knn",
"params": {"field": Field.VECTOR.value, "query_value": query_vector, "space_type": "l2"},
},
}
}
try:
response = self._client.search(index=self._collection_name.lower(), body=query)
@ -209,10 +218,10 @@ class OpenSearchVector(BaseVector):
return docs
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
full_text_query = {"query": {"match": {Field.CONTENT_KEY.value: query}}}
full_text_query = {"query": {"bool": {"must": [{"match": {Field.CONTENT_KEY.value: query}}]}}}
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
full_text_query["query"]["terms"] = {"metadata.document_id": document_ids_filter}
full_text_query["query"]["bool"]["filter"] = [{"terms": {"metadata.document_id": document_ids_filter}}]
response = self._client.search(index=self._collection_name.lower(), body=full_text_query)
@ -255,7 +264,8 @@ class OpenSearchVector(BaseVector):
Field.METADATA_KEY.value: {
"type": "object",
"properties": {
"doc_id": {"type": "keyword"} # Map doc_id to keyword type
"doc_id": {"type": "keyword"}, # Map doc_id to keyword type
"document_id": {"type": "keyword"},
},
},
}

@ -303,7 +303,6 @@ class OracleVector(BaseVector):
return docs
else:
return [Document(page_content="", metadata={})]
return []
def delete(self) -> None:
with self._get_connection() as conn:

@ -164,6 +164,10 @@ class Vector:
from core.rag.datasource.vdb.huawei.huawei_cloud_vector import HuaweiCloudVectorFactory
return HuaweiCloudVectorFactory
case VectorType.MATRIXONE:
from core.rag.datasource.vdb.matrixone.matrixone_vector import MatrixoneVectorFactory
return MatrixoneVectorFactory
case _:
raise ValueError(f"Vector store {vector_type} is not supported.")

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save