Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublisherClient creation isn't thread safe and hangs if called by concurrent threads #1101

Open
matt-mcallister opened this issue Feb 27, 2024 · 2 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@matt-mcallister
Copy link

Environment details

  • OS type and version: Linux - docker container using python:3.10.13-bookworm as the base image
  • Python version: 3.10.13
  • pip version: 23.0.1
  • google-cloud-pubsub version: 2.19.7

Steps to reproduce

Calling PublisherClient.from_service_account_info() from multiple concurrent threads causes the program to hang.

Code example

import concurrent.futures
from google.cloud.pubsub_v1 import PublisherClient

creds = {
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
    "client_email": "",
    "client_id": "",
    "client_x509_cert_url": "",
    "private_key_id": "",
    "project_id": "",
    "token_uri": "https://oauth2.googleapis.com/token",
    "type": "service_account",
    "private_key": ""
}

def run(publisher_number):
    PublisherClient.from_service_account_info(creds)
    print(f'Created publisher {publisher_number}')

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    total_publishers = 10
    futures = []
    for i in range(0, total_publishers):
        futures.append(executor.submit(run, i))

    print(f"Submitted all {total_publishers} to thread executor")
    for f in futures:
        f.result()

Output

$ python test.py
Submitted all 10 to thread executor
^C^\Quit (core dumped)

Stack trace

The program hangs so there is no stack trace, but this is the core dump

$ pystack core core.72129
Using executable found in the core file: /home/temp/bin/python

Core file information:
state: R zombie: True niceness: 0
pid: 72129 ppid: 71086 sid: 71086
uid: 10001 gid: 30001 pgrp: 72129
executable: python arguments: python test.py

The process died due receiving signal SIGQUIT
Traceback for thread 72131 [Has the GIL] (most recent call last):
    (Python) File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 953, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 83, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/home/test.py", line 48, in run
        PublisherClient.from_service_account_info(creds)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 172, in from_service_account_info
        return cls(*args, **kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/cloud/pubsub_v1/publisher/client.py", line 139, in __init__
        super().__init__(**kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 712, in __init__
        self._transport = Transport(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 165, in __init__
        self._grpc_channel = type(self).create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 222, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(

Traceback for thread 72130 [] (most recent call last):
    (Python) File "/usr/local/lib/python3.10/threading.py", line 973, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
    (Python) File "/usr/local/lib/python3.10/threading.py", line 953, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 83, in _worker
        work_item.run()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File "/home/test.py", line 48, in run
        PublisherClient.from_service_account_info(creds)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 172, in from_service_account_info
        return cls(*args, **kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/cloud/pubsub_v1/publisher/client.py", line 139, in __init__
        super().__init__(**kwargs)
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 712, in __init__
        self._transport = Transport(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 165, in __init__
        self._grpc_channel = type(self).create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/transports/grpc.py", line 222, in create_channel
        return grpc_helpers.create_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
        return grpc.secure_channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/__init__.py", line 2119, in secure_channel
        return _channel.Channel(
    (Python) File "/home/temp/lib/python3.10/site-packages/grpc/_channel.py", line 2046, in __init__
        self._channel = cygrpc.Channel(
    (Python) File "/usr/local/lib/python3.10/pkgutil.py", line 639, in get_data
        return loader.get_data(resource_name)
    (Python) File "<frozen importlib._bootstrap_external>", line 1073, in get_data

Traceback for thread 72129 [] (most recent call last):
    (Python) File "/home/test.py", line 60, in <module>
        f.result()
    (Python) File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 453, in result
        self._condition.wait(timeout)
    (Python) File "/usr/local/lib/python3.10/threading.py", line 320, in wait
        waiter.acquire()
@pedrorjbr
Copy link

pedrorjbr commented Apr 17, 2024

I suppose that I am suffering from the same problem.
https://stackoverflow.com/questions/78337450/publishing-message-to-gcp-pubsub-from-a-fastapi-app
is there any workaround?

As I am using the PublisherClient from CloudRun, I can´t find way to reuse the same PublisherClient.
It seems a so basic use case.

@florianorpeliere
Copy link

Hello,

I have exactly the same problem. I use the PublisherClient in an Apache Beam (Dataflow) job and I noticed mutual blocking problems.

I have a fairly simple way to reproduce it:

import threading
from google.cloud import pubsub_v1

def pubsub():
    for i in [1, 2, 3, 4, 5]:
        print(i)
        pubsub_v1.PublisherClient()


t1 = threading.Thread(target=pubsub)
t2 = threading.Thread(target=pubsub)
t1.start()
t2.start()
t1.join()
t2.join()

Producing:

$ python block.py
1
1
^C^C^C^C
[1]    63732 terminated  python3 block.py

OS: macOS 14.4.1 (23E224) - M2 Pro
Python: Python 3.12.1
google-cloud-pubsub: 2.21.1

I hope this will help you.
If there is a need to open another issue, no problem.

Thank you in advance for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

No branches or pull requests

4 participants