|
|
|
|
@ -2,6 +2,7 @@
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import re
|
|
|
|
|
from typing import Any, Optional
|
|
|
|
|
|
|
|
|
|
from core.model_manager import ModelInstance
|
|
|
|
|
@ -57,6 +58,9 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter):
|
|
|
|
|
# Regex special characters for detection
|
|
|
|
|
_regex_chars = ["(", ")", "[", "]", "{", "}", "*", "+", "?", "|", "\\", ".", "^", "$"]
|
|
|
|
|
|
|
|
|
|
def __init__(self, fixed_separator: str = "\n\n", separators: Optional[list[str]] = None, **kwargs: Any):
|
|
|
|
|
"""Create a new TextSplitter."""
|
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
@ -66,7 +70,23 @@ class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter)
|
|
|
|
|
def split_text(self, text: str) -> list[str]:
|
|
|
|
|
"""Split incoming text and return chunks."""
|
|
|
|
|
if self._fixed_separator:
|
|
|
|
|
chunks = text.split(self._fixed_separator)
|
|
|
|
|
# Check if the separator contains regex special characters
|
|
|
|
|
is_regex = any(char in self._fixed_separator for char in self._regex_chars)
|
|
|
|
|
|
|
|
|
|
if is_regex:
|
|
|
|
|
# For regex separators, use finditer to find all matches and split manually
|
|
|
|
|
chunks = self._split_with_regex_manual(text, self._fixed_separator)
|
|
|
|
|
# Handle large chunks at sentence boundaries while preserving regex structure
|
|
|
|
|
final_chunks = []
|
|
|
|
|
for chunk in chunks:
|
|
|
|
|
if len(chunk) > self._chunk_size:
|
|
|
|
|
final_chunks.extend(self._split_large_regex_chunk(chunk))
|
|
|
|
|
else:
|
|
|
|
|
final_chunks.append(chunk)
|
|
|
|
|
return final_chunks
|
|
|
|
|
else:
|
|
|
|
|
# Use regular string splitting for simple separators
|
|
|
|
|
chunks = text.split(self._fixed_separator)
|
|
|
|
|
else:
|
|
|
|
|
chunks = [text]
|
|
|
|
|
|
|
|
|
|
@ -124,8 +144,14 @@ class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter)
|
|
|
|
|
if not new_separators:
|
|
|
|
|
final_chunks.append(s)
|
|
|
|
|
else:
|
|
|
|
|
other_info = self._split_text(s, new_separators)
|
|
|
|
|
final_chunks.extend(other_info)
|
|
|
|
|
# For regex separators, use custom splitting to preserve structure
|
|
|
|
|
is_regex = any(char in self._fixed_separator for char in self._regex_chars)
|
|
|
|
|
if is_regex:
|
|
|
|
|
other_info = self._split_large_regex_chunk(s)
|
|
|
|
|
final_chunks.extend(other_info)
|
|
|
|
|
else:
|
|
|
|
|
other_info = self._split_text(s, new_separators)
|
|
|
|
|
final_chunks.extend(other_info)
|
|
|
|
|
|
|
|
|
|
if _good_splits:
|
|
|
|
|
merged_text = self._merge_splits(_good_splits, _separator, _good_splits_lengths)
|
|
|
|
|
@ -154,3 +180,75 @@ class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter)
|
|
|
|
|
final_chunks.append(current_part)
|
|
|
|
|
|
|
|
|
|
return final_chunks
|
|
|
|
|
|
|
|
|
|
def _split_with_regex_manual(self, text: str, pattern: str) -> list[str]:
|
|
|
|
|
"""Manually split text using regex pattern by finding all matches."""
|
|
|
|
|
# Find all matches
|
|
|
|
|
matches = list(re.finditer(pattern, text))
|
|
|
|
|
|
|
|
|
|
if not matches:
|
|
|
|
|
return [text]
|
|
|
|
|
|
|
|
|
|
chunks = []
|
|
|
|
|
last_end = 0
|
|
|
|
|
|
|
|
|
|
for i, match in enumerate(matches):
|
|
|
|
|
# Get the matched separator (e.g., "一、", "二、")
|
|
|
|
|
separator = match.group(0)
|
|
|
|
|
|
|
|
|
|
# Find the end of this section (next match or end of text)
|
|
|
|
|
next_start = len(text)
|
|
|
|
|
if i + 1 < len(matches):
|
|
|
|
|
next_start = matches[i + 1].start()
|
|
|
|
|
|
|
|
|
|
# Create a chunk that includes the separator and all content up to next separator
|
|
|
|
|
chunk_content = text[match.start() : next_start].strip()
|
|
|
|
|
if chunk_content:
|
|
|
|
|
chunks.append(chunk_content)
|
|
|
|
|
|
|
|
|
|
last_end = next_start
|
|
|
|
|
|
|
|
|
|
# Add any remaining text after the last match
|
|
|
|
|
if last_end < len(text):
|
|
|
|
|
remaining = text[last_end:].strip()
|
|
|
|
|
if remaining:
|
|
|
|
|
chunks.append(remaining)
|
|
|
|
|
|
|
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
|
def _split_large_regex_chunk(self, chunk: str) -> list[str]:
|
|
|
|
|
"""Split large regex chunks at sentence boundaries while preserving structure."""
|
|
|
|
|
# Split at sentence boundaries (。!?.!?)
|
|
|
|
|
sentence_pattern = r"([。!?.!?])"
|
|
|
|
|
sentences = re.split(sentence_pattern, chunk)
|
|
|
|
|
|
|
|
|
|
# Rejoin sentences with their punctuation
|
|
|
|
|
sentences = ["".join(sentences[i : i + 2]) for i in range(0, len(sentences) - 1, 2)]
|
|
|
|
|
if len(sentences) % 2 == 1:
|
|
|
|
|
sentences.append(sentences[-1])
|
|
|
|
|
|
|
|
|
|
# Filter out empty sentences
|
|
|
|
|
sentences = [s.strip() for s in sentences if s.strip()]
|
|
|
|
|
|
|
|
|
|
# Group sentences into chunks that fit within chunk_size
|
|
|
|
|
chunks = []
|
|
|
|
|
current_chunk = ""
|
|
|
|
|
current_length = 0
|
|
|
|
|
|
|
|
|
|
for sentence in sentences:
|
|
|
|
|
sentence_length = len(sentence)
|
|
|
|
|
|
|
|
|
|
# If adding this sentence would exceed chunk_size, start a new chunk
|
|
|
|
|
if current_length + sentence_length > self._chunk_size and current_chunk:
|
|
|
|
|
chunks.append(current_chunk.strip())
|
|
|
|
|
current_chunk = sentence
|
|
|
|
|
current_length = sentence_length
|
|
|
|
|
else:
|
|
|
|
|
current_chunk += sentence
|
|
|
|
|
current_length += sentence_length
|
|
|
|
|
|
|
|
|
|
# Add the last chunk if it exists
|
|
|
|
|
if current_chunk:
|
|
|
|
|
chunks.append(current_chunk.strip())
|
|
|
|
|
|
|
|
|
|
return chunks
|
|
|
|
|
|