Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GH-1375 Improve performance of NetHttpRequest write timeouts by provding a possibility to use custom threadpool #1494

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -153,6 +154,12 @@ public final class HttpRequest {
/** Timeout in milliseconds to set POST/PUT data or {@code 0} for an infinite timeout. */
private int writeTimeout = 0;

/**
* Custom executor used to handle write timeouts, enabled by {@link #writeTimeout}.
* By default, a new thread is created per each request.
*/
private ExecutorService writeTimeoutExecutor;

/** HTTP unsuccessful (non-2XX) response handler or {@code null} for none. */
private HttpUnsuccessfulResponseHandler unsuccessfulResponseHandler;

Expand Down Expand Up @@ -490,6 +497,17 @@ public HttpRequest setWriteTimeout(int writeTimeout) {
}

/**
* Sets custom executor used to handle write timeouts, enabled by {@link #setWriteTimeout(int)}.
* By default, a new thread is created per each request.
*
* @since 1.40.2
*/
public HttpRequest setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {
this.writeTimeoutExecutor = writeTimeoutExecutor;
return this;
}

/**
* Returns the HTTP request headers.
*
* @since 1.5
Expand Down Expand Up @@ -1003,6 +1021,7 @@ public HttpResponse execute() throws IOException {
// execute
lowLevelHttpRequest.setTimeout(connectTimeout, readTimeout);
lowLevelHttpRequest.setWriteTimeout(writeTimeout);
lowLevelHttpRequest.setWriteTimeoutExecutor(writeTimeoutExecutor);

// switch tracing scope to current span
@SuppressWarnings("MustBeClosedChecker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.api.client.util.StreamingContent;
import java.io.IOException;
import java.util.concurrent.ExecutorService;

/**
* Low-level HTTP request.
Expand Down Expand Up @@ -158,6 +159,16 @@ public void setTimeout(int connectTimeout, int readTimeout) throws IOException {
*/
public void setWriteTimeout(int writeTimeout) throws IOException {}

/**
* Sets custom timeout executor for POST/PUT requests.
*
* <p>Default implementation uses a new thread per each {@link #execute()} call.
*
* @param writeTimeoutExecutor custom timeout executor to use
* @since 1.40.2
*/
public void setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {}

/** Executes the request and returns a low-level HTTP response object. */
public abstract LowLevelHttpResponse execute() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class NetHttpRequest extends LowLevelHttpRequest {

private final HttpURLConnection connection;
private int writeTimeout;
private ExecutorService writeTimeoutExecutor;

/** @param connection HTTP URL connection */
NetHttpRequest(HttpURLConnection connection) {
Expand Down Expand Up @@ -65,6 +66,11 @@ public void setWriteTimeout(int writeTimeout) throws IOException {
this.writeTimeout = writeTimeout;
}

@Override
public void setWriteTimeoutExecutor(ExecutorService writeTimeoutExecutor) {
this.writeTimeoutExecutor = writeTimeoutExecutor;
}

interface OutputWriter {
void write(OutputStream outputStream, StreamingContent content) throws IOException;
}
Expand Down Expand Up @@ -184,9 +190,13 @@ public Boolean call() throws IOException {
}
};

final ExecutorService executor = Executors.newSingleThreadExecutor();
final boolean externalWriteTimeoutExecutor = writeTimeoutExecutor != null;
final ExecutorService executor = externalWriteTimeoutExecutor ? writeTimeoutExecutor : Executors.newSingleThreadExecutor();
final Future<Boolean> future = executor.submit(new FutureTask<Boolean>(writeContent), null);
executor.shutdown();

if (!externalWriteTimeoutExecutor) {
executor.shutdown();
}

try {
future.get(writeTimeout, TimeUnit.MILLISECONDS);
Expand All @@ -197,7 +207,8 @@ public Boolean call() throws IOException {
} catch (TimeoutException e) {
throw new IOException("Socket write timed out", e);
}
if (!executor.isTerminated()) {

if (!externalWriteTimeoutExecutor && !executor.isTerminated()) {
executor.shutdown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import java.io.OutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.junit.Test;
import javax.annotation.Nullable;

public class NetHttpRequestTest {

Expand Down Expand Up @@ -44,7 +47,7 @@ public void testHangingWrite() throws InterruptedException {
@Override
public void run() {
try {
postWithTimeout(0);
postWithTimeout(0, null);
} catch (IOException e) {
// expected to be interrupted
assertEquals(e.getCause().getClass(), InterruptedException.class);
Expand All @@ -63,9 +66,9 @@ public void run() {
}

@Test(timeout = 1000)
public void testOutputStreamWriteTimeout() throws Exception {
public void testOutputStreamWriteTimeout() {
try {
postWithTimeout(100);
postWithTimeout(100, null);
fail("should have timed out");
} catch (IOException e) {
assertEquals(e.getCause().getClass(), TimeoutException.class);
Expand All @@ -74,10 +77,25 @@ public void testOutputStreamWriteTimeout() throws Exception {
}
}

private static void postWithTimeout(int timeout) throws Exception {
@Test(timeout = 1000)
public void testOutputStreamWriteTimeoutWithCustomExecutor() {
ExecutorService customWriteTimeoutExecutor = Executors.newSingleThreadExecutor();
try {
postWithTimeout(100, customWriteTimeoutExecutor);
fail("should have timed out");
} catch (IOException e) {
assertEquals(e.getCause().getClass(), TimeoutException.class);
assertFalse(customWriteTimeoutExecutor.isTerminated());
} catch (Exception e) {
fail("Expected an IOException not a " + e.getCause().getClass().getName());
}
}

private static void postWithTimeout(int timeout, @Nullable ExecutorService writeTimeoutExecutor) throws Exception {
MockHttpURLConnection connection = new MockHttpURLConnection(new URL(HttpTesting.SIMPLE_URL));
connection.setRequestMethod("POST");
NetHttpRequest request = new NetHttpRequest(connection);
request.setWriteTimeoutExecutor(writeTimeoutExecutor);
InputStream is = NetHttpRequestTest.class.getClassLoader().getResourceAsStream("file.txt");
HttpContent content = new InputStreamContent("text/plain", is);
request.setStreamingContent(content);
Expand Down