From 2b50adb51a92857a605eff049ad580eed31544f4 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 24 Mar 2020 17:16:49 -0400 Subject: [PATCH 1/9] feat: Add flow control support to publisher --- .../com/google/cloud/pubsub/v1/Publisher.java | 175 +++++++++++++++- .../pubsub/v1/SequentialExecutorService.java | 4 + .../cloud/pubsub/v1/PublisherImplTest.java | 190 ++++++++++++++++++ 3 files changed, 366 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index e28427eea..45455fa2b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -25,6 +25,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.BackgroundResourceAggregation; import com.google.api.gax.core.CredentialsProvider; @@ -55,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -108,6 +111,8 @@ public class Publisher { private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; + private MessageFlowController flowController = null; + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -122,6 +127,16 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); + if (flowControl != null + && flowControl.getLimitExceededBehavior() != FlowController.LimitExceededBehavior.Ignore) { + this.flowController = + new MessageFlowController( + flowControl.getMaxOutstandingElementCount(), + flowControl.getMaxOutstandingRequestBytes(), + flowControl.getLimitExceededBehavior()); + } + this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; @@ -221,6 +236,19 @@ public ApiFuture publish(PubsubMessage message) { final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); + + if (flowController != null) { + try { + flowController.acquire(outstandingPublish.messageSize); + } catch (Exception e) { + if (!orderingKey.isEmpty()) { + sequentialExecutor.stopPublish(orderingKey); + } + outstandingPublish.publishResult.setException(e); + return outstandingPublish.publishResult; + } + } + List batchesToSend; messagesBatchLock.lock(); try { @@ -454,7 +482,7 @@ public ApiFuture call() { ApiFutures.addCallback(future, futureCallback, directExecutor()); } - private static final class OutstandingBatch { + private final class OutstandingBatch { final List outstandingPublishes; final long creationTime; int attempt; @@ -484,6 +512,9 @@ private List getMessages() { private void onFailure(Throwable t) { for (OutstandingPublish outstandingPublish : outstandingPublishes) { + if (flowController != null) { + flowController.release(outstandingPublish.messageSize); + } outstandingPublish.publishResult.setException(t); } } @@ -491,7 +522,11 @@ private void onFailure(Throwable t) { private void onSuccess(Iterable results) { Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { - messagesResultsIt.next().publishResult.set(messageId); + OutstandingPublish nextPublish = messagesResultsIt.next(); + if (flowController != null) { + flowController.release(nextPublish.messageSize); + } + nextPublish.publishResult.set(messageId); } } } @@ -602,6 +637,10 @@ public static final class Builder { .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build()) .build(); static final RetrySettings DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder() @@ -759,7 +798,137 @@ public Publisher build() throws IOException { } } - private static class MessagesBatch { + private static class MessageFlowController { + private final Lock lock; + private final Long messageLimit; + private final Long byteLimit; + private final FlowController.LimitExceededBehavior limitBehavior; + + private Long outstandingMessages; + private Long outstandingBytes; + private LinkedList awaitingMessageAcquires; + private LinkedList awaitingBytesAcquires; + + MessageFlowController( + Long messageLimit, Long byteLimit, FlowController.LimitExceededBehavior limitBehavior) { + this.messageLimit = messageLimit; + this.byteLimit = byteLimit; + this.limitBehavior = limitBehavior; + this.lock = new ReentrantLock(); + + this.outstandingMessages = 0L; + this.outstandingBytes = 0L; + + this.awaitingMessageAcquires = new LinkedList(); + this.awaitingBytesAcquires = new LinkedList(); + } + + void acquire(long messageSize) throws FlowController.FlowControlException { + lock.lock(); + try { + if (outstandingMessages >= messageLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit); + } + if (outstandingBytes + messageSize >= byteLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); + } + + // We can acquire or we should wait until we can acquire. + // Start by acquiring a slot for a message. + CountDownLatch messageWaiter = null; + while (outstandingMessages >= messageLimit) { + if (messageWaiter == null) { + // This message gets added to the back of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.addLast(messageWaiter); + } else { + // This message already in line stays at the head of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.removeFirst(); + awaitingMessageAcquires.addFirst(messageWaiter); + } + lock.unlock(); + try { + messageWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + ++outstandingMessages; + if (messageWaiter != null) { + awaitingMessageAcquires.removeFirst(); + } + + // There may be some surplus messages left; let the next message waiting for a token have + // one. + if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) { + awaitingMessageAcquires.getFirst().countDown(); + } + + // Now acquire space for bytes. + CountDownLatch bytesWaiter = null; + Long bytesRemaining = messageSize; + while (outstandingBytes + bytesRemaining >= byteLimit) { + // Take what is available. + Long available = byteLimit - outstandingBytes; + bytesRemaining -= available; + outstandingBytes = byteLimit; + if (bytesWaiter == null) { + // This message gets added to the back of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.addLast(bytesWaiter); + } else { + // This message already in line stays at the head of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.removeFirst(); + awaitingBytesAcquires.addFirst(bytesWaiter); + } + lock.unlock(); + try { + bytesWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + + outstandingBytes += bytesRemaining; + if (bytesWaiter != null) { + awaitingBytesAcquires.removeFirst(); + } + // There may be some surplus bytes left; let the next message waiting for bytes have some. + if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) { + awaitingBytesAcquires.getFirst().countDown(); + } + } finally { + lock.unlock(); + } + } + + private void notifyNextAcquires() { + if (!awaitingMessageAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst(); + awaitingAcquire.countDown(); + } + if (!awaitingBytesAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst(); + awaitingAcquire.countDown(); + } + } + + void release(long messageSize) { + lock.lock(); + --outstandingMessages; + outstandingBytes -= messageSize; + notifyNextAcquires(); + lock.unlock(); + } + } + + private class MessagesBatch { private List messages; private int batchedBytes; private String orderingKey; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 712f51eb5..292921850 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -247,6 +247,10 @@ void resumePublish(String key) { keysWithErrors.remove(key); } + void stopPublish(String key) { + keysWithErrors.add(key); + } + /** Cancels every task in the queue associated with {@code key}. */ private void cancelQueuedTasks(final String key, Throwable e) { keysWithErrors.add(key); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index b1109c8fc..ea9e9850f 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -25,6 +25,8 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -43,7 +45,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; @@ -923,6 +928,191 @@ public void testShutDown() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } + @Test + public void testPublishFlowControl_throwException() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAAAAAAAAA"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message succeeds. + ApiFuture publishFuture2 = sendTestMessage(publisher, "AAAA"); + + // Sending a third message fails because of the outstanding message. + ApiFuture publishFuture3 = sendTestMessage(publisher, "AA"); + try { + publishFuture3.get(); + fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class); + } + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + assertEquals("1", publishFuture2.get()); + + // Sending another message succeeds. + ApiFuture publishFuture4 = sendTestMessage(publisher, "AAAA"); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2")); + assertEquals("2", publishFuture4.get()); + } + + @Test + public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = + sendTestMessageWithOrderingKey(publisher, "AAAAAAAAAAA", "a"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message for the same ordering key fails because the first one failed. + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); + try { + publishFuture2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + + @Test + public void testPublishFlowControl_block() throws Exception { + final Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(2L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + Executor responseExecutor = Executors.newScheduledThreadPool(10); + final CountDownLatch sendResponse1 = new CountDownLatch(1); + final CountDownLatch sentResponse1 = new CountDownLatch(1); + final CountDownLatch sendResponse2 = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1")); + sentResponse1.countDown(); + sendResponse2.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("2")); + } catch (Exception e) { + } + } + }); + + // Sending two messages succeeds. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AA"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "AA"); + + // Sending a third message blocks because messages are outstanding. + final CountDownLatch publish3Completed = new CountDownLatch(1); + final CountDownLatch sentResponse3 = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + ApiFuture publishFuture3 = sendTestMessage(publisher, "AAAAAA"); + publish3Completed.countDown(); + } + }); + + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.countDown(); + sentResponse1.await(); + sendResponse2.countDown(); + } catch (Exception e) { + } + } + }); + + // Sending a fourth message blocks because although only one message has been sent, + // the third message claimed the tokens for outstanding bytes. + final CountDownLatch publish4Completed = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + publish3Completed.await(); + ApiFuture publishFuture4 = sendTestMessage(publisher, "A"); + publish4Completed.countDown(); + } catch (Exception e) { + } + } + }); + + publish3Completed.await(); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); + sentResponse3.countDown(); + + publish4Completed.await(); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) From 6bb47bc432bb3a0bbce9483e98d4aa27fd46fe8b Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 25 Mar 2020 11:41:39 -0400 Subject: [PATCH 2/9] make suggested fixes --- .../java/com/google/cloud/pubsub/v1/Publisher.java | 8 +++----- .../com/google/cloud/pubsub/v1/PublisherImplTest.java | 10 +++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 45455fa2b..659d84bb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -240,7 +240,7 @@ public ApiFuture publish(PubsubMessage message) { if (flowController != null) { try { flowController.acquire(outstandingPublish.messageSize); - } catch (Exception e) { + } catch (FlowController.FlowControlException e) { if (!orderingKey.isEmpty()) { sequentialExecutor.stopPublish(orderingKey); } @@ -846,8 +846,7 @@ void acquire(long messageSize) throws FlowController.FlowControlException { } else { // This message already in line stays at the head of the line. messageWaiter = new CountDownLatch(1); - awaitingMessageAcquires.removeFirst(); - awaitingMessageAcquires.addFirst(messageWaiter); + awaitingMessageAcquires.set(0, messageWaiter); } lock.unlock(); try { @@ -883,8 +882,7 @@ void acquire(long messageSize) throws FlowController.FlowControlException { } else { // This message already in line stays at the head of the line. bytesWaiter = new CountDownLatch(1); - awaitingBytesAcquires.removeFirst(); - awaitingBytesAcquires.addFirst(bytesWaiter); + awaitingBytesAcquires.set(0, bytesWaiter); } lock.unlock(); try { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index ea9e9850f..5ee5fdcfc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -1042,7 +1042,7 @@ public void testPublishFlowControl_block() throws Exception { .build(); Executor responseExecutor = Executors.newScheduledThreadPool(10); final CountDownLatch sendResponse1 = new CountDownLatch(1); - final CountDownLatch sentResponse1 = new CountDownLatch(1); + final CountDownLatch response1Sent = new CountDownLatch(1); final CountDownLatch sendResponse2 = new CountDownLatch(1); responseExecutor.execute( new Runnable() { @@ -1052,7 +1052,7 @@ public void run() { sendResponse1.await(); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("1")); - sentResponse1.countDown(); + response1Sent.countDown(); sendResponse2.await(); testPublisherServiceImpl.addPublishResponse( PublishResponse.newBuilder().addMessageIds("2")); @@ -1067,7 +1067,7 @@ public void run() { // Sending a third message blocks because messages are outstanding. final CountDownLatch publish3Completed = new CountDownLatch(1); - final CountDownLatch sentResponse3 = new CountDownLatch(1); + final CountDownLatch response3Sent = new CountDownLatch(1); responseExecutor.execute( new Runnable() { @Override @@ -1083,7 +1083,7 @@ public void run() { public void run() { try { sendResponse1.countDown(); - sentResponse1.await(); + response1Sent.await(); sendResponse2.countDown(); } catch (Exception e) { } @@ -1108,7 +1108,7 @@ public void run() { publish3Completed.await(); testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); - sentResponse3.countDown(); + response3Sent.countDown(); publish4Completed.await(); } From 9c113c3e32c28cf0d1de8aad3409b5c509fb1ada Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 2 May 2020 13:25:32 -0400 Subject: [PATCH 3/9] chore: Remove note that ordering keys requires enablements. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 282df369f..2c3fa86a1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -305,7 +305,7 @@ public void run() { * * @param key The key for which to resume publishing. */ - @BetaApi("Ordering is not yet fully supported and requires special project enablements.") + @BetaApi public void resumePublish(String key) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); sequentialExecutor.resumePublish(key); @@ -765,7 +765,7 @@ public Builder setRetrySettings(RetrySettings retrySettings) { } /** Sets the message ordering option. */ - @BetaApi("Ordering is not yet fully supported and requires special project enablements.") + @BetaApi public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { this.enableMessageOrdering = enableMessageOrdering; return this; From 6947740747aa76784967fc1cb93a129a9408e1fe Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 20:58:18 -0400 Subject: [PATCH 4/9] feat: Add support for server-side flow control --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 7 +++++++ .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 1 + 2 files changed, 8 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 047e1ba75..0fa671190 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -25,6 +25,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; + private final FlowControlSettings flowControlSettings; + private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); private final Waiter ackOperationsWaiter = new Waiter(); @@ -93,6 +96,7 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, + FlowControlSettings flowControlSettings, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -112,6 +116,7 @@ public StreamingSubscriberConnection( executor, systemExecutor, clock); + this.flowControlSettings = flowControlSettings; } @Override @@ -209,6 +214,8 @@ private void initialize() { .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) .setClientId(clientId) + .setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount()) + .setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes()) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 0054408ee..bd30bb112 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -332,6 +332,7 @@ private void startStreamingConnections() { ackLatencyDistribution, subStub, i, + flowControlSettings, flowController, executor, alarmsExecutor, From fb08aa3732f12b7ba69b399a3ac450ac78ba27ae Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sat, 20 Jun 2020 21:00:05 -0400 Subject: [PATCH 5/9] Revert "chore: Remove note that ordering keys requires enablements." This reverts commit 9c113c3e32c28cf0d1de8aad3409b5c509fb1ada. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2c3fa86a1..282df369f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -305,7 +305,7 @@ public void run() { * * @param key The key for which to resume publishing. */ - @BetaApi + @BetaApi("Ordering is not yet fully supported and requires special project enablements.") public void resumePublish(String key) { Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher."); sequentialExecutor.resumePublish(key); @@ -765,7 +765,7 @@ public Builder setRetrySettings(RetrySettings retrySettings) { } /** Sets the message ordering option. */ - @BetaApi + @BetaApi("Ordering is not yet fully supported and requires special project enablements.") public Builder setEnableMessageOrdering(boolean enableMessageOrdering) { this.enableMessageOrdering = enableMessageOrdering; return this; From c9c2028f5d208ad957898e35d43578cd50439d33 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 24 Jun 2020 00:41:01 -0400 Subject: [PATCH 6/9] fix: Fix import order --- .../google/cloud/pubsub/v1/StreamingSubscriberConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 0fa671190..885554f74 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -24,8 +24,8 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcStatusCode; From 78ab82d5a457be7e41f00bb63643c3678dc68ec7 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Thu, 23 Jul 2020 17:16:27 -0400 Subject: [PATCH 7/9] fix: Make error message more clear about where ordering must be enabled when publishing. --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 282df369f..b781a2e3d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -233,7 +233,9 @@ public ApiFuture publish(PubsubMessage message) { final String orderingKey = message.getOrderingKey(); Preconditions.checkState( orderingKey.isEmpty() || enableMessageOrdering, - "Cannot publish a message with an ordering key when message ordering is not enabled."); + "Cannot publish a message with an ordering key when message ordering is not enabled in the " + + "Publisher client. Please create a Publisher client with " + + "setEnableMessageOrdering(true) in the builder."); final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); From 8b734677bd48d1db50b849c3f649492467255940 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Sun, 27 Sep 2020 12:38:35 -0400 Subject: [PATCH 8/9] fix: Ensure that messages that are in pending batches for an ordering key are canceled when a previous publish for the ordering keys fails. --- .../com/google/cloud/pubsub/v1/Publisher.java | 20 ++++++++ .../pubsub/v1/SequentialExecutorService.java | 4 ++ .../cloud/pubsub/v1/PublisherImplTest.java | 51 +++++++++++++++++++ 3 files changed, 75 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 6a9f68659..5779b1fe7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -256,6 +256,11 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { + if (sequentialExecutor.keyHasError(orderingKey)) { + outstandingPublish.publishResult.setException( + SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); + return outstandingPublish.publishResult; + } MessagesBatch messagesBatch = messagesBatches.get(orderingKey); if (messagesBatch == null) { messagesBatch = new MessagesBatch(batchingSettings, orderingKey); @@ -462,6 +467,21 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { try { + if (outstandingBatch.orderingKey != null && !outstandingBatch.orderingKey.isEmpty()) { + messagesBatchLock.lock(); + try { + MessagesBatch messagesBatch = messagesBatches.get(outstandingBatch.orderingKey); + if (messagesBatch != null) { + for (OutstandingPublish outstanding : messagesBatch.messages) { + outstanding.publishResult.setException( + SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); + } + messagesBatches.remove(outstandingBatch.orderingKey); + } + } finally { + messagesBatchLock.unlock(); + } + } outstandingBatch.onFailure(t); } finally { messagesWaiter.incrementPendingCount(-outstandingBatch.size()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 292921850..4866e6be4 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -243,6 +243,10 @@ public void cancel(Throwable e) { return future; } + boolean keyHasError(String key) { + return keysWithErrors.contains(key); + } + void resumePublish(String key) { keysWithErrors.remove(key); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index d7687ae07..e5a785aed 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -566,6 +566,57 @@ public void testResumePublish() throws Exception { shutdownTestPublisher(publisher); } + @Test + public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(500)) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Send two messages that will fulfill the first batch, which will return a failure. + testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a"); + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a"); + + // A third message will fail because the first attempt to publish failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a"); + + try { + publishFuture1.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture2.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + } + + try { + publishFuture3.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + + // A subsequent attempt fails immediately. + ApiFuture publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a"); + try { + publishFuture4.get(); + fail("Should have failed."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + private ApiFuture sendTestMessageWithOrderingKey( Publisher publisher, String data, String orderingKey) { return publisher.publish( From 8be37d79643ba2b420fd0d88426eb421fb33fc48 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 29 Sep 2020 15:24:27 -0400 Subject: [PATCH 9/9] fix: Only check keyHasError if ordering keys is non-empty --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 5779b1fe7..07a550496 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -256,7 +256,7 @@ public ApiFuture publish(PubsubMessage message) { List batchesToSend; messagesBatchLock.lock(); try { - if (sequentialExecutor.keyHasError(orderingKey)) { + if (!orderingKey.isEmpty() && sequentialExecutor.keyHasError(orderingKey)) { outstandingPublish.publishResult.setException( SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION); return outstandingPublish.publishResult;