Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AsyncClient for async query_and_wait for BQ jobs #1839

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
260 changes: 260 additions & 0 deletions google/cloud/bigquery/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
from google.cloud.bigquery.client import *
from google.cloud.bigquery.client import (
_add_server_timeout_header,
_extract_job_reference,
)
from google.cloud.bigquery.opentelemetry_tracing import async_create_span
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery.table import *
from google.cloud.bigquery.retry import (
DEFAULT_ASYNC_JOB_RETRY,
DEFAULT_ASYNC_RETRY,
DEFAULT_TIMEOUT,
)
from google.api_core import retry_async as retries
import asyncio
from google.auth.transport import _aiohttp_requests
from google.api_core.page_iterator_async import AsyncIterator

# This code is experimental


class AsyncClient:
def __init__(self, *args, **kwargs):
self._client = Client(*args, **kwargs)

async def query_and_wait(
self,
query,
*,
job_config: Optional[QueryJobConfig] = None,
location: Optional[str] = None,
project: Optional[str] = None,
api_timeout: TimeoutType = DEFAULT_TIMEOUT,
wait_timeout: TimeoutType = None,
retry: retries.AsyncRetry = DEFAULT_ASYNC_RETRY,
job_retry: retries.AsyncRetry = DEFAULT_ASYNC_JOB_RETRY,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> RowIterator:
if project is None:
project = self._client.project

if location is None:
location = self._client.location

# for some reason these cannot find the function call
# if job_config is not None:
# self._client._verify_job_config_type(job_config, QueryJobConfig)

# if job_config is not None:
# self._client._verify_job_config_type(job_config, QueryJobConfig)

job_config = _job_helpers.job_config_with_defaults(
job_config, self._client._default_query_job_config
)

return await async_query_and_wait(
self,
query,
job_config=job_config,
location=location,
project=project,
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
job_retry=job_retry,
page_size=page_size,
max_results=max_results,
)

async def _call_api(
self,
retry: Optional[retries.AsyncRetry] = None,
span_name: Optional[str] = None,
span_attributes: Optional[Dict] = None,
job_ref=None,
headers: Optional[Dict[str, str]] = None,
**kwargs,
):
kwargs = _add_server_timeout_header(headers, kwargs)

# Prepare the asynchronous request function
# async with _aiohttp_requests.Request(**kwargs) as response:
# response.raise_for_status()
# response = await response.json() # or response.text()

async_call = functools.partial(self._client._connection.api_request, **kwargs)

if retry:
async_call = retry(async_call)

if span_name is not None:
async with async_create_span(
name=span_name,
attributes=span_attributes,
client=self._client,
job_ref=job_ref,
):
return async_call() # Await the asynchronous call

return async_call() # Await the asynchronous call


async def async_query_and_wait(
client: "AsyncClient",
query: str,
*,
job_config: Optional[job.QueryJobConfig],
location: Optional[str],
project: str,
api_timeout: Optional[float] = None,
wait_timeout: Optional[float] = None,
retry: Optional[retries.AsyncRetry],
job_retry: Optional[retries.AsyncRetry],
page_size: Optional[int] = None,
max_results: Optional[int] = None,
) -> RowIterator:
if not _job_helpers._supported_by_jobs_query(job_config):
return await async_wait_or_cancel(
asyncio.to_thread(
_job_helpers.query_jobs_insert(
client=client._client,
query=query,
job_id=None,
job_id_prefix=None,
job_config=job_config,
location=location,
project=project,
retry=retry,
timeout=api_timeout,
job_retry=job_retry,
)
),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)

path = _job_helpers._to_query_path(project)
request_body = _job_helpers._to_query_request(
query=query, job_config=job_config, location=location, timeout=api_timeout
)

if page_size is not None and max_results is not None:
request_body["maxResults"] = min(page_size, max_results)
elif page_size is not None or max_results is not None:
request_body["maxResults"] = page_size or max_results

if os.getenv("QUERY_PREVIEW_ENABLED", "").casefold() == "true":
request_body["jobCreationMode"] = "JOB_CREATION_OPTIONAL"

request_body["requestId"] = _job_helpers.make_job_id()
span_attributes = {"path": path}

if retry is not None:
response = await client._call_api( # ASYNCHRONOUS HTTP CALLS aiohttp (optional of google-auth), add back retry()
retry=None, # We're calling the retry decorator ourselves, async_retries, need to implement after making HTTP calls async
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

else:
response = await client._call_api(
retry=None,
span_name="BigQuery.query",
span_attributes=span_attributes,
method="POST",
path=path,
data=request_body,
timeout=api_timeout,
)

# Even if we run with JOB_CREATION_OPTIONAL, if there are more pages
# to fetch, there will be a job ID for jobs.getQueryResults.
query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(response)
page_token = query_results.page_token
more_pages = page_token is not None

if more_pages or not query_results.complete:
# TODO(swast): Avoid a call to jobs.get in some cases (few
# remaining pages) by waiting for the query to finish and calling
# client._list_rows_from_query_results directly. Need to update
# RowIterator to fetch destination table via the job ID if needed.
result = await async_wait_or_cancel(
asyncio.to_thread(
_job_helpers._to_query_job(client._client, query, job_config, response),
api_timeout=api_timeout,
wait_timeout=wait_timeout,
retry=retry,
page_size=page_size,
max_results=max_results,
)
)

def api_request(*args, **kwargs):
return client._call_api(
span_name="BigQuery.query",
span_attributes=span_attributes,
*args,
timeout=api_timeout,
**kwargs,
)

result = AsyncRowIterator( # async of RowIterator? async version without all the pandas stuff
client=client._client,
api_request=api_request,
path=None,
schema=query_results.schema,
max_results=max_results,
page_size=page_size,
total_rows=query_results.total_rows,
first_page_response=response,
location=query_results.location,
job_id=query_results.job_id,
query_id=query_results.query_id,
project=query_results.project,
num_dml_affected_rows=query_results.num_dml_affected_rows,
)

if job_retry is not None:
return job_retry(result)
else:
return result


async def async_wait_or_cancel(
job: job.QueryJob,
api_timeout: Optional[float],
wait_timeout: Optional[float],
retry: Optional[retries.AsyncRetry],
page_size: Optional[int],
max_results: Optional[int],
) -> RowIterator:
try:
return asyncio.to_thread(
job.result(
page_size=page_size,
max_results=max_results,
retry=retry,
timeout=wait_timeout,
)
)
except Exception:
# Attempt to cancel the job since we can't return the results.
try:
job.cancel(retry=retry, timeout=api_timeout)
except Exception:
# Don't eat the original exception if cancel fails.
pass
raise


class AsyncRowIterator(AsyncHTTPIterator):
pass
33 changes: 32 additions & 1 deletion google/cloud/bigquery/opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import logging
from contextlib import contextmanager
from contextlib import contextmanager, asynccontextmanager
from google.api_core.exceptions import GoogleAPICallError # type: ignore

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -86,6 +86,37 @@ def create_span(name, attributes=None, client=None, job_ref=None):
raise


@asynccontextmanager
async def async_create_span(name, attributes=None, client=None, job_ref=None):
"""Asynchronous context manager for creating and exporting OpenTelemetry spans."""
global _warned_telemetry
final_attributes = _get_final_span_attributes(attributes, client, job_ref)

if not HAS_OPENTELEMETRY:
if not _warned_telemetry:
logger.debug(
"This service is instrumented using OpenTelemetry. "
"OpenTelemetry or one of its components could not be imported; "
"please add compatible versions of opentelemetry-api and "
"opentelemetry-instrumentation packages in order to get BigQuery "
"Tracing data."
)
_warned_telemetry = True
yield None
return
tracer = trace.get_tracer(__name__)

async with tracer.start_as_current_span(
name=name, attributes=final_attributes
) as span:
try:
yield span
except GoogleAPICallError as error:
if error.code is not None:
span.set_status(Status(http_status_to_status_code(error.code)))
raise


def _get_final_span_attributes(attributes=None, client=None, job_ref=None):
"""Compiles attributes from: client, job_ref, user-provided attributes.

Expand Down
11 changes: 10 additions & 1 deletion google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from google.api_core import exceptions
from google.api_core import retry
from google.api_core import retry, retry_async
from google.auth import exceptions as auth_exceptions # type: ignore
import requests.exceptions

Expand Down Expand Up @@ -90,3 +90,12 @@ def _job_should_retry(exc):
"""
The default job retry object.
"""

DEFAULT_ASYNC_RETRY = retry_async.AsyncRetry(
predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE
) # deadline is deprecated

DEFAULT_ASYNC_JOB_RETRY = retry_async.AsyncRetry(
predicate=_job_should_retry,
deadline=_DEFAULT_JOB_DEADLINE, # deadline is deprecated
)
6 changes: 6 additions & 0 deletions google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import google.api_core.exceptions
from google.api_core.page_iterator import HTTPIterator

# from google.api_core.page_iterator_async import AsyncHTTPIterator <- when supported in google api core

import google.cloud._helpers # type: ignore
from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pandas_helpers
Expand Down Expand Up @@ -2444,6 +2446,10 @@ def to_geodataframe(
)


# class AsyncRowIterator(AsyncHTTPIterator):
# pass


class _EmptyRowIterator(RowIterator):
"""An empty row iterator.

Expand Down
12 changes: 6 additions & 6 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def default(session, install_extras=True):
constraints_path,
)

if install_extras and session.python in ["3.11", "3.12"]:
install_target = ".[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
if install_extras and session.python in ["3.12"]:
install_target = ".[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]"
elif install_extras:
install_target = ".[all]"
else:
Expand Down Expand Up @@ -188,8 +188,8 @@ def system(session):
# Data Catalog needed for the column ACL test with a real Policy Tag.
session.install("google-cloud-datacatalog", "-c", constraints_path)

if session.python in ["3.11", "3.12"]:
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
if session.python in ["3.12"]:
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]" # look at geopandas to see if it supports 3.11/3.12 (up to 3.11)
else:
extras = "[all]"
session.install("-e", f".{extras}", "-c", constraints_path)
Expand Down Expand Up @@ -254,8 +254,8 @@ def snippets(session):
session.install("google-cloud-storage", "-c", constraints_path)
session.install("grpcio", "-c", constraints_path)

if session.python in ["3.11", "3.12"]:
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry]"
if session.python in ["3.12"]:
extras = "[bqstorage,ipywidgets,pandas,tqdm,opentelemetry,aiohttp]"
else:
extras = "[all]"
session.install("-e", f".{extras}", "-c", constraints_path)
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
"proto-plus >= 1.15.0, <2.0.0dev",
"protobuf>=3.19.5,<5.0.0dev,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", # For the legacy proto-based types.
],
"aiohttp": [
"google-auth[aiohttp]",
],
}

all_extras = []
Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.9.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
#
# NOTE: Not comprehensive yet, will eventually be maintained semi-automatically by
# the renovate bot.
aiohttp==3.6.2
grpcio==1.47.0
pyarrow>=4.0.0