feat(api): Add model for workflow suspension

pull/22621/head
QuantumGhost 7 months ago
parent 74981a65c6
commit 838630c39e

@ -0,0 +1,28 @@
from enum import StrEnum
from uuid import UUID
from pydantic import BaseModel, Field
from libs.uuid_utils import uuidv7
class StateVersion(StrEnum):
# `V1` is `GraphRuntimeState` serialized as JSON by dumping with Pydantic.
V1 = "v1"
class WorkflowSuspension(BaseModel):
id: UUID = Field(default_factory=uuidv7)
# Correspond to WorkflowExecution.id_
execution_id: str
workflow_id: str
continuation_node_id: str
state: str
state_version: StateVersion = StateVersion.V1
inputs: str

@ -0,0 +1,51 @@
"""Add WorkflowSuspension model, add suspension_id to WorkflowRun
Revision ID: 1091956b9ee0
Revises: 1c9ba48be8e4
Create Date: 2025-07-17 20:20:43.710683
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1091956b9ee0'
down_revision = '1c9ba48be8e4'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('workflow_suspensions',
sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('tenant_id', models.types.StringUUID(), nullable=False),
sa.Column('app_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_id', models.types.StringUUID(), nullable=False),
sa.Column('workflow_run_id', models.types.StringUUID(), nullable=False),
sa.Column('resumed_at', sa.DateTime(), nullable=True),
sa.Column('continuation_node_id', sa.String(length=255), nullable=False),
sa.Column('state_version', sa.String(length=20), nullable=False),
sa.Column('state', sa.Text(), nullable=False),
sa.Column('inputs', sa.Text(), nullable=True),
sa.Column('form_code', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('workflow_suspensions_pkey')),
sa.UniqueConstraint('form_code', name=op.f('workflow_suspensions_form_code_key'))
)
with op.batch_alter_table('workflow_runs', schema=None) as batch_op:
batch_op.add_column(sa.Column('suspension_id', models.types.StringUUID(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('workflow_runs', schema=None) as batch_op:
batch_op.drop_column('suspension_id')
op.drop_table('workflow_suspensions')
# ### end Alembic commands ###

@ -14,10 +14,12 @@ from core.file.models import File
from core.variables import utils as variable_utils
from core.variables.variables import FloatVariable, IntegerVariable, StringVariable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID
from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
from core.workflow.entities.workflow_suspension import StateVersion
from core.workflow.nodes.enums import NodeType
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from libs.datetime_utils import naive_utc_now
from libs.helper import extract_tenant_id
from libs.helper import extract_tenant_id, generate_string
from ._workflow_exc import NodeNotFoundError, WorkflowDataError
@ -508,7 +510,10 @@ class WorkflowRun(Base):
version: Mapped[str] = mapped_column(db.String(255))
graph: Mapped[Optional[str]] = mapped_column(db.Text)
inputs: Mapped[Optional[str]] = mapped_column(db.Text)
status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded
status: Mapped[str] = mapped_column(
EnumText(WorkflowExecutionStatus, length=255),
nullable=False,
) # running, succeeded, failed, stopped, partial-succeeded
outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}")
error: Mapped[Optional[str]] = mapped_column(db.Text)
elapsed_time: Mapped[float] = mapped_column(db.Float, nullable=False, server_default=sa.text("0"))
@ -520,6 +525,10 @@ class WorkflowRun(Base):
finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime)
exceptions_count: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True)
# Represents the suspension details of a suspended workflow.
# This field is non-null when `status == SUSPENDED` and null otherwise.
suspension_id: Mapped[StringUUID] = mapped_column(StringUUID, nullable=True)
@property
def created_by_account(self):
created_by_role = CreatorUserRole(self.created_by_role)
@ -907,10 +916,6 @@ class ConversationVariable(Base):
_EDITABLE_SYSTEM_VARIABLE = frozenset(["query", "files"])
def _naive_utc_datetime():
return naive_utc_now()
class WorkflowDraftVariable(Base):
"""`WorkflowDraftVariable` record variables and outputs generated during
debugging worfklow or chatflow.
@ -941,14 +946,14 @@ class WorkflowDraftVariable(Base):
created_at: Mapped[datetime] = mapped_column(
db.DateTime,
nullable=False,
default=_naive_utc_datetime,
default=naive_utc_now,
server_default=func.current_timestamp(),
)
updated_at: Mapped[datetime] = mapped_column(
db.DateTime,
nullable=False,
default=_naive_utc_datetime,
default=naive_utc_now,
server_default=func.current_timestamp(),
onupdate=func.current_timestamp(),
)
@ -1173,8 +1178,8 @@ class WorkflowDraftVariable(Base):
description: str = "",
) -> "WorkflowDraftVariable":
variable = WorkflowDraftVariable()
variable.created_at = _naive_utc_datetime()
variable.updated_at = _naive_utc_datetime()
variable.created_at = naive_utc_now()
variable.updated_at = naive_utc_now()
variable.description = description
variable.app_id = app_id
variable.node_id = node_id
@ -1254,3 +1259,120 @@ class WorkflowDraftVariable(Base):
def is_system_variable_editable(name: str) -> bool:
return name in _EDITABLE_SYSTEM_VARIABLE
_SUSPENSION_FORM_CODE_LENGTH = 22
def _generate_suspension_form_code():
return generate_string(_SUSPENSION_FORM_CODE_LENGTH)
class WorkflowSuspension(Base):
__tablename__ = "workflow_suspensions"
# id is the unique identifier of a suspension
id: Mapped[str] = mapped_column(
StringUUID,
primary_key=True,
# NOTE: The server default acts as a fallback mechanism.
# The application generates the ID for new `WorkflowSuspension` records
# to streamline the insertion process and minimize database roundtrips.
server_default=db.text("uuidv7()"),
)
created_at: Mapped[datetime] = mapped_column(
sa.DateTime,
nullable=False,
default=naive_utc_now,
server_default=func.current_timestamp(),
)
updated_at: Mapped[datetime] = mapped_column(
sa.DateTime,
nullable=False,
default=naive_utc_now,
server_default=func.current_timestamp(),
onupdate=func.current_timestamp(),
)
# `tenant_id` identifies the tenant associated with this suspension,
# corresponding to the `id` field in the `Tenant` model.
tenant_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
)
# `app_id` represents the application identifier associated with this state.
# It corresponds to the `id` field in the `App` model.
#
# While this field is technically redundant (as the corresponding app can be
# determined by querying the `Workflow`), it is retained to simplify data
# cleanup and management processes.
app_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
)
# `workflow_id` represents the unique identifier of the workflow associated with this suspension.
# It corresponds to the `id` field in the `Workflow` model.
#
# Since an application can have multiple versions of a workflow, each with its own unique ID,
# the `app_id` alone is insufficient to determine which workflow version should be loaded
# when resuming a suspended workflow.
workflow_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
)
# `workflow_run_id` represents the identifier of the execution of workflow,
# correspond to the `id` field of `WorkflowNodeExecutionModel`.
workflow_run_id: Mapped[str] = mapped_column(
StringUUID,
nullable=False,
)
# `resumed_at` records the timestamp when the suspended workflow was resumed.
# It is set to `NULL` if the workflow has not been resumed.
resumed_at: Mapped[Optional[datetime]] = mapped_column(
sa.DateTime,
nullable=True,
default=sa.null,
)
# `continuation_node_id` specifies the next node to execute when the workflow resumes.
#
# Although this information is embedded within the `state` field, it is extracted
# into a separate field to facilitate debugging and data analysis.
continuation_node_id: Mapped[str] = mapped_column(
sa.String(length=255),
nullable=False,
)
# The version of the serialized execution state data. Currently, the only supported value is `v1`.
state_version: Mapped[StateVersion] = mapped_column(
EnumText(StateVersion),
nullable=False,
)
# `state` contains the serialized runtime state of the `GraphEngine`,
# capturing the workflow's execution context at the time of suspension.
#
# The value of `state` is a JSON-formatted string representing a JSON object (e.g., `{}`).
state: Mapped[str] = mapped_column(sa.Text, nullable=False)
# The inputs provided by the user when resuming the suspended workflow.
# These inputs are serialized as a JSON-formatted string (e.g., `{}`).
#
# This field is `NULL` if no inputs were submitted by the user.
inputs: Mapped[str] = mapped_column(sa.Text, nullable=True)
form_code: Mapped[str] = mapped_column(
# A 32-character string can store a base64-encoded value with 192 bits of entropy
# or a base62-encoded value with over 180 bits of entropy, providing sufficient
# uniqueness for most use cases.
sa.String(32),
nullable=False,
unique=True,
default=_generate_suspension_form_code,
)

Loading…
Cancel
Save