Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable server side flow control by default with the option to turn it off #426

Merged
merged 6 commits into from Nov 10, 2020
Expand Up @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
Expand All @@ -98,6 +99,7 @@ public StreamingSubscriberConnection(
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
boolean useLegacyFlowControl,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -119,6 +121,7 @@ public StreamingSubscriberConnection(
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
this.useLegacyFlowControl = useLegacyFlowControl;
}

@Override
Expand Down Expand Up @@ -217,9 +220,13 @@ private void initialize() {
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(
valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
.setMaxOutstandingBytes(
valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
.build());

/**
Expand Down
Expand Up @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac

private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
// The ExecutorProvider used to generate executors for processing messages.
Expand All @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
subscriptionName = builder.subscriptionName;

maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
Expand Down Expand Up @@ -336,6 +338,7 @@ private void startStreamingConnections() {
subStub,
i,
flowControlSettings,
useLegacyFlowControl,
flowController,
executor,
alarmsExecutor,
Expand Down Expand Up @@ -420,6 +423,7 @@ public static final class Builder {
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
Expand Down Expand Up @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Disables enforcing flow control settings at the Cloud PubSub server and uses the less
* accurate method of only enforcing flow control at the client side.
*/
public Builder setUseLegacyFlowControl(boolean value) {
this.useLegacyFlowControl = value;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down