Skip to content

Commit

Permalink
Client change for using request priority to enable flow control
Browse files Browse the repository at this point in the history
In this stage, client needs to do 3 things:
1. Always create flow control callable regardless of client flag
2. Make sure the entire callable behave like no-op if RateLimitInfo is not present.
3. Make sure client runs flow control as long as RateLimitInfo is present, regardless of client flag.

Meanwhile, setting client flag would still set the feature flag.

On server side, we'll compute and return RateLimitInfo if server side sees priority is low.
  • Loading branch information
kongweihan committed Sep 11, 2023
1 parent 7cc8a28 commit 761bab7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
Expand Up @@ -728,19 +728,20 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
.build(),
settings.bulkMutateRowsSettings().getRetryableCodes());

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable =
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> statsHeader =
new StatsHeadersServerStreamingCallable<>(base);

if (settings.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
callable = new RateLimitingServerStreamingCallable(callable);
}
// Always create this callable because flow control will be enabled by the presence of
// RateLimitInfo, not the client flag
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> flowControl =
new RateLimitingServerStreamingCallable(statsHeader);

// Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
// and
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> convertException =
new ConvertExceptionCallable<>(callable);
new ConvertExceptionCallable<>(flowControl);

ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(convertException);
Expand Down
Expand Up @@ -975,6 +975,7 @@ public EnhancedBigtableStubSettings build() {
this.setTransportChannelProvider(channelProviderBuilder.build());
}

// Will be deprecated once we migrate flow control user to use request priority
if (this.bulkMutateRowsSettings().isServerInitiatedFlowControlEnabled()) {
// only set mutate rows feature flag when this feature is enabled
featureFlags.setMutateRowsRateLimit(true);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -64,6 +65,10 @@ class RateLimitingServerStreamingCallable
// as the server side cap
private static final double MAX_FACTOR = 1.3;

// Disabled by default, enabled if RateLimitInfo is present, which is set on server side when
// feature flag is present or low request priority is used.
private final AtomicBoolean rateLimitEnabled = new AtomicBoolean(false);

private final RateLimiter limiter;

private final AtomicReference<Instant> lastQpsChangeTime = new AtomicReference<>(Instant.now());
Expand All @@ -73,40 +78,33 @@ class RateLimitingServerStreamingCallable
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> innerCallable) {
this.limiter = RateLimiter.create(DEFAULT_QPS);
this.innerCallable = Preconditions.checkNotNull(innerCallable, "Inner callable must be set");
logger.info("Rate limiting is enabled with initial QPS of " + limiter.getRate());
logger.info("Rate limiting callable is created with initial QPS of " + limiter.getRate());
}

@Override
public void call(
MutateRowsRequest request,
ResponseObserver<MutateRowsResponse> responseObserver,
ApiCallContext context) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
if (rateLimitEnabled.get()) {
Stopwatch stopwatch = Stopwatch.createStarted();
limiter.acquire();
stopwatch.stop();
if (context.getTracer() instanceof BigtableTracer) {
((BigtableTracer) context.getTracer())
.batchRequestThrottled(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}
RateLimitingResponseObserver innerObserver =
new RateLimitingResponseObserver(limiter, lastQpsChangeTime, responseObserver);
RateLimitingResponseObserver innerObserver = new RateLimitingResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

class RateLimitingResponseObserver extends SafeResponseObserver<MutateRowsResponse> {
private final ResponseObserver<MutateRowsResponse> outerObserver;
private final RateLimiter rateLimiter;

private final AtomicReference<Instant> lastQpsChangeTime;

RateLimitingResponseObserver(
RateLimiter rateLimiter,
AtomicReference<Instant> lastQpsChangeTime,
ResponseObserver<MutateRowsResponse> observer) {
RateLimitingResponseObserver(ResponseObserver<MutateRowsResponse> observer) {
super(observer);
this.outerObserver = observer;
this.rateLimiter = rateLimiter;
this.lastQpsChangeTime = lastQpsChangeTime;
}

@Override
Expand All @@ -116,7 +114,13 @@ protected void onStartImpl(StreamController controller) {

@Override
protected void onResponseImpl(MutateRowsResponse response) {
// Must not limit rate if RateLimitInfo is not present.
// Must limit rate and update QPS if RateLimitInfo is present, regardless of client side flag
// setting.
if (response.hasRateLimitInfo()) {
if (!rateLimitEnabled.getAndSet(true)) {
logger.info("Rate limiting is enabled with QPS of " + limiter.getRate());
}
RateLimitInfo info = response.getRateLimitInfo();
// RateLimitInfo is an optional field. However, proto3 sub-message field always
// have presence even thought it's marked as "optional". Check the factor and
Expand All @@ -126,6 +130,11 @@ protected void onResponseImpl(MutateRowsResponse response) {
info.getFactor(),
Duration.ofSeconds(com.google.protobuf.util.Durations.toSeconds(info.getPeriod())));
}
} else {
// Disable in case customer switched from low to higher priorities.
if (rateLimitEnabled.getAndSet(false)) {
logger.info("Rate limiting is disabled");
}
}
}

Expand Down

0 comments on commit 761bab7

Please sign in to comment.