fix: missing first page in to_dataframe#15575
Conversation
Summary of ChangesHello @daniel-sanche, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical data loss bug in the Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
The pull request effectively addresses the data loss issue in to_dataframe() by implementing a 'peek' strategy on the first page of the stream. This prevents the self.pages iterator from being partially consumed and lost when the Arrow optimization fails. The introduction of the _empty_dataframe helper method improves maintainability and handles empty streams more robustly. I have suggested a few minor improvements to ensure consistency in schema handling and to prevent potential KeyError exceptions when custom dtypes are provided.
| try: | ||
| # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a | ||
| # pandas dataframe is about 2x faster. This is because pandas.concat is | ||
| # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is | ||
| # usually no-copy. | ||
| record_batches = [ | ||
| p.to_arrow() for p in itertools.chain([first_page], pages) | ||
| ] | ||
|
|
||
| table = pyarrow.Table.from_batches(record_batches) | ||
| df = table.to_pandas() | ||
| for column in dtypes: | ||
| df[column] = pandas.Series(df[column], dtype=dtypes[column]) | ||
| return df |
There was a problem hiding this comment.
While the current implementation correctly handles the case where the first page fails to convert to Arrow, it is technically susceptible to data loss if p.to_arrow() succeeds for the first page but fails for a subsequent one (as the pages iterator would have been partially consumed before the except block is reached). Although BigQuery Storage streams are typically homogeneous in format, it is safer to verify the first page individually before committing to the full iteration.
| try: | |
| # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a | |
| # pandas dataframe is about 2x faster. This is because pandas.concat is | |
| # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is | |
| # usually no-copy. | |
| record_batches = [ | |
| p.to_arrow() for p in itertools.chain([first_page], pages) | |
| ] | |
| table = pyarrow.Table.from_batches(record_batches) | |
| df = table.to_pandas() | |
| for column in dtypes: | |
| df[column] = pandas.Series(df[column], dtype=dtypes[column]) | |
| return df | |
| try: | |
| # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a | |
| # pandas dataframe is about 2x faster. | |
| first_batch = first_page.to_arrow() | |
| record_batches = [first_batch] + [ | |
| p.to_arrow() for p in pages | |
| ] | |
| table = pyarrow.Table.from_batches(record_batches) | |
| df = table.to_pandas() | |
| for column, dtype in dtypes.items(): | |
| if column in df.columns: | |
| df[column] = pandas.Series(df[column], dtype=dtype) | |
| else: | |
| df[column] = pandas.Series([], dtype=dtype) | |
| return df |
There was a problem hiding this comment.
Yes, this code assumes if the first page is in arrow format, the rest of the stream will be as well. @Linchin let me know if that's a wrong assumption
I think I'll change the code to raise an exception if we do get in this unexpected state though, to be explicit about what we expect
There was a problem hiding this comment.
I think it's reasonable to expect the following pages to be arrow too. Thanks for adding the exception, just in case.
packages/google-cloud-bigquery-storage/google/cloud/bigquery_storage_v1/reader.py
Show resolved
Hide resolved
…torage_v1/reader.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request effectively resolves the bug where the first page of data could be lost when converting a ReadRowsIterable to a pandas DataFrame. The implemented "peek" strategy for the first page, along with the new _empty_dataframe helper method, provides a robust solution for handling both Arrow and Avro streams, as well as empty streams. The addition of comprehensive unit tests, including scenarios for lost records, empty streams, and mid-stream format changes, significantly improves the code's reliability. The minor refactoring in _dtypes_from_avro also contributes to better maintainability. Overall, the changes are well-designed and thoroughly tested.
| df[column] = pandas.Series(df[column], dtype=dtype) | ||
| else: | ||
| df[column] = pandas.Series([], dtype=dtype) | ||
| return df |
There was a problem hiding this comment.
I asked Gemini for a code review, and it suggested to simplify this block to
if isinstance(self._stream_parser, _ArrowStreamParser):
self._stream_parser._parse_arrow_schema()
# Initialize DataFrame directly from the Arrow schema
df = self._stream_parser._schema.empty_table().to_pandas()
# Apply dtypes efficiently
for column, dtype in dtypes.items():
df[column] = pandas.Series(df.get(column, []), dtype=dtype)
return df
I ran nox -s unit-3.14 -r -- -k test_to_dataframe_empty_w_dtypes_arrow locally and it passed with this refactor
| if self._stream_parser is None: | ||
| df = pandas.DataFrame(columns=dtypes.keys()) | ||
| for col, dtype in dtypes.items(): | ||
| df[col] = pandas.Series([], dtype=dtype) | ||
| return df |
There was a problem hiding this comment.
I asked Gemini for a code review, and it suggested to simplify this block to
if self._stream_parser is None:
return pandas.DataFrame(columns=dtypes.keys()).astype(dtypes)
I ran unit tests with this suggestion and they passed
There was a bug in ReadRowsIterable.to_dataframe(), where it would attempt to use
.to_arrow()to build a dataframe, before falling back to a generic method if.to_arrowwas unsupported.This was a problem, because both methods read from a single
self.pagesiterator, so the first attempt would pull a page of data out of the feed, which would then be lost ifto_arrow()is unsupportedThis PR fixes the bug by pulling a single item off of the stream and calling
to_arrowon it individually, instead of reading off the self.pages generator directlyFixes #14900