diff --git a/api/core/workflow/entities/workflow_suspension.py b/api/core/workflow/entities/workflow_suspension.py new file mode 100644 index 0000000000..9304e6cadd --- /dev/null +++ b/api/core/workflow/entities/workflow_suspension.py @@ -0,0 +1,28 @@ +from enum import StrEnum +from uuid import UUID + +from pydantic import BaseModel, Field + +from libs.uuid_utils import uuidv7 + + +class StateVersion(StrEnum): + # `V1` is `GraphRuntimeState` serialized as JSON by dumping with Pydantic. + V1 = "v1" + + +class WorkflowSuspension(BaseModel): + id: UUID = Field(default_factory=uuidv7) + + # Correspond to WorkflowExecution.id_ + execution_id: str + + workflow_id: str + + continuation_node_id: str + + state: str + + state_version: StateVersion = StateVersion.V1 + + inputs: str diff --git a/api/migrations/versions/2025_07_17_2020-1091956b9ee0_add_workflowsuspension_model_add_.py b/api/migrations/versions/2025_07_17_2020-1091956b9ee0_add_workflowsuspension_model_add_.py new file mode 100644 index 0000000000..49182e6474 --- /dev/null +++ b/api/migrations/versions/2025_07_17_2020-1091956b9ee0_add_workflowsuspension_model_add_.py @@ -0,0 +1,51 @@ +"""Add WorkflowSuspension model, add suspension_id to WorkflowRun + +Revision ID: 1091956b9ee0 +Revises: 1c9ba48be8e4 +Create Date: 2025-07-17 20:20:43.710683 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1091956b9ee0' +down_revision = '1c9ba48be8e4' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('workflow_suspensions', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('workflow_id', models.types.StringUUID(), nullable=False), + sa.Column('workflow_run_id', models.types.StringUUID(), nullable=False), + sa.Column('resumed_at', sa.DateTime(), nullable=True), + sa.Column('continuation_node_id', sa.String(length=255), nullable=False), + sa.Column('state_version', sa.String(length=20), nullable=False), + sa.Column('state', sa.Text(), nullable=False), + sa.Column('inputs', sa.Text(), nullable=True), + sa.Column('form_code', sa.String(length=32), nullable=False), + sa.PrimaryKeyConstraint('id', name=op.f('workflow_suspensions_pkey')), + sa.UniqueConstraint('form_code', name=op.f('workflow_suspensions_form_code_key')) + ) + with op.batch_alter_table('workflow_runs', schema=None) as batch_op: + batch_op.add_column(sa.Column('suspension_id', models.types.StringUUID(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_runs', schema=None) as batch_op: + batch_op.drop_column('suspension_id') + + op.drop_table('workflow_suspensions') + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index 124fb3bb4c..8c153af741 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -14,10 +14,12 @@ from core.file.models import File from core.variables import utils as variable_utils from core.variables.variables import FloatVariable, IntegerVariable, StringVariable from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID, SYSTEM_VARIABLE_NODE_ID +from core.workflow.entities.workflow_execution import WorkflowExecutionStatus +from core.workflow.entities.workflow_suspension import StateVersion from core.workflow.nodes.enums import NodeType from factories.variable_factory import TypeMismatchError, build_segment_with_type from libs.datetime_utils import naive_utc_now -from libs.helper import extract_tenant_id +from libs.helper import extract_tenant_id, generate_string from ._workflow_exc import NodeNotFoundError, WorkflowDataError @@ -508,7 +510,10 @@ class WorkflowRun(Base): version: Mapped[str] = mapped_column(db.String(255)) graph: Mapped[Optional[str]] = mapped_column(db.Text) inputs: Mapped[Optional[str]] = mapped_column(db.Text) - status: Mapped[str] = mapped_column(db.String(255)) # running, succeeded, failed, stopped, partial-succeeded + status: Mapped[str] = mapped_column( + EnumText(WorkflowExecutionStatus, length=255), + nullable=False, + ) # running, succeeded, failed, stopped, partial-succeeded outputs: Mapped[Optional[str]] = mapped_column(sa.Text, default="{}") error: Mapped[Optional[str]] = mapped_column(db.Text) elapsed_time: Mapped[float] = mapped_column(db.Float, nullable=False, server_default=sa.text("0")) @@ -520,6 +525,10 @@ class WorkflowRun(Base): finished_at: Mapped[Optional[datetime]] = mapped_column(db.DateTime) exceptions_count: Mapped[int] = mapped_column(db.Integer, server_default=db.text("0"), nullable=True) + # Represents the suspension details of a suspended workflow. + # This field is non-null when `status == SUSPENDED` and null otherwise. + suspension_id: Mapped[StringUUID] = mapped_column(StringUUID, nullable=True) + @property def created_by_account(self): created_by_role = CreatorUserRole(self.created_by_role) @@ -907,10 +916,6 @@ class ConversationVariable(Base): _EDITABLE_SYSTEM_VARIABLE = frozenset(["query", "files"]) -def _naive_utc_datetime(): - return naive_utc_now() - - class WorkflowDraftVariable(Base): """`WorkflowDraftVariable` record variables and outputs generated during debugging worfklow or chatflow. @@ -941,14 +946,14 @@ class WorkflowDraftVariable(Base): created_at: Mapped[datetime] = mapped_column( db.DateTime, nullable=False, - default=_naive_utc_datetime, + default=naive_utc_now, server_default=func.current_timestamp(), ) updated_at: Mapped[datetime] = mapped_column( db.DateTime, nullable=False, - default=_naive_utc_datetime, + default=naive_utc_now, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), ) @@ -1173,8 +1178,8 @@ class WorkflowDraftVariable(Base): description: str = "", ) -> "WorkflowDraftVariable": variable = WorkflowDraftVariable() - variable.created_at = _naive_utc_datetime() - variable.updated_at = _naive_utc_datetime() + variable.created_at = naive_utc_now() + variable.updated_at = naive_utc_now() variable.description = description variable.app_id = app_id variable.node_id = node_id @@ -1254,3 +1259,120 @@ class WorkflowDraftVariable(Base): def is_system_variable_editable(name: str) -> bool: return name in _EDITABLE_SYSTEM_VARIABLE + + +_SUSPENSION_FORM_CODE_LENGTH = 22 + + +def _generate_suspension_form_code(): + return generate_string(_SUSPENSION_FORM_CODE_LENGTH) + + +class WorkflowSuspension(Base): + __tablename__ = "workflow_suspensions" + + # id is the unique identifier of a suspension + id: Mapped[str] = mapped_column( + StringUUID, + primary_key=True, + # NOTE: The server default acts as a fallback mechanism. + # The application generates the ID for new `WorkflowSuspension` records + # to streamline the insertion process and minimize database roundtrips. + server_default=db.text("uuidv7()"), + ) + + created_at: Mapped[datetime] = mapped_column( + sa.DateTime, + nullable=False, + default=naive_utc_now, + server_default=func.current_timestamp(), + ) + + updated_at: Mapped[datetime] = mapped_column( + sa.DateTime, + nullable=False, + default=naive_utc_now, + server_default=func.current_timestamp(), + onupdate=func.current_timestamp(), + ) + + # `tenant_id` identifies the tenant associated with this suspension, + # corresponding to the `id` field in the `Tenant` model. + tenant_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + ) + + # `app_id` represents the application identifier associated with this state. + # It corresponds to the `id` field in the `App` model. + # + # While this field is technically redundant (as the corresponding app can be + # determined by querying the `Workflow`), it is retained to simplify data + # cleanup and management processes. + app_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + ) + + # `workflow_id` represents the unique identifier of the workflow associated with this suspension. + # It corresponds to the `id` field in the `Workflow` model. + # + # Since an application can have multiple versions of a workflow, each with its own unique ID, + # the `app_id` alone is insufficient to determine which workflow version should be loaded + # when resuming a suspended workflow. + workflow_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + ) + + # `workflow_run_id` represents the identifier of the execution of workflow, + # correspond to the `id` field of `WorkflowNodeExecutionModel`. + workflow_run_id: Mapped[str] = mapped_column( + StringUUID, + nullable=False, + ) + + # `resumed_at` records the timestamp when the suspended workflow was resumed. + # It is set to `NULL` if the workflow has not been resumed. + resumed_at: Mapped[Optional[datetime]] = mapped_column( + sa.DateTime, + nullable=True, + default=sa.null, + ) + + # `continuation_node_id` specifies the next node to execute when the workflow resumes. + # + # Although this information is embedded within the `state` field, it is extracted + # into a separate field to facilitate debugging and data analysis. + continuation_node_id: Mapped[str] = mapped_column( + sa.String(length=255), + nullable=False, + ) + + # The version of the serialized execution state data. Currently, the only supported value is `v1`. + state_version: Mapped[StateVersion] = mapped_column( + EnumText(StateVersion), + nullable=False, + ) + + # `state` contains the serialized runtime state of the `GraphEngine`, + # capturing the workflow's execution context at the time of suspension. + # + # The value of `state` is a JSON-formatted string representing a JSON object (e.g., `{}`). + state: Mapped[str] = mapped_column(sa.Text, nullable=False) + + # The inputs provided by the user when resuming the suspended workflow. + # These inputs are serialized as a JSON-formatted string (e.g., `{}`). + # + # This field is `NULL` if no inputs were submitted by the user. + inputs: Mapped[str] = mapped_column(sa.Text, nullable=True) + + form_code: Mapped[str] = mapped_column( + # A 32-character string can store a base64-encoded value with 192 bits of entropy + # or a base62-encoded value with over 180 bits of entropy, providing sufficient + # uniqueness for most use cases. + sa.String(32), + nullable=False, + unique=True, + default=_generate_suspension_form_code, + )