feat/datasource
jyong 10 months ago
parent 3d0e288e85
commit b618f3bd9e

@ -141,6 +141,7 @@ class PipelineGenerator(BaseAppGenerator):
document_id=document_id, document_id=document_id,
datasource_type=datasource_type, datasource_type=datasource_type,
datasource_info=json.dumps(datasource_info), datasource_info=json.dumps(datasource_info),
datasource_node_id=start_node_id,
input_data=inputs, input_data=inputs,
pipeline_id=pipeline.id, pipeline_id=pipeline.id,
created_by=user.id, created_by=user.id,

@ -181,7 +181,7 @@ class PluginDatasourceManager(BasePluginClient):
"provider": datasource_provider_id.provider_name, "provider": datasource_provider_id.provider_name,
"datasource": datasource_name, "datasource": datasource_name,
"credentials": credentials, "credentials": credentials,
"page": datasource_parameters, "page": datasource_parameters.model_dump(),
}, },
}, },
headers={ headers={

@ -29,6 +29,7 @@ from core.workflow.utils.variable_template_parser import VariableTemplateParser
from extensions.ext_database import db from extensions.ext_database import db
from factories import file_factory from factories import file_factory
from models.model import UploadFile from models.model import UploadFile
from services.datasource_provider_service import DatasourceProviderService
from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from ...entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey
from .entities import DatasourceNodeData from .entities import DatasourceNodeData
@ -100,13 +101,21 @@ class DatasourceNode(BaseNode[DatasourceNodeData]):
match datasource_type: match datasource_type:
case DatasourceProviderType.ONLINE_DOCUMENT: case DatasourceProviderType.ONLINE_DOCUMENT:
datasource_runtime = cast(OnlineDocumentDatasourcePlugin, datasource_runtime) datasource_runtime = cast(OnlineDocumentDatasourcePlugin, datasource_runtime)
datasource_provider_service = DatasourceProviderService()
credentials = datasource_provider_service.get_real_datasource_credentials(
tenant_id=self.tenant_id,
provider=node_data.provider_name,
plugin_id=node_data.plugin_id,
)
if credentials:
datasource_runtime.runtime.credentials = credentials[0].get("credentials")
online_document_result: Generator[DatasourceMessage, None, None] = ( online_document_result: Generator[DatasourceMessage, None, None] = (
datasource_runtime.get_online_document_page_content( datasource_runtime.get_online_document_page_content(
user_id=self.user_id, user_id=self.user_id,
datasource_parameters=GetOnlineDocumentPageContentRequest( datasource_parameters=GetOnlineDocumentPageContentRequest(
workspace_id=datasource_info.get("workspace_id"), workspace_id=datasource_info.get("workspace_id"),
page_id=datasource_info.get("page").get("page_id"), page_id=datasource_info.get("page").get("page_id"),
type=datasource_info.get("type"), type=datasource_info.get("page").get("type"),
), ),
provider_type=datasource_type, provider_type=datasource_type,
) )

@ -0,0 +1,33 @@
"""add_pipeline_info_8
Revision ID: a1025f709c06
Revises: 70a0fc0c013f
Create Date: 2025-06-19 15:25:41.263120
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a1025f709c06'
down_revision = '70a0fc0c013f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('document_pipeline_execution_logs', schema=None) as batch_op:
batch_op.add_column(sa.Column('datasource_node_id', sa.String(length=255), nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('document_pipeline_execution_logs', schema=None) as batch_op:
batch_op.drop_column('datasource_node_id')
# ### end Alembic commands ###

@ -1267,6 +1267,7 @@ class DocumentPipelineExecutionLog(Base):
document_id = db.Column(StringUUID, nullable=False) document_id = db.Column(StringUUID, nullable=False)
datasource_type = db.Column(db.String(255), nullable=False) datasource_type = db.Column(db.String(255), nullable=False)
datasource_info = db.Column(db.Text, nullable=False) datasource_info = db.Column(db.Text, nullable=False)
datasource_node_id = db.Column(db.String(255), nullable=False)
input_data = db.Column(db.JSON, nullable=False) input_data = db.Column(db.JSON, nullable=False)
created_by = db.Column(StringUUID, nullable=True) created_by = db.Column(StringUUID, nullable=True)
created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp()) created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())

Loading…
Cancel
Save