Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove WriteFlusher instead having dedicated WritableByteChannels #2514

Merged
merged 7 commits into from May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Expand Up @@ -78,4 +78,10 @@
<method>com.google.cloud.storage.StorageOptions$Builder setBlobWriteSessionConfig(com.google.cloud.storage.BlobWriteSessionConfig)</method>
</difference>

<!-- somehow clirr things a public class in a package-private class is part of the public api -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/storage/WriteFlushStrategy$DefaultBidiFlusher</className>
</difference>

</differences>
Expand Up @@ -16,9 +16,17 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.GrpcUtils.contextWithBucketName;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
Expand All @@ -31,64 +39,89 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

final class GapicBidiUnbufferedWritableByteChannel<
RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory>
implements UnbufferedWritableByteChannel {

final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
private final BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write;
private final RetryingDependencies deps;
private final ResultRetryAlgorithm<?> alg;
private final String bucketName;
private final Supplier<GrpcCallContext> baseContextSupplier;
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
private final ChunkSegmenter chunkSegmenter;

private final BidiWriteCtx<RequestFactoryT> writeCtx;
private final WriteFlushStrategy.BidiFlusher flusher;
private final BidiWriteCtx<BidiResumableWrite> writeCtx;
private final BidiObserver responseObserver;

private volatile ApiStreamObserver<BidiWriteObjectRequest> stream;
private boolean open = true;
private boolean first = true;
private boolean finished = false;

GapicBidiUnbufferedWritableByteChannel(
BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write,
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
ChunkSegmenter chunkSegmenter,
RequestFactoryT requestFactory,
WriteFlushStrategy.BidiFlusherFactory flusherFactory) {
BidiResumableWrite requestFactory,
Supplier<GrpcCallContext> baseContextSupplier) {
this.write = write;
this.deps = deps;
this.alg = alg;
this.baseContextSupplier = baseContextSupplier;
this.bucketName = requestFactory.bucketName();
this.resultFuture = resultFuture;
this.chunkSegmenter = chunkSegmenter;

this.writeCtx = new BidiWriteCtx<>(requestFactory);
this.flusher =
flusherFactory.newFlusher(
requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set);
this.responseObserver = new BidiObserver();
}

@Override
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
return internalWrite(srcs, srcsOffset, srcsLength, false);
}

@Override
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
long written = internalWrite(srcs, offset, length, true);
close();
return written;
}

@Override
public boolean isOpen() {
return open;
}

@Override
public void close() throws IOException {
if (!open) {
return;
}
ApiStreamObserver<BidiWriteObjectRequest> openedStream = openedStream();
if (!finished) {
BidiWriteObjectRequest message = finishMessage();
try {
flusher.close(message);
openedStream.onNext(message);
finished = true;
openedStream.onCompleted();
} catch (RuntimeException e) {
resultFuture.setException(e);
throw e;
}
} else {
flusher.close(null);
openedStream.onCompleted();
}
responseObserver.await();
open = false;
}

@VisibleForTesting
BidiWriteCtx<RequestFactoryT> getWriteCtx() {
BidiWriteCtx<BidiResumableWrite> getWriteCtx() {
return writeCtx;
}

Expand Down Expand Up @@ -130,7 +163,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
finished = true;
}

BidiWriteObjectRequest build = builder.build();
BidiWriteObjectRequest build = possiblyPairDownBidiRequest(builder, first).build();
first = false;
messages.add(build);
bytesConsumed += contentSize;
}
Expand All @@ -140,7 +174,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
}

try {
flusher.flush(messages);
flush(messages);
} catch (RuntimeException e) {
resultFuture.setException(e);
throw e;
Expand All @@ -162,4 +196,123 @@ private BidiWriteObjectRequest finishMessage() {
BidiWriteObjectRequest message = b.build();
return message;
}

private ApiStreamObserver<BidiWriteObjectRequest> openedStream() {
if (stream == null) {
synchronized (this) {
if (stream == null) {
GrpcCallContext internalContext =
contextWithBucketName(bucketName, baseContextSupplier.get());
stream =
this.write
.withDefaultCallContext(internalContext)
.bidiStreamingCall(responseObserver);
responseObserver.sem.drainPermits();
}
}
}
return stream;
}

private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
Retrying.run(
deps,
alg,
() -> {
try {
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream();
for (BidiWriteObjectRequest message : segments) {
opened.onNext(message);
}
if (!finished) {
BidiWriteObjectRequest message =
BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build();
opened.onNext(message);
}
responseObserver.await();
return null;
} catch (Exception e) {
stream = null;
first = true;
throw e;
}
},
Decoder.identity());
}

private static BidiWriteObjectRequest.Builder possiblyPairDownBidiRequest(
BidiWriteObjectRequest.Builder b, boolean firstMessageOfStream) {
if (firstMessageOfStream && b.getWriteOffset() == 0) {
return b;
}

if (!firstMessageOfStream) {
b.clearUploadId();
}

if (b.getWriteOffset() > 0) {
b.clearWriteObjectSpec();
}

if (b.getWriteOffset() > 0 && !b.getFinishWrite()) {
b.clearObjectChecksums();
}
return b;
}

private class BidiObserver implements ApiStreamObserver<BidiWriteObjectResponse> {

private final Semaphore sem;
private volatile BidiWriteObjectResponse last;
private volatile RuntimeException previousError;

private BidiObserver() {
this.sem = new Semaphore(0);
}

@Override
public void onNext(BidiWriteObjectResponse value) {
// incremental update
if (value.hasPersistedSize()) {
writeCtx.getConfirmedBytes().set((value.getPersistedSize()));
} else if (value.hasResource()) {
writeCtx.getConfirmedBytes().set(value.getResource().getSize());
}
sem.release();
last = value;
}

@Override
public void onError(Throwable t) {
if (t instanceof RuntimeException) {
previousError = (RuntimeException) t;
}
sem.release();
}

@Override
public void onCompleted() {
if (last != null && last.hasResource()) {
resultFuture.set(last);
}
sem.release();
}

void await() {
try {
sem.acquire();
} catch (InterruptedException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
}
RuntimeException err = previousError;
if (err != null) {
previousError = null;
throw err;
}
}
}
}
Expand Up @@ -30,7 +30,6 @@
import com.google.storage.v2.ServiceConstants.Values;
import java.nio.ByteBuffer;
import java.util.function.BiFunction;
import java.util.function.Function;

final class GapicBidiWritableByteChannelSessionBuilder {

Expand Down Expand Up @@ -78,36 +77,6 @@ GapicBidiWritableByteChannelSessionBuilder setByteStringStrategy(
return this;
}

/**
* When constructing a bidi channel session, there is always a {@link
* GapicBidiUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction
* which will instantiate the {@link GapicBidiUnbufferedWritableByteChannel} when provided with a
* {@code StartT} value and a {@code SettableApiFuture<BidiWriteObjectResponse>}.
*
* <p>As part of providing the function, the provided parameters {@code BidiFlusherFactory} and
* {@code f} are "bound" into the returned function. In conjunction with the configured fields of
* this class a new instance of {@link GapicBidiUnbufferedWritableByteChannel} can be constructed.
*/
private <StartT, RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory>
BiFunction<StartT, SettableApiFuture<BidiWriteObjectResponse>, UnbufferedWritableByteChannel>
bindFunction(
WriteFlushStrategy.BidiFlusherFactory flusherFactory,
Function<StartT, RequestFactoryT> f) {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return (start, resultFuture) ->
new GapicBidiUnbufferedWritableByteChannel<>(
resultFuture,
new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
f.apply(start),
flusherFactory);
}

GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder resumable() {
return new GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder();
}
Expand Down Expand Up @@ -164,12 +133,30 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture<BidiResumableWrite> start
}

BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is
// invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction(
WriteFlushStrategy.defaultBidiFlusher(
write, deps, alg, Retrying::newCallContext),
BidiResumableWrite::identity)
((BiFunction<
BidiResumableWrite,
SettableApiFuture<BidiWriteObjectResponse>,
UnbufferedWritableByteChannel>)
(start, resultFuture) ->
new GapicBidiUnbufferedWritableByteChannel(
write,
deps,
alg,
resultFuture,
new ChunkSegmenter(
boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
start,
Retrying::newCallContext))
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
Expand Down