From 17875bf7113c2da26fb598e06234f3cb301823ba Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 22 Apr 2024 16:22:15 -0400 Subject: [PATCH 1/7] chore: start teasing apart grpc write path to allow removal of WriteFlusher abstraction WriteFlusher has grown unwieldy to be able to properly handle the kinds of uploads that are performed while being able to perform appropriate finegrained response validation. This is the first change in a series to tease WriteFlusher apart and create an UnbufferedWritableByteChannel for each kind of upload where all logic for that upload can be encapsulated. --- ...icUnbufferedDirectWritableByteChannel.java | 283 ++++++++++++++++++ ...apicWritableByteChannelSessionBuilder.java | 39 ++- ...apicUnbufferedWritableByteChannelTest.java | 13 +- 3 files changed, 322 insertions(+), 13 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java new file mode 100644 index 000000000..2ae762916 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java @@ -0,0 +1,283 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.LongConsumer; +import org.checkerframework.checker.nullness.qual.NonNull; + +final class GapicUnbufferedDirectWritableByteChannel implements UnbufferedWritableByteChannel { + + private final SettableApiFuture resultFuture; + private final ChunkSegmenter chunkSegmenter; + private final ClientStreamingCallable write; + + private final WriteCtx writeCtx; + + private final Observer responseObserver; + private volatile ApiStreamObserver stream; + + private boolean open = true; + private boolean first = true; + private boolean finished = false; + + GapicUnbufferedDirectWritableByteChannel( + SettableApiFuture resultFuture, + ChunkSegmenter chunkSegmenter, + ClientStreamingCallable write, + SimpleWriteObjectRequestBuilderFactory requestFactory) { + String bucketName = requestFactory.bucketName(); + this.resultFuture = resultFuture; + this.chunkSegmenter = chunkSegmenter; + + GrpcCallContext internalContext = + contextWithBucketName(bucketName, GrpcCallContext.createDefault()); + this.write = write.withDefaultCallContext(internalContext); + + this.writeCtx = new WriteCtx<>(requestFactory); + this.responseObserver = new Observer(writeCtx.getConfirmedBytes()::set, resultFuture::set); + } + + @Override + public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + return internalWrite(srcs, srcsOffset, srcsLength, false); + } + + @Override + public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + long write = internalWrite(srcs, srcsOffset, srcsLength, true); + close(); + return write; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + ApiStreamObserver openedStream = openedStream(); + if (!finished) { + WriteObjectRequest message = finishMessage(); + try { + openedStream.onNext(message); + openedStream.onCompleted(); + finished = true; + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } else { + try { + openedStream.onCompleted(); + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } + open = false; + responseObserver.await(); + } + + private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) + throws ClosedChannelException { + if (!open) { + throw new ClosedChannelException(); + } + + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + + List messages = new ArrayList<>(); + + ApiStreamObserver openedStream = openedStream(); + int bytesConsumed = 0; + for (ChunkSegment datum : data) { + Crc32cLengthKnown crc32c = datum.getCrc32c(); + ByteString b = datum.getB(); + int contentSize = b.size(); + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + Crc32cLengthKnown cumulative = + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + WriteObjectRequest.Builder builder = + writeCtx + .newRequestBuilder() + .setWriteOffset(offset) + .setChecksummedData(checksummedData.build()); + if (!datum.isOnlyFullBlocks()) { + builder.setFinishWrite(true); + if (cumulative != null) { + builder.setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + } + finished = true; + } + + WriteObjectRequest build = possiblyPairDownRequest(builder, first).build(); + first = false; + messages.add(build); + bytesConsumed += contentSize; + } + if (finalize && !finished) { + messages.add(finishMessage()); + finished = true; + } + + try { + for (WriteObjectRequest message : messages) { + openedStream.onNext(message); + } + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + + return bytesConsumed; + } + + @NonNull + private WriteObjectRequest finishMessage() { + long offset = writeCtx.getTotalSentBytes().get(); + Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); + + WriteObjectRequest.Builder b = + writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + if (crc32cValue != null) { + b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); + } + WriteObjectRequest message = possiblyPairDownRequest(b, first).build(); + return message; + } + + private ApiStreamObserver openedStream() { + if (stream == null) { + synchronized (this) { + if (stream == null) { + stream = write.clientStreamingCall(responseObserver); + } + } + } + return stream; + } + + /** + * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, + * this utility method centralizes the logic necessary to clear those fields for use by subsequent + * messages. + */ + private static WriteObjectRequest.Builder possiblyPairDownRequest( + WriteObjectRequest.Builder b, boolean firstMessageOfStream) { + if (firstMessageOfStream && b.getWriteOffset() == 0) { + return b; + } + if (b.getWriteOffset() > 0) { + b.clearWriteObjectSpec(); + } + + if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { + b.clearObjectChecksums(); + } + return b; + } + + static class Observer implements ApiStreamObserver { + + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + + private final SettableApiFuture invocationHandle; + private volatile WriteObjectResponse last; + + Observer(LongConsumer sizeCallback, Consumer completeCallback) { + this.sizeCallback = sizeCallback; + this.completeCallback = completeCallback; + this.invocationHandle = SettableApiFuture.create(); + } + + @Override + public void onNext(WriteObjectResponse value) { + // incremental update + if (value.hasPersistedSize()) { + sizeCallback.accept(value.getPersistedSize()); + } else if (value.hasResource()) { + sizeCallback.accept(value.getResource().getSize()); + } + last = value; + } + + /** + * observed exceptions so far + * + *
    + *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} + *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} + *
  3. {@link io.grpc.StatusRuntimeException} + *
+ */ + @Override + public void onError(Throwable t) { + invocationHandle.setException(t); + } + + @Override + public void onCompleted() { + if (last != null && last.hasResource()) { + completeCallback.accept(last); + } + invocationHandle.set(null); + } + + void await() { + try { + invocationHandle.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 5aa47eb6f..728e3fb2a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -39,6 +39,7 @@ import java.nio.ByteBuffer; import java.util.function.BiFunction; import java.util.function.Function; +import org.checkerframework.checker.nullness.qual.NonNull; final class GapicWritableByteChannelSessionBuilder { @@ -113,6 +114,17 @@ JournalingResumableUploadBuilder journaling() { return new JournalingResumableUploadBuilder(); } + private @NonNull ChunkSegmenter getChunkSegmenter() { + // it is theoretically possible that the setter methods for the following variables could + // be called again between when this method is invoked and the resulting function is invoked. + // To ensure we are using the specified values at the point in time they are bound to the + // function read them into local variables which will be closed over rather than the class + // fields. + ByteStringStrategy boundStrategy = byteStringStrategy; + Hasher boundHasher = hasher; + return new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE); + } + /** * When constructing any of our channel sessions, there is always a {@link * GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction which @@ -141,6 +153,15 @@ JournalingResumableUploadBuilder journaling() { flusherFactory); } + private static + BiFunction, UnbufferedWritableByteChannel> + lift( + BiFunction< + StartT, SettableApiFuture, UnbufferedWritableByteChannel> + func) { + return func; + } + final class DirectUploadBuilder { /** @@ -189,9 +210,12 @@ UnbufferedDirectUploadBuilder setRequest(WriteObjectRequest req) { UnbufferedWritableByteChannelSession build() { return new UnbufferedWriteSession<>( ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")), - bindFunction( - WriteFlushStrategy.fsyncOnClose(write), - WriteObjectRequestBuilderFactory::simple) + lift((WriteObjectRequest start, SettableApiFuture resultFuture) -> + new GapicUnbufferedDirectWritableByteChannel( + resultFuture, + getChunkSegmenter(), + write, + WriteObjectRequestBuilderFactory.simple(start))) .andThen(StorageByteChannels.writable()::createSynchronized)); } } @@ -214,9 +238,12 @@ BufferedDirectUploadBuilder setRequest(WriteObjectRequest req) { BufferedWritableByteChannelSession build() { return new BufferedWriteSession<>( ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")), - bindFunction( - WriteFlushStrategy.fsyncOnClose(write), - WriteObjectRequestBuilderFactory::simple) + lift((WriteObjectRequest start, SettableApiFuture resultFuture) -> + new GapicUnbufferedDirectWritableByteChannel( + resultFuture, + getChunkSegmenter(), + write, + WriteObjectRequestBuilderFactory.simple(start))) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java index 3054616cd..f154646a5 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java @@ -25,6 +25,7 @@ import com.google.api.gax.rpc.DataLossException; import com.google.api.gax.rpc.PermissionDeniedException; import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteFlushStrategy.Flusher; import com.google.common.collect.ImmutableList; @@ -149,7 +150,8 @@ public void directUpload() throws IOException, InterruptedException, ExecutionEx WriteObjectResponse resp = resp5; WriteObjectRequest base = WriteObjectRequest.newBuilder().setWriteObjectSpec(spec).build(); - WriteObjectRequestBuilderFactory reqFactory = WriteObjectRequestBuilderFactory.simple(base); + SimpleWriteObjectRequestBuilderFactory reqFactory = + WriteObjectRequestBuilderFactory.simple(base); StorageImplBase service = new DirectWriteService( @@ -157,12 +159,9 @@ public void directUpload() throws IOException, InterruptedException, ExecutionEx try (FakeServer fake = FakeServer.of(service); StorageClient sc = StorageClient.create(fake.storageSettings())) { SettableApiFuture result = SettableApiFuture.create(); - try (GapicUnbufferedWritableByteChannel c = - new GapicUnbufferedWritableByteChannel<>( - result, - segmenter, - reqFactory, - WriteFlushStrategy.fsyncOnClose(sc.writeObjectCallable()))) { + try (GapicUnbufferedDirectWritableByteChannel c = + new GapicUnbufferedDirectWritableByteChannel( + result, segmenter, sc.writeObjectCallable(), reqFactory)) { c.write(ByteBuffer.wrap(bytes)); } assertThat(result.get()).isEqualTo(resp); From 70d7084cafa333f6bff1c16a9e9873510bc5c980 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 22 Apr 2024 18:47:36 -0400 Subject: [PATCH 2/7] chore: pt.2 chunked resumable upload Make dedicated WritableByteChannel to handle chunked resumable uploads. --- ...edChunkedResumableWritableByteChannel.java | 303 ++++++++++++++++++ ...apicWritableByteChannelSessionBuilder.java | 48 ++- ...apicUnbufferedWritableByteChannelTest.java | 105 +++--- 3 files changed, 389 insertions(+), 67 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java new file mode 100644 index 000000000..37d92c4ec --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java @@ -0,0 +1,303 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.LongConsumer; +import java.util.function.Supplier; +import org.checkerframework.checker.nullness.qual.NonNull; + +final class GapicUnbufferedChunkedResumableWritableByteChannel + implements UnbufferedWritableByteChannel { + + private final SettableApiFuture resultFuture; + private final ChunkSegmenter chunkSegmenter; + private final ClientStreamingCallable write; + + private final String bucketName; + private final WriteCtx writeCtx; + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final Supplier baseContextSupplier; + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + + private boolean open = true; + private boolean finished = false; + + GapicUnbufferedChunkedResumableWritableByteChannel( + SettableApiFuture resultFuture, + @NonNull ChunkSegmenter chunkSegmenter, + ClientStreamingCallable write, + ResumableWrite requestFactory, + RetryingDependencies deps, + ResultRetryAlgorithm alg, + Supplier baseContextSupplier) { + this.resultFuture = resultFuture; + this.chunkSegmenter = chunkSegmenter; + this.write = write; + this.bucketName = requestFactory.bucketName(); + this.writeCtx = new WriteCtx<>(requestFactory); + this.deps = deps; + this.alg = alg; + this.baseContextSupplier = baseContextSupplier; + this.sizeCallback = writeCtx.getConfirmedBytes()::set; + this.completeCallback = resultFuture::set; + } + + @Override + public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + return internalWrite(srcs, srcsOffset, srcsLength, false); + } + + @Override + public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + long write = internalWrite(srcs, srcsOffset, srcsLength, true); + close(); + return write; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + if (open && !finished) { + WriteObjectRequest message = finishMessage(true); + try { + flush(ImmutableList.of(message)); + finished = true; + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } + open = false; + } + + private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) + throws ClosedChannelException { + if (!open) { + throw new ClosedChannelException(); + } + + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + + List messages = new ArrayList<>(); + + boolean first = true; + int bytesConsumed = 0; + for (ChunkSegment datum : data) { + Crc32cLengthKnown crc32c = datum.getCrc32c(); + ByteString b = datum.getB(); + int contentSize = b.size(); + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + Crc32cLengthKnown cumulative = + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + WriteObjectRequest.Builder builder = + writeCtx + .newRequestBuilder() + .setWriteOffset(offset) + .setChecksummedData(checksummedData.build()); + if (!datum.isOnlyFullBlocks()) { + builder.setFinishWrite(true); + if (cumulative != null) { + builder.setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + } + finished = true; + } + + WriteObjectRequest build = possiblyPairDownRequest(builder, first).build(); + first = false; + messages.add(build); + bytesConsumed += contentSize; + } + if (finalize && !finished) { + messages.add(finishMessage(first)); + finished = true; + } + + try { + flush(messages); + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + + return bytesConsumed; + } + + @NonNull + private WriteObjectRequest finishMessage(boolean first) { + long offset = writeCtx.getTotalSentBytes().get(); + Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); + + WriteObjectRequest.Builder b = + writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + if (crc32cValue != null) { + b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); + } + WriteObjectRequest message = possiblyPairDownRequest(b, first).build(); + return message; + } + + private void flush(@NonNull List segments) { + GrpcCallContext internalContext = contextWithBucketName(bucketName, baseContextSupplier.get()); + ClientStreamingCallable callable = + write.withDefaultCallContext(internalContext); + + Retrying.run( + deps, + alg, + () -> { + Observer observer = new Observer(sizeCallback, completeCallback); + ApiStreamObserver write = callable.clientStreamingCall(observer); + + for (WriteObjectRequest message : segments) { + write.onNext(message); + } + write.onCompleted(); + observer.await(); + return null; + }, + Decoder.identity()); + } + + /** + * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, + * this utility method centralizes the logic necessary to clear those fields for use by subsequent + * messages. + */ + private static WriteObjectRequest.Builder possiblyPairDownRequest( + WriteObjectRequest.Builder b, boolean firstMessageOfStream) { + if (firstMessageOfStream && b.getWriteOffset() == 0) { + return b; + } + + if (!firstMessageOfStream) { + b.clearUploadId(); + } + + if (b.getWriteOffset() > 0) { + b.clearWriteObjectSpec(); + } + + if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { + b.clearObjectChecksums(); + } + return b; + } + + @VisibleForTesting + WriteCtx getWriteCtx() { + return writeCtx; + } + + static class Observer implements ApiStreamObserver { + + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + + private final SettableApiFuture invocationHandle; + private volatile WriteObjectResponse last; + + Observer(LongConsumer sizeCallback, Consumer completeCallback) { + this.sizeCallback = sizeCallback; + this.completeCallback = completeCallback; + this.invocationHandle = SettableApiFuture.create(); + } + + @Override + public void onNext(WriteObjectResponse value) { + // incremental update + if (value.hasPersistedSize()) { + sizeCallback.accept(value.getPersistedSize()); + } else if (value.hasResource()) { + sizeCallback.accept(value.getResource().getSize()); + } + last = value; + } + + /** + * observed exceptions so far + * + *
    + *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} + *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} + *
  3. {@link io.grpc.StatusRuntimeException} + *
+ */ + @Override + public void onError(Throwable t) { + invocationHandle.setException(t); + } + + @Override + public void onCompleted() { + if (last != null && last.hasResource()) { + completeCallback.accept(last); + } + invocationHandle.set(null); + } + + void await() { + try { + invocationHandle.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 728e3fb2a..26bc48193 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -323,12 +323,24 @@ UnbufferedResumableUploadBuilder setStartAsync(ApiFuture start) UnbufferedWritableByteChannelSession build() { return new UnbufferedWriteSession<>( requireNonNull(start, "start must be non null"), - bindFunction( - fsyncEvery - ? WriteFlushStrategy.fsyncEveryFlush( - write, deps, alg, Retrying::newCallContext) - : WriteFlushStrategy.fsyncOnClose(write), - ResumableWrite::identity) + lift((ResumableWrite start, SettableApiFuture result) -> { + if (fsyncEvery) { + return new GapicUnbufferedChunkedResumableWritableByteChannel( + result, + getChunkSegmenter(), + write, + ResumableWrite.identity(start), + deps, + alg, + Retrying::newCallContext); + } else { + return new GapicUnbufferedWritableByteChannel<>( + result, + getChunkSegmenter(), + ResumableWrite.identity(start), + WriteFlushStrategy.fsyncOnClose(write)); + } + }) .andThen(StorageByteChannels.writable()::createSynchronized)); } } @@ -355,12 +367,24 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture start) { BufferedWritableByteChannelSession build() { return new BufferedWriteSession<>( requireNonNull(start, "start must be non null"), - bindFunction( - fsyncEvery - ? WriteFlushStrategy.fsyncEveryFlush( - write, deps, alg, Retrying::newCallContext) - : WriteFlushStrategy.fsyncOnClose(write), - ResumableWrite::identity) + lift((ResumableWrite start, SettableApiFuture result) -> { + if (fsyncEvery) { + return new GapicUnbufferedChunkedResumableWritableByteChannel( + result, + getChunkSegmenter(), + write, + ResumableWrite.identity(start), + deps, + alg, + Retrying::newCallContext); + } else { + return new GapicUnbufferedWritableByteChannel<>( + result, + getChunkSegmenter(), + ResumableWrite.identity(start), + WriteFlushStrategy.fsyncOnClose(write)); + } + }) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java index f154646a5..edbc8006d 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java @@ -27,7 +27,6 @@ import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.WriteCtx.SimpleWriteObjectRequestBuilderFactory; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; -import com.google.cloud.storage.WriteFlushStrategy.Flusher; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -52,13 +51,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.logging.Logger; import java.util.stream.Collector; import java.util.stream.Collectors; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; public final class ITGapicUnbufferedWritableByteChannelTest { @@ -125,7 +121,7 @@ public final class ITGapicUnbufferedWritableByteChannelTest { private static final WriteObjectResponse resp5 = WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build(); - private static final WriteObjectRequestBuilderFactory reqFactory = + private static final ResumableWrite reqFactory = new ResumableWrite(startReq, startResp, TestUtils.onlyUploadId()); @Test @@ -182,16 +178,15 @@ public void resumableUpload() throws IOException, InterruptedException, Executio try (FakeServer fake = FakeServer.of(service); StorageClient sc = StorageClient.create(fake.storageSettings())) { SettableApiFuture result = SettableApiFuture.create(); - GapicUnbufferedWritableByteChannel c = - new GapicUnbufferedWritableByteChannel<>( + GapicUnbufferedChunkedResumableWritableByteChannel c = + new GapicUnbufferedChunkedResumableWritableByteChannel( result, segmenter, + sc.writeObjectCallable(), reqFactory, - WriteFlushStrategy.fsyncEveryFlush( - sc.writeObjectCallable(), - RetryingDependencies.attemptOnce(), - Retrying.neverRetry(), - Retrying::newCallContext)); + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + Retrying::newCallContext); ArrayList debugMessages = new ArrayList<>(); try { ImmutableList buffers = TestUtils.subDivide(bytes, 10); @@ -266,21 +261,20 @@ public void resumableUpload_chunkAutomaticRetry() try (FakeServer fake = FakeServer.of(service); StorageClient sc = StorageClient.create(fake.storageSettings())) { SettableApiFuture result = SettableApiFuture.create(); - try (GapicUnbufferedWritableByteChannel c = - new GapicUnbufferedWritableByteChannel<>( + try (GapicUnbufferedChunkedResumableWritableByteChannel c = + new GapicUnbufferedChunkedResumableWritableByteChannel( result, segmenter, + sc.writeObjectCallable(), reqFactory, - WriteFlushStrategy.fsyncEveryFlush( - sc.writeObjectCallable(), - TestUtils.defaultRetryingDeps(), - new BasicResultRetryAlgorithm() { - @Override - public boolean shouldRetry(Throwable t, Object ignore) { - return TestUtils.findThrowable(DataLossException.class, t) != null; - } - }, - Retrying::newCallContext))) { + TestUtils.defaultRetryingDeps(), + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry(Throwable t, Object ignore) { + return TestUtils.findThrowable(DataLossException.class, t) != null; + } + }, + Retrying::newCallContext)) { writeCtx = c.getWriteCtx(); ImmutableList buffers = TestUtils.subDivide(bytes, 10); c.write(buffers.get(0)); @@ -308,37 +302,38 @@ public boolean shouldRetry(Throwable t, Object ignore) { @Test public void resumableUpload_finalizeWhenWriteAndCloseCalledEvenWhenQuantumAligned() - throws IOException { - SettableApiFuture result = SettableApiFuture.create(); - - AtomicReference> actualFlush = new AtomicReference<>(); - WriteObjectRequest closeRequestSentinel = - WriteObjectRequest.newBuilder().setUploadId("sentinel").build(); - AtomicReference actualClose = new AtomicReference<>(closeRequestSentinel); - GapicUnbufferedWritableByteChannel c = - new GapicUnbufferedWritableByteChannel<>( - result, - segmenter, - reqFactory, - (bucketName, committedTotalBytesCallback, onSuccessCallback) -> - new Flusher() { - @Override - public void flush(@NonNull List segments) { - actualFlush.compareAndSet(null, segments); - } - - @Override - public void close(@Nullable WriteObjectRequest req) { - actualClose.compareAndSet(closeRequestSentinel, req); - } - }); - - long written = c.writeAndClose(ByteBuffer.wrap(bytes)); - - assertThat(written).isEqualTo(40); - assertThat(actualFlush.get()).isEqualTo(ImmutableList.of(req1, req2, req3, req4, req5)); - // calling close is okay, as long as the provided request is null - assertThat(actualClose.get()).isAnyOf(closeRequestSentinel, null); + throws IOException, InterruptedException, ExecutionException { + ImmutableMap, WriteObjectResponse> writes = + ImmutableMap., WriteObjectResponse>builder() + .put( + ImmutableList.of( + req1, + req2.toBuilder().clearUploadId().build(), + req3.toBuilder().clearUploadId().build(), + req4.toBuilder().clearUploadId().build(), + req5.toBuilder().clearUploadId().build()), + resp5) + .build(); + StorageImplBase service = new DirectWriteService(writes); + try (FakeServer fake = FakeServer.of(service); + StorageClient sc = StorageClient.create(fake.storageSettings())) { + SettableApiFuture result = SettableApiFuture.create(); + GapicUnbufferedChunkedResumableWritableByteChannel c = + new GapicUnbufferedChunkedResumableWritableByteChannel( + result, + segmenter, + sc.writeObjectCallable(), + reqFactory, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + Retrying::newCallContext); + try { + int written = c.writeAndClose(ByteBuffer.wrap(bytes)); + assertThat(written).isEqualTo(bytes.length); + } catch (PermissionDeniedException ignore) { + } + assertThat(result.get()).isEqualTo(resp5); + } } static class DirectWriteService extends StorageImplBase { From 3c739aa557a03b9da873d0e2d46e7de78478f768 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 22 Apr 2024 19:26:09 -0400 Subject: [PATCH 3/7] chore: pt.3 streamed resumable upload Make dedicated WritableByteChannel to handle streamed resumable uploads. --- ...zeOnCloseResumableWritableByteChannel.java | 283 ++++++++++++++++++ ...apicWritableByteChannelSessionBuilder.java | 44 +-- 2 files changed, 287 insertions(+), 40 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java new file mode 100644 index 000000000..f9f8befb5 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java @@ -0,0 +1,283 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.LongConsumer; +import org.checkerframework.checker.nullness.qual.NonNull; + +final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel + implements UnbufferedWritableByteChannel { + + private final SettableApiFuture resultFuture; + private final ChunkSegmenter chunkSegmenter; + private final ClientStreamingCallable write; + + private final WriteCtx writeCtx; + + private final Observer responseObserver; + private volatile ApiStreamObserver stream; + + private boolean open = true; + private boolean first = true; + private boolean finished = false; + + GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + SettableApiFuture resultFuture, + ChunkSegmenter chunkSegmenter, + ClientStreamingCallable write, + ResumableWrite requestFactory) { + String bucketName = requestFactory.bucketName(); + this.resultFuture = resultFuture; + this.chunkSegmenter = chunkSegmenter; + + GrpcCallContext internalContext = + contextWithBucketName(bucketName, GrpcCallContext.createDefault()); + this.write = write.withDefaultCallContext(internalContext); + + this.writeCtx = new WriteCtx<>(requestFactory); + this.responseObserver = new Observer(writeCtx.getConfirmedBytes()::set, resultFuture::set); + } + + @Override + public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + return internalWrite(srcs, srcsOffset, srcsLength, false); + } + + @Override + public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + long write = internalWrite(srcs, srcsOffset, srcsLength, true); + close(); + return write; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + ApiStreamObserver openedStream = openedStream(); + if (!finished) { + WriteObjectRequest message = finishMessage(); + try { + openedStream.onNext(message); + openedStream.onCompleted(); + finished = true; + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } else { + try { + openedStream.onCompleted(); + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } + open = false; + responseObserver.await(); + } + + private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) + throws ClosedChannelException { + if (!open) { + throw new ClosedChannelException(); + } + + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + + List messages = new ArrayList<>(); + + ApiStreamObserver openedStream = openedStream(); + int bytesConsumed = 0; + for (ChunkSegment datum : data) { + Crc32cLengthKnown crc32c = datum.getCrc32c(); + ByteString b = datum.getB(); + int contentSize = b.size(); + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + Crc32cLengthKnown cumulative = + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + WriteObjectRequest.Builder builder = + writeCtx + .newRequestBuilder() + .setWriteOffset(offset) + .setChecksummedData(checksummedData.build()); + if (!datum.isOnlyFullBlocks()) { + builder.setFinishWrite(true); + if (cumulative != null) { + builder.setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + } + finished = true; + } + + WriteObjectRequest build = possiblyPairDownRequest(builder, first).build(); + first = false; + messages.add(build); + bytesConsumed += contentSize; + } + if (finalize && !finished) { + messages.add(finishMessage()); + finished = true; + } + + try { + for (WriteObjectRequest message : messages) { + openedStream.onNext(message); + } + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + + return bytesConsumed; + } + + @NonNull + private WriteObjectRequest finishMessage() { + long offset = writeCtx.getTotalSentBytes().get(); + Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); + + WriteObjectRequest.Builder b = + writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + if (crc32cValue != null) { + b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); + } + WriteObjectRequest message = b.build(); + return message; + } + + private ApiStreamObserver openedStream() { + if (stream == null) { + synchronized (this) { + if (stream == null) { + stream = write.clientStreamingCall(responseObserver); + } + } + } + return stream; + } + + /** + * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, + * this utility method centralizes the logic necessary to clear those fields for use by subsequent + * messages. + */ + private static WriteObjectRequest.Builder possiblyPairDownRequest( + WriteObjectRequest.Builder b, boolean firstMessageOfStream) { + if (firstMessageOfStream && b.getWriteOffset() == 0) { + return b; + } + if (b.getWriteOffset() > 0) { + b.clearWriteObjectSpec(); + } + + if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { + b.clearObjectChecksums(); + } + return b; + } + + static class Observer implements ApiStreamObserver { + + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + + private final SettableApiFuture invocationHandle; + private volatile WriteObjectResponse last; + + Observer(LongConsumer sizeCallback, Consumer completeCallback) { + this.sizeCallback = sizeCallback; + this.completeCallback = completeCallback; + this.invocationHandle = SettableApiFuture.create(); + } + + @Override + public void onNext(WriteObjectResponse value) { + // incremental update + if (value.hasPersistedSize()) { + sizeCallback.accept(value.getPersistedSize()); + } else if (value.hasResource()) { + sizeCallback.accept(value.getResource().getSize()); + } + last = value; + } + + /** + * observed exceptions so far + * + *
    + *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} + *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} + *
  3. {@link io.grpc.StatusRuntimeException} + *
+ */ + @Override + public void onError(Throwable t) { + invocationHandle.setException(t); + } + + @Override + public void onCompleted() { + if (last != null && last.hasResource()) { + completeCallback.accept(last); + } + invocationHandle.set(null); + } + + void await() { + try { + invocationHandle.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 26bc48193..1c6ad18c0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -30,7 +30,6 @@ import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; -import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory; import com.google.storage.v2.QueryWriteStatusRequest; import com.google.storage.v2.QueryWriteStatusResponse; import com.google.storage.v2.ServiceConstants.Values; @@ -38,7 +37,6 @@ import com.google.storage.v2.WriteObjectResponse; import java.nio.ByteBuffer; import java.util.function.BiFunction; -import java.util.function.Function; import org.checkerframework.checker.nullness.qual.NonNull; final class GapicWritableByteChannelSessionBuilder { @@ -125,34 +123,6 @@ JournalingResumableUploadBuilder journaling() { return new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE); } - /** - * When constructing any of our channel sessions, there is always a {@link - * GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction which - * will instantiate the {@link GapicUnbufferedWritableByteChannel} when provided with a {@code - * StartT} value and a {@code SettableApiFuture}. - * - *

As part of providing the function, the provided parameters {@code FlusherFactory} and {@code - * f} are "bound" into the returned function. In conjunction with the configured fields of this - * class a new instance of {@link GapicUnbufferedWritableByteChannel} can be constructed. - */ - private - BiFunction, UnbufferedWritableByteChannel> - bindFunction(FlusherFactory flusherFactory, Function f) { - // it is theoretically possible that the setter methods for the following variables could - // be called again between when this method is invoked and the resulting function is invoked. - // To ensure we are using the specified values at the point in time they are bound to the - // function read them into local variables which will be closed over rather than the class - // fields. - ByteStringStrategy boundStrategy = byteStringStrategy; - Hasher boundHasher = hasher; - return (start, resultFuture) -> - new GapicUnbufferedWritableByteChannel<>( - resultFuture, - new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), - f.apply(start), - flusherFactory); - } - private static BiFunction, UnbufferedWritableByteChannel> lift( @@ -334,11 +304,8 @@ UnbufferedWritableByteChannelSession build() { alg, Retrying::newCallContext); } else { - return new GapicUnbufferedWritableByteChannel<>( - result, - getChunkSegmenter(), - ResumableWrite.identity(start), - WriteFlushStrategy.fsyncOnClose(write)); + return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + result, getChunkSegmenter(), write, ResumableWrite.identity(start)); } }) .andThen(StorageByteChannels.writable()::createSynchronized)); @@ -378,11 +345,8 @@ BufferedWritableByteChannelSession build() { alg, Retrying::newCallContext); } else { - return new GapicUnbufferedWritableByteChannel<>( - result, - getChunkSegmenter(), - ResumableWrite.identity(start), - WriteFlushStrategy.fsyncOnClose(write)); + return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( + result, getChunkSegmenter(), write, ResumableWrite.identity(start)); } }) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) From 77f99925514eaca2ff776235fc8ea1de8eddadea Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 22 Apr 2024 21:10:43 -0400 Subject: [PATCH 4/7] chore: delete GapicUnbufferedWritableByteChannel --- .../GapicUnbufferedWritableByteChannel.java | 180 ------------------ ...HttpWritableByteChannelSessionBuilder.java | 11 -- 2 files changed, 191 deletions(-) delete mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java deleted file mode 100644 index 32d5eb9dc..000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage; - -import com.google.api.core.SettableApiFuture; -import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; -import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; -import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; -import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory; -import com.google.cloud.storage.WriteFlushStrategy.Flusher; -import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import com.google.storage.v2.ChecksummedData; -import com.google.storage.v2.ObjectChecksums; -import com.google.storage.v2.WriteObjectRequest; -import com.google.storage.v2.WriteObjectResponse; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.List; -import org.checkerframework.checker.nullness.qual.NonNull; - -final class GapicUnbufferedWritableByteChannel< - RequestFactoryT extends WriteObjectRequestBuilderFactory> - implements UnbufferedWritableByteChannel { - - private final SettableApiFuture resultFuture; - private final ChunkSegmenter chunkSegmenter; - - private final WriteCtx writeCtx; - private final Flusher flusher; - - private boolean open = true; - private boolean finished = false; - - GapicUnbufferedWritableByteChannel( - SettableApiFuture resultFuture, - ChunkSegmenter chunkSegmenter, - RequestFactoryT requestFactory, - FlusherFactory flusherFactory) { - this.resultFuture = resultFuture; - this.chunkSegmenter = chunkSegmenter; - - this.writeCtx = new WriteCtx<>(requestFactory); - this.flusher = - flusherFactory.newFlusher( - requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set); - } - - @Override - public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { - return internalWrite(srcs, srcsOffset, srcsLength, false); - } - - @Override - public long writeAndClose(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { - long write = internalWrite(srcs, srcsOffset, srcsLength, true); - close(); - return write; - } - - @Override - public boolean isOpen() { - return open; - } - - @Override - public void close() throws IOException { - if (!finished) { - WriteObjectRequest message = finishMessage(); - try { - flusher.close(message); - finished = true; - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; - } - } else { - try { - flusher.close(null); - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; - } - } - open = false; - } - - @VisibleForTesting - WriteCtx getWriteCtx() { - return writeCtx; - } - - private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) - throws ClosedChannelException { - if (!open) { - throw new ClosedChannelException(); - } - - ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); - - List messages = new ArrayList<>(); - - int bytesConsumed = 0; - for (ChunkSegment datum : data) { - Crc32cLengthKnown crc32c = datum.getCrc32c(); - ByteString b = datum.getB(); - int contentSize = b.size(); - long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); - Crc32cLengthKnown cumulative = - writeCtx - .getCumulativeCrc32c() - .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); - ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); - if (crc32c != null) { - checksummedData.setCrc32C(crc32c.getValue()); - } - WriteObjectRequest.Builder builder = - writeCtx - .newRequestBuilder() - .setWriteOffset(offset) - .setChecksummedData(checksummedData.build()); - if (!datum.isOnlyFullBlocks()) { - builder.setFinishWrite(true); - if (cumulative != null) { - builder.setObjectChecksums( - ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); - } - finished = true; - } - - WriteObjectRequest build = builder.build(); - messages.add(build); - bytesConsumed += contentSize; - } - if (finalize && !finished) { - messages.add(finishMessage()); - finished = true; - } - - try { - flusher.flush(messages); - } catch (RuntimeException e) { - resultFuture.setException(e); - throw e; - } - - return bytesConsumed; - } - - @NonNull - private WriteObjectRequest finishMessage() { - long offset = writeCtx.getTotalSentBytes().get(); - Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); - - WriteObjectRequest.Builder b = - writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); - if (crc32cValue != null) { - b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); - } - WriteObjectRequest message = b.build(); - return message; - } -} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java index 19abf0928..9626560e6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java @@ -113,17 +113,6 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) { return new BufferedResumableUploadBuilder(bufferHandle); } - /** - * When constructing any of our channel sessions, there is always a {@link - * GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction - * which will instantiate the {@link GapicUnbufferedWritableByteChannel} when provided with a - * {@code StartT} value and a {@code SettableApiFuture}. - * - *

As part of providing the function, the provided parameters {@code FlusherFactory} and - * {@code f} are "bound" into the returned function. In conjunction with the configured fields - * of this class a new instance of {@link GapicUnbufferedWritableByteChannel} can be - * constructed. - */ private BiFunction< JsonResumableWrite, SettableApiFuture, UnbufferedWritableByteChannel> bindFunction() { From 7b97234ad58f3e0065f74bc1cbe1b25c8eb9549c Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 22 Apr 2024 21:13:57 -0400 Subject: [PATCH 5/7] chore: cleanup WriteFlushStrategy --- .../cloud/storage/WriteFlushStrategy.java | 260 ------------------ .../cloud/storage/WriteFlushStrategyTest.java | 134 --------- 2 files changed, 394 deletions(-) delete mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/WriteFlushStrategyTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java index 0a191c7da..520a8c2d0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java @@ -21,15 +21,12 @@ import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; -import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BidiWriteObjectResponse; -import com.google.storage.v2.WriteObjectRequest; -import com.google.storage.v2.WriteObjectResponse; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -53,28 +50,6 @@ final class WriteFlushStrategy { private WriteFlushStrategy() {} - /** - * Create a {@link Flusher} which will "fsync" every time {@link Flusher#flush(List)} is called - * along with {@link Flusher#close(WriteObjectRequest)}. - */ - static FlusherFactory fsyncEveryFlush( - ClientStreamingCallable write, - RetryingDependencies deps, - ResultRetryAlgorithm alg, - Supplier baseContextSupplier) { - return (String bucketName, - LongConsumer committedTotalBytesCallback, - Consumer onSuccessCallback) -> - new FsyncEveryFlusher( - write, - deps, - alg, - bucketName, - committedTotalBytesCallback, - onSuccessCallback, - baseContextSupplier); - } - /** * Create a {@link BidiFlusher} which will keep a bidirectional stream open, flushing and sending * the appropriate signals to GCS when the buffer is full. @@ -97,20 +72,6 @@ static BidiFlusherFactory defaultBidiFlusher( baseContextSupplier); } - /** - * Create a {@link Flusher} which will "fsync" only on {@link Flusher#close(WriteObjectRequest)}. - * Calls to {@link Flusher#flush(List)} will be sent but not synced. - * - * @see FlusherFactory#newFlusher(String, LongConsumer, Consumer) - */ - static FlusherFactory fsyncOnClose( - ClientStreamingCallable write) { - return (String bucketName, - LongConsumer committedTotalBytesCallback, - Consumer onSuccessCallback) -> - new FsyncOnClose(write, bucketName, committedTotalBytesCallback, onSuccessCallback); - } - static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext baseContext) { if (bucketName != null && !bucketName.isEmpty()) { return baseContext.withExtraHeaders( @@ -120,32 +81,6 @@ static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext return baseContext; } - /** - * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, - * this utility method centralizes the logic necessary to clear those fields for use by subsequent - * messages. - */ - private static WriteObjectRequest possiblyPairDownRequest( - WriteObjectRequest message, boolean firstMessageOfStream) { - if (firstMessageOfStream && message.getWriteOffset() == 0) { - return message; - } - - WriteObjectRequest.Builder b = message.toBuilder(); - if (!firstMessageOfStream) { - b.clearUploadId(); - } - - if (message.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (message.getWriteOffset() > 0 && !message.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b.build(); - } - private static BidiWriteObjectRequest possiblyPairDownBidiRequest( BidiWriteObjectRequest message, boolean firstMessageOfStream) { if (firstMessageOfStream && message.getWriteOffset() == 0) { @@ -167,25 +102,6 @@ private static BidiWriteObjectRequest possiblyPairDownBidiRequest( return b.build(); } - @FunctionalInterface - interface FlusherFactory { - /** - * @param committedTotalBytesCallback Callback to signal the total number of bytes committed by - * this flusher. - * @param onSuccessCallback Callback to signal success, and provide the final response. - */ - Flusher newFlusher( - String bucketName, - LongConsumer committedTotalBytesCallback, - Consumer onSuccessCallback); - } - - interface Flusher { - void flush(@NonNull List segments); - - void close(@Nullable WriteObjectRequest req); - } - @FunctionalInterface interface BidiFlusherFactory { /** @@ -205,65 +121,6 @@ interface BidiFlusher { void close(@Nullable BidiWriteObjectRequest req); } - private static final class FsyncEveryFlusher implements Flusher { - - private final ClientStreamingCallable write; - private final RetryingDependencies deps; - private final ResultRetryAlgorithm alg; - private final String bucketName; - private final LongConsumer sizeCallback; - private final Consumer completeCallback; - private final Supplier baseContextSupplier; - - private FsyncEveryFlusher( - ClientStreamingCallable write, - RetryingDependencies deps, - ResultRetryAlgorithm alg, - String bucketName, - LongConsumer sizeCallback, - Consumer completeCallback, - Supplier baseContextSupplier) { - this.write = write; - this.deps = deps; - this.alg = alg; - this.bucketName = bucketName; - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; - this.baseContextSupplier = baseContextSupplier; - } - - public void flush(@NonNull List segments) { - Retrying.run( - deps, - alg, - () -> { - Observer observer = new Observer(sizeCallback, completeCallback); - GrpcCallContext internalContext = - contextWithBucketName(bucketName, baseContextSupplier.get()); - ApiStreamObserver write = - this.write.withDefaultCallContext(internalContext).clientStreamingCall(observer); - - boolean first = true; - for (WriteObjectRequest message : segments) { - message = possiblyPairDownRequest(message, first); - - write.onNext(message); - first = false; - } - write.onCompleted(); - observer.await(); - return null; - }, - Decoder.identity()); - } - - public void close(@Nullable WriteObjectRequest req) { - if (req != null) { - flush(ImmutableList.of(req)); - } - } - } - public static final class DefaultBidiFlusher implements BidiFlusher { private final BidiStreamingCallable write; @@ -340,123 +197,6 @@ private void ensureOpen() { } } - private static final class FsyncOnClose implements Flusher { - - private final ClientStreamingCallable write; - private final String bucketName; - private final Observer responseObserver; - - private volatile ApiStreamObserver stream; - private boolean first = true; - - private FsyncOnClose( - ClientStreamingCallable write, - String bucketName, - LongConsumer sizeCallback, - Consumer completeCallback) { - this.write = write; - this.bucketName = bucketName; - this.responseObserver = new Observer(sizeCallback, completeCallback); - } - - @Override - public void flush(@NonNull List segments) { - ensureOpen(); - for (WriteObjectRequest message : segments) { - message = possiblyPairDownRequest(message, first); - - stream.onNext(message); - first = false; - } - } - - @Override - public void close(@Nullable WriteObjectRequest message) { - ensureOpen(); - if (message != null) { - message = possiblyPairDownRequest(message, first); - stream.onNext(message); - } - stream.onCompleted(); - responseObserver.await(); - } - - private void ensureOpen() { - if (stream == null) { - synchronized (this) { - if (stream == null) { - GrpcCallContext internalContext = - contextWithBucketName(bucketName, GrpcCallContext.createDefault()); - stream = - this.write - .withDefaultCallContext(internalContext) - .clientStreamingCall(responseObserver); - } - } - } - } - } - - static class Observer implements ApiStreamObserver { - - private final LongConsumer sizeCallback; - private final Consumer completeCallback; - - private final SettableApiFuture invocationHandle; - private volatile WriteObjectResponse last; - - Observer(LongConsumer sizeCallback, Consumer completeCallback) { - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; - this.invocationHandle = SettableApiFuture.create(); - } - - @Override - public void onNext(WriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - sizeCallback.accept(value.getPersistedSize()); - } else if (value.hasResource()) { - sizeCallback.accept(value.getResource().getSize()); - } - last = value; - } - - /** - * observed exceptions so far - * - *

    - *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} - *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} - *
  3. {@link io.grpc.StatusRuntimeException} - *
- */ - @Override - public void onError(Throwable t) { - invocationHandle.setException(t); - } - - @Override - public void onCompleted() { - if (last != null && last.hasResource()) { - completeCallback.accept(last); - } - invocationHandle.set(null); - } - - void await() { - try { - invocationHandle.get(); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new RuntimeException(e); - } - } - } - } - static class BidiObserver implements ApiStreamObserver { private final LongConsumer sizeCallback; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/WriteFlushStrategyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/WriteFlushStrategyTest.java deleted file mode 100644 index f4c037b18..000000000 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/WriteFlushStrategyTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.rpc.ApiCallContext; -import com.google.api.gax.rpc.ApiStreamObserver; -import com.google.api.gax.rpc.ClientStreamingCallable; -import com.google.cloud.storage.Retrying.RetryingDependencies; -import com.google.cloud.storage.WriteFlushStrategy.Flusher; -import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.storage.v2.WriteObjectRequest; -import com.google.storage.v2.WriteObjectResponse; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import org.junit.Test; - -public final class WriteFlushStrategyTest { - private static final Map> expectedHeaderNonNullNonEmpty = - ImmutableMap.of("x-goog-request-params", ImmutableList.of("bucket=bucket-name")); - private static final Map> expectedHeaderNonNullEmpty = ImmutableMap.of(); - private static final Map> expectedHeaderNull = ImmutableMap.of(); - - @Test - public void bucketNameAddedToXGoogRequestParams_nonNull_nonEmpty_fsyncEveryFlush() { - doTest( - write -> - WriteFlushStrategy.fsyncEveryFlush( - write, - RetryingDependencies.attemptOnce(), - Retrying.neverRetry(), - GrpcCallContext::createDefault), - "bucket-name", - expectedHeaderNonNullNonEmpty); - } - - @Test - public void bucketNameAddedToXGoogRequestParams_nonNull_nonEmpty_fsyncOnClose() { - doTest(WriteFlushStrategy::fsyncOnClose, "bucket-name", expectedHeaderNonNullNonEmpty); - } - - @Test - public void bucketNameNotAddedToXGoogRequestParams_nonNull_empty_fsyncEveryFlush() { - doTest( - write -> - WriteFlushStrategy.fsyncEveryFlush( - write, - RetryingDependencies.attemptOnce(), - Retrying.neverRetry(), - GrpcCallContext::createDefault), - "", - expectedHeaderNonNullEmpty); - } - - @Test - public void bucketNameNotAddedToXGoogRequestParams_nonNull_empty_fsyncOnClose() { - doTest(WriteFlushStrategy::fsyncOnClose, "", expectedHeaderNonNullEmpty); - } - - @Test - public void bucketNameNotAddedToXGoogRequestParams_null_fsyncEveryFlush() { - doTest( - write -> - WriteFlushStrategy.fsyncEveryFlush( - write, - RetryingDependencies.attemptOnce(), - Retrying.neverRetry(), - GrpcCallContext::createDefault), - null, - expectedHeaderNull); - } - - @Test - public void bucketNameNotAddedToXGoogRequestParams_null_fsyncOnClose() { - doTest(WriteFlushStrategy::fsyncOnClose, null, expectedHeaderNull); - } - - private static void doTest( - Function, FlusherFactory> ff, - String bucketName, - Map> expectedHeader) { - AtomicLong c = new AtomicLong(0); - AtomicReference ref = new AtomicReference<>(); - AtomicReference>> actualHeader = new AtomicReference<>(); - ClientStreamingCallable write = - new ClientStreamingCallable() { - @Override - public ApiStreamObserver clientStreamingCall( - ApiStreamObserver responseObserver, ApiCallContext context) { - Map> extraHeaders = context.getExtraHeaders(); - actualHeader.compareAndSet(null, extraHeaders); - return new ApiStreamObserver() { - @Override - public void onNext(WriteObjectRequest value) {} - - @Override - public void onError(Throwable t) {} - - @Override - public void onCompleted() { - responseObserver.onCompleted(); - } - }; - } - }; - FlusherFactory factory = ff.apply(write); - Flusher flusher = factory.newFlusher(bucketName, c::addAndGet, ref::set); - flusher.flush(Collections.emptyList()); - flusher.close(null); - assertThat(actualHeader.get()).isEqualTo(expectedHeader); - } -} From d87105aea7869048ef08827f45519659dd67f14f Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 23 Apr 2024 20:24:45 -0400 Subject: [PATCH 6/7] chore: pt.4 BidiWrite chunked resumable upload --- ...apicBidiUnbufferedWritableByteChannel.java | 185 +++++++++++-- ...BidiWritableByteChannelSessionBuilder.java | 57 ++-- ...edChunkedResumableWritableByteChannel.java | 2 +- ...icUnbufferedDirectWritableByteChannel.java | 2 +- ...zeOnCloseResumableWritableByteChannel.java | 2 +- .../com/google/cloud/storage/GrpcUtils.java | 35 +++ ...ndUploadUnbufferedWritableByteChannel.java | 2 +- .../cloud/storage/WriteFlushStrategy.java | 261 ------------------ .../it/ITObjectChecksumSupportTest.java | 73 ++++- 9 files changed, 302 insertions(+), 317 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java delete mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java index d19a88026..19aba735e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -16,9 +16,17 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.GrpcUtils.contextWithBucketName; + import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -31,33 +39,45 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.NonNull; -final class GapicBidiUnbufferedWritableByteChannel< - RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory> - implements UnbufferedWritableByteChannel { - +final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel { + private final BidiStreamingCallable write; + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final String bucketName; + private final Supplier baseContextSupplier; private final SettableApiFuture resultFuture; private final ChunkSegmenter chunkSegmenter; - private final BidiWriteCtx writeCtx; - private final WriteFlushStrategy.BidiFlusher flusher; + private final BidiWriteCtx writeCtx; + private final BidiObserver responseObserver; + private volatile ApiStreamObserver stream; private boolean open = true; + private boolean first = true; private boolean finished = false; GapicBidiUnbufferedWritableByteChannel( + BidiStreamingCallable write, + RetryingDependencies deps, + ResultRetryAlgorithm alg, SettableApiFuture resultFuture, ChunkSegmenter chunkSegmenter, - RequestFactoryT requestFactory, - WriteFlushStrategy.BidiFlusherFactory flusherFactory) { + BidiResumableWrite requestFactory, + Supplier baseContextSupplier) { + this.write = write; + this.deps = deps; + this.alg = alg; + this.baseContextSupplier = baseContextSupplier; + this.bucketName = requestFactory.bucketName(); this.resultFuture = resultFuture; this.chunkSegmenter = chunkSegmenter; this.writeCtx = new BidiWriteCtx<>(requestFactory); - this.flusher = - flusherFactory.newFlusher( - requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set); + this.responseObserver = new BidiObserver(); } @Override @@ -65,6 +85,13 @@ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOEx return internalWrite(srcs, srcsOffset, srcsLength, false); } + @Override + public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException { + long written = internalWrite(srcs, offset, length, true); + close(); + return written; + } + @Override public boolean isOpen() { return open; @@ -72,23 +99,29 @@ public boolean isOpen() { @Override public void close() throws IOException { + if (!open) { + return; + } + ApiStreamObserver openedStream = openedStream(); if (!finished) { BidiWriteObjectRequest message = finishMessage(); try { - flusher.close(message); + openedStream.onNext(message); finished = true; + openedStream.onCompleted(); } catch (RuntimeException e) { resultFuture.setException(e); throw e; } } else { - flusher.close(null); + openedStream.onCompleted(); } + responseObserver.await(); open = false; } @VisibleForTesting - BidiWriteCtx getWriteCtx() { + BidiWriteCtx getWriteCtx() { return writeCtx; } @@ -130,7 +163,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo finished = true; } - BidiWriteObjectRequest build = builder.build(); + BidiWriteObjectRequest build = possiblyPairDownBidiRequest(builder, first).build(); + first = false; messages.add(build); bytesConsumed += contentSize; } @@ -140,7 +174,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo } try { - flusher.flush(messages); + flush(messages); } catch (RuntimeException e) { resultFuture.setException(e); throw e; @@ -162,4 +196,123 @@ private BidiWriteObjectRequest finishMessage() { BidiWriteObjectRequest message = b.build(); return message; } + + private ApiStreamObserver openedStream() { + if (stream == null) { + synchronized (this) { + if (stream == null) { + GrpcCallContext internalContext = + contextWithBucketName(bucketName, baseContextSupplier.get()); + stream = + this.write + .withDefaultCallContext(internalContext) + .bidiStreamingCall(responseObserver); + responseObserver.sem.drainPermits(); + } + } + } + return stream; + } + + private void flush(@NonNull List segments) { + Retrying.run( + deps, + alg, + () -> { + try { + ApiStreamObserver opened = openedStream(); + for (BidiWriteObjectRequest message : segments) { + opened.onNext(message); + } + if (!finished) { + BidiWriteObjectRequest message = + BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build(); + opened.onNext(message); + } + responseObserver.await(); + return null; + } catch (Exception e) { + stream = null; + first = true; + throw e; + } + }, + Decoder.identity()); + } + + private static BidiWriteObjectRequest.Builder possiblyPairDownBidiRequest( + BidiWriteObjectRequest.Builder b, boolean firstMessageOfStream) { + if (firstMessageOfStream && b.getWriteOffset() == 0) { + return b; + } + + if (!firstMessageOfStream) { + b.clearUploadId(); + } + + if (b.getWriteOffset() > 0) { + b.clearWriteObjectSpec(); + } + + if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { + b.clearObjectChecksums(); + } + return b; + } + + private class BidiObserver implements ApiStreamObserver { + + private final Semaphore sem; + private volatile BidiWriteObjectResponse last; + private volatile RuntimeException previousError; + + private BidiObserver() { + this.sem = new Semaphore(0); + } + + @Override + public void onNext(BidiWriteObjectResponse value) { + // incremental update + if (value.hasPersistedSize()) { + writeCtx.getConfirmedBytes().set((value.getPersistedSize())); + } else if (value.hasResource()) { + writeCtx.getConfirmedBytes().set(value.getResource().getSize()); + } + sem.release(); + last = value; + } + + @Override + public void onError(Throwable t) { + if (t instanceof RuntimeException) { + previousError = (RuntimeException) t; + } + sem.release(); + } + + @Override + public void onCompleted() { + if (last != null && last.hasResource()) { + resultFuture.set(last); + } + sem.release(); + } + + void await() { + try { + sem.acquire(); + } catch (InterruptedException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + RuntimeException err = previousError; + if (err != null) { + previousError = null; + throw err; + } + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java index e26d33bbb..536eba7fb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java @@ -30,7 +30,6 @@ import com.google.storage.v2.ServiceConstants.Values; import java.nio.ByteBuffer; import java.util.function.BiFunction; -import java.util.function.Function; final class GapicBidiWritableByteChannelSessionBuilder { @@ -78,36 +77,6 @@ GapicBidiWritableByteChannelSessionBuilder setByteStringStrategy( return this; } - /** - * When constructing a bidi channel session, there is always a {@link - * GapicBidiUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction - * which will instantiate the {@link GapicBidiUnbufferedWritableByteChannel} when provided with a - * {@code StartT} value and a {@code SettableApiFuture}. - * - *

As part of providing the function, the provided parameters {@code BidiFlusherFactory} and - * {@code f} are "bound" into the returned function. In conjunction with the configured fields of - * this class a new instance of {@link GapicBidiUnbufferedWritableByteChannel} can be constructed. - */ - private - BiFunction, UnbufferedWritableByteChannel> - bindFunction( - WriteFlushStrategy.BidiFlusherFactory flusherFactory, - Function f) { - // it is theoretically possible that the setter methods for the following variables could - // be called again between when this method is invoked and the resulting function is invoked. - // To ensure we are using the specified values at the point in time they are bound to the - // function read them into local variables which will be closed over rather than the class - // fields. - ByteStringStrategy boundStrategy = byteStringStrategy; - Hasher boundHasher = hasher; - return (start, resultFuture) -> - new GapicBidiUnbufferedWritableByteChannel<>( - resultFuture, - new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), - f.apply(start), - flusherFactory); - } - GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder resumable() { return new GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder(); } @@ -164,12 +133,30 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture start } BufferedWritableByteChannelSession build() { + // it is theoretically possible that the setter methods for the following variables could + // be called again between when this method is invoked and the resulting function is + // invoked. + // To ensure we are using the specified values at the point in time they are bound to the + // function read them into local variables which will be closed over rather than the class + // fields. + ByteStringStrategy boundStrategy = byteStringStrategy; + Hasher boundHasher = hasher; return new BufferedWriteSession<>( requireNonNull(start, "start must be non null"), - bindFunction( - WriteFlushStrategy.defaultBidiFlusher( - write, deps, alg, Retrying::newCallContext), - BidiResumableWrite::identity) + ((BiFunction< + BidiResumableWrite, + SettableApiFuture, + UnbufferedWritableByteChannel>) + (start, resultFuture) -> + new GapicBidiUnbufferedWritableByteChannel( + write, + deps, + alg, + resultFuture, + new ChunkSegmenter( + boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), + start, + Retrying::newCallContext)) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java index 37d92c4ec..aa48810d4 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java @@ -16,7 +16,7 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; +import static com.google.cloud.storage.GrpcUtils.contextWithBucketName; import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java index 2ae762916..861e867db 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedDirectWritableByteChannel.java @@ -16,7 +16,7 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; +import static com.google.cloud.storage.GrpcUtils.contextWithBucketName; import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java index f9f8befb5..16c6ec0ae 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java @@ -16,7 +16,7 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; +import static com.google.cloud.storage.GrpcUtils.contextWithBucketName; import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java new file mode 100644 index 000000000..7e8101155 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +final class GrpcUtils { + + private GrpcUtils() {} + + static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext baseContext) { + if (bucketName != null && !bucketName.isEmpty()) { + return baseContext.withExtraHeaders( + ImmutableMap.of( + "x-goog-request-params", ImmutableList.of(String.format("bucket=%s", bucketName)))); + } + return baseContext; + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java index 4169b8410..67b5c089e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/SyncAndUploadUnbufferedWritableByteChannel.java @@ -16,7 +16,7 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.WriteFlushStrategy.contextWithBucketName; +import static com.google.cloud.storage.GrpcUtils.contextWithBucketName; import com.google.api.core.SettableApiFuture; import com.google.api.gax.grpc.GrpcCallContext; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java deleted file mode 100644 index 520a8c2d0..000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Copyright 2022 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.storage; - -import com.google.api.core.SettableApiFuture; -import com.google.api.gax.grpc.GrpcCallContext; -import com.google.api.gax.retrying.ResultRetryAlgorithm; -import com.google.api.gax.rpc.ApiStreamObserver; -import com.google.api.gax.rpc.BidiStreamingCallable; -import com.google.cloud.storage.Conversions.Decoder; -import com.google.cloud.storage.Retrying.RetryingDependencies; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.storage.v2.BidiWriteObjectRequest; -import com.google.storage.v2.BidiWriteObjectResponse; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; -import java.util.function.LongConsumer; -import java.util.function.Supplier; -import org.checkerframework.checker.nullness.qual.NonNull; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * When writing to GCS using the WriteObject rpc, there are some behavioral differences between - * performing a "direct" upload vs a "resumable" upload. - * - *

This class provides the encapsulation of the stream and "fsync" lifecycles and possible - * automatic retry where applicable. - * - *

In this class "fsync" is used to mean "complete the client stream to GCS and await its - * response". We are loosely following the concept used in linux to flush data to disk fsync(2) - */ -final class WriteFlushStrategy { - - private WriteFlushStrategy() {} - - /** - * Create a {@link BidiFlusher} which will keep a bidirectional stream open, flushing and sending - * the appropriate signals to GCS when the buffer is full. - */ - static BidiFlusherFactory defaultBidiFlusher( - BidiStreamingCallable write, - RetryingDependencies deps, - ResultRetryAlgorithm alg, - Supplier baseContextSupplier) { - return (String bucketName, - LongConsumer committedTotalBytesCallback, - Consumer onSuccessCallback) -> - new DefaultBidiFlusher( - write, - deps, - alg, - bucketName, - committedTotalBytesCallback, - onSuccessCallback, - baseContextSupplier); - } - - static GrpcCallContext contextWithBucketName(String bucketName, GrpcCallContext baseContext) { - if (bucketName != null && !bucketName.isEmpty()) { - return baseContext.withExtraHeaders( - ImmutableMap.of( - "x-goog-request-params", ImmutableList.of(String.format("bucket=%s", bucketName)))); - } - return baseContext; - } - - private static BidiWriteObjectRequest possiblyPairDownBidiRequest( - BidiWriteObjectRequest message, boolean firstMessageOfStream) { - if (firstMessageOfStream && message.getWriteOffset() == 0) { - return message; - } - - BidiWriteObjectRequest.Builder b = message.toBuilder(); - if (!firstMessageOfStream) { - b.clearUploadId(); - } - - if (message.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (message.getWriteOffset() > 0 && !message.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b.build(); - } - - @FunctionalInterface - interface BidiFlusherFactory { - /** - * @param committedTotalBytesCallback Callback to signal the total number of bytes committed by - * this flusher. - * @param onSuccessCallback Callback to signal success, and provide the final response. - */ - BidiFlusher newFlusher( - String bucketName, - LongConsumer committedTotalBytesCallback, - Consumer onSuccessCallback); - } - - interface BidiFlusher { - void flush(@NonNull List segments); - - void close(@Nullable BidiWriteObjectRequest req); - } - - public static final class DefaultBidiFlusher implements BidiFlusher { - - private final BidiStreamingCallable write; - private final RetryingDependencies deps; - private final ResultRetryAlgorithm alg; - private final String bucketName; - private final LongConsumer sizeCallback; - private final Consumer completeCallback; - private final Supplier baseContextSupplier; - private volatile ApiStreamObserver stream; - - private final BidiObserver responseObserver; - - private DefaultBidiFlusher( - BidiStreamingCallable write, - RetryingDependencies deps, - ResultRetryAlgorithm alg, - String bucketName, - LongConsumer sizeCallback, - Consumer completeCallback, - Supplier baseContextSupplier) { - this.write = write; - this.deps = deps; - this.alg = alg; - this.bucketName = bucketName; - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; - this.baseContextSupplier = baseContextSupplier; - this.responseObserver = new BidiObserver(sizeCallback, completeCallback); - } - - public void flush(@NonNull List segments) { - ensureOpen(); - Retrying.run( - deps, - alg, - () -> { - boolean first = true; - for (BidiWriteObjectRequest message : segments) { - message = possiblyPairDownBidiRequest(message, first); - - stream.onNext(message); - first = false; - } - BidiWriteObjectRequest message = - BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build(); - stream.onNext(message); - responseObserver.await(); - return null; - }, - Decoder.identity()); - } - - public void close(@Nullable BidiWriteObjectRequest req) { - ensureOpen(); - if (req != null) { - flush(ImmutableList.of(req)); - } - } - - private void ensureOpen() { - if (stream == null) { - synchronized (this) { - if (stream == null) { - GrpcCallContext internalContext = - contextWithBucketName(bucketName, baseContextSupplier.get()); - stream = - this.write - .withDefaultCallContext(internalContext) - .bidiStreamingCall(responseObserver); - } - } - } - } - } - - static class BidiObserver implements ApiStreamObserver { - - private final LongConsumer sizeCallback; - private final Consumer completeCallback; - - private final SettableApiFuture invocationHandle; - private volatile BidiWriteObjectResponse last; - - BidiObserver(LongConsumer sizeCallback, Consumer completeCallback) { - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; - this.invocationHandle = SettableApiFuture.create(); - } - - @Override - public void onNext(BidiWriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - sizeCallback.accept(value.getPersistedSize()); - invocationHandle.set(null); - } else if (value.hasResource()) { - sizeCallback.accept(value.getResource().getSize()); - } - last = value; - } - - /** - * observed exceptions so far - * - *

    - *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} - *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} - *
  3. {@link io.grpc.StatusRuntimeException} - *
- */ - @Override - public void onError(Throwable t) { - invocationHandle.setException(t); - } - - @Override - public void onCompleted() { - - if (last != null && last.hasResource()) { - completeCallback.accept(last); - } - invocationHandle.set(null); - } - - void await() { - try { - invocationHandle.get(); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } else { - throw new RuntimeException(e); - } - } - } - } -} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java index 12bbf3df5..a151ca030 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java @@ -23,11 +23,14 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.BlobWriteSessionConfigs; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.DataGenerator; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.TmpFile; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.it.ITObjectChecksumSupportTest.ChecksummedTestContentProvider; @@ -47,8 +50,10 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -75,6 +80,7 @@ public static final class ChecksummedTestContentProvider implements ParametersPr @Override public ImmutableList parameters() { DataGenerator gen = DataGenerator.base64Characters(); + int _256KiB = 256 * 1024; int _2MiB = 2 * 1024 * 1024; int _24MiB = 24 * 1024 * 1024; @@ -84,7 +90,9 @@ public ImmutableList parameters() { // med, multiple messages single stream when resumable ChecksummedTestContent.of(gen.genBytes(_2MiB + 3)), // large, multiple messages and multiple streams when resumable - ChecksummedTestContent.of(gen.genBytes(_24MiB + 5))); + ChecksummedTestContent.of(gen.genBytes(_24MiB + 5)), + // quantum aligned number of bytes + ChecksummedTestContent.of(gen.genBytes(_2MiB * 8 + _256KiB))); } } @@ -282,4 +290,67 @@ public void testMd5Validated_writer_expectSuccess() throws IOException { Blob blob = storage.get(blobId); assertThat(blob.getMd5()).isEqualTo(content.getMd5Base64()); } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void testCrc32cValidated_bidiWrite_expectSuccess() throws Exception { + String blobName = generator.randomObjectName(); + BlobId blobId = BlobId.of(bucket.getName(), blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(content.getCrc32cBase64()).build(); + + byte[] bytes = content.getBytes(); + + StorageOptions optionsWithBidi = + this.storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + + try (Storage storage = optionsWithBidi.getService()) { + BlobWriteSession session = + storage.blobWriteSession( + blobInfo, BlobWriteOption.doesNotExist(), BlobWriteOption.crc32cMatch()); + + try (ReadableByteChannel src = Channels.newChannel(new ByteArrayInputStream(bytes)); + WritableByteChannel dst = session.open()) { + ByteStreams.copy(src, dst); + } + + BlobInfo gen1 = session.getResult().get(5, TimeUnit.SECONDS); + assertThat(gen1.getCrc32c()).isEqualTo(content.getCrc32cBase64()); + } + } + + @Test + @CrossRun.Exclude(transports = Transport.HTTP) + public void testCrc32cValidated_bidiWrite_expectFailure() throws Exception { + String blobName = generator.randomObjectName(); + BlobId blobId = BlobId.of(bucket.getName(), blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(content.getCrc32cBase64()).build(); + + byte[] bytes = content.concat('x'); + + StorageOptions optionsWithBidi = + this.storage + .getOptions() + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + + try (Storage storage = optionsWithBidi.getService()) { + BlobWriteSession session = + storage.blobWriteSession( + blobInfo, BlobWriteOption.doesNotExist(), BlobWriteOption.crc32cMatch()); + + WritableByteChannel dst = session.open(); + try (ReadableByteChannel src = Channels.newChannel(new ByteArrayInputStream(bytes))) { + ByteStreams.copy(src, dst); + } + + StorageException expected = assertThrows(StorageException.class, dst::close); + + assertThat(expected.getCode()).isEqualTo(400); + } + } } From 77c4925ca2b5954cc612615f3bb59625eba05e0f Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 24 Apr 2024 15:28:28 -0400 Subject: [PATCH 7/7] chore: clirr --- google-cloud-storage/clirr-ignored-differences.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index cc9d69330..df4e590cc 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -78,4 +78,10 @@ com.google.cloud.storage.StorageOptions$Builder setBlobWriteSessionConfig(com.google.cloud.storage.BlobWriteSessionConfig) + + + 8001 + com/google/cloud/storage/WriteFlushStrategy$DefaultBidiFlusher + +