diff --git a/api/core/workflow/nodes/datasource/datasource_node.py b/api/core/workflow/nodes/datasource/datasource_node.py index 1ba9cc2645..77eba1a7ce 100644 --- a/api/core/workflow/nodes/datasource/datasource_node.py +++ b/api/core/workflow/nodes/datasource/datasource_node.py @@ -9,8 +9,10 @@ from core.datasource.entities.datasource_entities import ( DatasourceParameter, DatasourceProviderType, GetOnlineDocumentPageContentRequest, + OnlineDriveDownloadFileRequest, ) from core.datasource.online_document.online_document_plugin import OnlineDocumentDatasourcePlugin +from core.datasource.online_drive.online_drive_plugin import OnlineDriveDatasourcePlugin from core.datasource.utils.message_transformer import DatasourceFileMessageTransformer from core.file import File from core.file.enums import FileTransferMethod, FileType @@ -125,7 +127,31 @@ class DatasourceNode(BaseNode[DatasourceNodeData]): parameters_for_log=parameters_for_log, datasource_info=datasource_info, ) - + case DatasourceProviderType.ONLINE_DRIVE: + datasource_runtime = cast(OnlineDriveDatasourcePlugin, datasource_runtime) + datasource_provider_service = DatasourceProviderService() + credentials = datasource_provider_service.get_real_datasource_credentials( + tenant_id=self.tenant_id, + provider=node_data.provider_name, + plugin_id=node_data.plugin_id, + ) + if credentials: + datasource_runtime.runtime.credentials = credentials[0].get("credentials") + online_drive_result: Generator[DatasourceMessage, None, None] = ( + datasource_runtime.online_drive_download_file( + user_id=self.user_id, + request=OnlineDriveDownloadFileRequest( + key=datasource_info.get("key"), + bucket=datasource_info.get("bucket"), + ), + provider_type=datasource_type, + ) + ) + yield from self._transform_message( + messages=online_drive_result, + parameters_for_log=parameters_for_log, + datasource_info=datasource_info, + ) case DatasourceProviderType.WEBSITE_CRAWL: yield RunCompletedEvent( run_result=NodeRunResult( diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index a2551f043d..9fe09744a8 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -17,9 +17,12 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.datasource.entities.datasource_entities import ( DatasourceProviderType, OnlineDocumentPagesMessage, + OnlineDriveBrowseFilesRequest, + OnlineDriveBrowseFilesResponse, WebsiteCrawlMessage, ) from core.datasource.online_document.online_document_plugin import OnlineDocumentDatasourcePlugin +from core.datasource.online_drive.online_drive_plugin import OnlineDriveDatasourcePlugin from core.datasource.website_crawl.website_crawl_plugin import WebsiteCrawlDatasourcePlugin from core.rag.entities.event import BaseDatasourceEvent, DatasourceCompletedEvent, DatasourceProcessingEvent from core.repositories.sqlalchemy_workflow_node_execution_repository import SQLAlchemyWorkflowNodeExecutionRepository @@ -554,10 +557,33 @@ class RagPipelineService: end_time = time.time() online_document_event = DatasourceCompletedEvent( data=message.result, - time_consuming=round(end_time - start_time, 2) + time_consuming=round(end_time - start_time, 2), + total=None, + completed=None, ) yield online_document_event.model_dump() - + case DatasourceProviderType.ONLINE_DRIVE: + datasource_runtime = cast(OnlineDriveDatasourcePlugin, datasource_runtime) + online_drive_result: Generator[OnlineDriveBrowseFilesResponse, None, None] = datasource_runtime.online_drive_browse_files( + user_id=account.id, + request=OnlineDriveBrowseFilesRequest( + bucket=user_inputs.get("bucket"), + prefix=user_inputs.get("prefix"), + max_keys=user_inputs.get("max_keys", 20), + start_after=user_inputs.get("start_after"), + ), + provider_type=datasource_runtime.datasource_provider_type(), + ) + start_time = time.time() + for message in online_drive_result: + end_time = time.time() + online_drive_event = DatasourceCompletedEvent( + data=message.result, + time_consuming=round(end_time - start_time, 2), + total=None, + completed=None, + ) + yield online_drive_event.model_dump() case DatasourceProviderType.WEBSITE_CRAWL: datasource_runtime = cast(WebsiteCrawlDatasourcePlugin, datasource_runtime) website_crawl_result: Generator[WebsiteCrawlMessage, None, None] = datasource_runtime.get_website_crawl(