Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db_api): support stale reads #584

Merged
merged 50 commits into from Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4d7684e
feat(db_api): add an ability to set ReadOnly/ReadWrite connection mode
Aug 2, 2021
990fe3e
update the error message
Aug 2, 2021
2381c6d
update another error message
Aug 2, 2021
9cec921
don't check error messages with regexes
Aug 2, 2021
d6783b8
don't commit/rollback ReadOnly transactions
Aug 5, 2021
131a6fc
clear the transaction
Aug 5, 2021
e1a3db2
add read-only transactions data visibility test
Aug 5, 2021
4bd65e7
Apply suggestions from code review
Aug 5, 2021
4186609
add conditions for edge cases
Aug 6, 2021
0f3e716
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Aug 6, 2021
8332d49
don't calc checksum for read-only transactions
Aug 6, 2021
61fc8ad
Merge branch 'master' into read_only_transactions
Aug 6, 2021
0be92af
use Snapshot for reads
Aug 10, 2021
3a72694
update docstrings
Aug 10, 2021
00887ff
Merge branch 'master' into read_only_transactions
Aug 10, 2021
b8eefed
Merge branch 'master' into read_only_transactions
Aug 11, 2021
de0e47e
use multi-use snapshots in !autocommit mode
Aug 11, 2021
9f1896a
return the read_only docstring back, erase excess unit test
Aug 11, 2021
4422ad5
Merge branch 'master' into read_only_transactions
larkee Aug 16, 2021
f11ea8c
erase excess ifs
Aug 16, 2021
9bafc76
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Aug 16, 2021
783951f
Merge branch 'master' into read_only_transactions
Aug 20, 2021
d807a2d
add additional check into the snapshot_checkout() method
Aug 23, 2021
76e7caf
Merge branch 'master' into read_only_transactions
Aug 26, 2021
c61f212
add new style system test
Sep 13, 2021
9118987
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Sep 13, 2021
70fc3ac
resolve conflict
Sep 13, 2021
22e5e73
don't use error message regexes
Sep 13, 2021
1cdccbe
erase excess import
Sep 13, 2021
9bb0ebc
Merge branch 'master' into read_only_transactions
Sep 14, 2021
ac8c4b2
refactor
Sep 14, 2021
ae9cd00
Merge branch 'main' into read_only_transactions
larkee Sep 16, 2021
6973748
add unit test to check that read-only transactions are not retried
Sep 16, 2021
5dab169
Merge branch 'read_only_transactions' of https://github.com/q-logic/p…
Sep 16, 2021
0cb831f
Merge branch 'read_only_transactions' into stale_reads
Sep 20, 2021
f18e102
feat(db_api): support stale reads
Sep 21, 2021
a40b48a
Merge branch 'master' into stale_reads
Oct 7, 2021
b22c31e
Apply suggestions from code review
Oct 11, 2021
6a3fedb
move `or None` into the property
Oct 11, 2021
b896a40
Merge branch 'stale_reads' of https://github.com/q-logic/python-spann…
Oct 11, 2021
ba714f1
Merge branch 'main' into stale_reads
Oct 20, 2021
0ce4b7c
Merge branch 'main' into stale_reads
Oct 22, 2021
3c422b4
Merge branch 'main' into stale_reads
larkee Nov 1, 2021
6ad2c65
fix the check
Nov 1, 2021
45fdf8e
lint fix
Nov 1, 2021
e50875b
lint fix
Nov 10, 2021
56333b9
Merge branch 'main' into stale_reads
larkee Nov 11, 2021
7a97493
Merge branch 'main' into stale_reads
larkee Nov 11, 2021
fb7858e
Merge branch 'main' into stale_reads
Nov 12, 2021
1315ad8
Merge branch 'main' into stale_reads
larkee Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 40 additions & 1 deletion google/cloud/spanner_dbapi/connection.py
Expand Up @@ -87,6 +87,7 @@ def __init__(self, instance, database, read_only=False):
# connection close
self._own_pool = True
self._read_only = read_only
self._staleness = None

@property
def autocommit(self):
Expand Down Expand Up @@ -165,6 +166,42 @@ def read_only(self, value):
)
self._read_only = value

@property
def staleness(self):
"""Current read staleness option value of this `Connection`.

Returns:
dict: Staleness type and value.
"""
return self._staleness or {}

@staleness.setter
def staleness(self, value):
"""Read staleness option setter.

Args:
value (dict): Staleness type and value.
"""
if self.inside_transaction:
raise ValueError(
"`staleness` option can't be changed while a transaction is in progress. "
"Commit or rollback the current transaction and try again."
)

possible_opts = (
"read_timestamp",
"min_read_timestamp",
"max_staleness",
"exact_staleness",
)
if value is not None and sum([opt in value for opt in possible_opts]) != 1:
raise ValueError(
"Expected one of the following staleness options: "
"read_timestamp, min_read_timestamp, max_staleness, exact_staleness."
)

self._staleness = value

def _session_checkout(self):
"""Get a Cloud Spanner session from the pool.

Expand Down Expand Up @@ -284,7 +321,9 @@ def snapshot_checkout(self):
"""
if self.read_only and not self.autocommit:
if not self._snapshot:
self._snapshot = Snapshot(self._session_checkout(), multi_use=True)
self._snapshot = Snapshot(
self._session_checkout(), multi_use=True, **self.staleness
)
self._snapshot.begin()

return self._snapshot
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/spanner_dbapi/cursor.py
Expand Up @@ -426,7 +426,9 @@ def _handle_DQL(self, sql, params):
)
else:
# execute with single-use snapshot
with self.connection.database.snapshot() as snapshot:
with self.connection.database.snapshot(
**self.connection.staleness
) as snapshot:
self._handle_DQL_with_snapshot(snapshot, sql, params)

def __enter__(self):
Expand Down
34 changes: 33 additions & 1 deletion tests/system/test_dbapi.py
Expand Up @@ -12,13 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import hashlib
import pickle
import pkg_resources
import pytest

from google.cloud import spanner_v1
from google.cloud.spanner_dbapi.connection import connect, Connection
from google.cloud._helpers import UTC
from google.cloud.spanner_dbapi.connection import connect
from google.cloud.spanner_dbapi.connection import Connection
from google.cloud.spanner_dbapi.exceptions import ProgrammingError
from google.cloud.spanner_v1 import JsonObject
from . import _helpers
Expand Down Expand Up @@ -429,3 +432,32 @@ def test_read_only(shared_instance, dbapi_database):

cur.execute("SELECT * FROM contacts")
conn.commit()


def test_staleness(shared_instance, dbapi_database):
"""Check the DB API `staleness` option."""
conn = Connection(shared_instance, dbapi_database)
cursor = conn.cursor()

before_insert = datetime.datetime.utcnow().replace(tzinfo=UTC)

cursor.execute(
"""
INSERT INTO contacts (contact_id, first_name, last_name, email)
VALUES (1, 'first-name', 'last-name', 'test.email@example.com')
"""
)
conn.commit()

conn.read_only = True
conn.staleness = {"read_timestamp": before_insert}
cursor.execute("SELECT * FROM contacts")
conn.commit()
assert len(cursor.fetchall()) == 0

conn.staleness = None
cursor.execute("SELECT * FROM contacts")
conn.commit()
assert len(cursor.fetchall()) == 1

conn.close()
146 changes: 131 additions & 15 deletions tests/unit/spanner_dbapi/test_connection.py
Expand Up @@ -14,6 +14,7 @@

"""Cloud Spanner DB-API Connection class unit tests."""

import datetime
import mock
import unittest
import warnings
Expand Down Expand Up @@ -133,7 +134,11 @@ def test_read_only_not_retried(self):
connection.retry_transaction = mock.Mock()

cursor = connection.cursor()
cursor._itr = mock.Mock(__next__=mock.Mock(side_effect=Aborted("Aborted"),))
cursor._itr = mock.Mock(
__next__=mock.Mock(
side_effect=Aborted("Aborted"),
)
)

cursor.fetchone()
cursor.fetchall()
Expand Down Expand Up @@ -573,7 +578,10 @@ def test_retry_aborted_retry(self, mock_client):
connection.retry_transaction()

run_mock.assert_has_calls(
(mock.call(statement, retried=True), mock.call(statement, retried=True),)
(
mock.call(statement, retried=True),
mock.call(statement, retried=True),
)
)

def test_retry_transaction_raise_max_internal_retries(self):
Expand Down Expand Up @@ -630,7 +638,10 @@ def test_retry_aborted_retry_without_delay(self, mock_client):
connection.retry_transaction()

run_mock.assert_has_calls(
(mock.call(statement, retried=True), mock.call(statement, retried=True),)
(
mock.call(statement, retried=True),
mock.call(statement, retried=True),
)
)

def test_retry_transaction_w_multiple_statement(self):
Expand Down Expand Up @@ -688,9 +699,6 @@ def test_retry_transaction_w_empty_response(self):
run_mock.assert_called_with(statement, retried=True)

def test_validate_ok(self):
def exit_func(self, exc_type, exc_value, traceback):
pass

connection = self._make_connection()

# mock snapshot context manager
Expand All @@ -699,7 +707,7 @@ def exit_func(self, exc_type, exc_value, traceback):

snapshot_ctx = mock.Mock()
snapshot_ctx.__enter__ = mock.Mock(return_value=snapshot_obj)
snapshot_ctx.__exit__ = exit_func
snapshot_ctx.__exit__ = exit_ctx_func
snapshot_method = mock.Mock(return_value=snapshot_ctx)

connection.database.snapshot = snapshot_method
Expand All @@ -710,9 +718,6 @@ def exit_func(self, exc_type, exc_value, traceback):
def test_validate_fail(self):
from google.cloud.spanner_dbapi.exceptions import OperationalError

def exit_func(self, exc_type, exc_value, traceback):
pass

connection = self._make_connection()

# mock snapshot context manager
Expand All @@ -721,7 +726,7 @@ def exit_func(self, exc_type, exc_value, traceback):

snapshot_ctx = mock.Mock()
snapshot_ctx.__enter__ = mock.Mock(return_value=snapshot_obj)
snapshot_ctx.__exit__ = exit_func
snapshot_ctx.__exit__ = exit_ctx_func
snapshot_method = mock.Mock(return_value=snapshot_ctx)

connection.database.snapshot = snapshot_method
Expand All @@ -734,9 +739,6 @@ def exit_func(self, exc_type, exc_value, traceback):
def test_validate_error(self):
from google.cloud.exceptions import NotFound

def exit_func(self, exc_type, exc_value, traceback):
pass

connection = self._make_connection()

# mock snapshot context manager
Expand All @@ -745,7 +747,7 @@ def exit_func(self, exc_type, exc_value, traceback):

snapshot_ctx = mock.Mock()
snapshot_ctx.__enter__ = mock.Mock(return_value=snapshot_obj)
snapshot_ctx.__exit__ = exit_func
snapshot_ctx.__exit__ = exit_ctx_func
snapshot_method = mock.Mock(return_value=snapshot_ctx)

connection.database.snapshot = snapshot_method
Expand All @@ -763,3 +765,117 @@ def test_validate_closed(self):

with self.assertRaises(InterfaceError):
connection.validate()

def test_staleness_invalid_value(self):
"""Check that `staleness` property accepts only correct values."""
connection = self._make_connection()

# incorrect staleness type
with self.assertRaises(ValueError):
connection.staleness = {"something": 4}

# no expected staleness types
with self.assertRaises(ValueError):
connection.staleness = {}

def test_staleness_inside_transaction(self):
"""
Check that it's impossible to change the `staleness`
option if a transaction is in progress.
"""
connection = self._make_connection()
connection._transaction = mock.Mock(committed=False, rolled_back=False)

with self.assertRaises(ValueError):
connection.staleness = {"read_timestamp": datetime.datetime(2021, 9, 21)}

def test_staleness_multi_use(self):
"""
Check that `staleness` option is correctly
sent to the `Snapshot()` constructor.

READ_ONLY, NOT AUTOCOMMIT
"""
timestamp = datetime.datetime(2021, 9, 20)

connection = self._make_connection()
connection._session = "session"
connection.read_only = True
connection.staleness = {"read_timestamp": timestamp}

with mock.patch(
"google.cloud.spanner_dbapi.connection.Snapshot"
) as snapshot_mock:
connection.snapshot_checkout()

snapshot_mock.assert_called_with(
"session", multi_use=True, read_timestamp=timestamp
)

def test_staleness_single_use_autocommit(self):
"""
Check that `staleness` option is correctly
sent to the snapshot context manager.

NOT READ_ONLY, AUTOCOMMIT
"""
timestamp = datetime.datetime(2021, 9, 20)

connection = self._make_connection()
connection._session_checkout = mock.MagicMock(autospec=True)

connection.autocommit = True
connection.staleness = {"read_timestamp": timestamp}

# mock snapshot context manager
snapshot_obj = mock.Mock()
snapshot_obj.execute_sql = mock.Mock(return_value=[1])

snapshot_ctx = mock.Mock()
snapshot_ctx.__enter__ = mock.Mock(return_value=snapshot_obj)
snapshot_ctx.__exit__ = exit_ctx_func
snapshot_method = mock.Mock(return_value=snapshot_ctx)

connection.database.snapshot = snapshot_method

cursor = connection.cursor()
cursor.execute("SELECT 1")

connection.database.snapshot.assert_called_with(read_timestamp=timestamp)

def test_staleness_single_use_readonly_autocommit(self):
"""
Check that `staleness` option is correctly sent to the
snapshot context manager while in `autocommit` mode.

READ_ONLY, AUTOCOMMIT
"""
timestamp = datetime.datetime(2021, 9, 20)

connection = self._make_connection()
connection.autocommit = True
connection.read_only = True
connection._session_checkout = mock.MagicMock(autospec=True)

connection.staleness = {"read_timestamp": timestamp}

# mock snapshot context manager
snapshot_obj = mock.Mock()
snapshot_obj.execute_sql = mock.Mock(return_value=[1])

snapshot_ctx = mock.Mock()
snapshot_ctx.__enter__ = mock.Mock(return_value=snapshot_obj)
snapshot_ctx.__exit__ = exit_ctx_func
snapshot_method = mock.Mock(return_value=snapshot_ctx)

connection.database.snapshot = snapshot_method

cursor = connection.cursor()
cursor.execute("SELECT 1")

connection.database.snapshot.assert_called_with(read_timestamp=timestamp)


def exit_ctx_func(self, exc_type, exc_value, traceback):
"""Context __exit__ method mock."""
pass