Skip to content

Commit 607362a

Browse files
authored
Add support for anonymous in-process servers. (#8589)
Support anonymous in-process servers, and InProcessChannelBuilder.forTarget. Anonymous servers aren't registered statically, meaning they can't be looked up by name. Only the AnonymousInProcessSocketAddress passed to InProcessServerBuilder.forAddress(), (or subsequently fetched from Server.getListenSockets()) can be used to connect to the server. Supporting InProcessChannelBuilder.forTarget is particularly useful for production Android usage of in-process servers, where process startup latency is crucial. A custom name resolver can be used to create the server instance on demand without directly impacting the startup latency of in-process gRPC clients. Together, these features support a more-standard approach to "OnDeviceServer" referenced in gRFC L73. https://github.com/grpc/proposal/blob/master/L73-java-binderchannel.md#ondeviceserver
1 parent 203515d commit 607362a

File tree

8 files changed

+316
-46
lines changed

8 files changed

+316
-46
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2021 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.inprocess;
18+
19+
import static com.google.common.base.Preconditions.checkState;
20+
21+
import io.grpc.ExperimentalApi;
22+
import java.io.IOException;
23+
import java.net.SocketAddress;
24+
import javax.annotation.Nullable;
25+
import javax.annotation.concurrent.GuardedBy;
26+
27+
/**
28+
* Custom SocketAddress class for {@link InProcessTransport}, for
29+
* a server which can only be referenced via this address instance.
30+
*/
31+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8626")
32+
public final class AnonymousInProcessSocketAddress extends SocketAddress {
33+
private static final long serialVersionUID = -8567592561863414695L;
34+
35+
@Nullable
36+
@GuardedBy("this")
37+
private InProcessServer server;
38+
39+
/** Creates a new AnonymousInProcessSocketAddress. */
40+
public AnonymousInProcessSocketAddress() { }
41+
42+
@Nullable
43+
synchronized InProcessServer getServer() {
44+
return server;
45+
}
46+
47+
synchronized void setServer(InProcessServer server) throws IOException {
48+
if (this.server != null) {
49+
throw new IOException("Server instance already registered");
50+
}
51+
this.server = server;
52+
}
53+
54+
synchronized void clearServer(InProcessServer server) {
55+
checkState(this.server == server);
56+
this.server = null;
57+
}
58+
}

core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,28 @@ public final class InProcessChannelBuilder extends
5555
* @return a new builder
5656
*/
5757
public static InProcessChannelBuilder forName(String name) {
58-
return new InProcessChannelBuilder(name);
58+
return forAddress(new InProcessSocketAddress(checkNotNull(name, "name")));
5959
}
6060

6161
/**
62-
* Always fails. Call {@link #forName} instead.
62+
* Create a channel builder that will connect to the server referenced by the given target URI.
63+
* Only intended for use with a custom name resolver.
64+
*
65+
* @param target the identity of the server to connect to
66+
* @return a new builder
6367
*/
64-
@DoNotCall("Unsupported. Use forName() instead")
6568
public static InProcessChannelBuilder forTarget(String target) {
66-
throw new UnsupportedOperationException("call forName() instead");
69+
return new InProcessChannelBuilder(null, checkNotNull(target, "target"));
70+
}
71+
72+
/**
73+
* Create a channel builder that will connect to the server referenced by the given address.
74+
*
75+
* @param address the address of the server to connect to
76+
* @return a new builder
77+
*/
78+
public static InProcessChannelBuilder forAddress(SocketAddress address) {
79+
return new InProcessChannelBuilder(checkNotNull(address, "address"), null);
6780
}
6881

6982
/**
@@ -75,13 +88,11 @@ public static InProcessChannelBuilder forAddress(String name, int port) {
7588
}
7689

7790
private final ManagedChannelImplBuilder managedChannelImplBuilder;
78-
private final String name;
7991
private ScheduledExecutorService scheduledExecutorService;
8092
private int maxInboundMetadataSize = Integer.MAX_VALUE;
8193
private boolean transportIncludeStatusCause = false;
8294

83-
private InProcessChannelBuilder(String name) {
84-
this.name = checkNotNull(name, "name");
95+
private InProcessChannelBuilder(@Nullable SocketAddress directAddress, @Nullable String target) {
8596

8697
final class InProcessChannelTransportFactoryBuilder implements ClientTransportFactoryBuilder {
8798
@Override
@@ -90,8 +101,13 @@ public ClientTransportFactory buildClientTransportFactory() {
90101
}
91102
}
92103

93-
managedChannelImplBuilder = new ManagedChannelImplBuilder(new InProcessSocketAddress(name),
94-
"localhost", new InProcessChannelTransportFactoryBuilder(), null);
104+
if (directAddress != null) {
105+
managedChannelImplBuilder = new ManagedChannelImplBuilder(directAddress, "localhost",
106+
new InProcessChannelTransportFactoryBuilder(), null);
107+
} else {
108+
managedChannelImplBuilder = new ManagedChannelImplBuilder(target,
109+
new InProcessChannelTransportFactoryBuilder(), null);
110+
}
95111

96112
// In-process transport should not record its traffic to the stats module.
97113
// https://github.com/grpc/grpc-java/issues/2284
@@ -204,7 +220,7 @@ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
204220

205221
ClientTransportFactory buildTransportFactory() {
206222
return new InProcessClientTransportFactory(
207-
name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
223+
scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
208224
}
209225

210226
void setStatsEnabled(boolean value) {
@@ -215,18 +231,15 @@ void setStatsEnabled(boolean value) {
215231
* Creates InProcess transports. Exposed for internal use, as it should be private.
216232
*/
217233
static final class InProcessClientTransportFactory implements ClientTransportFactory {
218-
private final String name;
219234
private final ScheduledExecutorService timerService;
220235
private final boolean useSharedTimer;
221236
private final int maxInboundMetadataSize;
222237
private boolean closed;
223238
private final boolean includeCauseWithStatus;
224239

225240
private InProcessClientTransportFactory(
226-
String name,
227241
@Nullable ScheduledExecutorService scheduledExecutorService,
228242
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
229-
this.name = name;
230243
useSharedTimer = scheduledExecutorService == null;
231244
timerService = useSharedTimer
232245
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
@@ -242,7 +255,7 @@ public ConnectionClientTransport newClientTransport(
242255
}
243256
// TODO(carl-mastrangelo): Pass channelLogger in.
244257
return new InProcessTransport(
245-
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
258+
addr, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
246259
options.getEagAttributes(), includeCauseWithStatus);
247260
}
248261

core/src/main/java/io/grpc/inprocess/InProcessServer.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,16 @@ final class InProcessServer implements InternalServer {
4040
private static final ConcurrentMap<String, InProcessServer> registry
4141
= new ConcurrentHashMap<>();
4242

43-
static InProcessServer findServer(String name) {
44-
return registry.get(name);
43+
static InProcessServer findServer(SocketAddress addr) {
44+
if (addr instanceof AnonymousInProcessSocketAddress) {
45+
return ((AnonymousInProcessSocketAddress) addr).getServer();
46+
} else if (addr instanceof InProcessSocketAddress) {
47+
return registry.get(((InProcessSocketAddress) addr).getName());
48+
}
49+
return null;
4550
}
4651

47-
private final String name;
52+
private final SocketAddress listenAddress;
4853
private final int maxInboundMetadataSize;
4954
private final List<ServerStreamTracer.Factory> streamTracerFactories;
5055
private ServerListener listener;
@@ -60,7 +65,7 @@ static InProcessServer findServer(String name) {
6065
InProcessServer(
6166
InProcessServerBuilder builder,
6267
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
63-
this.name = builder.name;
68+
this.listenAddress = builder.listenAddress;
6469
this.schedulerPool = builder.schedulerPool;
6570
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
6671
this.streamTracerFactories =
@@ -72,14 +77,25 @@ public void start(ServerListener serverListener) throws IOException {
7277
this.listener = serverListener;
7378
this.scheduler = schedulerPool.getObject();
7479
// Must be last, as channels can start connecting after this point.
75-
if (registry.putIfAbsent(name, this) != null) {
76-
throw new IOException("name already registered: " + name);
80+
registerInstance();
81+
}
82+
83+
private void registerInstance() throws IOException {
84+
if (listenAddress instanceof AnonymousInProcessSocketAddress) {
85+
((AnonymousInProcessSocketAddress) listenAddress).setServer(this);
86+
} else if (listenAddress instanceof InProcessSocketAddress) {
87+
String name = ((InProcessSocketAddress) listenAddress).getName();
88+
if (registry.putIfAbsent(name, this) != null) {
89+
throw new IOException("name already registered: " + name);
90+
}
91+
} else {
92+
throw new AssertionError();
7793
}
7894
}
7995

8096
@Override
8197
public SocketAddress getListenSocketAddress() {
82-
return new InProcessSocketAddress(name);
98+
return listenAddress;
8399
}
84100

85101
@Override
@@ -99,19 +115,30 @@ public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
99115

100116
@Override
101117
public void shutdown() {
102-
if (!registry.remove(name, this)) {
103-
throw new AssertionError();
104-
}
118+
unregisterInstance();
105119
scheduler = schedulerPool.returnObject(scheduler);
106120
synchronized (this) {
107121
shutdown = true;
108122
listener.serverShutdown();
109123
}
110124
}
111125

126+
private void unregisterInstance() {
127+
if (listenAddress instanceof AnonymousInProcessSocketAddress) {
128+
((AnonymousInProcessSocketAddress) listenAddress).clearServer(this);
129+
} else if (listenAddress instanceof InProcessSocketAddress) {
130+
String name = ((InProcessSocketAddress) listenAddress).getName();
131+
if (!registry.remove(name, this)) {
132+
throw new AssertionError();
133+
}
134+
} else {
135+
throw new AssertionError();
136+
}
137+
}
138+
112139
@Override
113140
public String toString() {
114-
return MoreObjects.toStringHelper(this).add("name", name).toString();
141+
return MoreObjects.toStringHelper(this).add("listenAddress", listenAddress).toString();
115142
}
116143

117144
synchronized ServerTransportListener register(InProcessTransport transport) {

core/src/main/java/io/grpc/inprocess/InProcessServerBuilder.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
3535
import io.grpc.internal.SharedResourcePool;
3636
import java.io.File;
37+
import java.net.SocketAddress;
3738
import java.util.List;
3839
import java.util.UUID;
3940
import java.util.concurrent.ScheduledExecutorService;
@@ -81,7 +82,16 @@ public final class InProcessServerBuilder extends
8182
* @return a new builder
8283
*/
8384
public static InProcessServerBuilder forName(String name) {
84-
return new InProcessServerBuilder(name);
85+
return forAddress(new InProcessSocketAddress(checkNotNull(name, "name")));
86+
}
87+
88+
/**
89+
* Create a server builder which listens on the given address.
90+
* @param listenAddress The SocketAddress this server will listen on.
91+
* @return a new builder
92+
*/
93+
public static InProcessServerBuilder forAddress(SocketAddress listenAddress) {
94+
return new InProcessServerBuilder(listenAddress);
8595
}
8696

8797
/**
@@ -100,13 +110,13 @@ public static String generateName() {
100110
}
101111

102112
private final ServerImplBuilder serverImplBuilder;
103-
final String name;
113+
final SocketAddress listenAddress;
104114
int maxInboundMetadataSize = Integer.MAX_VALUE;
105115
ObjectPool<ScheduledExecutorService> schedulerPool =
106116
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
107117

108-
private InProcessServerBuilder(String name) {
109-
this.name = Preconditions.checkNotNull(name, "name");
118+
private InProcessServerBuilder(SocketAddress listenAddress) {
119+
this.listenAddress = checkNotNull(listenAddress, "listenAddress");
110120

111121
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
112122
@Override

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import io.grpc.internal.StatsTraceContext;
6060
import io.grpc.internal.StreamListener;
6161
import java.io.InputStream;
62+
import java.net.SocketAddress;
6263
import java.util.ArrayDeque;
6364
import java.util.ArrayList;
6465
import java.util.Collections;
@@ -80,7 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
8081
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
8182

8283
private final InternalLogId logId;
83-
private final String name;
84+
private final SocketAddress address;
8485
private final int clientMaxInboundMetadataSize;
8586
private final String authority;
8687
private final String userAgent;
@@ -119,29 +120,29 @@ protected void handleNotInUse() {
119120
}
120121
};
121122

122-
private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
123+
private InProcessTransport(SocketAddress address, int maxInboundMetadataSize, String authority,
123124
String userAgent, Attributes eagAttrs,
124125
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
125-
this.name = name;
126+
this.address = address;
126127
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
127128
this.authority = authority;
128129
this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
129130
checkNotNull(eagAttrs, "eagAttrs");
130131
this.attributes = Attributes.newBuilder()
131132
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
132133
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
133-
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
134-
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
134+
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
135+
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
135136
.build();
136137
this.optionalServerListener = optionalServerListener;
137-
logId = InternalLogId.allocate(getClass(), name);
138+
logId = InternalLogId.allocate(getClass(), address.toString());
138139
this.includeCauseWithStatus = includeCauseWithStatus;
139140
}
140141

141142
public InProcessTransport(
142-
String name, int maxInboundMetadataSize, String authority, String userAgent,
143+
SocketAddress address, int maxInboundMetadataSize, String authority, String userAgent,
143144
Attributes eagAttrs, boolean includeCauseWithStatus) {
144-
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
145+
this(address, maxInboundMetadataSize, authority, userAgent, eagAttrs,
145146
Optional.<ServerListener>absent(), includeCauseWithStatus);
146147
}
147148

@@ -150,7 +151,7 @@ public InProcessTransport(
150151
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
151152
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
152153
ServerListener serverListener) {
153-
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
154+
this(new InProcessSocketAddress(name), maxInboundMetadataSize, authority, userAgent, eagAttrs,
154155
Optional.of(serverListener), false);
155156
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
156157
this.serverSchedulerPool = serverSchedulerPool;
@@ -165,7 +166,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
165166
serverScheduler = serverSchedulerPool.getObject();
166167
serverTransportListener = optionalServerListener.get().transportCreated(this);
167168
} else {
168-
InProcessServer server = InProcessServer.findServer(name);
169+
InProcessServer server = InProcessServer.findServer(address);
169170
if (server != null) {
170171
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
171172
serverSchedulerPool = server.getScheduledExecutorServicePool();
@@ -176,7 +177,7 @@ public synchronized Runnable start(ManagedClientTransport.Listener listener) {
176177
}
177178
}
178179
if (serverTransportListener == null) {
179-
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);
180+
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + address);
180181
final Status localShutdownStatus = shutdownStatus;
181182
return new Runnable() {
182183
@Override
@@ -194,8 +195,8 @@ public void run() {
194195
public void run() {
195196
synchronized (InProcessTransport.this) {
196197
Attributes serverTransportAttrs = Attributes.newBuilder()
197-
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
198-
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
198+
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address)
199+
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, address)
199200
.build();
200201
serverStreamAttributes = serverTransportListener.transportReady(serverTransportAttrs);
201202
clientTransportListener.transportReady();
@@ -307,7 +308,7 @@ public void shutdownNow(Status reason) {
307308
public String toString() {
308309
return MoreObjects.toStringHelper(this)
309310
.add("logId", logId.getId())
310-
.add("name", name)
311+
.add("address", address)
311312
.toString();
312313
}
313314

0 commit comments

Comments
 (0)