Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Allow load chunks in VectorRetrival and Bm25Retrival #1636

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

yiyiyi0817
Copy link
Member

@yiyiyi0817 yiyiyi0817 commented Feb 21, 2025

Description

The process method has been split to handle chunks separately, allowing the previous UnstructIO as a native method, while results from other chunk tools can be directly loaded into the retriever.

The standard format for chunks from different chunk tools is:

chunks (List[Dict[str, Any]]): A list of chunks, where
    each chunk is a dict containing:
    - "text": The text(str) to be embedded.
    - "metadata": A dictionary with additional metadata for the
        chunk.

I also plan to provide example about how to load chunks from chunkr and mineru, but it seems they have some bugs now. I will do it after fix these bugs.

Checklist

Go over all the following points, and put an x in all the boxes that apply.

  • I have read the CONTRIBUTION guide (required)
  • I have linked this PR to an issue using the Development section on the right sidebar or by adding Fixes #issue-number in the PR description (required)
  • I have checked if any dependencies need to be added or updated in pyproject.toml and poetry.lock
  • I have updated the tests accordingly (required for a bug fix or a new feature)
  • I have updated the documentation if needed:
  • I have added examples if this is a new feature

If you are unsure about any of these, don't hesitate to ask. We are here to help!

@yiyiyi0817 yiyiyi0817 linked an issue Feb 21, 2025 that may be closed by this pull request
2 tasks
@yiyiyi0817 yiyiyi0817 self-assigned this Feb 21, 2025
@yiyiyi0817 yiyiyi0817 changed the title Allow load chunks for VectorRetrival and Bm25Retrival Allow load chunks in VectorRetrival and Bm25Retrival Feb 21, 2025
@yiyiyi0817 yiyiyi0817 changed the title Allow load chunks in VectorRetrival and Bm25Retrival feat: Allow load chunks in VectorRetrival and Bm25Retrival Feb 21, 2025
@yiyiyi0817 yiyiyi0817 changed the title feat: Allow load chunks in VectorRetrival and Bm25Retrival refactor: Allow load chunks in VectorRetrival and Bm25Retrival Feb 21, 2025
@Wendong-Fan Wendong-Fan requested review from Asher-hss, raywhoelse and AveryYay and removed request for Asher-hss February 22, 2025 10:36
Copy link
Member

@Wendong-Fan Wendong-Fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @yiyiyi0817 , could you help me understand how could user input chunks when they run .process? Seems you added the function to handle chunk but the flexibility was not provided to .process? Also for the original issue I think what we want to do is to allow user switch loader based on their need instead of hard coding to unstructured io like now, the current implementation seems didn't resolve this issue

@yiyiyi0817
Copy link
Member Author

I will change some code based on wendong's suggestion. When it can be reviewed, I will request other's review again.

@yiyiyi0817
Copy link
Member Author

yiyiyi0817 commented Feb 27, 2025

Hi there, this PR can be reviewed now. @AveryYay @raywhoelse

@yiyiyi0817
Copy link
Member Author

yiyiyi0817 commented Feb 27, 2025

hey @yiyiyi0817 , could you help me understand how could user input chunks when they run .process? Seems you added the function to handle chunk but the flexibility was not provided to .process? Also for the original issue I think what we want to do is to allow user switch loader based on their need instead of hard coding to unstructured io like now, the current implementation seems didn't resolve this issue

Now the user can bypass the UnstructedIO import of chunks by calling load_chunks. The reason I didn't make load_chunks a type of process is because process directly handles URLs while load_chunks deals with chunks, and their inputs would be different. I tried to provide an example, but I couldn't run mineru.

Comment on lines +33 to +49
def from_uio_chunks(cls, uio_chunks, extra_info=None):
return [
cls(
text=str(uio_chunk),
metadata={
**{
key: value
for key, value in (
uio_chunk.metadata.to_dict().items()
)
if key != "orig_elements"
},
"extra_info": extra_info or {},
},
)
for uio_chunk in uio_chunks
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def from_uio_chunks(cls, uio_chunks, extra_info=None):
    chunks = []
    for uio_chunk in uio_chunks:
        try:
            metadata_dict = uio_chunk.metadata.to_dict()
            chunk = cls(
                text=str(uio_chunk),
                metadata={
                    key: value
                    for key, value in metadata_dict.items()
                    if key != "orig_elements"
                },
                extra_info=extra_info or {},
            )
            chunks.append(chunk)
        except AttributeError as e:
            # Log or handle the exception as needed
            logger.error(f"Skipping chunk due to attribute error: {e}")
    return chunks
add try...exception to huddle exception

chunks (Optional[List[Chunk]]): A list of chunks.
Raises:
ValueError: If neither `chunks` nor `self.chunks` is available.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunks = chunks or self.chunks


tokenized_corpus = []
for chunk in chunks:
tokenized_corpus.append(chunk.text.split(" "))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

''.join(c if c.isalnum() else ' ' for c in text).split()

Remove basic punctuation and split

Comment on lines +150 to +220
elif isinstance(content, str):
content_path_info = content[:100]
# Check if the content is URL
parsed_url = urlparse(content)
is_url = all([parsed_url.scheme, parsed_url.netloc])
if is_url or os.path.exists(content):
elements = (
self.uio.parse_file_or_url(
input_path=content,
metadata_filename=metadata_filename,
**kwargs,
)
or []
)
]
else:
elements = [
self.uio.create_element_from_text(
text=content,
filename=metadata_filename,
)
]

if not elements:
warnings.warn(
f"No elements were extracted from the content: {content}"
)
else:
# Chunk the content if required
chunks = (
self.uio.chunk_elements(
chunk_type=chunk_type,
elements=elements,
max_characters=max_characters,
if not elements:
warnings.warn(
f"No elements were extracted from the content: {content}"
)
if should_chunk
else elements
)

# Process chunks in batches and store embeddings
for i in range(0, len(chunks), embed_batch):
batch_chunks = chunks[i : i + embed_batch]
batch_vectors = self.embedding_model.embed_list(
objs=[str(chunk) for chunk in batch_chunks]
else:
# Chunk the content if required
uio_chunks = (
self.uio.chunk_elements(
chunk_type=chunk_type,
elements=elements,
max_characters=max_characters,
)
if should_chunk
else elements
)

records = []
# Prepare the payload for each vector record, includes the
# content path, chunk metadata, and chunk text
for vector, chunk in zip(batch_vectors, batch_chunks):
if isinstance(content, str):
content_path_info = {"content path": content[:100]}
elif isinstance(content, IOBase):
content_path_info = {"content path": "From file bytes"}
elif isinstance(content, Element):
content_path_info = {
"content path": content.metadata.file_directory[
:100
]
if content.metadata.file_directory
else ""
}

chunk_metadata = {"metadata": chunk.metadata.to_dict()}
# Remove the 'orig_elements' key if it exists
chunk_metadata["metadata"].pop("orig_elements", "")
chunk_metadata["extra_info"] = extra_info or {}
chunk_text = {"text": str(chunk)}
combined_dict = {
**content_path_info,
**chunk_metadata,
**chunk_text,
}

records.append(
VectorRecord(vector=vector, payload=combined_dict)
)
chunks = Chunk.from_uio_chunks(uio_chunks, extra_info)
else:
raise ValueError(f"Unsupported chunk tool type: {chunk_tool_type}")

self.storage.add(records=records)
self.load_chunks(
chunks=chunks,
content_path_info=content_path_info,
embed_batch=embed_batch,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1,add try.. exception.... to huddle error;
2, It is recommended to consider splitting the content into independent methods to improve readability and maintainability.for example:

def _get_content_path_info(self, content: Union[str, Element, IO[bytes]]) -> str:
        if isinstance(content, Element):
            return content.metadata.file_directory[:100] if content.metadata.file_directory else ""
        elif isinstance(content, IOBase):
            return "From file bytes"
        elif isinstance(content, str):
            return content[:100]
        return ""
def _parse_content(self, 
                      content: Union[str, Element, IO[bytes]], 
                      metadata_filename: Optional[str],
                      **kwargs) -> list[Element]:
        uio = UnstructuredIO()
        
        if isinstance(content, Element):
            return [content]
        elif isinstance(content, IOBase):
            try:
                return uio.parse_bytes(file=content, metadata_filename=metadata_filename, **kwargs) or []
            finally:
                content.close()  
        elif isinstance(content, str):
            parsed_url = urlparse(content)
            is_url = all([parsed_url.scheme, parsed_url.netloc])
            
            if is_url or os.path.exists(content):
                return uio.parse_file_or_url(
                    input_path=content,
                    metadata_filename=metadata_filename,
                    **kwargs
                ) or []
            return [uio.create_element_from_text(text=content, filename=metadata_filename)]
        
        raise ValueError(f"Unsupported content type: {type(content)}")

else:
# Chunk the content if required
chunks = (
self.uio.chunk_elements(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that elements or uio_chunks are large which may cause high memory usage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Reviewing
Development

Successfully merging this pull request may close these issues.

[Feature Request] Support loader switch in VectorRetrival and AutoRetrival
3 participants