feat(auto-upgrade): celery scheduled task
parent
bddb5d1493
commit
ea7acb3d2b
@ -0,0 +1,124 @@
|
||||
import time
|
||||
|
||||
import click
|
||||
|
||||
import app
|
||||
from core.helper import marketplace
|
||||
from core.plugin.entities.plugin import PluginInstallationSource
|
||||
from core.plugin.impl.plugin import PluginInstaller
|
||||
from extensions.ext_database import db
|
||||
from models.account import TenantPluginAutoUpgradeStrategy
|
||||
|
||||
AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes
|
||||
|
||||
RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3
|
||||
|
||||
|
||||
@app.celery.task(queue="dataset")
|
||||
def check_upgradable_plugin_task():
|
||||
click.echo(click.style("Start check upgradable plugin.", fg="green"))
|
||||
start_at = time.perf_counter()
|
||||
|
||||
now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC
|
||||
|
||||
# get strategies that set to be performed in the next AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL
|
||||
strategies = (
|
||||
db.session.query(TenantPluginAutoUpgradeStrategy)
|
||||
.filter(
|
||||
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day >= now_seconds_of_day,
|
||||
TenantPluginAutoUpgradeStrategy.upgrade_time_of_day
|
||||
< now_seconds_of_day + AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL,
|
||||
TenantPluginAutoUpgradeStrategy.strategy_setting
|
||||
!= TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED,
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
manager = PluginInstaller()
|
||||
|
||||
for strategy in strategies:
|
||||
try:
|
||||
tenant_id = strategy.tenant_id
|
||||
strategy_setting = strategy.strategy_setting
|
||||
upgrade_mode = strategy.upgrade_mode
|
||||
exclude_plugins = strategy.exclude_plugins
|
||||
include_plugins = strategy.include_plugins
|
||||
|
||||
if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED:
|
||||
continue
|
||||
|
||||
# get plugins that need to be checked
|
||||
plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier
|
||||
|
||||
if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins:
|
||||
all_plugins = manager.list_plugins(tenant_id)
|
||||
|
||||
for plugin in all_plugins:
|
||||
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins:
|
||||
plugin_ids.append((plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier))
|
||||
|
||||
elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE:
|
||||
# get all plugins and remove the exclude plugins
|
||||
all_plugins = manager.list_plugins(tenant_id)
|
||||
plugin_ids = [
|
||||
(plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
|
||||
for plugin in all_plugins
|
||||
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins
|
||||
]
|
||||
|
||||
if not plugin_ids:
|
||||
continue
|
||||
|
||||
# fetch latest versions from marketplace
|
||||
manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids)
|
||||
|
||||
for manifest in manifests:
|
||||
for plugin_id, version, original_unique_identifier in plugin_ids:
|
||||
current_version = version
|
||||
latest_version = manifest.latest_version
|
||||
|
||||
# @yeuoly review here
|
||||
def fix_only_checker(latest_version, current_version):
|
||||
latest_version_tuple = tuple(int(val) for val in latest_version.split("."))
|
||||
current_version_tuple = tuple(int(val) for val in current_version.split("."))
|
||||
|
||||
if (
|
||||
latest_version_tuple[0] == current_version_tuple[0]
|
||||
and latest_version_tuple[1] == current_version_tuple[1]
|
||||
):
|
||||
return latest_version_tuple[2] != current_version_tuple[2]
|
||||
return False
|
||||
|
||||
version_checker = {
|
||||
TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version,
|
||||
current_version: latest_version != current_version,
|
||||
TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker,
|
||||
}
|
||||
|
||||
if version_checker[strategy_setting](latest_version, current_version):
|
||||
# execute upgrade
|
||||
new_unique_identifier = manifest.latest_package_identifier
|
||||
|
||||
marketplace.record_install_plugin_event(new_unique_identifier)
|
||||
click.echo(click.style("Upgrade plugin: {}".format(new_unique_identifier), fg="green"))
|
||||
task_start_resp = manager.upgrade_plugin(
|
||||
tenant_id,
|
||||
original_unique_identifier,
|
||||
new_unique_identifier,
|
||||
PluginInstallationSource.Marketplace,
|
||||
{
|
||||
"plugin_unique_identifier": new_unique_identifier,
|
||||
},
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red"))
|
||||
continue
|
||||
|
||||
end_at = time.perf_counter()
|
||||
click.echo(
|
||||
click.style(
|
||||
"Checked upgradable plugin success latency: {}".format(end_at - start_at),
|
||||
fg="green",
|
||||
)
|
||||
)
|
||||
Loading…
Reference in New Issue