|
|
|
|
@ -3,6 +3,7 @@ import os
|
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from openpyxl import load_workbook
|
|
|
|
|
|
|
|
|
|
from core.rag.extractor.extractor_base import BaseExtractor
|
|
|
|
|
from core.rag.models.document import Document
|
|
|
|
|
@ -28,26 +29,48 @@ class ExcelExtractor(BaseExtractor):
|
|
|
|
|
self._autodetect_encoding = autodetect_encoding
|
|
|
|
|
|
|
|
|
|
def extract(self) -> list[Document]:
|
|
|
|
|
""" Load from Excel file in xls or xlsx format using Pandas."""
|
|
|
|
|
""" Load from Excel file in xls or xlsx format using Pandas and openpyxl."""
|
|
|
|
|
documents = []
|
|
|
|
|
# Determine the file extension
|
|
|
|
|
file_extension = os.path.splitext(self._file_path)[-1].lower()
|
|
|
|
|
# Read each worksheet of an Excel file using Pandas
|
|
|
|
|
|
|
|
|
|
if file_extension == '.xlsx':
|
|
|
|
|
excel_file = pd.ExcelFile(self._file_path, engine='openpyxl')
|
|
|
|
|
wb = load_workbook(self._file_path, data_only=True)
|
|
|
|
|
for sheet_name in wb.sheetnames:
|
|
|
|
|
sheet = wb[sheet_name]
|
|
|
|
|
data = sheet.values
|
|
|
|
|
cols = next(data)
|
|
|
|
|
df = pd.DataFrame(data, columns=cols)
|
|
|
|
|
|
|
|
|
|
df.dropna(how='all', inplace=True)
|
|
|
|
|
|
|
|
|
|
for index, row in df.iterrows():
|
|
|
|
|
page_content = []
|
|
|
|
|
for col_index, (k, v) in enumerate(row.items()):
|
|
|
|
|
if pd.notna(v):
|
|
|
|
|
cell = sheet.cell(row=index + 2,
|
|
|
|
|
column=col_index + 1) # +2 to account for header and 1-based index
|
|
|
|
|
if cell.hyperlink:
|
|
|
|
|
value = f"[{v}]({cell.hyperlink.target})"
|
|
|
|
|
page_content.append(f'"{k}":"{value}"')
|
|
|
|
|
else:
|
|
|
|
|
page_content.append(f'"{k}":"{v}"')
|
|
|
|
|
documents.append(Document(page_content=';'.join(page_content),
|
|
|
|
|
metadata={'source': self._file_path}))
|
|
|
|
|
|
|
|
|
|
elif file_extension == '.xls':
|
|
|
|
|
excel_file = pd.ExcelFile(self._file_path, engine='xlrd')
|
|
|
|
|
for sheet_name in excel_file.sheet_names:
|
|
|
|
|
df = excel_file.parse(sheet_name=sheet_name)
|
|
|
|
|
df.dropna(how='all', inplace=True)
|
|
|
|
|
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
page_content = []
|
|
|
|
|
for k, v in row.items():
|
|
|
|
|
if pd.notna(v):
|
|
|
|
|
page_content.append(f'"{k}":"{v}"')
|
|
|
|
|
documents.append(Document(page_content=';'.join(page_content),
|
|
|
|
|
metadata={'source': self._file_path}))
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Unsupported file extension: {file_extension}")
|
|
|
|
|
for sheet_name in excel_file.sheet_names:
|
|
|
|
|
df: pd.DataFrame = excel_file.parse(sheet_name=sheet_name)
|
|
|
|
|
|
|
|
|
|
# filter out rows with all NaN values
|
|
|
|
|
df.dropna(how='all', inplace=True)
|
|
|
|
|
|
|
|
|
|
# transform each row into a Document
|
|
|
|
|
documents += [Document(page_content=';'.join(f'"{k}":"{v}"' for k, v in row.items() if pd.notna(v)),
|
|
|
|
|
metadata={'source': self._file_path},
|
|
|
|
|
) for _, row in df.iterrows()]
|
|
|
|
|
|
|
|
|
|
return documents
|
|
|
|
|
|