diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 8e9171b57..e43502fd3 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -106,11 +106,11 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; - enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN } + enum ConnectionHandshake { READY, ACTIVE, DRAINING, GRACEFUL_SHUTDOWN, SHUTDOWN } enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final ProtocolIOSession ioSession; - private final FrameFactory frameFactory; + final FrameFactory frameFactory; private final HttpProcessor httpProcessor; private final H2Config localConfig; private final BasicH2TransportMetrics inputMetrics; @@ -121,14 +121,14 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private final Deque outputQueue; private final HPackEncoder hPackEncoder; private final HPackDecoder hPackDecoder; - private final H2Streams streams; + final H2Streams streams; private final Queue pingHandlers; private final AtomicInteger connInputWindow; private final AtomicInteger connOutputWindow; private final AtomicInteger outputRequests; private final H2StreamListener streamListener; - private ConnectionHandshake connState = ConnectionHandshake.READY; + ConnectionHandshake connState = ConnectionHandshake.READY; private SettingsHandshake localSettingState = SettingsHandshake.READY; private SettingsHandshake remoteSettingState = SettingsHandshake.READY; @@ -297,7 +297,7 @@ private void commitFrameInternal(final RawFrame frame) throws IOException { ioSession.setEvent(SelectionKey.OP_WRITE); } - private void commitFrame(final RawFrame frame) throws IOException { + void commitFrame(final RawFrame frame) throws IOException { Args.notNull(frame, "Frame"); ioSession.getLock().lock(); try { @@ -506,6 +506,10 @@ public final void onOutput() throws HttpException, IOException { ioSession.getLock().unlock(); } + if (beforeOutputProcessing()) { + return; + } + if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) { if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) { @@ -581,23 +585,7 @@ public final void onOutput() throws HttpException, IOException { validateStreamTimeouts(); } - if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { - int liveStreams = 0; - for (final Iterator it = streams.iterator(); it.hasNext(); ) { - final H2Stream stream = it.next(); - if (stream.isClosedPastLingerDeadline()) { - streams.dropStreamId(stream.getId()); - it.remove(); - } else { - if (streams.isSameSide(stream.getId()) || stream.getId() <= streams.getLastRemoteId()) { - liveStreams++; - } - } - } - if (liveStreams == 0) { - connState = ConnectionHandshake.SHUTDOWN; - } - } + maybeTransitionFromGracefulShutdown(); if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) >= 0) { for (;;) { final Command command = ioSession.poll(); @@ -628,6 +616,9 @@ public final void onOutput() throws HttpException, IOException { } public final void onTimeout(final Timeout timeout) throws HttpException, IOException { + if (onShutdownTimeout(timeout)) { + return; + } connState = ConnectionHandshake.SHUTDOWN; final RawFrame goAway; @@ -665,14 +656,100 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc connState = ConnectionHandshake.SHUTDOWN; } else { if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { - final RawFrame goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, "Graceful shutdown"); - commitFrame(goAway); - connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN; + initiateGracefulShutdown(); + } + } + } + + /** + * Initiates the graceful shutdown sequence. The default implementation emits a single + * GOAWAY frame carrying the last processed remote stream id and transitions to + * {@code GRACEFUL_SHUTDOWN} (or straight to {@code SHUTDOWN} when no streams are open). + * Subclasses may override to implement alternate shutdown sequences. + */ + void initiateGracefulShutdown() throws IOException { + final RawFrame goAway = frameFactory.createGoAway(streams.getLastRemoteId(), H2Error.NO_ERROR, "Graceful shutdown"); + commitFrame(goAway); + connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN; + } + + /** + * Whether the multiplexer is currently accepting new remote-initiated streams. + * The default allows new streams while the connection is in {@code READY} or {@code ACTIVE}. + */ + boolean canAcceptNewRemoteStream() { + return connState.compareTo(ConnectionHandshake.ACTIVE) <= 0; + } + + /** + * Notification hook fired right after a remote-initiated stream has been created. + * The default is a no-op. + */ + void onRemoteStreamAccepted(final int streamId) { + } + + /** + * Timeout hook that lets subclasses intercept inactivity-driven shutdowns. + * Returning {@code true} signals that the subclass has handled the timeout and the + * default full-shutdown path should be skipped. + */ + boolean onShutdownTimeout(final Timeout timeout) throws HttpException, IOException { + return false; + } + + /** + * Early {@code onOutput} hook fired after the output buffer has been flushed and before + * the main state-machine processing. Returning {@code true} aborts the rest of the + * {@code onOutput} cycle, allowing subclasses to interleave additional frames between + * flush and further work. + */ + boolean beforeOutputProcessing() throws HttpException, IOException { + return false; + } + + void applyRemoteGracefulGoAway(final int processedLocalStreamId) { + if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + final int activeStreamId = stream.getId(); + if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) { + stream.fail(new RequestNotExecutedException()); + it.remove(); + } + } + } + connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN; + } + + /** + * Evaluates whether the {@code GRACEFUL_SHUTDOWN} state can be promoted to + * {@code SHUTDOWN}. The default transitions once no live streams remain. + */ + void maybeTransitionFromGracefulShutdown() { + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) { + int liveStreams = 0; + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (stream.isClosedPastLingerDeadline()) { + streams.dropStreamId(stream.getId()); + it.remove(); + } else { + if (streams.isSameSide(stream.getId()) || stream.getId() <= streams.getLastRemoteId()) { + liveStreams++; + } + } + } + if (liveStreams == 0) { + connState = ConnectionHandshake.SHUTDOWN; } } } - private void executePing(final PingCommand pingCommand) throws IOException { + boolean hasPendingOutput() { + return !outputBuffer.isEmpty() || !outputQueue.isEmpty(); + } + + void executePing(final PingCommand pingCommand) throws IOException { final AsyncPingHandler handler = pingCommand.getHandler(); pingHandlers.add(handler); final RawFrame ping = frameFactory.createPing(handler.getData()); @@ -817,8 +894,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } final H2StreamChannel channel = createChannel(streamId); - if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { + if (canAcceptNewRemoteStream()) { stream = streams.createActive(channel, incomingRequest(channel)); + onRemoteStreamAccepted(streamId); streams.resetIfExceedsMaxConcurrentLimit(stream, localConfig.getMaxConcurrentStreams()); } else { channel.localReset(H2Error.REFUSED_STREAM); @@ -1026,8 +1104,9 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final H2StreamChannel channel = createChannel(promisedStreamId); final H2Stream promisedStream; - if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { + if (canAcceptNewRemoteStream()) { promisedStream = streams.createReserved(channel, incomingPushPromise(channel, stream.getPushHandlerFactory())); + onRemoteStreamAccepted(promisedStreamId); } else { channel.localReset(H2Error.REFUSED_STREAM); promisedStream = streams.createActive(channel, NoopH2StreamHandler.INSTANCE); @@ -1053,17 +1132,7 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio final int errorCode = payload.getInt(); goAwayReceived = true; if (errorCode == H2Error.NO_ERROR.getCode()) { - if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) { - for (final Iterator it = streams.iterator(); it.hasNext(); ) { - final H2Stream stream = it.next(); - final int activeStreamId = stream.getId(); - if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) { - stream.fail(new RequestNotExecutedException()); - it.remove(); - } - } - } - connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN; + applyRemoteGracefulGoAway(processedLocalStreamId); } else { for (final Iterator it = streams.iterator(); it.hasNext(); ) { final H2Stream stream = it.next(); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java index 9a70b3c95..64f940390 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamMultiplexer.java @@ -28,12 +28,14 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.RequestHeaderFieldsTooLargeException; +import org.apache.hc.core5.http.RequestNotExecutedException; import org.apache.hc.core5.http.config.CharCodingConfig; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; @@ -50,10 +52,14 @@ import org.apache.hc.core5.http2.config.H2Setting; import org.apache.hc.core5.http2.frame.DefaultFrameFactory; import org.apache.hc.core5.http2.frame.FrameFactory; +import org.apache.hc.core5.http2.frame.RawFrame; import org.apache.hc.core5.http2.frame.StreamIdGenerator; import org.apache.hc.core5.http2.hpack.HeaderListConstraintException; +import org.apache.hc.core5.http2.nio.AsyncPingHandler; +import org.apache.hc.core5.http2.nio.command.PingCommand; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Timeout; /** * I/O event handler for events fired by {@link ProtocolIOSession} that implements @@ -67,6 +73,10 @@ public class ServerH2StreamMultiplexer extends AbstractH2StreamMultiplexer { private final HandlerFactory exchangeHandlerFactory; + private int shutdownLastStreamId; + private int lastProcessedRemoteStreamId; + private boolean drainPingSent; + public ServerH2StreamMultiplexer( final ProtocolIOSession ioSession, final FrameFactory frameFactory, @@ -158,6 +168,128 @@ boolean allowGracefulAbort(final H2Stream stream) { return false; } + @Override + boolean canAcceptNewRemoteStream() { + return connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 + || connState == ConnectionHandshake.DRAINING; + } + + @Override + void onRemoteStreamAccepted(final int streamId) { + if (streamId > lastProcessedRemoteStreamId) { + lastProcessedRemoteStreamId = streamId; + } + } + + @Override + void initiateGracefulShutdown() throws IOException { + shutdownLastStreamId = Integer.MAX_VALUE; + drainPingSent = false; + final RawFrame goAway = frameFactory.createGoAway( + shutdownLastStreamId, H2Error.NO_ERROR, "Graceful shutdown"); + commitFrame(goAway); + connState = ConnectionHandshake.DRAINING; + requestSessionOutput(); + } + + @Override + boolean onShutdownTimeout(final Timeout timeout) throws HttpException, IOException { + if (connState == ConnectionHandshake.DRAINING) { + completeGracefulShutdown(); + return true; + } + return false; + } + + @Override + boolean beforeOutputProcessing() throws HttpException, IOException { + if (connState == ConnectionHandshake.DRAINING && !drainPingSent && !hasPendingOutput()) { + drainPingSent = true; + executePing(new PingCommand(createDrainPingHandler())); + return true; + } + return false; + } + + @Override + void applyRemoteGracefulGoAway(final int processedLocalStreamId) { + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + final int activeStreamId = stream.getId(); + if (!streams.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) { + stream.fail(new RequestNotExecutedException()); + it.remove(); + } + } + if (connState != ConnectionHandshake.DRAINING) { + shutdownLastStreamId = processedLocalStreamId; + connState = streams.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN; + } + } + + @Override + void maybeTransitionFromGracefulShutdown() { + if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) != 0) { + return; + } + int liveStreams = 0; + for (final Iterator it = streams.iterator(); it.hasNext(); ) { + final H2Stream stream = it.next(); + if (stream.isClosedPastLingerDeadline()) { + streams.dropStreamId(stream.getId()); + it.remove(); + } else { + if (streams.isSameSide(stream.getId()) + || shutdownLastStreamId == 0 + || stream.getId() <= shutdownLastStreamId) { + liveStreams++; + } + } + } + if (shutdownLastStreamId != Integer.MAX_VALUE && liveStreams == 0) { + connState = ConnectionHandshake.SHUTDOWN; + } + } + + private void completeGracefulShutdown() throws IOException { + if (connState != ConnectionHandshake.DRAINING) { + return; + } + shutdownLastStreamId = lastProcessedRemoteStreamId; + final RawFrame goAway = frameFactory.createGoAway(shutdownLastStreamId, H2Error.NO_ERROR, "Graceful shutdown"); + commitFrame(goAway); + connState = ConnectionHandshake.GRACEFUL_SHUTDOWN; + } + + private AsyncPingHandler createDrainPingHandler() { + final ByteBuffer data = ByteBuffer.allocate(8); + data.putLong(System.nanoTime()); + data.flip(); + return new AsyncPingHandler() { + + @Override + public ByteBuffer getData() { + return data.asReadOnlyBuffer(); + } + + @Override + public void consumeResponse(final ByteBuffer feedback) throws IOException { + if (connState == ConnectionHandshake.DRAINING) { + completeGracefulShutdown(); + } + } + + @Override + public void failed(final Exception cause) { + } + + @Override + public void cancel() { + } + + }; + } + @Override List
decodeHeaders(final ByteBuffer payload) throws HttpException, IOException { try { diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2GracefulShutdownDrainExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2GracefulShutdownDrainExample.java new file mode 100644 index 000000000..ff08adb0e --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2GracefulShutdownDrainExample.java @@ -0,0 +1,336 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.annotation.Experimental; +import org.apache.hc.core5.http.ClassicHttpRequest; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester; +import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; +import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientEndpoint; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResponseChannel; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncRequestProducer; +import org.apache.hc.core5.http.nio.support.classic.ClassicToAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.frame.FrameFlag; +import org.apache.hc.core5.http2.frame.FrameType; +import org.apache.hc.core5.http2.frame.RawFrame; +import org.apache.hc.core5.http2.impl.nio.H2StreamListener; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.ListenerEndpoint; +import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; + +/** + * Example that demonstrates server-side graceful HTTP/2 connection drain. + *

+ * This example starts an embedded HTTP/2 server and an HTTP/2 client, executes + * a single request over a persistent connection, and then triggers graceful + * server shutdown. + *

+ * With two-phase GOAWAY drain support on the server side, the client + * side frame log should show: + *

+ * << GOAWAY lastStreamId=2147483647 errorCode=0
+ * << PING ack=false
+ * >> PING ack=true
+ * << GOAWAY lastStreamId=1 errorCode=0
+ * 
+ */ +@Experimental +public class H2GracefulShutdownDrainExample { + + private static final int PORT = 8080; + + public static void main(final String[] args) throws Exception { + + final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() + .setSoTimeout(30, TimeUnit.SECONDS) + .setTcpNoDelay(true) + .build(); + + final H2Config h2Config = H2Config.custom() + .setPushEnabled(false) + .setMaxConcurrentStreams(100) + .build(); + + final CountDownLatch finalGoAwayLatch = new CountDownLatch(1); + final AtomicInteger clientGoAwayCount = new AtomicInteger(); + + final HttpAsyncServer server = H2ServerBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new LoggingH2StreamListener("SERVER", null, null)) + .register("/hello", () -> new AsyncServerExchangeHandler() { + + private final ByteBuffer content = StandardCharsets.UTF_8.encode("hello over h2\n"); + private volatile boolean responseSubmitted; + + @Override + public void handleRequest( + final HttpRequest request, + final EntityDetails entityDetails, + final ResponseChannel responseChannel, + final HttpContext context) throws HttpException, IOException { + final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_OK); + response.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString()); + responseChannel.sendResponse(response, null, context); + responseSubmitted = true; + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + capacityChannel.update(Integer.MAX_VALUE); + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + while (src.hasRemaining()) { + src.get(); + } + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + } + + @Override + public int available() { + return responseSubmitted ? content.remaining() : 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + if (content.hasRemaining()) { + channel.write(content); + } + if (!content.hasRemaining()) { + channel.endStream(); + } + } + + @Override + public void failed(final Exception cause) { + cause.printStackTrace(System.out); + } + + @Override + public void releaseResources() { + } + + }) + .create(); + + final HttpAsyncRequester requester = H2RequesterBootstrap.bootstrap() + .setIOReactorConfig(ioReactorConfig) + .setH2Config(h2Config) + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .setStreamListener(new LoggingH2StreamListener("CLIENT", finalGoAwayLatch, clientGoAwayCount)) + .create(); + + server.start(); + final Future listenerFuture = server.listen(new InetSocketAddress(PORT), URIScheme.HTTP); + final ListenerEndpoint listenerEndpoint = listenerFuture.get(); + System.out.println("Server listening on " + listenerEndpoint.getAddress()); + + requester.start(); + + final HttpHost target = new HttpHost("http", "127.0.0.1", PORT); + final Future endpointFuture = requester.connect(target, Timeout.ofSeconds(30)); + final AsyncClientEndpoint clientEndpoint = endpointFuture.get(); + + final ClassicHttpRequest request = ClassicRequestBuilder.get() + .setHttpHost(target) + .setPath("/hello") + .build(); + + final ClassicToAsyncRequestProducer requestProducer = + new ClassicToAsyncRequestProducer(request, Timeout.ofSeconds(30)); + final ClassicToAsyncResponseConsumer responseConsumer = + new ClassicToAsyncResponseConsumer(Timeout.ofSeconds(30)); + + clientEndpoint.execute(requestProducer, responseConsumer, null); + + requestProducer.blockWaiting().execute(); + try (ClassicHttpResponse response = responseConsumer.blockWaiting()) { + System.out.println("/hello -> " + response.getCode()); + final HttpEntity entity = response.getEntity(); + if (entity != null) { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + System.out.println(line); + } + } + } + } + + System.out.println(); + System.out.println("Triggering graceful server shutdown"); + server.initiateShutdown(); + + final boolean completed = finalGoAwayLatch.await(10, TimeUnit.SECONDS); + System.out.println("Final GOAWAY observed: " + completed); + if (!completed) { + throw new IllegalStateException("Did not observe the final GOAWAY frame"); + } + + Thread.sleep(1000); + + System.out.println(); + System.out.println("Triggering requester shutdown"); + requester.initiateShutdown(); + + requester.awaitShutdown(TimeValue.ofSeconds(5)); + server.awaitShutdown(TimeValue.ofSeconds(5)); + + requester.close(CloseMode.GRACEFUL); + server.close(CloseMode.GRACEFUL); + } + + static final class LoggingH2StreamListener implements H2StreamListener { + + private final String name; + private final CountDownLatch finalGoAwayLatch; + private final AtomicInteger goAwayCount; + + LoggingH2StreamListener( + final String name, + final CountDownLatch finalGoAwayLatch, + final AtomicInteger goAwayCount) { + this.name = name; + this.finalGoAwayLatch = finalGoAwayLatch; + this.goAwayCount = goAwayCount; + } + + @Override + public void onHeaderInput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(name + " " + connection.getRemoteAddress() + " (" + streamId + ") << " + headers.get(i)); + } + } + + @Override + public void onHeaderOutput(final HttpConnection connection, final int streamId, final List headers) { + for (int i = 0; i < headers.size(); i++) { + System.out.println(name + " " + connection.getRemoteAddress() + " (" + streamId + ") >> " + headers.get(i)); + } + } + + @Override + public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) { + System.out.println(name + " " + connection.getRemoteAddress() + " (" + streamId + ") << " + formatFrame(frame)); + if (finalGoAwayLatch != null && goAwayCount != null && FrameType.valueOf(frame.getType()) == FrameType.GOAWAY) { + if (goAwayCount.incrementAndGet() == 2) { + finalGoAwayLatch.countDown(); + } + } + } + + @Override + public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) { + System.out.println(name + " " + connection.getRemoteAddress() + " (" + streamId + ") >> " + formatFrame(frame)); + } + + @Override + public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + @Override + public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) { + } + + private static String formatFrame(final RawFrame frame) { + final FrameType frameType = FrameType.valueOf(frame.getType()); + if (frameType == null) { + return "UNKNOWN(" + frame.getType() + ")"; + } + switch (frameType) { + case GOAWAY: { + final ByteBuffer payload = frame.getPayload(); + if (payload == null || payload.remaining() < 8) { + return "GOAWAY invalid"; + } + final ByteBuffer dup = payload.asReadOnlyBuffer(); + final int lastStreamId = dup.getInt() & 0x7fffffff; + final int errorCode = dup.getInt(); + return "GOAWAY lastStreamId=" + lastStreamId + " errorCode=" + errorCode; + } + case PING: + return "PING ack=" + frame.isFlagSet(FrameFlag.ACK); + case SETTINGS: + return frame.isFlagSet(FrameFlag.ACK) ? "SETTINGS ack=true" : "SETTINGS ack=false"; + case HEADERS: + return "HEADERS endStream=" + frame.isFlagSet(FrameFlag.END_STREAM) + + " endHeaders=" + frame.isFlagSet(FrameFlag.END_HEADERS); + case DATA: + return "DATA endStream=" + frame.isFlagSet(FrameFlag.END_STREAM) + + " length=" + frame.getLength(); + default: + return frameType.name() + " length=" + frame.getLength(); + } + } + + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 6c9e88e4f..73df51ca9 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -46,9 +46,11 @@ import org.apache.hc.core5.http.impl.CharCodingSupport; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushProducer; import org.apache.hc.core5.http.nio.HandlerFactory; +import org.apache.hc.core5.http.nio.command.ShutdownCommand; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http2.H2ConnectionException; @@ -68,6 +70,7 @@ import org.apache.hc.core5.http2.frame.StreamIdGenerator; import org.apache.hc.core5.http2.hpack.HPackEncoder; import org.apache.hc.core5.http2.hpack.HPackException; +import org.apache.hc.core5.reactor.Command; import org.apache.hc.core5.reactor.ProtocolIOSession; import org.apache.hc.core5.util.ByteArrayBuffer; import org.apache.hc.core5.util.Timeout; @@ -198,6 +201,30 @@ boolean allowGracefulAbort(final H2Stream stream) { } + static class ServerH2TestMultiplexer extends ServerH2StreamMultiplexer { + + private final Supplier streamHandlerSupplier; + + ServerH2TestMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final HttpProcessor httpProcessor, + final HandlerFactory exchangeHandlerFactory, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Supplier streamHandlerSupplier) { + super(ioSession, frameFactory, httpProcessor, exchangeHandlerFactory, charCodingConfig, h2Config, + streamListener); + this.streamHandlerSupplier = streamHandlerSupplier; + } + + @Override + H2StreamHandler incomingRequest(final H2StreamChannel channel) { + return streamHandlerSupplier.get(); + } + } + @Test void testInputOneFrame() throws Exception { final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024); @@ -2017,5 +2044,116 @@ void testHeadersWithPrioritySelfDependencyIsStreamProtocolError() throws Excepti .consumeHeader(ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean()); } + @Test + void testGracefulShutdownUsesTwoPhaseGoAwayWithPingBarrier() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[buffer.remaining()]; + buffer.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + Mockito.when(protocolIOSession.poll()).thenReturn(null); + + final H2Config h2Config = H2Config.custom().build(); + + @SuppressWarnings("unchecked") + final HandlerFactory exchangeHandlerFactory = + (HandlerFactory) Mockito.mock(HandlerFactory.class); + final AbstractH2StreamMultiplexer mux = new ServerH2TestMultiplexer( + protocolIOSession, + FRAME_FACTORY, + httpProcessor, + exchangeHandlerFactory, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + mux.onConnect(); + mux.onOutput(); + completeSettingsHandshake(mux); + mux.onOutput(); + + final ByteArrayBuffer headerBuf = new ByteArrayBuffer(128); + final HPackEncoder encoder = new HPackEncoder( + h2Config.getHeaderTableSize(), + CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + + final List
headers = Arrays.asList( + new BasicHeader(":method", "GET"), + new BasicHeader(":scheme", "https"), + new BasicHeader(":path", "/"), + new BasicHeader(":authority", "example.test")); + + encoder.encodeHeaders(headerBuf, headers, h2Config.isCompressionEnabled()); + + final RawFrame headersFrame = FRAME_FACTORY.createHeaders( + 1, + ByteBuffer.wrap(headerBuf.array(), 0, headerBuf.length()), + true, + true); + feedFrame(mux, headersFrame); + + writes.clear(); + + Mockito.when(protocolIOSession.poll()).thenReturn(ShutdownCommand.GRACEFUL, (Command) null); + + // 1st pass: consume shutdown command, queue initial GOAWAY + mux.onOutput(); + + // 2nd pass: flush initial GOAWAY, queue drain PING + mux.onOutput(); + + // 3rd pass: flush drain PING + mux.onOutput(); + + List frames = parseFrames(concat(writes)); + + final FrameStub initialGoAway = frames.stream() + .filter(FrameStub::isGoAway) + .findFirst() + .orElse(null); + Assertions.assertNotNull(initialGoAway, "Initial GOAWAY not emitted"); + Assertions.assertEquals(Integer.MAX_VALUE, goAwayLastStreamId(initialGoAway)); + + final FrameStub ping = frames.stream() + .filter(f -> f.isPing() && !f.isAck()) + .findFirst() + .orElse(null); + Assertions.assertNotNull(ping, "Drain PING not emitted"); + + final RawFrame pingAck = new RawFrame( + FrameType.PING.getValue(), + FrameFlag.ACK.getValue(), + 0, + ByteBuffer.wrap(ping.payload)); + + writes.clear(); + + feedFrame(mux, pingAck); + + // final GOAWAY gets queued by consumeResponse -> completeGracefulShutdown() + mux.onOutput(); + + frames = parseFrames(concat(writes)); + + final FrameStub finalGoAway = frames.stream() + .filter(FrameStub::isGoAway) + .findFirst() + .orElse(null); + Assertions.assertNotNull(finalGoAway, "Final GOAWAY not emitted"); + Assertions.assertEquals(1, goAwayLastStreamId(finalGoAway)); + } + + private static int goAwayLastStreamId(final FrameStub frame) { + final ByteBuffer buffer = ByteBuffer.wrap(frame.payload); + return buffer.getInt() & 0x7fffffff; + } } \ No newline at end of file