Skip to content

Commit

Permalink
Merge branch 'GoogleCloudDataproc:master' into retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Deependra-Patel committed Jan 24, 2022
2 parents 24dbc8f + 3543b1e commit 9830b2a
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 123 deletions.
Expand Up @@ -367,12 +367,6 @@ public class GoogleHadoopFileSystemConfiguration {
public static final HadoopConfigurationProperty<Long> GCS_GRPC_READ_MESSAGE_TIMEOUT_MS =
new HadoopConfigurationProperty<>("fs.gs.grpc.read.message.timeout.ms", 5 * 1_000L);

/**
* Configuration key for the connection timeout (in millisecond) for gRPC read requests to GCS.
*/
public static final HadoopConfigurationProperty<Long> GCS_GRPC_READ_SPEED_BYTES_PER_SEC =
new HadoopConfigurationProperty<>("fs.gs.grpc.read.speed.bytespersec", 50 * 1024 * 1024L);

/**
* Configuration key for the connection timeout (in millisecond) for gRPC metadata requests to
* GCS.
Expand Down Expand Up @@ -502,7 +496,6 @@ private static GoogleCloudStorageReadOptions getReadChannelOptions(Configuration
.setMinRangeRequestSize(GCS_INPUT_STREAM_MIN_RANGE_REQUEST_SIZE.get(config, config::getInt))
.setGrpcChecksumsEnabled(GCS_GRPC_CHECKSUMS_ENABLE.get(config, config::getBoolean))
.setGrpcReadTimeoutMillis(GCS_GRPC_READ_TIMEOUT_MS.get(config, config::getLong))
.setGrpcReadSpeedBytesPerSec(GCS_GRPC_READ_SPEED_BYTES_PER_SEC.get(config, config::getLong))
.setGrpcReadMessageTimeoutMillis(
GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.get(config, config::getLong))
.setGrpcReadMetadataTimeoutMillis(
Expand Down
Expand Up @@ -23,7 +23,6 @@
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_ENABLE;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_MESSAGE_TIMEOUT_MS;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_METADATA_TIMEOUT_MS;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_SPEED_BYTES_PER_SEC;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_READ_TIMEOUT_MS;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_TRAFFICDIRECTOR_ENABLE;
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GRPC_UPLOAD_BUFFERED_REQUESTS;
Expand Down Expand Up @@ -84,7 +83,6 @@ public class GoogleHadoopFileSystemConfigurationTest {
put("fs.gs.grpc.read.metadata.timeout.ms", 60 * 1000L);
put("fs.gs.grpc.read.timeout.ms", 30 * 1000L);
put("fs.gs.grpc.read.message.timeout.ms", 5 * 1000L);
put("fs.gs.grpc.read.speed.bytespersec", 50 * 1024 * 1024L);
put("fs.gs.grpc.read.zerocopy.enable", true);
put("fs.gs.grpc.directpath.enable", true);
put("fs.gs.grpc.server.address", "storage.googleapis.com");
Expand Down Expand Up @@ -317,7 +315,6 @@ public void testGrpcConfiguration() {
long grpcWriteTimeout = 20;
long grpcWriteMessageTimeout = 25;
long grpcUploadBufferedRequests = 30;
long grpcReadSpeedBytesPerSec = 100 * 1024 * 1024;
boolean directPathEnabled = true;
boolean trafficDirectorEnabled = true;
boolean grpcEnabled = true;
Expand All @@ -330,8 +327,6 @@ public void testGrpcConfiguration() {
GCS_GRPC_UPLOAD_BUFFERED_REQUESTS.getKey(), String.valueOf(grpcUploadBufferedRequests));
config.set(GCS_GRPC_DIRECTPATH_ENABLE.getKey(), String.valueOf(directPathEnabled));
config.set(GCS_GRPC_TRAFFICDIRECTOR_ENABLE.getKey(), String.valueOf(trafficDirectorEnabled));
config.set(
GCS_GRPC_READ_SPEED_BYTES_PER_SEC.getKey(), String.valueOf(grpcReadSpeedBytesPerSec));
config.set(
GCS_GRPC_CHECK_INTERVAL_TIMEOUT_MS.getKey(), String.valueOf(grpcCheckIntervalTimeout));
config.set(GCS_GRPC_READ_MESSAGE_TIMEOUT_MS.getKey(), String.valueOf(grpcReadMessageTimeout));
Expand All @@ -349,8 +344,6 @@ public void testGrpcConfiguration() {
.isEqualTo(grpcUploadBufferedRequests);
assertThat(options.isDirectPathPreferred()).isEqualTo(directPathEnabled);
assertThat(options.isGrpcEnabled()).isEqualTo(grpcEnabled);
assertThat(options.getReadChannelOptions().getGrpcReadSpeedBytesPerSec())
.isEqualTo(grpcReadSpeedBytesPerSec);
assertThat(options.getGrpcMessageTimeoutCheckInterval()).isEqualTo(grpcCheckIntervalTimeout);
assertThat(options.getReadChannelOptions().getGrpcReadMessageTimeoutMillis())
.isEqualTo(grpcReadMessageTimeout);
Expand Down
Expand Up @@ -86,6 +86,7 @@
import org.apache.hadoop.service.Service;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -1319,7 +1320,9 @@ public void xattr_statistics() throws IOException {
assertThat(myGhfs.delete(testRoot, true)).isTrue();
}

public void http_IOstatistics() throws IOException {
@Test
@Ignore("Test is failing")
public void http_IOStatistics() throws IOException {
FSDataOutputStream fout = ghfs.create(new Path("/file1"));
fout.writeBytes("Test Content");
fout.close();
Expand Down
Expand Up @@ -14,7 +14,6 @@
package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.DEFAULT_GRPC_READ_SPEED_BYTES_PER_SEC;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.Math.max;
Expand Down Expand Up @@ -125,8 +124,6 @@ public class GoogleCloudStorageGrpcReadChannel implements SeekableByteChannel {
// Offset in the object for the end of the range-requests
private long contentChannelEndOffset = -1;

private final long readTimeout;

private final Watchdog watchdog;

private final long gRPCReadMessageTimeout;
Expand Down Expand Up @@ -311,8 +308,7 @@ private static GoogleCloudStorageGrpcReadChannel openChannel(
long footerOffsetInBytes = max(0, (objectSize - prefetchSizeInBytes));

long startTime = System.currentTimeMillis();
ByteString footerContent =
getFooterContent(resourceId, readOptions, stub, footerOffsetInBytes, objectSize);
ByteString footerContent = getFooterContent(resourceId, readOptions, stub, footerOffsetInBytes);
long endTime = System.currentTimeMillis();
if (footerContent == null) {
logger.atFiner().log(
Expand All @@ -328,7 +324,7 @@ private static GoogleCloudStorageGrpcReadChannel openChannel(
stub,
resourceId,
itemInfo.getContentGeneration(),
objectSize,
itemInfo.getSize(),
footerOffsetInBytes,
footerContent,
watchdog,
Expand Down Expand Up @@ -393,12 +389,11 @@ private static ByteString getFooterContent(
StorageResourceId resourceId,
GoogleCloudStorageReadOptions readOptions,
StorageBlockingStub stub,
long footerOffset,
long objectSize)
long footerOffset)
throws IOException {
try {
Iterator<ReadObjectResponse> footerContentResponse =
stub.withDeadlineAfter(getReadTimeoutMillis(readOptions, objectSize), MILLISECONDS)
stub.withDeadlineAfter(readOptions.getGrpcReadTimeoutMillis(), MILLISECONDS)
.readObject(
ReadObjectRequest.newBuilder()
.setReadOffset(footerOffset)
Expand All @@ -424,16 +419,6 @@ private static ByteString getFooterContent(
}
}

// Estimates a read time out based on read speeds
static long getReadTimeoutMillis(GoogleCloudStorageReadOptions readOptions, long objectSize) {
long readSpeedInBytesPerSec = DEFAULT_GRPC_READ_SPEED_BYTES_PER_SEC;
if (readOptions.getGrpcReadSpeedBytesPerSec() > 0) {
readSpeedInBytesPerSec = readOptions.getGrpcReadSpeedBytesPerSec();
}

return readOptions.getGrpcReadTimeoutMillis() + ((objectSize / readSpeedInBytesPerSec) * 1000);
}

private GoogleCloudStorageGrpcReadChannel(
StorageBlockingStub gcsGrpcBlockingStub,
StorageResourceId resourceId,
Expand All @@ -446,7 +431,6 @@ private GoogleCloudStorageGrpcReadChannel(
BackOffFactory backOffFactory) {
this.useZeroCopyMarshaller =
ZeroCopyReadinessChecker.isReady() && readOptions.isGrpcReadZeroCopyEnabled();
this.readTimeout = getReadTimeoutMillis(readOptions, objectSize);
this.stub = gcsGrpcBlockingStub;
this.resourceId = resourceId;
this.objectGeneration = objectGeneration;
Expand Down Expand Up @@ -731,7 +715,8 @@ private void requestObjectMedia(OptionalLong bytesToRead) throws StatusRuntimeEx

requestContext = Context.current().withCancellation();
Context toReattach = requestContext.attach();
StorageBlockingStub blockingStub = stub.withDeadlineAfter(readTimeout, MILLISECONDS);
StorageBlockingStub blockingStub =
stub.withDeadlineAfter(readOptions.getGrpcReadTimeoutMillis(), MILLISECONDS);
try {
if (useZeroCopyMarshaller) {
Iterator<ReadObjectResponse> responseIterator =
Expand Down
Expand Up @@ -1562,7 +1562,7 @@ private Storage.Objects.List createListRequest(
throws IOException {
logger.atFiner().log(
"createListRequest(%s, %s, %s, %s, %d)",
bucketName, objectNamePrefix, delimiter, maxResults);
bucketName, objectNamePrefix, objectFields, delimiter, maxResults);
checkArgument(!isNullOrEmpty(bucketName), "bucketName must not be null or empty");

Storage.Objects.List listObject =
Expand Down
Expand Up @@ -45,8 +45,7 @@ public enum Fadvise {
public static final Fadvise DEFAULT_FADVISE = Fadvise.SEQUENTIAL;
public static final int DEFAULT_MIN_RANGE_REQUEST_SIZE = 2 * 1024 * 1024;
public static final boolean GRPC_CHECKSUMS_ENABLED_DEFAULT = false;
public static final long DEFAULT_GRPC_READ_TIMEOUT_MILLIS = 30 * 1000;
public static final long DEFAULT_GRPC_READ_SPEED_BYTES_PER_SEC = 50 * 1024 * 1024;
public static final long DEFAULT_GRPC_READ_TIMEOUT_MILLIS = 20 * 60 * 1000;
public static final long DEFAULT_GRPC_READ_METADATA_TIMEOUT_MILLIS = 60 * 1000;
public static final boolean DEFAULT_GRPC_READ_ZEROCOPY_ENABLED = true;
public static final long DEFAULT_GRPC_READ_MESSAGE_TIMEOUT_MILLIS = 5 * 1000;
Expand All @@ -69,7 +68,6 @@ public static Builder builder() {
.setMinRangeRequestSize(DEFAULT_MIN_RANGE_REQUEST_SIZE)
.setGrpcChecksumsEnabled(GRPC_CHECKSUMS_ENABLED_DEFAULT)
.setGrpcReadTimeoutMillis(DEFAULT_GRPC_READ_TIMEOUT_MILLIS)
.setGrpcReadSpeedBytesPerSec(DEFAULT_GRPC_READ_SPEED_BYTES_PER_SEC)
.setGrpcReadMetadataTimeoutMillis(DEFAULT_GRPC_READ_METADATA_TIMEOUT_MILLIS)
.setGrpcReadZeroCopyEnabled(DEFAULT_GRPC_READ_ZEROCOPY_ENABLED)
.setGrpcReadMessageTimeoutMillis(DEFAULT_GRPC_READ_MESSAGE_TIMEOUT_MILLIS);
Expand Down Expand Up @@ -113,9 +111,6 @@ public static Builder builder() {
/** See {@link Builder#setGrpcReadTimeoutMillis}. */
public abstract long getGrpcReadTimeoutMillis();

/** See {@link Builder#setGrpcReadSpeedBytesPerSec}. */
public abstract long getGrpcReadSpeedBytesPerSec();

/** See {@link Builder#setGrpcReadMetadataTimeoutMillis}. */
public abstract long getGrpcReadMetadataTimeoutMillis();

Expand Down Expand Up @@ -217,9 +212,6 @@ public abstract static class Builder {
/** Sets the property to override the default GCS gRPC read stream timeout. */
public abstract Builder setGrpcReadTimeoutMillis(long grpcReadTimeoutMillis);

/** Sets the property to override the default GCS gRPC read speed in bytes per sec. */
public abstract Builder setGrpcReadSpeedBytesPerSec(long grpcReadSpeedBytesPerSec);

/** Sets the property to override the default timeout for GCS metadata reads from gRPC. */
public abstract Builder setGrpcReadMetadataTimeoutMillis(long grpcReadMetadataTimeoutMillis);

Expand Down
31 changes: 19 additions & 12 deletions gcsio/src/main/java/com/google/cloud/hadoop/gcsio/Watchdog.java
Expand Up @@ -75,7 +75,7 @@ private Watchdog(Duration scheduleInterval) {

private void start() {
executor.scheduleAtFixedRate(
this, scheduleInterval.toMillis(), scheduleInterval.toMillis(), MILLISECONDS);
this, /* initialDelay= */ 0, scheduleInterval.toMillis(), MILLISECONDS);
}

/**
Expand Down Expand Up @@ -136,8 +136,9 @@ public <R, T> StreamObserver<R> watch(
public void run() {
try {
runUnsafe();
} catch (RuntimeException t) {
logger.atSevere().withCause(t).log("Caught throwable in periodic Watchdog run. Continuing.");
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log(
"Caught RuntimeException in periodic Watchdog run, continuing.");
}
}

Expand Down Expand Up @@ -203,7 +204,9 @@ public T next() {
lastActivityAt = clock.millis();
}
T next = innerIterator.next();
this.state = State.DELIVERING;
synchronized (lock) {
state = State.DELIVERING;
}
return next;
}

Expand All @@ -222,8 +225,7 @@ public boolean cancelIfStale() {
Throwable throwable = null;
synchronized (lock) {
long waitTime = clock.millis() - lastActivityAt;
if (this.state == State.WAITING
&& (!waitTimeout.isZero() && waitTime >= waitTimeout.toMillis())) {
if (state == State.WAITING && !waitTimeout.isZero() && waitTime >= waitTimeout.toMillis()) {
throwable = new TimeoutException("Canceled due to timeout waiting for next response");
}
}
Expand All @@ -238,11 +240,15 @@ public boolean cancelIfStale() {
public boolean hasNext() {
boolean hasNext = false;
try {
this.state = State.WAITING;
synchronized (lock) {
state = State.WAITING;
}
hasNext = innerIterator.hasNext();
} finally {
// stream is complete successfully with no more items or has thrown an exception
if (!hasNext) openStreams.remove(this);
if (!hasNext) {
openStreams.remove(this);
}
}
return hasNext;
}
Expand Down Expand Up @@ -280,8 +286,7 @@ public boolean cancelIfStale() {
Throwable throwable = null;
synchronized (lock) {
long waitTime = clock.millis() - lastActivityAt;
if (this.state == State.WAITING
&& (!waitTimeout.isZero() && waitTime >= waitTimeout.toMillis())) {
if (state == State.WAITING && !waitTimeout.isZero() && waitTime >= waitTimeout.toMillis()) {
throwable = new TimeoutException("Canceled due to timeout waiting for next response");
}
}
Expand All @@ -296,10 +301,12 @@ public boolean cancelIfStale() {
public void onNext(R value) {
synchronized (lock) {
lastActivityAt = clock.millis();
state = State.WAITING;
}
this.state = State.WAITING;
innerStreamObserver.onNext(value);
this.state = State.IDLE;
synchronized (lock) {
state = State.IDLE;
}
}

@Override
Expand Down
Expand Up @@ -1518,34 +1518,6 @@ public void isOpenReturnsFalseAfterClose() throws Exception {
assertFalse(readChannel.isOpen());
}

@Test
public void readTimeOutBasedOnObjectSize() {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setGrpcReadTimeoutMillis(60 * 1000) // 60 sec
.setGrpcReadSpeedBytesPerSec(100 * 1024 * 1024) // 100 MBps
.build();
long objectSize = 500 * 1024 * 1024; // 500MB
long readTimeoutMillis =
GoogleCloudStorageGrpcReadChannel.getReadTimeoutMillis(readOptions, objectSize);
// 60 sec + 5 sec
assertEquals(65000, readTimeoutMillis);
}

@Test
public void readTimeOutBasedOnObjectSizeMisconfigured() {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setGrpcReadTimeoutMillis(60 * 1000) // 60 sec
.setGrpcReadSpeedBytesPerSec(0) // 0 Bps
.build();
long objectSize = 500 * 1024 * 1024; // 500MB
long readTimeoutMillis =
GoogleCloudStorageGrpcReadChannel.getReadTimeoutMillis(readOptions, objectSize);
// 60 sec + 10 sec (500 MB at 50 MBps of default value)
assertEquals(70000, readTimeoutMillis);
}

private GoogleCloudStorageGrpcReadChannel newReadChannel() throws IOException {
return newReadChannel(GoogleCloudStorageReadOptions.DEFAULT);
}
Expand Down

0 comments on commit 9830b2a

Please sign in to comment.