From 6308a0e24f280a4e1f935700513d517f7a7871bc Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Fri, 30 Oct 2020 17:52:33 -0400 Subject: [PATCH 1/5] Enable server side flow control by default with the option to turn it off This change enables sending flow control settings automatically to the server. If FlowControlSettings.maxOutstandingElementCount > 0 or FlowControlSettings.maxOutstandingRequestBytes > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and Subscriber.Builder.setUseLegacyFlowControl() method is provided for users who would like to opt-out of this feature in case they encouter issues with server side flow control. --- .../pubsub/v1/StreamingSubscriberConnection.java | 7 +++++-- .../java/com/google/cloud/pubsub/v1/Subscriber.java | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f4e330ef1..cfa292095 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final MessageDispatcher messageDispatcher; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); @@ -98,6 +99,7 @@ public StreamingSubscriberConnection( SubscriberStub stub, int channelAffinity, FlowControlSettings flowControlSettings, + boolean useLegacyFlowControl, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -119,6 +121,7 @@ public StreamingSubscriberConnection( systemExecutor, clock); this.flowControlSettings = flowControlSettings; + this.useLegacyFlowControl = useLegacyFlowControl; } @Override @@ -217,9 +220,9 @@ private void initialize() { .setStreamAckDeadlineSeconds(60) .setClientId(clientId) .setMaxOutstandingMessages( - valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) .setMaxOutstandingBytes( - valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) + this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 35c50fdb6..217136b00 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final Duration maxAckExtensionPeriod; private final Duration maxDurationPerAckExtension; // The ExecutorProvider used to generate executors for processing messages. @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; + useLegacyFlowControl = builder.useLegacyFlowControl; subscriptionName = builder.subscriptionName; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; @@ -336,6 +338,7 @@ private void startStreamingConnections() { subStub, i, flowControlSettings, + useLegacyFlowControl, flowController, executor, alarmsExecutor, @@ -420,6 +423,7 @@ public static final class Builder { private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; private Duration maxDurationPerAckExtension = Duration.ofMillis(0); + private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1000L) @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } + /** + * Disables enforcing flow control settings at the Cloud PubSub server + * and uses the less accurate method of only enforcing flow control at the client side. + */ + public Builder setUseLegacyFlowControl(boolean value) { + this.useLegacyFlowControl = value; + return this; + } + /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * From cfe826dc762642d503130864957ee8e1c162affc Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Fri, 30 Oct 2020 17:52:33 -0400 Subject: [PATCH 2/5] feat: Enable server side flow control by default with the option to turn it off This change enables sending flow control settings automatically to the server. If FlowControlSettings.maxOutstandingElementCount > 0 or FlowControlSettings.maxOutstandingRequestBytes > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and Subscriber.Builder.setUseLegacyFlowControl() method is provided for users who would like to opt-out of this feature in case they encounter issues with server side flow control. --- .../pubsub/v1/StreamingSubscriberConnection.java | 7 +++++-- .../java/com/google/cloud/pubsub/v1/Subscriber.java | 13 +++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index f4e330ef1..cfa292095 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final MessageDispatcher messageDispatcher; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); @@ -98,6 +99,7 @@ public StreamingSubscriberConnection( SubscriberStub stub, int channelAffinity, FlowControlSettings flowControlSettings, + boolean useLegacyFlowControl, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -119,6 +121,7 @@ public StreamingSubscriberConnection( systemExecutor, clock); this.flowControlSettings = flowControlSettings; + this.useLegacyFlowControl = useLegacyFlowControl; } @Override @@ -217,9 +220,9 @@ private void initialize() { .setStreamAckDeadlineSeconds(60) .setClientId(clientId) .setMaxOutstandingMessages( - valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) .setMaxOutstandingBytes( - valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) + this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 35c50fdb6..217136b00 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; + private final boolean useLegacyFlowControl; private final Duration maxAckExtensionPeriod; private final Duration maxDurationPerAckExtension; // The ExecutorProvider used to generate executors for processing messages. @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private Subscriber(Builder builder) { receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; + useLegacyFlowControl = builder.useLegacyFlowControl; subscriptionName = builder.subscriptionName; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; @@ -336,6 +338,7 @@ private void startStreamingConnections() { subStub, i, flowControlSettings, + useLegacyFlowControl, flowController, executor, alarmsExecutor, @@ -420,6 +423,7 @@ public static final class Builder { private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; private Duration maxDurationPerAckExtension = Duration.ofMillis(0); + private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1000L) @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { return this; } + /** + * Disables enforcing flow control settings at the Cloud PubSub server + * and uses the less accurate method of only enforcing flow control at the client side. + */ + public Builder setUseLegacyFlowControl(boolean value) { + this.useLegacyFlowControl = value; + return this; + } + /** * Set the maximum period a message ack deadline will be extended. Defaults to one hour. * From fd3dde1a7c8a44c7936b90996bf3edfe6aa220c5 Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Thu, 5 Nov 2020 15:53:19 -0500 Subject: [PATCH 3/5] Update google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java Co-authored-by: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com> --- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 217136b00..6d5946276 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -509,9 +509,9 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { } /** - * Disables enforcing flow control settings at the Cloud PubSub server - * and uses the less accurate method of only enforcing flow control at the client side. - */ + * Disables enforcing flow control settings at the Cloud PubSub server and uses the less + * accurate method of only enforcing flow control at the client side. + */ public Builder setUseLegacyFlowControl(boolean value) { this.useLegacyFlowControl = value; return this; From a400b70150700fe12c4e72e6c8fb7a6eb80ef8fd Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Thu, 5 Nov 2020 15:53:27 -0500 Subject: [PATCH 4/5] Update google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java Co-authored-by: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com> --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index cfa292095..d8cf9c350 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -220,8 +220,12 @@ private void initialize() { .setStreamAckDeadlineSeconds(60) .setClientId(clientId) .setMaxOutstandingMessages( - this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) - .setMaxOutstandingBytes( + this.useLegacyFlowControl + ? 0 + : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + this.useLegacyFlowControl + ? 0 + : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); From e20f65fd831c732df6a24e022fd51687588ada43 Mon Sep 17 00:00:00 2001 From: fayssalmartanigcp <73672393+fayssalmartanigcp@users.noreply.github.com> Date: Thu, 5 Nov 2020 15:55:46 -0500 Subject: [PATCH 5/5] Update StreamingSubscriberConnection.java --- .../google/cloud/pubsub/v1/StreamingSubscriberConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index d8cf9c350..1587afb91 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -223,10 +223,10 @@ private void initialize() { this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + .setMaxOutstandingBytes( this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) - this.useLegacyFlowControl ? 0 : valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); /**