Skip to content

Commit eac4178

Browse files
authored
okhttp: add max connection idle at OkHttpServer and fix test (#9533)
* Revert "Revert "okhttp: add max connection idle at OkHttpServer (#9494)" (#9528)" This reverts commit 95b9d6d and fixed flaky test.
1 parent 88a035e commit eac4178

File tree

3 files changed

+120
-6
lines changed

3 files changed

+120
-6
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.okhttp;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
20+
1921
import com.google.common.annotations.VisibleForTesting;
2022
import com.google.common.base.Preconditions;
2123
import com.google.errorprone.annotations.DoNotCall;
@@ -62,6 +64,10 @@
6264
public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpServerBuilder> {
6365
private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName());
6466
private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;
67+
68+
static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
69+
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
70+
6571
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
6672
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
6773
OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL;
@@ -110,6 +116,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
110116
int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
111117
int maxInboundMetadataSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
112118
int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
119+
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
113120

114121
@VisibleForTesting
115122
OkHttpServerBuilder(
@@ -178,6 +185,27 @@ public OkHttpServerBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit)
178185
return this;
179186
}
180187

188+
/**
189+
* Sets a custom max connection idle time, connection being idle for longer than which will be
190+
* gracefully terminated. Idleness duration is defined since the most recent time the number of
191+
* outstanding RPCs became zero or the connection establishment. An unreasonably small value might
192+
* be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
193+
* max connection idle.
194+
*/
195+
@Override
196+
public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) {
197+
checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s",
198+
maxConnectionIdle);
199+
maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle);
200+
if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) {
201+
maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
202+
}
203+
if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) {
204+
maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO;
205+
}
206+
return this;
207+
}
208+
181209
/**
182210
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
183211
* without any read activity on the connection, the connection is considered dead. An unreasonably

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.okhttp;
1818

19+
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
20+
1921
import com.google.common.base.Preconditions;
2022
import com.google.common.util.concurrent.Futures;
2123
import com.google.common.util.concurrent.ListenableFuture;
@@ -28,6 +30,7 @@
2830
import io.grpc.Status;
2931
import io.grpc.internal.GrpcUtil;
3032
import io.grpc.internal.KeepAliveManager;
33+
import io.grpc.internal.MaxConnectionIdleManager;
3134
import io.grpc.internal.ObjectPool;
3235
import io.grpc.internal.SerializingExecutor;
3336
import io.grpc.internal.ServerTransport;
@@ -91,6 +94,7 @@ final class OkHttpServerTransport implements ServerTransport,
9194
private ScheduledExecutorService scheduledExecutorService;
9295
private Attributes attributes;
9396
private KeepAliveManager keepAliveManager;
97+
private MaxConnectionIdleManager maxConnectionIdleManager;
9498

9599
private final Object lock = new Object();
96100
@GuardedBy("lock")
@@ -189,6 +193,11 @@ private void startIo(SerializingExecutor serializingExecutor) {
189193
keepAliveManager.onTransportStarted();
190194
}
191195

196+
if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
197+
maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
198+
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
199+
}
200+
192201
transportExecutor.execute(
193202
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
194203
} catch (Error | IOException | RuntimeException ex) {
@@ -311,6 +320,9 @@ private void terminated() {
311320
if (keepAliveManager != null) {
312321
keepAliveManager.onTransportTermination();
313322
}
323+
if (maxConnectionIdleManager != null) {
324+
maxConnectionIdleManager.onTransportTermination();
325+
}
314326
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
315327
scheduledExecutorService =
316328
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
@@ -369,6 +381,9 @@ public OutboundFlowController.StreamState[] getActiveStreams() {
369381
void streamClosed(int streamId, boolean flush) {
370382
synchronized (lock) {
371383
streams.remove(streamId);
384+
if (maxConnectionIdleManager != null && streams.isEmpty()) {
385+
maxConnectionIdleManager.onTransportIdle();
386+
}
372387
if (gracefulShutdown && streams.isEmpty()) {
373388
frameWriter.close();
374389
} else {
@@ -433,6 +448,7 @@ static final class Config {
433448
final int flowControlWindow;
434449
final int maxInboundMessageSize;
435450
final int maxInboundMetadataSize;
451+
final long maxConnectionIdleNanos;
436452

437453
public Config(
438454
OkHttpServerBuilder builder,
@@ -452,6 +468,7 @@ public Config(
452468
flowControlWindow = builder.flowControlWindow;
453469
maxInboundMessageSize = builder.maxInboundMessageSize;
454470
maxInboundMetadataSize = builder.maxInboundMetadataSize;
471+
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
455472
}
456473
}
457474

@@ -697,6 +714,9 @@ public void headers(boolean outFinished,
697714
authority == null ? null : asciiString(authority),
698715
statsTraceCtx,
699716
tracer);
717+
if (maxConnectionIdleManager != null && streams.isEmpty()) {
718+
maxConnectionIdleManager.onTransportActive();
719+
}
700720
streams.put(streamId, stream);
701721
listener.streamCreated(streamForApp, method, metadata);
702722
stream.onStreamAllocated();
@@ -953,6 +973,9 @@ private void respondWithHttpError(
953973
synchronized (lock) {
954974
Http2ErrorStreamState stream =
955975
new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
976+
if (maxConnectionIdleManager != null && streams.isEmpty()) {
977+
maxConnectionIdleManager.onTransportActive();
978+
}
956979
streams.put(streamId, stream);
957980
if (inFinished) {
958981
stream.inboundDataReceived(new Buffer(), 0, true);

okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
public class OkHttpServerTransportTest {
9191
private static final int TIME_OUT_MS = 2000;
9292
private static final int INITIAL_WINDOW_SIZE = 65535;
93+
private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1);
9394

9495
private MockServerTransportListener mockTransportListener = new MockServerTransportListener();
9596
private ServerTransportListener transportListener
@@ -105,10 +106,11 @@ public class OkHttpServerTransportTest {
105106
private ExecutorService threadPool = Executors.newCachedThreadPool();
106107
private HandshakerSocketFactory handshakerSocketFactory
107108
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
109+
private final FakeClock fakeClock = new FakeClock();
108110
private OkHttpServerBuilder serverBuilder
109111
= new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory)
110112
.executor(new FakeClock().getScheduledExecutorService()) // Executor unused
111-
.scheduledExecutorService(new FakeClock().getScheduledExecutorService())
113+
.scheduledExecutorService(fakeClock.getScheduledExecutorService())
112114
.transportExecutor(new Executor() {
113115
@Override public void execute(Runnable runnable) {
114116
if (runnable instanceof OkHttpServerTransport.FrameHandler) {
@@ -119,7 +121,8 @@ public class OkHttpServerTransportTest {
119121
}
120122
}
121123
})
122-
.flowControlWindow(INITIAL_WINDOW_SIZE);
124+
.flowControlWindow(INITIAL_WINDOW_SIZE)
125+
.maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS);
123126

124127
@Rule public final Timeout globalTimeout = Timeout.seconds(10);
125128

@@ -146,6 +149,64 @@ public void startThenShutdown() throws Exception {
146149
shutdownAndTerminate(/*lastStreamId=*/ 0);
147150
}
148151

152+
@Test
153+
public void maxConnectionIdleTimer() throws Exception {
154+
initTransport();
155+
handshake();
156+
clientFrameWriter.headers(1, Arrays.asList(
157+
HTTP_SCHEME_HEADER,
158+
METHOD_HEADER,
159+
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
160+
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
161+
CONTENT_TYPE_HEADER,
162+
TE_HEADER));
163+
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
164+
new Header("some-client-sent-trailer", "trailer-value")));
165+
pingPong();
166+
167+
MockStreamListener streamListener = mockTransportListener.newStreams.pop();
168+
assertThat(streamListener.messages.peek()).isNull();
169+
assertThat(streamListener.halfClosedCalled).isTrue();
170+
171+
streamListener.stream.close(Status.OK, new Metadata());
172+
173+
List<Header> responseTrailers = Arrays.asList(
174+
new Header(":status", "200"),
175+
CONTENT_TYPE_HEADER,
176+
new Header("grpc-status", "0"));
177+
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
178+
verify(clientFramesRead)
179+
.headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS);
180+
181+
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
182+
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
183+
verifyGracefulShutdown(1);
184+
}
185+
186+
@Test
187+
public void maxConnectionIdleTimer_respondWithError() throws Exception {
188+
initTransport();
189+
handshake();
190+
191+
clientFrameWriter.headers(1, Arrays.asList(
192+
HTTP_SCHEME_HEADER,
193+
METHOD_HEADER,
194+
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
195+
CONTENT_TYPE_HEADER,
196+
TE_HEADER,
197+
new Header("host", "example.com:80"),
198+
new Header("host", "example.com:80")));
199+
clientFrameWriter.flush();
200+
201+
verifyHttpError(
202+
1, 400, Status.Code.INTERNAL, "Multiple host headers disallowed. RFC7230 section 5.4");
203+
204+
pingPong();
205+
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
206+
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
207+
verifyGracefulShutdown(1);
208+
}
209+
149210
@Test
150211
public void startThenShutdownTwice() throws Exception {
151212
initTransport();
@@ -316,7 +377,8 @@ public void activeRpc_delaysShutdownTermination() throws Exception {
316377
clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size());
317378
pingPong();
318379

319-
shutdownAndVerifyGraceful(1);
380+
serverTransport.shutdown();
381+
verifyGracefulShutdown(1);
320382
verify(transportListener, never()).transportTerminated();
321383

322384
MockStreamListener streamListener = mockTransportListener.newStreams.pop();
@@ -1038,8 +1100,8 @@ private Metadata metadata(String... keysAndValues) {
10381100
return metadata;
10391101
}
10401102

1041-
private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
1042-
serverTransport.shutdown();
1103+
private void verifyGracefulShutdown(int lastStreamId)
1104+
throws IOException {
10431105
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
10441106
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
10451107
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
@@ -1052,7 +1114,8 @@ private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
10521114

10531115
private void shutdownAndTerminate(int lastStreamId) throws IOException {
10541116
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
1055-
shutdownAndVerifyGraceful(lastStreamId);
1117+
serverTransport.shutdown();
1118+
verifyGracefulShutdown(lastStreamId);
10561119
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
10571120
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
10581121
}

0 commit comments

Comments
 (0)