Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Session leaks #957

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9d83ef2
feat: session leak changes
surbhigarg92 May 8, 2023
4d16548
Merge remote-tracking branch 'upstream/main' into session_leaks
surbhigarg92 Jun 7, 2023
920eb55
unit tests and logging
surbhigarg92 Jun 13, 2023
aff17b1
revert test changes
surbhigarg92 Jun 13, 2023
f271251
tests
surbhigarg92 Jun 14, 2023
799589e
revert noxfile chanegs
surbhigarg92 Jun 14, 2023
38f71b8
tests
surbhigarg92 Jun 15, 2023
54e1717
session traces
surbhigarg92 Jun 15, 2023
7ed07c6
tests cases
surbhigarg92 Jun 15, 2023
57309de
formatting
surbhigarg92 Jun 15, 2023
c38b15d
documentation
surbhigarg92 Jun 15, 2023
d678841
Review comments
surbhigarg92 Jun 20, 2023
286e1f4
fix system tests
surbhigarg92 Jun 20, 2023
be6eee3
review comments
surbhigarg92 Jun 21, 2023
86a2552
review comments
surbhigarg92 Jun 21, 2023
c33f930
review comments
surbhigarg92 Jun 22, 2023
cb748d8
lint
surbhigarg92 Jun 22, 2023
1b72281
unit test
surbhigarg92 Jun 22, 2023
ac9e0e4
review comments
surbhigarg92 Jul 3, 2023
ff8674d
review comments
surbhigarg92 Jul 3, 2023
5a09450
review commemts
surbhigarg92 Jul 4, 2023
a75f147
review comments
surbhigarg92 Jul 5, 2023
9540c0a
fix: long running error message
surbhigarg92 Jul 24, 2023
c419204
Merge branch 'main' into session_leaks
surbhigarg92 Jul 24, 2023
b735e5e
Merge branch 'main' into session_leaks
surbhigarg92 Jul 25, 2023
2e489c4
Merge branch 'main' into session_leaks
surbhigarg92 Jul 28, 2023
1dccb4f
Merge branch 'main' into session_leaks
asthamohta Sep 19, 2023
0e04dde
Merge branch 'main' into session_leaks
asthamohta Sep 21, 2023
ee5b3a3
race condition-session already returned to pool
surbhigarg92 Nov 6, 2023
f8c258d
merge
surbhigarg92 Nov 6, 2023
9331df2
lint
surbhigarg92 Nov 6, 2023
33988c9
Merge branch 'googleapis:main' into session_leaks
surbhigarg92 Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions google/cloud/spanner_dbapi/connection.py
Expand Up @@ -582,9 +582,18 @@ def connect(
raise ValueError("project in url does not match client object project")

instance = client.instance(instance_id)
conn = Connection(
instance, instance.database(database_id, pool=pool) if database_id else None
)

if database_id:
database = instance.database(
database_id,
pool=pool,
close_inactive_transactions=False,
)
database._pool.logging_enabled = False
else:
database = None

conn = Connection(instance, database)
if pool is not None:
conn._own_pool = False

Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/_helpers.py
Expand Up @@ -38,6 +38,12 @@
+ "numeric has a whole component with precision {}"
)

LONG_RUNNING_TRANSACTION_ERR_MSG = "Transaction has been closed as it was running for more than 60 minutes. If transaction is expected to run long, run as batch or partitioned DML."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I thought we were removing the exact 60 minute thing here?


# Constants
DELETE_LONG_RUNNING_TRANSACTION_FREQUENCY_SEC = 120
DELETE_LONG_RUNNING_TRANSACTION_THRESHOLD_SEC = 3600
Comment on lines +44 to +45
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a reason for not preferring minutes? From a readability point of view, minutes will be more consistent across languages. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python sleep library expects seconds to be passed and does not support this. Also its helpful in overriding these properties to a smaller value for tests.

Seconds looks file to me.



def _try_to_coerce_bytes(bytestring):
"""Try to coerce a byte string into the right thing based on Python
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner_v1/batch.py
Expand Up @@ -29,6 +29,7 @@
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
from google.cloud.spanner_v1._helpers import LONG_RUNNING_TRANSACTION_ERR_MSG
from google.api_core.exceptions import InternalServerError


Expand Down Expand Up @@ -144,6 +145,8 @@ def _check_state(self):
"""
if self.committed is not None:
raise ValueError("Batch already committed")
if self._session is None:
raise ValueError(LONG_RUNNING_TRANSACTION_ERR_MSG)

def commit(self, return_commit_stats=False, request_options=None):
"""Commit mutations to the database.
Expand Down
45 changes: 34 additions & 11 deletions google/cloud/spanner_v1/database.py
Expand Up @@ -113,6 +113,11 @@ class Database(object):
is `True` to log commit statistics. If not passed, a logger
will be created when needed that will log the commit statistics
to stdout.

:type close_inactive_transactions: boolean
:param close_inactive_transactions: (Optional) If set to True, the database will automatically close inactive transactions that have been running for longer than 60 minutes which may cause session leaks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here for the 60 minutes

By default, this is set to False.

:type encryption_config:
:class:`~google.cloud.spanner_admin_database_v1.types.EncryptionConfig`
or :class:`~google.cloud.spanner_admin_database_v1.types.RestoreDatabaseEncryptionConfig`
Expand Down Expand Up @@ -142,6 +147,7 @@ def __init__(
ddl_statements=(),
pool=None,
logger=None,
close_inactive_transactions=False,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
Expand All @@ -160,6 +166,7 @@ def __init__(
self._default_leader = None
self.log_commit_stats = False
self._logger = logger
self._close_inactive_transactions = close_inactive_transactions
self._encryption_config = encryption_config
self._database_dialect = database_dialect
self._database_role = database_role
Expand Down Expand Up @@ -366,7 +373,7 @@ def enable_drop_protection(self, value):
def logger(self):
"""Logger used by the database.

The default logger will log commit stats at the log level INFO using
The default logger will log at the log level INFO using
`sys.stderr`.

:rtype: :class:`logging.Logger` or `None`
Expand All @@ -381,6 +388,14 @@ def logger(self):
self._logger.addHandler(ch)
return self._logger

@property
def close_inactive_transactions(self):
"""Whether the database has has closing inactive transactions enabled. Default: False.
:rtype: bool
:returns: True if closing inactive transactions is enabled, else False.
"""
return self._close_inactive_transactions

@property
def spanner_api(self):
"""Helper for session-related API calls."""
Expand Down Expand Up @@ -647,7 +662,7 @@ def execute_partitioned_dml(
)

def execute_pdml():
with SessionCheckout(self._pool) as session:
with SessionCheckout(self._pool, isLongRunning=True) as session:
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved

txn = api.begin_transaction(
session=session.name, options=txn_options, metadata=metadata
Expand Down Expand Up @@ -1020,6 +1035,7 @@ def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
batch = self._batch = Batch(session)
self._session._transaction = batch
if self._request_options.transaction_tag:
batch.transaction_tag = self._request_options.transaction_tag
return batch
Expand All @@ -1038,7 +1054,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"CommitStats: {}".format(self._batch.commit_stats),
extra={"commit_stats": self._batch.commit_stats},
)
self._database._pool.put(self._session)
if self._batch._session is not None:
self._database._pool.put(self._session)
self._session._transaction = None
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query - I think we are doing this to flush the transaction context which we newly created. Is this the best place to flush. Are there any other place where existing properties are flushed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



class SnapshotCheckout(object):
Expand All @@ -1062,22 +1080,27 @@ class SnapshotCheckout(object):
def __init__(self, database, **kw):
self._database = database
self._session = None
self._snapshot = None
self._kw = kw

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
return Snapshot(session, **self._kw)
self._snapshot = Snapshot(session, **self._kw)
self._session._transaction = self._snapshot
return self._snapshot

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database._pool.put(self._session)
if self._snapshot._session is not None:
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(exc_val, NotFound):
# If NotFound exception occurs inside the with block
# then we validate if the session still exists.
if not self._session.exists():
self._session = self._database._pool._new_session()
self._session.create()
self._database._pool.put(self._session)
self._session._transaction = None


class BatchSnapshot(object):
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/spanner_v1/instance.py
Expand Up @@ -429,6 +429,7 @@ def database(
ddl_statements=(),
pool=None,
logger=None,
close_inactive_transactions=False,
encryption_config=None,
database_dialect=DatabaseDialect.DATABASE_DIALECT_UNSPECIFIED,
database_role=None,
Expand All @@ -453,6 +454,10 @@ def database(
will be created when needed that will log the commit statistics
to stdout.

:type close_inactive_transactions: boolean
:param close_inactive_transactions: (Optional) Represents whether the database
has close inactive transactions enabled or not. Default is False

:type encryption_config:
:class:`~google.cloud.spanner_admin_database_v1.types.EncryptionConfig`
or :class:`~google.cloud.spanner_admin_database_v1.types.RestoreDatabaseEncryptionConfig`
Expand Down Expand Up @@ -481,6 +486,7 @@ def database(
ddl_statements=ddl_statements,
pool=pool,
logger=logger,
close_inactive_transactions=close_inactive_transactions,
encryption_config=encryption_config,
database_dialect=database_dialect,
database_role=database_role,
Expand Down