Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: add google-c2p dependence to DirectPath
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Oct 19, 2021
1 parent d8188d3 commit 8bfcd10
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
Expand Up @@ -109,6 +109,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
@Nullable private final Boolean allowNonDefaultServiceAccount;
@VisibleForTesting final ImmutableMap<String, ?> directPathServiceConfig;
@Nullable private final MtlsProvider mtlsProvider;
@Nullable @VisibleForTesting private String activeEndpoint;

@Nullable
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
Expand Down Expand Up @@ -136,6 +137,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
builder.directPathServiceConfig == null
? getDefaultDirectPathServiceConfig()
: builder.directPathServiceConfig;
this.activeEndpoint = builder.endpoint;
}

/**
Expand Down Expand Up @@ -339,15 +341,16 @@ && isOnComputeEngine()) {
isDirectPathXdsEnabled = Boolean.parseBoolean(envProvider.getenv(DIRECT_PATH_ENV_ENABLE_XDS));
if (isDirectPathXdsEnabled) {
// google-c2p resolver target must not have a port number
builder = ComputeEngineChannelBuilder.forTarget("google-c2p:///" + serviceAddress);
activeEndpoint = "google-c2p:///" + serviceAddress;
builder = ComputeEngineChannelBuilder.forTarget(activeEndpoint);
} else {
builder = ComputeEngineChannelBuilder.forAddress(serviceAddress, port);
// Set default keepAliveTime and keepAliveTimeout when directpath environment is enabled.
// Will be overridden by user defined values if any.
builder.keepAliveTime(DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS);
builder.keepAliveTimeout(DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
builder.defaultServiceConfig(directPathServiceConfig);
}
// Set default keepAliveTime and keepAliveTimeout when directpath environment is enabled.
// Will be overridden by user defined values if any.
builder.keepAliveTime(DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS);
builder.keepAliveTimeout(DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} else {
ChannelCredentials channelCredentials = createMtlsChannelCredentials();
if (channelCredentials != null) {
Expand Down Expand Up @@ -398,11 +401,16 @@ && isOnComputeEngine()) {
return managedChannel;
}

/** The endpoint to be used for the channel. */
/** The endpoint passed to the builder. It must have a pot, and must not have a URI scheme. */
public String getEndpoint() {
return endpoint;
}

/** The actual endpoint to be used for building a channel. */
public String getActiveEndpoint() {
return activeEndpoint;
}

/** The time without read activity before sending a keepalive ping. */
public Duration getKeepAliveTime() {
return keepAliveTime;
Expand Down Expand Up @@ -535,6 +543,12 @@ Builder setMtlsProvider(MtlsProvider mtlsProvider) {
return this;
}

@VisibleForTesting
Builder setEnvProvider(EnvironmentProvider envProvider) {
this.envProvider = envProvider;
return this;
}

/**
* Sets the GrpcInterceptorProvider for this TransportChannelProvider.
*
Expand Down
Expand Up @@ -67,6 +67,23 @@

@RunWith(JUnit4.class)
public class InstantiatingGrpcChannelProviderTest extends AbstractMtlsTransportChannelTest {
static class TestEnvironmentProvider
implements InstantiatingGrpcChannelProvider.EnvironmentProvider {
private final String isDirectPathXdsEnabled;

TestEnvironmentProvider(String isDirectPathXdsEnabled) {
this.isDirectPathXdsEnabled = isDirectPathXdsEnabled;
}

@Override
public String getenv(String env) {
if (env.equals("GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS")) {
return isDirectPathXdsEnabled;
}
return System.getenv(env);
}
}

@Test
public void testEndpoint() {
String endpoint = "localhost:8080";
Expand Down Expand Up @@ -386,6 +403,30 @@ public ManagedChannelBuilder apply(ManagedChannelBuilder channelBuilder) {
provider.getTransportChannel().shutdownNow();
}

@Test
public void testWithDirectPathXds() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
executor.shutdown();

InstantiatingGrpcChannelProvider iProvider =
InstantiatingGrpcChannelProvider.newBuilder()
.setAttemptDirectPath(true)
.setEnvProvider(new TestEnvironmentProvider(/*isDirectPathXdsEnabled = */ "true"))
.build();

TransportChannelProvider provider =
iProvider
.withHeaders(Collections.<String, String>emptyMap())
.withExecutor((Executor) executor)
.withEndpoint("localhost:8080")
.withCredentials(ComputeEngineCredentials.create());
;

provider.getTransportChannel().shutdownNow();

assertEquals(iProvider.getActiveEndpoint(), "google-c2p:///localhost");
}

@Test
public void testWithIPv6Address() throws IOException {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
Expand Down

0 comments on commit 8bfcd10

Please sign in to comment.