From 298c7bff56518c620782a0eadea203d0d1b8cae3 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 11 Aug 2021 20:29:29 +0000 Subject: [PATCH 1/4] feat: add batch throttled time to tracer --- .../com/google/api/gax/batching/Batcher.java | 4 ++ .../google/api/gax/batching/BatcherImpl.java | 50 +++++++++++++++++-- .../com/google/api/gax/tracing/ApiTracer.java | 7 +++ .../google/api/gax/tracing/BaseApiTracer.java | 5 ++ .../api/gax/batching/BatcherImplTest.java | 33 ++++++++++-- 5 files changed, 90 insertions(+), 9 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax/src/main/java/com/google/api/gax/batching/Batcher.java index d01f79f4a..8ac78ef2f 100644 --- a/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -32,6 +32,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; import com.google.api.core.InternalExtensionOnly; +import com.google.api.gax.rpc.ApiCallContext; /** * Represents a batching context where individual elements will be accumulated and flushed in a @@ -49,6 +50,9 @@ @InternalExtensionOnly public interface Batcher extends AutoCloseable { + /** {@link ApiCallContext.Key} for tracking batch total throttled time */ + ApiCallContext.Key THROTTLED_TIME_KEY = ApiCallContext.Key.create("total_throttled_time"); + /** * Queues the passed in element to be sent at some point in the future. * diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index aeeab82b2..f244248c8 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -40,9 +40,11 @@ import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.batching.FlowController.FlowControlRuntimeException; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; @@ -93,6 +95,7 @@ public class BatcherImpl private SettableApiFuture closeFuture; private final BatcherStats batcherStats = new BatcherStats(); private final FlowController flowController; + private ApiCallContext callContext; /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -108,7 +111,7 @@ public BatcherImpl( BatchingSettings batchingSettings, ScheduledExecutorService executor) { - this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null); + this(batchingDescriptor, unaryCallable, prototype, batchingSettings, executor, null, null); } /** @@ -128,6 +131,35 @@ public BatcherImpl( ScheduledExecutorService executor, @Nullable FlowController flowController) { + this( + batchingDescriptor, + unaryCallable, + prototype, + batchingSettings, + executor, + flowController, + null); + } + + /** + * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements + * into wrappers request and response + * @param unaryCallable a {@link UnaryCallable} object + * @param prototype a {@link RequestT} object + * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds + * @param flowController a {@link FlowController} for throttling requests. If it's null, create a + * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. + * @param callContext a {@link ApiCallContext} object that'll be merged in unaryCallable + */ + public BatcherImpl( + BatchingDescriptor batchingDescriptor, + UnaryCallable unaryCallable, + RequestT prototype, + BatchingSettings batchingSettings, + ScheduledExecutorService executor, + @Nullable FlowController flowController, + @Nullable ApiCallContext callContext) { + this.batchingDescriptor = Preconditions.checkNotNull(batchingDescriptor, "batching descriptor cannot be null"); this.unaryCallable = Preconditions.checkNotNull(unaryCallable, "callable cannot be null"); @@ -168,6 +200,7 @@ public BatcherImpl( scheduledFuture = Futures.immediateCancelledFuture(); } currentBatcherReference = new BatcherReference(this); + this.callContext = callContext; } /** {@inheritDoc} */ @@ -192,16 +225,18 @@ public ApiFuture add(ElementT element) { // class, which made it seem unnecessary to have blocking and non-blocking semaphore // implementations. Some refactoring may be needed for the optimized implementation. So we'll // defer it till we decide on if refactoring FlowController is necessary. + Stopwatch stopwatch = Stopwatch.createStarted(); try { flowController.reserve(1, batchingDescriptor.countBytes(element)); } catch (FlowControlException e) { // This exception will only be thrown if the FlowController is set to ThrowException behavior throw FlowControlRuntimeException.fromFlowControlException(e); } + long throttledTimeMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); SettableApiFuture result = SettableApiFuture.create(); synchronized (elementLock) { - currentOpenBatch.add(element, result); + currentOpenBatch.add(element, result, throttledTimeMs); } if (currentOpenBatch.hasAnyThresholdReached()) { @@ -230,8 +265,13 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } + if (callContext != null) { + callContext = + callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); + unaryCallable.withDefaultCallContext(callContext); + } final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build()); + unaryCallable.futureCall(accumulatedBatch.builder.build(), callContext); numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( @@ -367,6 +407,7 @@ private static class Batch { private long elementCounter = 0; private long byteCounter = 0; + private long totalThrottledTimeMs = 0; private Batch( RequestT prototype, @@ -383,11 +424,12 @@ private Batch( this.batcherStats = batcherStats; } - void add(ElementT element, SettableApiFuture result) { + void add(ElementT element, SettableApiFuture result, long throttledTimeMs) { builder.add(element); entries.add(BatchEntry.create(element, result)); elementCounter++; byteCounter += descriptor.countBytes(element); + totalThrottledTimeMs += throttledTimeMs; } void onBatchSuccess(ResponseT response) { diff --git a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index bc329630e..25d8a58e7 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -143,6 +143,13 @@ public interface ApiTracer { */ void batchRequestSent(long elementCount, long requestSize); + /** + * Adds an annotation of total throttled time of a batch. + * + * @param throttledTimeMs total throttled time of this batch. + */ + void batchRequestThrottled(long throttledTimeMs); + /** * A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a * {@link Scope} removes any context that the underlying implementation might've set in {@link diff --git a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java index 4ff8e901f..dbbedce58 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java @@ -135,4 +135,9 @@ public void requestSent() { public void batchRequestSent(long elementCount, long requestSize) { // noop } + + @Override + public void batchRequestThrottled(long throttledTimeMs) { + // noop + } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 10ccb1453..31aff895b 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -33,6 +33,7 @@ import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -75,6 +76,8 @@ import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.threeten.bp.Duration; @RunWith(JUnit4.class) @@ -132,9 +135,10 @@ public void testSendOutstanding() { SQUARER_BATCHING_DESC_V2, new LabeledIntSquarerCallable() { @Override - public ApiFuture> futureCall(LabeledIntList request) { + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { callableCounter.incrementAndGet(); - return super.futureCall(request); + return super.futureCall(request, context); } }, labeledIntList, @@ -838,8 +842,23 @@ public void testThrottlingBlocking() throws Exception { .setMaxOutstandingElementCount(1L) .build()); ExecutorService executor = Executors.newSingleThreadExecutor(); + + ApiCallContext callContext = Mockito.mock(ApiCallContext.class); + ArgumentCaptor> key = + ArgumentCaptor.forClass(ApiCallContext.Key.class); + ArgumentCaptor value = ArgumentCaptor.forClass(Long.class); + when(callContext.withOption(key.capture(), value.capture())).thenReturn(callContext); + long throttledTime = 10; + try (final Batcher batcher = - createDefaultBatcherImpl(settings, flowController)) { + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, + callLabeledIntSquarer, + labeledIntList, + settings, + EXECUTOR, + flowController, + callContext)) { flowController.reserve(1, 1); Future future = executor.submit( @@ -850,7 +869,7 @@ public void run() { } }); try { - future.get(10, TimeUnit.MILLISECONDS); + future.get(throttledTime, TimeUnit.MILLISECONDS); assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail(); } catch (TimeoutException e) { // expected @@ -861,6 +880,9 @@ public void run() { } catch (TimeoutException e) { assertWithMessage("adding elements to batcher should not be blocked").fail(); } + // Verify that throttled time is recorded in ApiCallContext + assertThat(key.getValue()).isSameInstanceAs(Batcher.THROTTLED_TIME_KEY); + assertThat(value.getValue()).isAtLeast(throttledTime); } finally { executor.shutdownNow(); } @@ -913,6 +935,7 @@ private BatcherImpl> createDefau labeledIntList, settings, EXECUTOR, - flowController); + flowController, + null); } } From fa3ef6f66d1f1bc86317f61b2b4c9ea28e3cdcfb Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 18 Aug 2021 18:39:35 +0000 Subject: [PATCH 2/4] deflake test --- .../test/java/com/google/api/gax/batching/BatcherImplTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 31aff895b..04967dcf1 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -869,7 +869,8 @@ public void run() { } }); try { - future.get(throttledTime, TimeUnit.MILLISECONDS); + future.get(10, TimeUnit.MILLISECONDS); + Thread.sleep(throttledTime); assertWithMessage("adding elements to batcher should be blocked by FlowControlled").fail(); } catch (TimeoutException e) { // expected From 357ac178aa20fcb1d99660db1933b89a260f370b Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 25 Oct 2021 10:33:40 -0400 Subject: [PATCH 3/4] Deprecate constructors and update call context --- .../com/google/api/gax/batching/BatcherImpl.java | 13 +++++++++---- .../java/com/google/api/gax/tracing/ApiTracer.java | 7 ------- .../com/google/api/gax/tracing/BaseApiTracer.java | 5 ----- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index f244248c8..151d0807b 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -95,7 +95,7 @@ public class BatcherImpl private SettableApiFuture closeFuture; private final BatcherStats batcherStats = new BatcherStats(); private final FlowController flowController; - private ApiCallContext callContext; + private final ApiCallContext callContext; /** * @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements @@ -103,7 +103,9 @@ public class BatcherImpl * @param unaryCallable a {@link UnaryCallable} object * @param prototype a {@link RequestT} object * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds + * @deprecated Please use the constructor with FlowController and ApiCallContext instead */ + @Deprecated public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, @@ -122,7 +124,9 @@ public BatcherImpl( * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds * @param flowController a {@link FlowController} for throttling requests. If it's null, create a * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. + * @deprecated Please use the constructor with ApiCallContext instead */ + @Deprecated public BatcherImpl( BatchingDescriptor batchingDescriptor, UnaryCallable unaryCallable, @@ -265,13 +269,14 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } + // This check is for the old constructors that don't have a call context + ApiCallContext callContextWithOption = null; if (callContext != null) { - callContext = + callContextWithOption = callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); - unaryCallable.withDefaultCallContext(callContext); } final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build(), callContext); + unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( diff --git a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java index 25d8a58e7..bc329630e 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/ApiTracer.java @@ -143,13 +143,6 @@ public interface ApiTracer { */ void batchRequestSent(long elementCount, long requestSize); - /** - * Adds an annotation of total throttled time of a batch. - * - * @param throttledTimeMs total throttled time of this batch. - */ - void batchRequestThrottled(long throttledTimeMs); - /** * A context class to be used with {@link #inScope()} and a try-with-resources block. Closing a * {@link Scope} removes any context that the underlying implementation might've set in {@link diff --git a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java index dbbedce58..4ff8e901f 100644 --- a/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java +++ b/gax/src/main/java/com/google/api/gax/tracing/BaseApiTracer.java @@ -135,9 +135,4 @@ public void requestSent() { public void batchRequestSent(long elementCount, long requestSize) { // noop } - - @Override - public void batchRequestThrottled(long throttledTimeMs) { - // noop - } } From 9eb045e0d0d9230e8b7d97b3b40245eeaeba3220 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 25 Oct 2021 10:52:28 -0400 Subject: [PATCH 4/4] update docs --- .../main/java/com/google/api/gax/batching/BatcherImpl.java | 7 ++++--- .../java/com/google/api/gax/batching/BatcherImplTest.java | 3 +-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 151d0807b..743d25c57 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -103,7 +103,8 @@ public class BatcherImpl * @param unaryCallable a {@link UnaryCallable} object * @param prototype a {@link RequestT} object * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds - * @deprecated Please use the constructor with FlowController and ApiCallContext instead + * @deprecated Please instantiate the Batcher with {@link FlowController} and {@link + * ApiCallContext} */ @Deprecated public BatcherImpl( @@ -124,7 +125,7 @@ public BatcherImpl( * @param batchingSettings a {@link BatchingSettings} with configuration of thresholds * @param flowController a {@link FlowController} for throttling requests. If it's null, create a * {@link FlowController} object from {@link BatchingSettings#getFlowControlSettings()}. - * @deprecated Please use the constructor with ApiCallContext instead + * @deprecated Please instantiate the Batcher with {@link ApiCallContext} */ @Deprecated public BatcherImpl( @@ -269,7 +270,7 @@ public void sendOutstanding() { currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats); } - // This check is for the old constructors that don't have a call context + // This check is for old clients that instantiated the batcher without ApiCallContext ApiCallContext callContextWithOption = null; if (callContext != null) { callContextWithOption = diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 04967dcf1..c3a1ce0bb 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -936,7 +936,6 @@ private BatcherImpl> createDefau labeledIntList, settings, EXECUTOR, - flowController, - null); + flowController); } }