Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Streaming Pull Usage #1174

Open
ash2703 opened this issue May 17, 2024 · 1 comment
Open

Async Streaming Pull Usage #1174

ash2703 opened this issue May 17, 2024 · 1 comment
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@ash2703
Copy link

ash2703 commented May 17, 2024

Has someone used this in peoduction scenarios? I am stuck and unable to pull messages using the async streaming client and any help would be benefecial.

Is wrapping the standard streaming pull in an asyncio executor gonna give me the same behaviour as below client?

This is my current usage:

from google import pubsub_v1
subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
    try:
        print(f"Listening for messages on {self.subscription_path}...")
        request = pubsub_v1.StreamingPullRequest(
            subscription=self.subscription_path,
        )

        # Not sure if there is any other way to do this
        async def request_generator():
            yield request
        print(f"stream: {request}"). # Code gets blocked here


        stream = await self.subscriber.streaming_pull(requests=request_generator())
        print(f"stream: {stream}")
        # Handle the response
        async for response in stream:
            for received_message in response.received_messages:
                print("Received message: ", received_message.message.data.decode('utf-8'))
    except Exception as e:
        raise e
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label May 17, 2024
@sudheer-ag
Copy link

sudheer-ag commented Jun 2, 2024

Client unable to pull messages because the underlying grpc library closing the stream immediately after opening the stream.
https://github.com/grpc/grpc/blob/e4daabc8bcc2a72652a843f1aeabd58eec9331b5/src/python/grpcio/grpc/aio/_call.py#L476
This is due to the single request we send in the request generator. We can solve the issue by sending heartbeat request to the server

from google import pubsub_v1
class AsyncStreamingRequestIterator():

    def __init__(self, initial_request):
        self.request = initial_request
 
    def __aiter__(self):
        return self
 
    async def __anext__(self):
        # Send First Request
        if self.request:
            return_value = self.request
            self.request = None
            return return_value
        await asyncio.sleep(30) # Default 30 Seconds
        return StreamingPullRequest(stream_ack_deadline_seconds=900)

subscriber = pubsub_v1.SubscriberAsyncClient() #skipping the class code but this is the Subscriber client
async def fetch_and_decode_msgs(self):
    try:
        print(f"Listening for messages on {self.subscription_path}...")
        request = pubsub_v1.StreamingPullRequest(
            subscription=self.subscription_path,
        )

        stream = await self.subscriber.streaming_pull(requests=AsyncStreamingRequestIterator(request), timeout=None)
        print(f"stream: {stream}")
        # Handle the response
        async for response in stream:
            for received_message in response.received_messages:
                print("Received message: ", received_message.message.data.decode('utf-8'))
    except Exception as e:
        raise e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

No branches or pull requests

3 participants