|
|
|
|
@ -7,6 +7,7 @@ import tempfile
|
|
|
|
|
from collections.abc import Mapping, Sequence
|
|
|
|
|
from typing import Any, cast
|
|
|
|
|
|
|
|
|
|
import chardet
|
|
|
|
|
import docx
|
|
|
|
|
import pandas as pd
|
|
|
|
|
import pypandoc # type: ignore
|
|
|
|
|
@ -180,26 +181,64 @@ def _extract_text_by_file_extension(*, file_content: bytes, file_extension: str)
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_plain_text(file_content: bytes) -> str:
|
|
|
|
|
try:
|
|
|
|
|
return file_content.decode("utf-8", "ignore")
|
|
|
|
|
except UnicodeDecodeError as e:
|
|
|
|
|
raise TextExtractionError("Failed to decode plain text file") from e
|
|
|
|
|
# Detect encoding using chardet
|
|
|
|
|
result = chardet.detect(file_content)
|
|
|
|
|
encoding = result["encoding"]
|
|
|
|
|
|
|
|
|
|
# Fallback to utf-8 if detection fails
|
|
|
|
|
if not encoding:
|
|
|
|
|
encoding = "utf-8"
|
|
|
|
|
|
|
|
|
|
return file_content.decode(encoding, errors="ignore")
|
|
|
|
|
except (UnicodeDecodeError, LookupError) as e:
|
|
|
|
|
# If decoding fails, try with utf-8 as last resort
|
|
|
|
|
try:
|
|
|
|
|
return file_content.decode("utf-8", errors="ignore")
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
raise TextExtractionError(f"Failed to decode plain text file: {e}") from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_json(file_content: bytes) -> str:
|
|
|
|
|
try:
|
|
|
|
|
json_data = json.loads(file_content.decode("utf-8", "ignore"))
|
|
|
|
|
# Detect encoding using chardet
|
|
|
|
|
result = chardet.detect(file_content)
|
|
|
|
|
encoding = result["encoding"]
|
|
|
|
|
|
|
|
|
|
# Fallback to utf-8 if detection fails
|
|
|
|
|
if not encoding:
|
|
|
|
|
encoding = "utf-8"
|
|
|
|
|
|
|
|
|
|
json_data = json.loads(file_content.decode(encoding, errors="ignore"))
|
|
|
|
|
return json.dumps(json_data, indent=2, ensure_ascii=False)
|
|
|
|
|
except (UnicodeDecodeError, json.JSONDecodeError) as e:
|
|
|
|
|
raise TextExtractionError(f"Failed to decode or parse JSON file: {e}") from e
|
|
|
|
|
except (UnicodeDecodeError, LookupError, json.JSONDecodeError) as e:
|
|
|
|
|
# If decoding fails, try with utf-8 as last resort
|
|
|
|
|
try:
|
|
|
|
|
json_data = json.loads(file_content.decode("utf-8", errors="ignore"))
|
|
|
|
|
return json.dumps(json_data, indent=2, ensure_ascii=False)
|
|
|
|
|
except (UnicodeDecodeError, json.JSONDecodeError):
|
|
|
|
|
raise TextExtractionError(f"Failed to decode or parse JSON file: {e}") from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_yaml(file_content: bytes) -> str:
|
|
|
|
|
"""Extract the content from yaml file"""
|
|
|
|
|
try:
|
|
|
|
|
yaml_data = yaml.safe_load_all(file_content.decode("utf-8", "ignore"))
|
|
|
|
|
# Detect encoding using chardet
|
|
|
|
|
result = chardet.detect(file_content)
|
|
|
|
|
encoding = result["encoding"]
|
|
|
|
|
|
|
|
|
|
# Fallback to utf-8 if detection fails
|
|
|
|
|
if not encoding:
|
|
|
|
|
encoding = "utf-8"
|
|
|
|
|
|
|
|
|
|
yaml_data = yaml.safe_load_all(file_content.decode(encoding, errors="ignore"))
|
|
|
|
|
return cast(str, yaml.dump_all(yaml_data, allow_unicode=True, sort_keys=False))
|
|
|
|
|
except (UnicodeDecodeError, yaml.YAMLError) as e:
|
|
|
|
|
raise TextExtractionError(f"Failed to decode or parse YAML file: {e}") from e
|
|
|
|
|
except (UnicodeDecodeError, LookupError, yaml.YAMLError) as e:
|
|
|
|
|
# If decoding fails, try with utf-8 as last resort
|
|
|
|
|
try:
|
|
|
|
|
yaml_data = yaml.safe_load_all(file_content.decode("utf-8", errors="ignore"))
|
|
|
|
|
return cast(str, yaml.dump_all(yaml_data, allow_unicode=True, sort_keys=False))
|
|
|
|
|
except (UnicodeDecodeError, yaml.YAMLError):
|
|
|
|
|
raise TextExtractionError(f"Failed to decode or parse YAML file: {e}") from e
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_pdf(file_content: bytes) -> str:
|
|
|
|
|
@ -338,7 +377,20 @@ def _extract_text_from_file(file: File):
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_csv(file_content: bytes) -> str:
|
|
|
|
|
try:
|
|
|
|
|
csv_file = io.StringIO(file_content.decode("utf-8", "ignore"))
|
|
|
|
|
# Detect encoding using chardet
|
|
|
|
|
result = chardet.detect(file_content)
|
|
|
|
|
encoding = result["encoding"]
|
|
|
|
|
|
|
|
|
|
# Fallback to utf-8 if detection fails
|
|
|
|
|
if not encoding:
|
|
|
|
|
encoding = "utf-8"
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
csv_file = io.StringIO(file_content.decode(encoding, errors="ignore"))
|
|
|
|
|
except (UnicodeDecodeError, LookupError):
|
|
|
|
|
# If decoding fails, try with utf-8 as last resort
|
|
|
|
|
csv_file = io.StringIO(file_content.decode("utf-8", errors="ignore"))
|
|
|
|
|
|
|
|
|
|
csv_reader = csv.reader(csv_file)
|
|
|
|
|
rows = list(csv_reader)
|
|
|
|
|
|
|
|
|
|
|