Skip to content

Commit

Permalink
Remove async retry from api call for now, remove rendudant comments, …
Browse files Browse the repository at this point in the history
…add in AsyncRetry and aiohttp
  • Loading branch information
kiraksi committed Mar 6, 2024
1 parent 9379285 commit 82db165
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 50 deletions.
70 changes: 34 additions & 36 deletions google/cloud/bigquery/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@
)
from google.api_core import retry_async as retries
import asyncio
import google.auth.transport._aiohttp_requests


class AsyncClient():
class AsyncClient:
def __init__(self, *args, **kwargs):
self._client = Client(*args, **kwargs)


async def query_and_wait(
self,
query,
Expand All @@ -29,14 +27,14 @@ async def query_and_wait(
job_retry: retries.AsyncRetry = DEFAULT_ASYNC_JOB_RETRY,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> RowIterator:

) -> RowIterator:
if project is None:
project = self._client.project

if location is None:
location = self._client.location

# for some reason these cannot find the function call
# if job_config is not None:
# self._client._verify_job_config_type(job_config, QueryJobConfig)

Expand All @@ -62,7 +60,7 @@ async def query_and_wait(
)


async def async_query_and_wait(
async def async_query_and_wait(
client: "Client",
query: str,
*,
Expand All @@ -76,23 +74,24 @@ async def async_query_and_wait(
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> table.RowIterator:

# Some API parameters aren't supported by the jobs.query API. In these
# cases, fallback to a jobs.insert call.
if not _job_helpers._supported_by_jobs_query(job_config):
return await async_wait_or_cancel(
asyncio.to_thread(_job_helpers.query_jobs_insert( # throw in a background thread
client=client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
)),
asyncio.to_thread(
_job_helpers.query_jobs_insert(
client=client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
)
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
Expand All @@ -113,21 +112,20 @@ async def async_query_and_wait(
if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"


request_body["requestId"] = _job_helpers.make_job_id()
span_attributes = {"path": path}

# For easier testing, handle the retries ourselves.
if retry is not None:
response = retry(client._call_api)( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth)
retry=None, # We're calling the retry decorator ourselves, async_retries
response = client._call_api( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry()
retry=None, # We're calling the retry decorator ourselves, async_retries, need to implement after making HTTP calls async
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

else:
response = client._call_api(
retry=None,
Expand All @@ -141,9 +139,7 @@ async def async_query_and_wait(

# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
await response
)
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(response)
page_token = query_results.page_token
more_pages = page_token is not None

Expand All @@ -161,7 +157,7 @@ async def async_query_and_wait(
max_results=max_results,
)

result = table.RowIterator( # async of RowIterator? async version without all the pandas stuff
result = table.RowIterator( # async of RowIterator? async version without all the pandas stuff
client=client,
api_request=functools.partial(client._call_api, retry, timeout=api_timeout),
path=None,
Expand All @@ -177,12 +173,12 @@ async def async_query_and_wait(
num_dml_affected_rows=query_results.num_dml_affected_rows,
)


if job_retry is not None:
return job_retry(result) # AsyncRetries, new default objects, default_job_retry_async, default_retry_async
return job_retry(result)
else:
return result


async def async_wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
Expand All @@ -192,17 +188,19 @@ async def async_wait_or_cancel(
max_results: Optional[int],
) -> table.RowIterator:
try:
return asyncio.to_thread(job.result( # run in a background thread
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
))
return asyncio.to_thread(
job.result( # run in a background thread
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
)
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
job.cancel(retry=retry, timeout=api_timeout)
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise
raise
9 changes: 6 additions & 3 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ def _job_should_retry(exc):
The default job retry object.
"""

DEFAULT_ASYNC_RETRY = retry_async.AsyncRetry(predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE) # deadline is deprecated
DEFAULT_ASYNC_RETRY = retry_async.AsyncRetry(
predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE
) # deadline is deprecated

DEFAULT_ASYNC_JOB_RETRY = retry_async.AsyncRetry(
predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE # deadline is deprecated
)
predicate=_job_should_retry,
deadline=_DEFAULT_JOB_DEADLINE, # deadline is deprecated
)
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# NOTE: Maintainers, please do not require google-cloud-core>=2.x.x
# Until this issue is closed
# https://github.com/googleapis/google-cloud-python/issues/10566
"google-auth >= 2.14.1, <3.0.0dev",
"google-cloud-core >= 1.6.0, <3.0.0dev",
"google-resumable-media >= 0.6.0, < 3.0dev",
"packaging >= 20.0.0",
Expand Down Expand Up @@ -83,9 +84,9 @@
"proto-plus >= 1.15.0, <2.0.0dev",
"protobuf>=3.19.5,<5.0.0dev,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", # For the legacy proto-based types.
],
"google-auth": [
"aiohttp",
]
"aiohttp": [
"google-auth[aiohttp]",
],
}

all_extras = []
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
db-dtypes==0.3.0
geopandas==0.9.0
google-api-core==1.31.5
google-auth==2.14.1
google-cloud-bigquery-storage==2.6.0
google-cloud-core==1.6.0
google-resumable-media==0.6.0
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
#
# NOTE: Not comprehensive yet, will eventually be maintained semi-automatically by
# the renovate bot.
aiohttp==3.6.2
grpcio==1.47.0
pyarrow>=4.0.0

0 comments on commit 82db165

Please sign in to comment.