diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java index 50a1d55fd..e0e44397d 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageTest.java @@ -418,6 +418,70 @@ public void upload_failure_runtimeException() throws Exception { .inOrder(); } + @Test + public void upload_retry_requestTimeout() throws Exception { + byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + emptyResponse(408), // HTTP 408 Request Timeout + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(testData.length)))); + + GoogleCloudStorage gcs = mockedGcs(GCS_OPTIONS, transport); + + try (WritableByteChannel writeChannel = gcs.create(RESOURCE_ID)) { + writeChannel.write(ByteBuffer.wrap(testData)); + } + + assertThat(trackingRequestInitializerWithRetries.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 2)) + .inOrder(); + } + + @Test + public void upload_noRetries_forbidden() throws Exception { + byte[] testData = new byte[MediaHttpUploader.MINIMUM_CHUNK_SIZE]; + new Random().nextBytes(testData); + + MockHttpTransport transport = + mockTransport( + emptyResponse(HttpStatusCodes.STATUS_CODE_NOT_FOUND), + resumableUploadResponse(BUCKET_NAME, OBJECT_NAME), + emptyResponse(HttpStatusCodes.STATUS_CODE_FORBIDDEN), + jsonDataResponse( + newStorageObject(BUCKET_NAME, OBJECT_NAME) + .setSize(BigInteger.valueOf(testData.length)))); + + GoogleCloudStorage gcs = mockedGcs(GCS_OPTIONS, transport); + + WritableByteChannel writeChannel = gcs.create(RESOURCE_ID); + writeChannel.write(ByteBuffer.wrap(testData)); + + IOException thrown = assertThrows(IOException.class, writeChannel::close); + assertThat(thrown) + .hasCauseThat() + .hasMessageThat() + .contains(String.valueOf(HttpStatusCodes.STATUS_CODE_FORBIDDEN)); + + assertThat(trackingRequestInitializerWithRetries.getAllRequestStrings()) + .containsExactly( + getRequestString(BUCKET_NAME, OBJECT_NAME), + resumableUploadRequestString( + BUCKET_NAME, OBJECT_NAME, /* generationId= */ 0, /* replaceGenerationId= */ false), + resumableUploadChunkRequestString(BUCKET_NAME, OBJECT_NAME, /* uploadId= */ 1)) + .inOrder(); + } + @Test public void reupload_success_singleWrite_multipleUploadChunks() throws Exception { byte[] testData = new byte[2 * MediaHttpUploader.MINIMUM_CHUNK_SIZE]; diff --git a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java index 722653603..15c8d62f4 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java @@ -16,169 +16,192 @@ package com.google.cloud.hadoop.util; import static com.google.common.base.Strings.isNullOrEmpty; +import static java.lang.Math.toIntExact; import static java.util.concurrent.TimeUnit.SECONDS; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpBackOffIOExceptionHandler; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; +import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpIOExceptionHandler; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.http.HttpUnsuccessfulResponseHandler; -import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; -import com.google.api.client.util.Sleeper; -import com.google.common.annotations.VisibleForTesting; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.common.collect.ImmutableSet; import com.google.common.flogger.GoogleLogger; +import com.google.common.flogger.LogContext; import java.io.IOException; import java.util.Set; -import org.apache.http.HttpStatus; +/** An implementation of {@link HttpRequestInitializer} with retries. */ public class RetryHttpInitializer implements HttpRequestInitializer { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); - /** HTTP status code indicating too many requests in a given amount of time. */ - private static final int HTTP_SC_TOO_MANY_REQUESTS = 429; - - // Base impl of BackOffRequired determining the default set of cases where we'll retry on - // unsuccessful HTTP responses; we'll mix in additional retryable response cases on top - // of the bases cases defined by this instance. - private static final HttpBackOffUnsuccessfulResponseHandler.BackOffRequired - BASE_HTTP_BACKOFF_REQUIRED = - HttpBackOffUnsuccessfulResponseHandler.BackOffRequired.ON_SERVER_ERROR; + private static final ExponentialBackOff.Builder BACKOFF_BUILDER = + new ExponentialBackOff.Builder() + // Set initial timeout to 1.25 seconds to have a 1-second minimum initial interval + // after 0.2 randomization factor will be applied + .setInitialIntervalMillis(1_250) + .setMultiplier(1.6) + .setRandomizationFactor(0.2) + .setMaxIntervalMillis(20_000) + // 30 minutes + .setMaxElapsedTimeMillis(1_800_000); // To be used as a request interceptor for filling in the "Authorization" header field, as well - // as a response handler for certain unsuccessful error codes wherein the Credential must refresh + // as a response handler for certain unsuccessful error codes wherein the Credentials must refresh // its token for a retry. - private final Credential credential; + private final HttpCredentialsAdapter credentials; private final RetryHttpInitializerOptions options; - // If non-null, the backoff handlers will be set to use this sleeper instead of their defaults. - // Only used for testing. - private Sleeper sleeperOverride; - - /** A HttpUnsuccessfulResponseHandler logs the URL that generated certain failures. */ - private static class LoggingResponseHandler - implements HttpUnsuccessfulResponseHandler, HttpIOExceptionHandler { - - private static final String LOG_MESSAGE_FORMAT = - "Encountered status code %d when sending %s request to URL '%s'. " - + "Delegating to response handler for possible retry."; - - private final HttpUnsuccessfulResponseHandler delegateResponseHandler; - private final HttpIOExceptionHandler delegateIOExceptionHandler; - private final ImmutableSet responseCodesToLog; - private final ImmutableSet responseCodesToLogWithRateLimit; + /** + * @param credentials A credentials which will be used to initialize on HttpRequests and as the + * delegate for a {@link UnsuccessfulResponseHandler}. + * @param options An options that configure {@link RetryHttpInitializer} instance behaviour. + */ + public RetryHttpInitializer(Credentials credentials, RetryHttpInitializerOptions options) { + this.credentials = credentials == null ? null : new HttpCredentialsAdapter(credentials); + this.options = options; + } - /** - * @param delegateResponseHandler The HttpUnsuccessfulResponseHandler to invoke to really handle - * errors. - * @param delegateIOExceptionHandler The HttpIOExceptionResponseHandler to delegate to. - * @param responseCodesToLog The set of response codes to log URLs for. - * @param responseCodesToLogWithRateLimit The set of response codes to log URLs for with reate - * limit. - */ - public LoggingResponseHandler( - HttpUnsuccessfulResponseHandler delegateResponseHandler, - HttpIOExceptionHandler delegateIOExceptionHandler, - Set responseCodesToLog, - Set responseCodesToLogWithRateLimit) { - this.delegateResponseHandler = delegateResponseHandler; - this.delegateIOExceptionHandler = delegateIOExceptionHandler; - this.responseCodesToLog = ImmutableSet.copyOf(responseCodesToLog); - this.responseCodesToLogWithRateLimit = ImmutableSet.copyOf(responseCodesToLogWithRateLimit); + @Override + public void initialize(HttpRequest request) throws IOException { + // Initialize request with credentials and let CredentialsOrBackoffResponseHandler + // to refresh credentials later if necessary + if (credentials != null) { + credentials.initialize(request); } - @Override - public boolean handleResponse( - HttpRequest httpRequest, HttpResponse httpResponse, boolean supportsRetry) - throws IOException { - if (responseCodesToLogWithRateLimit.contains(httpResponse.getStatusCode())) { - switch (httpResponse.getStatusCode()) { - case HTTP_SC_TOO_MANY_REQUESTS: - logger.atInfo().atMostEvery(10, SECONDS).log( - LOG_MESSAGE_FORMAT, - httpResponse.getStatusCode(), - httpRequest.getRequestMethod(), - httpRequest.getUrl()); - break; - default: - logger.atInfo().atMostEvery(10, SECONDS).log( - "Encountered status code %d (and maybe others) when sending %s request to URL '%s'." - + " Delegating to response handler for possible retry.", - httpResponse.getStatusCode(), httpRequest.getRequestMethod(), httpRequest.getUrl()); - } - } else if (responseCodesToLog.contains(httpResponse.getStatusCode())) { - logger.atInfo().log( - LOG_MESSAGE_FORMAT, - httpResponse.getStatusCode(), - httpRequest.getRequestMethod(), - httpRequest.getUrl()); - } - - return delegateResponseHandler.handleResponse(httpRequest, httpResponse, supportsRetry); + request + // Request will be retried if server errors (5XX) or I/O errors are encountered. + .setNumberOfRetries(options.getMaxRequestRetries()) + // Set the timeout configurations. + .setConnectTimeout(toIntExact(options.getConnectTimeout().toMillis())) + .setReadTimeout(toIntExact(options.getReadTimeout().toMillis())) + .setUnsuccessfulResponseHandler(new UnsuccessfulResponseHandler(credentials)) + .setIOExceptionHandler(new IoExceptionHandler()); + + HttpHeaders headers = request.getHeaders(); + if (isNullOrEmpty(headers.getUserAgent()) && !isNullOrEmpty(options.getDefaultUserAgent())) { + logger.atFiner().log( + "Request is missing a user-agent header, adding default value of '%s'", + options.getDefaultUserAgent()); + headers.setUserAgent(options.getDefaultUserAgent()); } + headers.putAll(options.getHttpHeaders()); + } - @Override - public boolean handleIOException(HttpRequest httpRequest, boolean supportsRetry) - throws IOException { - // We sadly don't get anything helpful to see if this is something we want to log. As a result - // we'll turn down the logging level to debug. - logger.atFine().log("Encountered an IOException when accessing URL %s", httpRequest.getUrl()); - return delegateIOExceptionHandler.handleIOException(httpRequest, supportsRetry); - } + public Credentials getCredentials() { + return credentials == null ? null : credentials.getCredentials(); } /** - * An inner class allowing this initializer to create a new handler instance per HttpRequest which - * shares the Credential of the outer class and which will compose the Credential with a backoff - * handler to handle unsuccessful HTTP codes. + * Handles unsuccessful responses: + * + *
    + *
  • log responses based on the response code + *
  • 401 Unauthorized responses are handled by the {@link HttpCredentialsAdapter} + *
  • 5XX are handled by the a backoff handler. + *
*/ - private class CredentialOrBackoffResponseHandler implements HttpUnsuccessfulResponseHandler { - // The backoff-handler instance to use whenever the outer-class's Credential does not handle - // the error. - private final HttpUnsuccessfulResponseHandler delegateHandler; - - public CredentialOrBackoffResponseHandler() { - HttpBackOffUnsuccessfulResponseHandler errorCodeHandler = - new HttpBackOffUnsuccessfulResponseHandler(getDefaultBackOff()); - errorCodeHandler.setBackOffRequired( - response -> - BASE_HTTP_BACKOFF_REQUIRED.isRequired(response) - || response.getStatusCode() == HTTP_SC_TOO_MANY_REQUESTS); - if (sleeperOverride != null) { - errorCodeHandler.setSleeper(sleeperOverride); - } - this.delegateHandler = errorCodeHandler; + private static class UnsuccessfulResponseHandler implements HttpUnsuccessfulResponseHandler { + + private static final int HTTP_SC_GONE = 410; + + /** HTTP status code indicating too many requests in a given amount of time. */ + private static final int HTTP_SC_TOO_MANY_REQUESTS = 429; + + /** + * HTTP status code indicating that the server has decided to close the connection rather than + * continue waiting + */ + private static final int HTTP_REQUEST_TIMEOUT = 408; + + private static final Set RETRYABLE_CODES = + ImmutableSet.of(HTTP_SC_TOO_MANY_REQUESTS, HTTP_REQUEST_TIMEOUT); + + // The set of response codes to log URLs for with a rate limit. + private static final ImmutableSet RESPONSE_CODES_TO_LOG_WITH_RATE_LIMIT = + ImmutableSet.of(HTTP_SC_TOO_MANY_REQUESTS); + + // The set of response codes to log URLs for. + private static final ImmutableSet RESPONSE_CODES_TO_LOG = + ImmutableSet.builder() + .addAll(RESPONSE_CODES_TO_LOG_WITH_RATE_LIMIT) + .add(HTTP_SC_GONE, HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE) + .build(); + + // Base implementation of BackOffRequired determining the default set of cases where we'll retry + // on unsuccessful HTTP responses; we'll mix in additional retryable response cases on top + // of the bases cases defined by this instance. + private static final HttpBackOffUnsuccessfulResponseHandler.BackOffRequired BACK_OFF_REQUIRED = + response -> + RETRYABLE_CODES.contains(response.getStatusCode()) + || HttpBackOffUnsuccessfulResponseHandler.BackOffRequired.ON_SERVER_ERROR + .isRequired(response); + + private final HttpCredentialsAdapter credentials; + private final HttpBackOffUnsuccessfulResponseHandler delegate; + + public UnsuccessfulResponseHandler(HttpCredentialsAdapter credentials) { + this.credentials = credentials; + this.delegate = + new HttpBackOffUnsuccessfulResponseHandler(BACKOFF_BUILDER.build()) + .setBackOffRequired(BACK_OFF_REQUIRED); } @Override public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException { - if (credential != null && credential.handleResponse(request, response, supportsRetry)) { - // If credential decides it can handle it, the return code or message indicated something + logResponseCode(request, response); + + if (credentials != null && credentials.handleResponse(request, response, supportsRetry)) { + // If credentials decides it can handle it, the return code or message indicated something // specific to authentication, and no backoff is desired. return true; } - if (delegateHandler.handleResponse(request, response, supportsRetry)) { + if (delegate.handleResponse(request, response, supportsRetry)) { // Otherwise, we defer to the judgement of our internal backoff handler. return true; } + escapeRedirectPath(request, response); + + return false; + } + + private void logResponseCode(HttpRequest request, HttpResponse response) { + if (RESPONSE_CODES_TO_LOG.contains(response.getStatusCode())) { + logger + .atInfo() + // Apply rate limit (atMostEvery) based on the response status code + .with(LogContext.Key.LOG_SITE_GROUPING_KEY, response.getStatusCode()) + .atMostEvery( + RESPONSE_CODES_TO_LOG_WITH_RATE_LIMIT.contains(response.getStatusCode()) ? 10 : 0, + SECONDS) + .log( + "Encountered status code %d when sending %s request to URL '%s'." + + " Delegating to response handler for possible retry.", + response.getStatusCode(), request.getRequestMethod(), request.getUrl()); + } + } + + private void escapeRedirectPath(HttpRequest request, HttpResponse response) { if (HttpStatusCodes.isRedirect(response.getStatusCode()) && request.getFollowRedirects() && response.getHeaders() != null && response.getHeaders().getLocation() != null) { // Hack: Reach in and fix any '+' in the URL but still report 'false'. The client library // incorrectly tries to decode '+' into ' ', even though the backend servers treat '+' - // as a legitimate path character, and so do not encode it. This is safe to do whether - // or not the client library fixes the bug, since %2B will correctly be decoded as '+' + // as a legitimate path character, and so do not encode it. This is safe to do regardless + // whether the client library fixes the bug, since %2B will correctly be decoded as '+' // even after the fix. String redirectLocation = response.getHeaders().getLocation(); if (redirectLocation.contains("+")) { @@ -189,100 +212,25 @@ public boolean handleResponse(HttpRequest request, HttpResponse response, boolea response.getHeaders().setLocation(escapedLocation); } } - - return false; } } - /** - * @param credential A credential which will be set as an interceptor on HttpRequests and as the - * delegate for a CredentialOrBackoffResponseHandler. - * @param defaultUserAgent A String to set as the user-agent when initializing an HttpRequest if - * the HttpRequest doesn't already have a user-agent header. - */ - public RetryHttpInitializer(Credential credential, String defaultUserAgent) { - this( - credential, - RetryHttpInitializerOptions.DEFAULT - .toBuilder() - .setDefaultUserAgent(defaultUserAgent) - .build()); - } + private static class IoExceptionHandler implements HttpIOExceptionHandler { - /** - * @param credential A credential which will be set as an interceptor on HttpRequests and as the - * delegate for a CredentialOrBackoffResponseHandler. - * @param options An options that configure {@link RetryHttpInitializer} instance behaviour. - */ - public RetryHttpInitializer(Credential credential, RetryHttpInitializerOptions options) { - this.credential = credential; - this.options = options; - this.sleeperOverride = null; - } + private final HttpIOExceptionHandler delegate; - @Override - public void initialize(HttpRequest request) { - // Credential must be the interceptor to fill in accessToken fields. - request.setInterceptor(credential); - - // Request will be retried if server errors (5XX) or I/O errors are encountered. - request.setNumberOfRetries(options.getMaxRequestRetries()); - - // Set the timeout configurations. - request.setConnectTimeout(Math.toIntExact(options.getConnectTimeout().toMillis())); - request.setReadTimeout(Math.toIntExact(options.getReadTimeout().toMillis())); - - // IOExceptions such as "socket timed out" of "insufficient bytes written" will follow a - // straightforward backoff. - HttpBackOffIOExceptionHandler exceptionHandler = - new HttpBackOffIOExceptionHandler(getDefaultBackOff()); - if (sleeperOverride != null) { - exceptionHandler.setSleeper(sleeperOverride); + public IoExceptionHandler() { + // Retry IOExceptions such as "socket timed out" of "insufficient bytes written" with backoff. + this.delegate = new HttpBackOffIOExceptionHandler(BACKOFF_BUILDER.build()); } - // Supply a new composite handler for unsuccessful return codes. 401 Unauthorized will be - // handled by the Credential, 410 Gone will be logged, and 5XX will be handled by a backoff - // handler. - LoggingResponseHandler loggingResponseHandler = - new LoggingResponseHandler( - new CredentialOrBackoffResponseHandler(), - exceptionHandler, - ImmutableSet.of(HttpStatus.SC_GONE, HttpStatus.SC_SERVICE_UNAVAILABLE), - ImmutableSet.of(HTTP_SC_TOO_MANY_REQUESTS)); - request.setUnsuccessfulResponseHandler(loggingResponseHandler); - request.setIOExceptionHandler(loggingResponseHandler); - - if (isNullOrEmpty(request.getHeaders().getUserAgent()) - && !isNullOrEmpty(options.getDefaultUserAgent())) { - logger.atFiner().log( - "Request is missing a user-agent, adding default value of '%s'", - options.getDefaultUserAgent()); - request.getHeaders().setUserAgent(options.getDefaultUserAgent()); + @Override + public boolean handleIOException(HttpRequest httpRequest, boolean supportsRetry) + throws IOException { + // We sadly don't get anything helpful to see if this is something we want to log. + // As a result we'll turn down the logging level to debug. + logger.atFine().log("Encountered an IOException when accessing URL %s", httpRequest.getUrl()); + return delegate.handleIOException(httpRequest, supportsRetry); } - - request.getHeaders().putAll(options.getHttpHeaders()); - } - - public Credential getCredential() { - return credential; - } - - private static BackOff getDefaultBackOff() { - return new ExponentialBackOff.Builder() - // Set initial timeout to 1.25 seconds to have a 1 second minimum initial interval - // after 0.2 randomization factor will be applied - .setInitialIntervalMillis(1_250) - .setMultiplier(1.6) - .setRandomizationFactor(0.2) - .setMaxIntervalMillis(20_000) - // 30 minutes - .setMaxElapsedTimeMillis(1_800_000) - .build(); - } - - /** Overrides the default Sleepers used in backoff retry handler instances. */ - @VisibleForTesting - void setSleeperOverride(Sleeper sleeper) { - sleeperOverride = sleeper; } }