Skip to content

Commit

Permalink
refactor to make it closer to synchronous execution for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
kiraksi committed Mar 4, 2024
1 parent 4e78888 commit d1a65bf
Showing 1 changed file with 22 additions and 48 deletions.
70 changes: 22 additions & 48 deletions google/cloud/bigquery/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery import table
import asyncio
from google.api_core import gapic_v1, retry_async

class AsyncClient(Client):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -110,44 +109,27 @@ async def do_query():
request_body["requestId"] = _job_helpers.make_job_id()
span_attributes = {"path": path}

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
client._call_api,
default_retry=retry_async.AsyncRetry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable,
),
deadline=60.0,
),
default_timeout=60.0,
client_info=DEFAULT_CLIENT_INFO,
)

# For easier testing, handle the retries ourselves.
# if retry is not None:
# response = retry(client._call_api)(
# retry=None, # We're calling the retry decorator ourselves.
# span_name="BigQuery.query",
# span_attributes=span_attributes,
# method="POST",
# path=path,
# data=request_body,
# timeout=api_timeout,
# )
# else:
response = await rpc(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
if retry is not None:
response = retry(client._call_api)(
retry=None, # We're calling the retry decorator ourselves.
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)
else:
response = client._call_api(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
Expand Down Expand Up @@ -186,12 +168,11 @@ async def do_query():
project=query_results.project,
num_dml_affected_rows=query_results.num_dml_affected_rows,
)


if job_retry is not None:
return job_retry(do_query)()
else:
return do_query()
return await do_query()

async def async_wait_or_cancel(
job: job.QueryJob,
Expand All @@ -215,11 +196,4 @@ async def async_wait_or_cancel(
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise


DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
"3.17.2"
)

__all__ = ("AsyncClient",)
raise

0 comments on commit d1a65bf

Please sign in to comment.