Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resume read stream on Unknown transport-layer exception #263

Merged
merged 1 commit into from Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion google/cloud/bigquery_storage_v1/reader.py
Expand Up @@ -39,7 +39,12 @@
pyarrow = None


_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)
_STREAM_RESUMPTION_EXCEPTIONS = (
google.api_core.exceptions.ServiceUnavailable,
# Caused by transport-level error. No status code was received.
# https://github.com/googleapis/python-bigquery-storage/issues/262
google.api_core.exceptions.Unknown,
)

# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
# Unfortunately, this condition is surfaced to the caller as an internal error
Expand Down
27 changes: 21 additions & 6 deletions tests/unit/test_reader_v1.py
Expand Up @@ -103,6 +103,12 @@ def _pages_w_unavailable(pages):
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")


def _pages_w_unknown(pages):
for page in pages:
yield page
raise google.api_core.exceptions.Unknown("No status received")


def _avro_blocks_w_deadline(avro_blocks):
for block in avro_blocks:
yield block
Expand Down Expand Up @@ -237,14 +243,19 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client):
]
avro_blocks_1 = _pages_w_unavailable(_bq_to_avro_blocks(bq_blocks_1, avro_schema))
bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
avro_blocks_2 = _pages_w_resumable_internal_error(
_bq_to_avro_blocks(bq_blocks_2, avro_schema)
)
bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema)

mock_gapic_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3)
bq_blocks_3 = [[{"int_col": -1}, {"int_col": -2}], [{"int_col": -4}]]
avro_blocks_3 = _pages_w_unknown(_bq_to_avro_blocks(bq_blocks_3, avro_schema))
bq_blocks_4 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_4 = _bq_to_avro_blocks(bq_blocks_4, avro_schema)

mock_gapic_client.read_rows.side_effect = (
avro_blocks_2,
avro_blocks_3,
avro_blocks_4,
)

reader = class_under_test(
avro_blocks_1,
Expand All @@ -260,16 +271,20 @@ def test_rows_w_reconnect(class_under_test, mock_gapic_client):
itertools.chain.from_iterable(bq_blocks_1),
itertools.chain.from_iterable(bq_blocks_2),
itertools.chain.from_iterable(bq_blocks_3),
itertools.chain.from_iterable(bq_blocks_4),
)
)

assert tuple(got) == expected
mock_gapic_client.read_rows.assert_any_call(
read_stream="teststream", offset=4, metadata={"test-key": "test-value"}
)
mock_gapic_client.read_rows.assert_called_with(
mock_gapic_client.read_rows.assert_any_call(
read_stream="teststream", offset=7, metadata={"test-key": "test-value"}
)
mock_gapic_client.read_rows.assert_called_with(
read_stream="teststream", offset=10, metadata={"test-key": "test-value"}
)


def test_rows_w_reconnect_by_page(class_under_test, mock_gapic_client):
Expand Down