feat: add switch to config celery schedule tasks

pull/19758/head
Junyan Qin 10 months ago
parent a366ce94f9
commit 717ec96d8a
No known key found for this signature in database
GPG Key ID: 22FE3AFADC710CEB

@ -471,6 +471,16 @@ APP_MAX_ACTIVE_REQUESTS=0
# Celery beat configuration # Celery beat configuration
CELERY_BEAT_SCHEDULER_TIME=1 CELERY_BEAT_SCHEDULER_TIME=1
# Celery schedule tasks configuration
ENABLE_CLEAN_EMBEDDING_CACHE_TASK=false
ENABLE_CLEAN_UNUSED_DATASETS_TASK=false
ENABLE_CREATE_TIDB_SERVERLESS_TASK=false
ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK=false
ENABLE_CLEAN_MESSAGES=false
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false
ENABLE_DATASETS_QUEUE_MONITOR=false
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true
# Position configuration # Position configuration
POSITION_TOOL_PINS= POSITION_TOOL_PINS=
POSITION_TOOL_INCLUDES= POSITION_TOOL_INCLUDES=

@ -832,6 +832,41 @@ class CeleryBeatConfig(BaseSettings):
) )
class CeleryScheduleTasksConfig(BaseSettings):
ENABLE_CLEAN_EMBEDDING_CACHE_TASK: bool = Field(
description="Enable clean embedding cache task",
default=False,
)
ENABLE_CLEAN_UNUSED_DATASETS_TASK: bool = Field(
description="Enable clean unused datasets task",
default=False,
)
ENABLE_CREATE_TIDB_SERVERLESS_TASK: bool = Field(
description="Enable create tidb service job task",
default=False,
)
ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK: bool = Field(
description="Enable update tidb service job status task",
default=False,
)
ENABLE_CLEAN_MESSAGES: bool = Field(
description="Enable clean messages task",
default=False,
)
ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field(
description="Enable mail clean document notify task",
default=False,
)
ENABLE_DATASETS_QUEUE_MONITOR: bool = Field(
description="Enable queue monitor task",
default=False,
)
ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: bool = Field(
description="Enable check upgradable plugin task",
default=True,
)
class PositionConfig(BaseSettings): class PositionConfig(BaseSettings):
POSITION_PROVIDER_PINS: str = Field( POSITION_PROVIDER_PINS: str = Field(
description="Comma-separated list of pinned model providers", description="Comma-separated list of pinned model providers",
@ -961,5 +996,6 @@ class FeatureConfig(
# hosted services config # hosted services config
HostedServiceConfig, HostedServiceConfig,
CeleryBeatConfig, CeleryBeatConfig,
CeleryScheduleTasksConfig,
): ):
pass pass

@ -64,55 +64,62 @@ def init_app(app: DifyApp) -> Celery:
celery_app.set_default() celery_app.set_default()
app.extensions["celery"] = celery_app app.extensions["celery"] = celery_app
imports = [ imports = []
"schedule.clean_embedding_cache_task",
"schedule.clean_unused_datasets_task",
"schedule.create_tidb_serverless_task",
"schedule.update_tidb_serverless_status_task",
"schedule.clean_messages",
"schedule.mail_clean_document_notify_task",
"schedule.queue_monitor_task",
"schedule.check_upgradable_plugin_task",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
"clean_embedding_cache_task": { # if you add a new task, please add the switch to CeleryScheduleTasksConfig
beat_schedule = {}
if dify_config.ENABLE_CLEAN_EMBEDDING_CACHE_TASK:
imports.append("schedule.clean_embedding_cache_task")
beat_schedule["clean_embedding_cache_task"] = {
"task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
"schedule": timedelta(days=day), "schedule": timedelta(days=day),
}, }
"clean_unused_datasets_task": { if dify_config.ENABLE_CLEAN_UNUSED_DATASETS_TASK:
imports.append("schedule.clean_unused_datasets_task")
beat_schedule["clean_unused_datasets_task"] = {
"task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
"schedule": timedelta(days=day), "schedule": timedelta(days=day),
}, }
"create_tidb_serverless_task": { if dify_config.ENABLE_CREATE_TIDB_SERVERLESS_TASK:
imports.append("schedule.create_tidb_serverless_task")
beat_schedule["create_tidb_serverless_task"] = {
"task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task", "task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task",
"schedule": crontab(minute="0", hour="*"), "schedule": crontab(minute="0", hour="*"),
}, }
"update_tidb_serverless_status_task": { if dify_config.ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK:
imports.append("schedule.update_tidb_serverless_status_task")
beat_schedule["update_tidb_serverless_status_task"] = {
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task", "task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
"schedule": timedelta(minutes=10), "schedule": timedelta(minutes=10),
}, }
"clean_messages": { if dify_config.ENABLE_CLEAN_MESSAGES:
imports.append("schedule.clean_messages")
beat_schedule["clean_messages"] = {
"task": "schedule.clean_messages.clean_messages", "task": "schedule.clean_messages.clean_messages",
"schedule": timedelta(days=day), "schedule": timedelta(days=day),
}, }
# every Monday if dify_config.ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK:
"mail_clean_document_notify_task": { imports.append("schedule.mail_clean_document_notify_task")
beat_schedule["mail_clean_document_notify_task"] = {
"task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task", "task": "schedule.mail_clean_document_notify_task.mail_clean_document_notify_task",
"schedule": crontab(minute="0", hour="10", day_of_week="1"), "schedule": crontab(minute="0", hour="10", day_of_week="1"),
}, }
"datasets-queue-monitor": { if dify_config.ENABLE_DATASETS_QUEUE_MONITOR:
imports.append("schedule.queue_monitor_task")
beat_schedule["datasets-queue-monitor"] = {
"task": "schedule.queue_monitor_task.queue_monitor_task", "task": "schedule.queue_monitor_task.queue_monitor_task",
"schedule": timedelta( "schedule": timedelta(
minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30 minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30
), ),
}, }
# every 15 minutes if dify_config.ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK:
"check_upgradable_plugin_task": { imports.append("schedule.check_upgradable_plugin_task")
beat_schedule["check_upgradable_plugin_task"] = {
"task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task",
"schedule": crontab(minute="*/15"), "schedule": crontab(minute="*/15"),
}, }
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
return celery_app return celery_app

@ -15,10 +15,9 @@ def check_upgradable_plugin_task():
click.echo(click.style("Start check upgradable plugin.", fg="green")) click.echo(click.style("Start check upgradable plugin.", fg="green"))
start_at = time.perf_counter() start_at = time.perf_counter()
now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC now_seconds_of_day = time.time() % 86400 - 30 # we assume the tz is UTC
click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green")) click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green"))
# 获取需要在下一个AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL内执行的策略
strategies = ( strategies = (
db.session.query(TenantPluginAutoUpgradeStrategy) db.session.query(TenantPluginAutoUpgradeStrategy)
.filter( .filter(

Loading…
Cancel
Save