|
|
|
|
@ -1,7 +1,7 @@
|
|
|
|
|
"""Abstract interface for document loader implementations."""
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
from openpyxl.reader.excel import load_workbook
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
|
from core.rag.extractor.extractor_base import BaseExtractor
|
|
|
|
|
from core.rag.models.document import Document
|
|
|
|
|
@ -27,24 +27,21 @@ class ExcelExtractor(BaseExtractor):
|
|
|
|
|
self._autodetect_encoding = autodetect_encoding
|
|
|
|
|
|
|
|
|
|
def extract(self) -> list[Document]:
|
|
|
|
|
"""Load from file path."""
|
|
|
|
|
"""Load from file path using Pandas."""
|
|
|
|
|
data = []
|
|
|
|
|
wb = load_workbook(filename=self._file_path, read_only=True)
|
|
|
|
|
# loop over all sheets
|
|
|
|
|
for sheet in wb:
|
|
|
|
|
keys = []
|
|
|
|
|
if 'A1:A1' == sheet.calculate_dimension():
|
|
|
|
|
sheet.reset_dimensions()
|
|
|
|
|
for row in sheet.iter_rows(values_only=True):
|
|
|
|
|
if all(v is None for v in row):
|
|
|
|
|
continue
|
|
|
|
|
if keys == []:
|
|
|
|
|
keys = list(map(str, row))
|
|
|
|
|
else:
|
|
|
|
|
row_dict = dict(zip(keys, list(map(str, row))))
|
|
|
|
|
row_dict = {k: v for k, v in row_dict.items() if v}
|
|
|
|
|
item = ''.join(f'{k}:{v};' for k, v in row_dict.items())
|
|
|
|
|
document = Document(page_content=item, metadata={'source': self._file_path})
|
|
|
|
|
data.append(document)
|
|
|
|
|
|
|
|
|
|
# 使用 Pandas 读取 Excel 文件的每个工作表
|
|
|
|
|
xls = pd.ExcelFile(self._file_path)
|
|
|
|
|
for sheet_name in xls.sheet_names:
|
|
|
|
|
df = pd.read_excel(xls, sheet_name=sheet_name)
|
|
|
|
|
|
|
|
|
|
# filter out rows with all NaN values
|
|
|
|
|
df.dropna(how='all', inplace=True)
|
|
|
|
|
|
|
|
|
|
# transform each row into a Document
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
item = ';'.join(f'{k}:{v}' for k, v in row.items() if pd.notna(v))
|
|
|
|
|
document = Document(page_content=item, metadata={'source': self._file_path})
|
|
|
|
|
data.append(document)
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|