Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: apply timeout to all resumable upload requests #1070

Merged
merged 5 commits into from Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
186 changes: 111 additions & 75 deletions google/cloud/bigquery/client.py
Expand Up @@ -31,9 +31,10 @@
import typing
from typing import (
Any,
BinaryIO,
Dict,
IO,
Iterable,
Mapping,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -112,10 +113,15 @@
pyarrow = _helpers.PYARROW_VERSIONS.try_import()

TimeoutType = Union[float, None]
ResumableTimeoutType = Union[
None, float, Tuple[float, float]
] # for resumable media methods

if typing.TYPE_CHECKING: # pragma: NO COVER
# os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
import pandas # type: ignore
import requests # required by api-core

_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
Expand Down Expand Up @@ -2348,7 +2354,7 @@ def load_table_from_uri(

def load_table_from_file(
self,
file_obj: BinaryIO,
file_obj: IO[bytes],
destination: Union[Table, TableReference, TableListItem, str],
rewind: bool = False,
size: int = None,
Expand All @@ -2358,50 +2364,50 @@ def load_table_from_file(
location: str = None,
project: str = None,
job_config: LoadJobConfig = None,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of this table from a file-like object.

Similar to :meth:`load_table_from_uri`, this method creates, starts and
returns a :class:`~google.cloud.bigquery.job.LoadJob`.

Args:
file_obj (file): A file handle opened in binary mode for reading.
destination (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
google.cloud.bigquery.table.TableListItem, \
str, \
]):
file_obj:
A file handle opened in binary mode for reading.
destination:
Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a
string using
:func:`google.cloud.bigquery.table.TableReference.from_string`.

Keyword Arguments:
rewind (Optional[bool]):
rewind:
If True, seek to the beginning of the file handle before
reading the file.
size (Optional[int]):
size:
The number of bytes to read from the file handle. If size is
``None`` or large, resumable upload will be used. Otherwise,
multipart upload will be used.
num_retries (Optional[int]): Number of upload retries. Defaults to 6.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries. Defaults to 6.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated job ID.
This parameter will be ignored if a ``job_id`` is also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job.
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request
may be repeated several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, does the same apply to our other API requests? I know we use requests library for those too, but via google-cloud-core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see, it does. The timeout argument gets passed down to google.cloud._http.JSONConnection, which accepts a two-tuple.

I think we did not advertise this second option in BigQuery and just went with Optional[Union[int, float]] to avoid leaking transport implementation details, but I'm not 100%.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

google-cloud-core definitely exposes both float and tuple for timeout.

Note that its current docs link (under cloud.google.com) doesn't show those details.


Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2453,7 +2459,7 @@ def load_table_from_file(

def load_table_from_dataframe(
self,
dataframe,
dataframe: "pandas.DataFrame",
destination: Union[Table, TableReference, str],
num_retries: int = _DEFAULT_NUM_RETRIES,
job_id: str = None,
Expand All @@ -2462,7 +2468,7 @@ def load_table_from_dataframe(
project: str = None,
job_config: LoadJobConfig = None,
parquet_compression: str = "snappy",
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of a table from a pandas DataFrame.

Expand All @@ -2481,9 +2487,9 @@ def load_table_from_dataframe(
https://github.com/googleapis/python-bigquery/issues/19

Args:
dataframe (pandas.DataFrame):
dataframe:
A :class:`~pandas.DataFrame` containing the data to load.
destination (google.cloud.bigquery.table.TableReference):
destination:
The destination table to use for loading the data. If it is an
existing table, the schema of the :class:`~pandas.DataFrame`
must match the schema of the destination table. If the table
Expand All @@ -2495,19 +2501,19 @@ def load_table_from_dataframe(
:func:`google.cloud.bigquery.table.TableReference.from_string`.

Keyword Arguments:
num_retries (Optional[int]): Number of upload retries.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated
job ID. This parameter will be ignored if a ``job_id`` is
also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job.

To override the default pandas data type conversions, supply
Expand All @@ -2524,7 +2530,7 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
parquet_compression:
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.

Expand All @@ -2537,9 +2543,13 @@ def load_table_from_dataframe(
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2717,7 +2727,7 @@ def load_table_from_json(
location: str = None,
project: str = None,
job_config: LoadJobConfig = None,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of a table from a JSON string or dict.

Expand All @@ -2741,36 +2751,35 @@ def load_table_from_json(
client = bigquery.Client()
client.load_table_from_file(data_as_file, ...)

destination (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
google.cloud.bigquery.table.TableListItem, \
str, \
]):
destination:
Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a
string using
:func:`google.cloud.bigquery.table.TableReference.from_string`.

Keyword Arguments:
num_retries (Optional[int]): Number of upload retries.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated job ID.
This parameter will be ignored if a ``job_id`` is also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job. The ``source_format``
setting is always set to
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.

Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2819,60 +2828,77 @@ def load_table_from_json(
)

def _do_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
):
self,
stream: IO[bytes],
metadata: Mapping[str, str],
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
) -> "requests.Response":
"""Perform a resumable upload.

Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.

metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.

num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.

project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

project:
Project ID of the project of where to run the upload. Defaults
to the client's project.

Returns:
requests.Response:
The "200 OK" response object returned after the final chunk
is uploaded.
The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries, timeout, project=project
)

while not upload.finished:
response = upload.transmit_next_chunk(transport)
response = upload.transmit_next_chunk(transport, timeout=timeout)
plamut marked this conversation as resolved.
Show resolved Hide resolved

return response

def _initiate_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
self,
stream: IO[bytes],
metadata: Mapping[str, str],
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
):
"""Initiate a resumable upload.

Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.

metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.

num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.

project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

project:
Project ID of the project of where to run the upload. Defaults
to the client's project.

Expand Down Expand Up @@ -2921,29 +2947,39 @@ def _initiate_resumable_upload(
return upload, transport

def _do_multipart_upload(
self, stream, metadata, size, num_retries, timeout, project=None
self,
stream: IO[bytes],
metadata: Mapping[str, str],
size: int,
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
):
"""Perform a multipart upload.

Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.

metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.

size (int):
size:
The number of bytes to be uploaded (which will be read
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.

project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.

project:
Project ID of the project of where to run the upload. Defaults
to the client's project.

Expand Down