feat(api): implement PydanticModelEncoder for JSON serialization

Use `PydanticModelEncoder` for JSON serialization while persisting
node execution data.
pull/20699/head
QuantumGhost 12 months ago
parent 77ec5f3f68
commit 03f91daa27

@ -19,6 +19,7 @@ from core.workflow.entities.node_execution_entities import (
)
from core.workflow.nodes.enums import NodeType
from core.workflow.repository.workflow_node_execution_repository import OrderConfig, WorkflowNodeExecutionRepository
from libs.jsonutil import PydanticModelEncoder
from models import (
Account,
CreatorUserRole,
@ -161,9 +162,11 @@ class SQLAlchemyWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository)
db_model.node_id = domain_model.node_id
db_model.node_type = domain_model.node_type
db_model.title = domain_model.title
db_model.inputs = json.dumps(domain_model.inputs) if domain_model.inputs else None
db_model.process_data = json.dumps(domain_model.process_data) if domain_model.process_data else None
db_model.outputs = json.dumps(domain_model.outputs) if domain_model.outputs else None
db_model.inputs = json.dumps(domain_model.inputs, cls=PydanticModelEncoder) if domain_model.inputs else None
db_model.process_data = (
json.dumps(domain_model.process_data, cls=PydanticModelEncoder) if domain_model.process_data else None
)
db_model.outputs = json.dumps(domain_model.outputs, cls=PydanticModelEncoder) if domain_model.outputs else None
db_model.status = domain_model.status
db_model.error = domain_model.error
db_model.elapsed_time = domain_model.elapsed_time

@ -0,0 +1,11 @@
import json
from pydantic import BaseModel
class PydanticModelEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, BaseModel):
return o.model_dump()
else:
super().default(o)
Loading…
Cancel
Save