Skip to content

Commit

Permalink
fix: Improve performance of NetHttpRequest write timeouts by providin…
Browse files Browse the repository at this point in the history
…g a possibility to use custom threadpool
  • Loading branch information
dzikoysk committed Nov 1, 2021
1 parent fbdecb4 commit e622b5e
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -153,6 +154,12 @@ public final class HttpRequest {
/** Timeout in milliseconds to set POST/PUT data or {@code 0} for an infinite timeout. */
private int writeTimeout = 0;

/**
* Custom executor used to handle write timeouts, enabled by {@link #writeTimeout}.
* By default, a new thread is created per each request.
*/
private ExecutorService writeTimeoutExecutor;

/** HTTP unsuccessful (non-2XX) response handler or {@code null} for none. */
private HttpUnsuccessfulResponseHandler unsuccessfulResponseHandler;

Expand Down Expand Up @@ -490,6 +497,17 @@ public HttpRequest setWriteTimeout(int writeTimeout) {
}

/**
* Sets custom executor used to handle write timeouts, enabled by {@link #setWriteTimeout(int)}.
* By default, a new thread is created per each request.
*
* @since 1.40.2
*/
public HttpRequest setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {
this.writeTimeoutExecutor = writeTimeoutExecutor;
return this;
}

/**
* Returns the HTTP request headers.
*
* @since 1.5
Expand Down Expand Up @@ -1003,6 +1021,7 @@ public HttpResponse execute() throws IOException {
// execute
lowLevelHttpRequest.setTimeout(connectTimeout, readTimeout);
lowLevelHttpRequest.setWriteTimeout(writeTimeout);
lowLevelHttpRequest.setWriteTimeoutExecutor(writeTimeoutExecutor);

// switch tracing scope to current span
@SuppressWarnings("MustBeClosedChecker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.api.client.util.StreamingContent;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
* Low-level HTTP request.
Expand Down Expand Up @@ -158,6 +159,16 @@ public void setTimeout(int connectTimeout, int readTimeout) throws IOException {
*/
public void setWriteTimeout(int writeTimeout) throws IOException {}

/**
* Sets custom timeout executor for POST/PUT requests.
*
* <p>Default implementation uses a new thread per each {@link #execute()} call.
*
* @param writeTimeoutExecutor custom timeout executor to use
* @since 1.40.2
*/
public void setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {}

/** Executes the request and returns a low-level HTTP response object. */
public abstract LowLevelHttpResponse execute() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class NetHttpRequest extends LowLevelHttpRequest {

private final HttpURLConnection connection;
private int writeTimeout;
private ExecutorService writeTimeoutExecutor;

/** @param connection HTTP URL connection */
NetHttpRequest(HttpURLConnection connection) {
Expand Down Expand Up @@ -65,6 +66,11 @@ public void setWriteTimeout(int writeTimeout) throws IOException {
this.writeTimeout = writeTimeout;
}

@Override
public void setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {
this.writeTimeoutExecutor = writeTimeoutExecutor;
}

interface OutputWriter {
void write(OutputStream outputStream, StreamingContent content) throws IOException;
}
Expand Down Expand Up @@ -184,9 +190,13 @@ public Boolean call() throws IOException {
}
};

final ExecutorService executor = Executors.newSingleThreadExecutor();
final boolean externalWriteTimeoutExecutor = writeTimeoutExecutor != null;
final ExecutorService executor = externalWriteTimeoutExecutor ? writeTimeoutExecutor : Executors.newSingleThreadExecutor();
final Future<Boolean> future = executor.submit(new FutureTask<Boolean>(writeContent), null);
executor.shutdown();

if (!externalWriteTimeoutExecutor) {
executor.shutdown();
}

try {
future.get(writeTimeout, TimeUnit.MILLISECONDS);
Expand All @@ -197,7 +207,8 @@ public Boolean call() throws IOException {
} catch (TimeoutException e) {
throw new IOException("Socket write timed out", e);
}
if (!executor.isTerminated()) {

if (!externalWriteTimeoutExecutor && !executor.isTerminated()) {
executor.shutdown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.io.OutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import javax.annotation.Nullable;

public class NetHttpRequestTest {

Expand Down Expand Up @@ -44,7 +47,7 @@ public void testHangingWrite() throws InterruptedException {
@Override
public void run() {
try {
postWithTimeout(0);
postWithTimeout(0, null);
} catch (IOException e) {
// expected to be interrupted
assertEquals(e.getCause().getClass(), InterruptedException.class);
Expand All @@ -63,9 +66,9 @@ public void run() {
}

@Test(timeout = 1000)
public void testOutputStreamWriteTimeout() throws Exception {
public void testOutputStreamWriteTimeout() {
try {
postWithTimeout(100);
postWithTimeout(100, null);
fail("should have timed out");
} catch (IOException e) {
assertEquals(e.getCause().getClass(), TimeoutException.class);
Expand All @@ -74,10 +77,25 @@ public void testOutputStreamWriteTimeout() throws Exception {
}
}

private static void postWithTimeout(int timeout) throws Exception {
@Test(timeout = 1000)
public void testOutputStreamWriteTimeoutWithCustomExecutor() {
ExecutorService customWriteTimeoutExecutor = Executors.newSingleThreadExecutor();
try {
postWithTimeout(100, customWriteTimeoutExecutor);
fail("should have timed out");
} catch (IOException e) {
assertEquals(e.getCause().getClass(), TimeoutException.class);
assertFalse(customWriteTimeoutExecutor.isTerminated());
} catch (Exception e) {
fail("Expected an IOException not a " + e.getCause().getClass().getName());
}
}

private static void postWithTimeout(int timeout, @Nullable ExecutorService writeTimeoutExecutor) throws Exception {
MockHttpURLConnection connection = new MockHttpURLConnection(new URL(HttpTesting.SIMPLE_URL));
connection.setRequestMethod("POST");
NetHttpRequest request = new NetHttpRequest(connection);
request.setWriteTimeoutExecutor(writeTimeoutExecutor);
InputStream is = NetHttpRequestTest.class.getClassLoader().getResourceAsStream("file.txt");
HttpContent content = new InputStreamContent("text/plain", is);
request.setStreamingContent(content);
Expand Down

0 comments on commit e622b5e

Please sign in to comment.