Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Send streaming pull flow control settings to server #267

Merged
merged 9 commits into from Jun 24, 2020
Expand Up @@ -24,6 +24,7 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final ScheduledExecutorService systemExecutor;
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
private final Waiter ackOperationsWaiter = new Waiter();
Expand All @@ -93,6 +96,7 @@ public StreamingSubscriberConnection(
Distribution ackLatencyDistribution,
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -112,6 +116,7 @@ public StreamingSubscriberConnection(
executor,
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
}

@Override
Expand Down Expand Up @@ -209,6 +214,8 @@ private void initialize() {
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount())
.setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes())
.build());

/**
Expand Down
Expand Up @@ -332,6 +332,7 @@ private void startStreamingConnections() {
ackLatencyDistribution,
subStub,
i,
flowControlSettings,
flowController,
executor,
alarmsExecutor,
Expand Down