diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 6279b1ad36..d52d6a75a8 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -72,6 +72,7 @@ def init_app(app: DifyApp) -> Celery: "schedule.clean_messages", "schedule.mail_clean_document_notify_task", "schedule.queue_monitor_task", + "schedule.check_upgradable_plugin_task", ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME beat_schedule = { @@ -106,6 +107,11 @@ def init_app(app: DifyApp) -> Celery: minutes=dify_config.QUEUE_MONITOR_INTERVAL if dify_config.QUEUE_MONITOR_INTERVAL else 30 ), }, + # every 15 minutes + "check_upgradable_plugin_task": { + "task": "schedule.check_upgradable_plugin_task.check_upgradable_plugin_task", + "schedule": timedelta(minutes=15), + }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) diff --git a/api/schedule/check_upgradable_plugin_task.py b/api/schedule/check_upgradable_plugin_task.py new file mode 100644 index 0000000000..95e9d6d16a --- /dev/null +++ b/api/schedule/check_upgradable_plugin_task.py @@ -0,0 +1,124 @@ +import time + +import click + +import app +from core.helper import marketplace +from core.plugin.entities.plugin import PluginInstallationSource +from core.plugin.impl.plugin import PluginInstaller +from extensions.ext_database import db +from models.account import TenantPluginAutoUpgradeStrategy + +AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes + +RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3 + + +@app.celery.task(queue="dataset") +def check_upgradable_plugin_task(): + click.echo(click.style("Start check upgradable plugin.", fg="green")) + start_at = time.perf_counter() + + now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC + + # get strategies that set to be performed in the next AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL + strategies = ( + db.session.query(TenantPluginAutoUpgradeStrategy) + .filter( + TenantPluginAutoUpgradeStrategy.upgrade_time_of_day >= now_seconds_of_day, + TenantPluginAutoUpgradeStrategy.upgrade_time_of_day + < now_seconds_of_day + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL, + TenantPluginAutoUpgradeStrategy.strategy_setting + != TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED, + ) + .all() + ) + + manager = PluginInstaller() + + for strategy in strategies: + try: + tenant_id = strategy.tenant_id + strategy_setting = strategy.strategy_setting + upgrade_mode = strategy.upgrade_mode + exclude_plugins = strategy.exclude_plugins + include_plugins = strategy.include_plugins + + if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED: + continue + + # get plugins that need to be checked + plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier + + if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins: + all_plugins = manager.list_plugins(tenant_id) + + for plugin in all_plugins: + if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins: + plugin_ids.append((plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)) + + elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE: + # get all plugins and remove the exclude plugins + all_plugins = manager.list_plugins(tenant_id) + plugin_ids = [ + (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier) + for plugin in all_plugins + if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins + ] + + if not plugin_ids: + continue + + # fetch latest versions from marketplace + manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids) + + for manifest in manifests: + for plugin_id, version, original_unique_identifier in plugin_ids: + current_version = version + latest_version = manifest.latest_version + + # @yeuoly review here + def fix_only_checker(latest_version, current_version): + latest_version_tuple = tuple(int(val) for val in latest_version.split(".")) + current_version_tuple = tuple(int(val) for val in current_version.split(".")) + + if ( + latest_version_tuple[0] == current_version_tuple[0] + and latest_version_tuple[1] == current_version_tuple[1] + ): + return latest_version_tuple[2] != current_version_tuple[2] + return False + + version_checker = { + TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version, + current_version: latest_version != current_version, + TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker, + } + + if version_checker[strategy_setting](latest_version, current_version): + # execute upgrade + new_unique_identifier = manifest.latest_package_identifier + + marketplace.record_install_plugin_event(new_unique_identifier) + click.echo(click.style("Upgrade plugin: {}".format(new_unique_identifier), fg="green")) + task_start_resp = manager.upgrade_plugin( + tenant_id, + original_unique_identifier, + new_unique_identifier, + PluginInstallationSource.Marketplace, + { + "plugin_unique_identifier": new_unique_identifier, + }, + ) + + except Exception as e: + click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red")) + continue + + end_at = time.perf_counter() + click.echo( + click.style( + "Checked upgradable plugin success latency: {}".format(end_at - start_at), + fg="green", + ) + )