Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update all automatic retry behavior to be idempotency aware #1132

Merged
merged 4 commits into from Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -87,7 +87,7 @@ public Builder setTransportOptions(TransportOptions transportOptions) {
* @return the builder
* @see StorageRetryStrategy#getDefaultStorageRetryStrategy()
*/
Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) {
public Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy) {
this.storageRetryStrategy =
requireNonNull(storageRetryStrategy, "storageRetryStrategy must be non null");
return this;
Expand Down Expand Up @@ -125,7 +125,7 @@ public TransportOptions getDefaultTransportOptions() {
}

public StorageRetryStrategy getStorageRetryStrategy() {
return StorageRetryStrategy.getLegacyStorageRetryStrategy();
return StorageRetryStrategy.getDefaultStorageRetryStrategy();
}
}

Expand Down
Expand Up @@ -29,7 +29,7 @@
* @see #getDefaultStorageRetryStrategy()
* @see #getUniformStorageRetryStrategy()
*/
interface StorageRetryStrategy extends Serializable {
public interface StorageRetryStrategy extends Serializable {

/**
* Factory method to provide a {@link ResultRetryAlgorithm} which will be used to evaluate whether
Expand Down
Expand Up @@ -65,8 +65,12 @@ public class BlobWriteChannelTest {
private static final String BLOB_NAME = "n";
private static final String UPLOAD_ID = "uploadid";
private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME).build();
private static final BlobInfo BLOB_INFO_WITH_GENERATION =
BlobInfo.newBuilder(BUCKET_NAME, BLOB_NAME, 1L).build();
private static final StorageObject UPDATED_BLOB = new StorageObject();
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final Map<StorageRpc.Option, ?> RPC_OPTIONS_GENERATION =
ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 1L);
private static final int MIN_CHUNK_SIZE = 256 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 60 * MIN_CHUNK_SIZE; // 15MiB
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE;
Expand Down Expand Up @@ -112,11 +116,12 @@ public void testCreate() {

@Test
public void testCreateRetryableError() {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS))
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andThrow(socketClosedException);
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
}
Expand Down Expand Up @@ -146,7 +151,8 @@ public void testWriteWithoutFlush() throws Exception {
public void testWriteWithFlushRetryChunk() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -167,7 +173,7 @@ public void testWriteWithFlushRetryChunk() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
Expand All @@ -179,7 +185,8 @@ public void testWriteWithFlushRetryChunk() throws Exception {
public void testWriteWithRetryFullChunk() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID), (byte[]) anyObject(), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(false)))
Expand All @@ -204,7 +211,7 @@ public void testWriteWithRetryFullChunk() throws Exception {
eq(true)))
.andReturn(BLOB_INFO.toPb());
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -217,7 +224,8 @@ public void testWriteWithRetryFullChunk() throws Exception {
public void testWriteWithRemoteProgressMade() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -239,7 +247,7 @@ public void testWriteWithRemoteProgressMade() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
Expand All @@ -251,7 +259,8 @@ public void testWriteWithRemoteProgressMade() throws Exception {
public void testWriteWithDriftRetryCase4() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -272,7 +281,7 @@ public void testWriteWithDriftRetryCase4() throws Exception {
eq(false)))
.andReturn(null);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
Expand All @@ -288,7 +297,8 @@ public void testWriteWithDriftRetryCase4() throws Exception {
public void testWriteWithUnreachableRemoteOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -300,7 +310,7 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(MIN_CHUNK_SIZE + 10L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -317,7 +327,8 @@ public void testWriteWithUnreachableRemoteOffset() throws Exception {
public void testWriteWithRetryAndObjectMetadata() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand Down Expand Up @@ -345,7 +356,7 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception {
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -358,7 +369,8 @@ public void testWriteWithRetryAndObjectMetadata() throws Exception {
public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -380,7 +392,7 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -399,7 +411,8 @@ public void testWriteWithUploadCompletedByAnotherClient() throws Exception {
public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -420,7 +433,7 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
Expand All @@ -437,7 +450,8 @@ public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws Exception {
public void testGetCurrentUploadOffset() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand Down Expand Up @@ -468,7 +482,7 @@ public void testGetCurrentUploadOffset() throws Exception {
eq(true)))
.andReturn(BLOB_INFO.toPb());
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
Expand All @@ -481,7 +495,8 @@ public void testGetCurrentUploadOffset() throws Exception {
public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception {
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(BLOB_INFO_WITH_GENERATION.toPb(), RPC_OPTIONS_GENERATION))
.andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
Expand All @@ -495,7 +510,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws Exception {
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = newWriter();
writer = newWriter(true);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
assertFalse(writer.isRetrying());
Expand Down Expand Up @@ -825,17 +840,23 @@ public void testSaveAndRestoreWithSignedURL() throws Exception {
}

private BlobWriteChannel newWriter() {
Map<StorageRpc.Option, ?> optionsMap = EMPTY_RPC_OPTIONS;
return newWriter(false);
}

private BlobWriteChannel newWriter(boolean withGeneration) {
Map<StorageRpc.Option, ?> optionsMap =
withGeneration ? RPC_OPTIONS_GENERATION : EMPTY_RPC_OPTIONS;
ResultRetryAlgorithm<?> createResultAlgorithm =
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap);
ResultRetryAlgorithm<?> writeResultAlgorithm =
retryAlgorithmManager.getForResumableUploadSessionWrite(optionsMap);
final BlobInfo blobInfo = withGeneration ? BLOB_INFO_WITH_GENERATION : BLOB_INFO;
return BlobWriteChannel.newBuilder()
.setStorageOptions(options)
.setBlobInfo(BLOB_INFO)
.setBlobInfo(blobInfo)
.setUploadIdSupplier(
ResumableMedia.startUploadForBlobInfo(
options, BLOB_INFO, optionsMap, createResultAlgorithm))
options, blobInfo, optionsMap, createResultAlgorithm))
.setAlgorithmForWrite(writeResultAlgorithm)
.build();
}
Expand Down
Expand Up @@ -38,9 +38,4 @@ public static Blob blobCopyWithStorage(Blob b, Storage s) {
BlobInfo.BuilderImpl builder = (BlobInfo.BuilderImpl) BlobInfo.fromPb(b.toPb()).toBuilder();
return new Blob(s, builder);
}

public static StorageOptions.Builder useDefaultStorageRetryStrategy(
StorageOptions.Builder builder) {
return builder.setStorageRetryStrategy(StorageRetryStrategy.getDefaultStorageRetryStrategy());
}
}
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -121,6 +122,8 @@ public class StorageImplMockitoTest {

// Empty StorageRpc options
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final Map<StorageRpc.Option, ?> BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION =
ImmutableMap.of(StorageRpc.Option.IF_GENERATION_MATCH, 24L);

// Bucket target options
private static final Storage.BucketTargetOption BUCKET_TARGET_METAGENERATION =
Expand Down Expand Up @@ -726,7 +729,10 @@ public void testCreateBlobRetry() throws IOException {
.doReturn(BLOB_INFO1.toPb())
.doThrow(UNEXPECTED_CALL_EXCEPTION)
.when(storageRpcMock)
.create(Mockito.eq(storageObject), capturedStream.capture(), Mockito.eq(EMPTY_RPC_OPTIONS));
.create(
Mockito.eq(storageObject),
capturedStream.capture(),
Mockito.eq(BLOB_INFO1_RPC_OPTIONS_WITH_GENERATION));

storage =
options
Expand All @@ -736,7 +742,7 @@ public void testCreateBlobRetry() throws IOException {
.getService();
initializeServiceDependentObjects();

Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT);
Blob blob = storage.create(BLOB_INFO1, BLOB_CONTENT, BlobTargetOption.generationMatch());

assertEquals(expectedBlob1, blob);

Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.google.cloud.NoCredentials;
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
import com.google.cloud.storage.PackagePrivateMethodWorkarounds;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.conformance.retry.TestBench.RetryTestResource;
Expand Down Expand Up @@ -141,7 +140,6 @@ private Storage newStorage(boolean forTest) {
.setHost(testBench.getBaseUri())
.setCredentials(NoCredentials.getInstance())
.setProjectId(testRetryConformance.getProjectId());
builder = PackagePrivateMethodWorkarounds.useDefaultStorageRetryStrategy(builder);
RetrySettings.Builder retrySettingsBuilder =
StorageOptions.getDefaultRetrySettings().toBuilder();
if (forTest) {
Expand Down