diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 301ccd27c..2b302681a 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -152,6 +152,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1); private final Timeout validateAfterInactivity; + private final Timeout pingAckTimeout; private volatile long lastActivityNanos; AbstractH2StreamMultiplexer( @@ -174,6 +175,20 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } final H2Config h2Config, final H2StreamListener streamListener, final Timeout validateAfterInactivity) { + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + validateAfterInactivity, Timeout.ofSeconds(5)); + } + + AbstractH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Timeout validateAfterInactivity, + final Timeout pingAckTimeout) { this.ioSession = Args.notNull(ioSession, "IO session"); this.frameFactory = Args.notNull(frameFactory, "Frame factory"); this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); @@ -202,6 +217,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED } this.streamListener = streamListener; this.lastActivityNanos = System.nanoTime(); this.validateAfterInactivity = validateAfterInactivity; + this.pingAckTimeout = Args.notNull(pingAckTimeout, "PING ACK timeout"); } @Override @@ -544,7 +560,7 @@ public final void onOutput() throws HttpException, IOException { final boolean hasBeenIdleTooLong = t > 0 && System.nanoTime() - lastActivityNanos > t; if (hasBeenIdleTooLong && ioSession.hasCommands() && pingHandlers.isEmpty()) { final Timeout socketTimeout = ioSession.getSocketTimeout(); - ioSession.setSocketTimeout(Timeout.ofSeconds(5)); + ioSession.setSocketTimeout(pingAckTimeout); executePing(new PingCommand(new BasicPingHandler(result -> { // restore timeout ioSession.setSocketTimeout(socketTimeout); diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java index 4d23d5144..83b7b4d62 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexer.java @@ -60,6 +60,24 @@ public class ClientH2StreamMultiplexer extends AbstractH2StreamMultiplexer { private final HandlerFactory pushHandlerFactory; + /** + * @since 5.5 + */ + public ClientH2StreamMultiplexer( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener, + final Timeout validateAfterInactivity, + final Timeout pingAckTimeout) { + super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, + streamListener, validateAfterInactivity, pingAckTimeout); + this.pushHandlerFactory = pushHandlerFactory; + } + public ClientH2StreamMultiplexer( final ProtocolIOSession ioSession, final FrameFactory frameFactory, @@ -69,8 +87,8 @@ public ClientH2StreamMultiplexer( final CharCodingConfig charCodingConfig, final H2StreamListener streamListener, final Timeout validateAfterInactivity) { - super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, streamListener, - validateAfterInactivity); + super(ioSession, frameFactory, StreamIdGenerator.ODD, httpProcessor, charCodingConfig, h2Config, + streamListener, validateAfterInactivity); this.pushHandlerFactory = pushHandlerFactory; } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java index a2315554e..31edc23f9 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamMultiplexerFactory.java @@ -59,7 +59,11 @@ public final class ClientH2StreamMultiplexerFactory { private final H2StreamListener streamListener; private final FrameFactory frameFactory; private final Supplier validateAfterInactivitySupplier; + private final Timeout pingAckTimeout; + /** + * @since 5.5 + */ public ClientH2StreamMultiplexerFactory( final HttpProcessor httpProcessor, final HandlerFactory pushHandlerFactory, @@ -67,7 +71,8 @@ public ClientH2StreamMultiplexerFactory( final CharCodingConfig charCodingConfig, final H2StreamListener streamListener, final FrameFactory frameFactory, - final Supplier validateAfterInactivitySupplier) { + final Supplier validateAfterInactivitySupplier, + final Timeout pingAckTimeout) { this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor"); this.pushHandlerFactory = pushHandlerFactory; this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT; @@ -75,6 +80,19 @@ public ClientH2StreamMultiplexerFactory( this.streamListener = streamListener; this.frameFactory = frameFactory != null ? frameFactory : DefaultFrameFactory.INSTANCE; this.validateAfterInactivitySupplier = validateAfterInactivitySupplier; + this.pingAckTimeout = pingAckTimeout; + } + + public ClientH2StreamMultiplexerFactory( + final HttpProcessor httpProcessor, + final HandlerFactory pushHandlerFactory, + final H2Config h2Config, + final CharCodingConfig charCodingConfig, + final H2StreamListener streamListener, + final FrameFactory frameFactory, + final Supplier validateAfterInactivitySupplier) { + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, + validateAfterInactivitySupplier, null); } public ClientH2StreamMultiplexerFactory( @@ -84,7 +102,7 @@ public ClientH2StreamMultiplexerFactory( final CharCodingConfig charCodingConfig, final H2StreamListener streamListener, final FrameFactory frameFactory) { - this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null); + this(httpProcessor, pushHandlerFactory, h2Config, charCodingConfig, streamListener, frameFactory, null, null); } public ClientH2StreamMultiplexerFactory( @@ -111,8 +129,14 @@ public ClientH2StreamMultiplexerFactory( } public ClientH2StreamMultiplexer create(final ProtocolIOSession ioSession) { + final Timeout validateAfterInactivity = resolveValidateAfterInactivity(); + if (pingAckTimeout != null) { + return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor, + pushHandlerFactory, h2Config, charCodingConfig, streamListener, + validateAfterInactivity, pingAckTimeout); + } return new ClientH2StreamMultiplexer(ioSession, frameFactory, httpProcessor, - pushHandlerFactory, h2Config, charCodingConfig, streamListener, resolveValidateAfterInactivity()); + pushHandlerFactory, h2Config, charCodingConfig, streamListener, validateAfterInactivity); } private Timeout resolveValidateAfterInactivity() { diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index c4523c800..39f23c6e1 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -55,6 +55,7 @@ import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; /** * {@link H2MultiplexingRequester} bootstrap. @@ -81,6 +82,8 @@ public class H2MultiplexingRequesterBootstrap { private int maxCommandsPerConnection; + private Timeout pingAckTimeout; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -202,6 +205,18 @@ public final H2MultiplexingRequesterBootstrap setMaxCommandsPerConnection(final return this; } + /** + * Sets the timeout applied while waiting for the HTTP/2 PING ACK emitted during + * pre-flight connection validation. When unset, the default of 5 seconds is used. + * + * @return this instance. + * @since 5.5 + */ + public final H2MultiplexingRequesterBootstrap setPingAckTimeout(final Timeout pingAckTimeout) { + this.pingAckTimeout = pingAckTimeout; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -287,7 +302,8 @@ public H2MultiplexingRequester create() { charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT, streamListener, frameFactory, - validateAfterInactivityRef::get); + validateAfterInactivityRef::get, + pingAckTimeout); return new H2MultiplexingRequester( ioReactorConfig, (ioSession, attachment) -> diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java index 14b1c2b03..e64023847 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java @@ -105,6 +105,7 @@ public class H2RequesterBootstrap { private IOReactorMetricsListener threadPoolListener; private FrameFactory frameFactory; private int maxPendingCommandsPerConnection; + private Timeout pingAckTimeout; private H2RequesterBootstrap() { @@ -216,6 +217,18 @@ public final H2RequesterBootstrap setMaxPendingCommandsPerConnection(final int m return this; } + /** + * Sets the timeout applied while waiting for the HTTP/2 PING ACK emitted during + * pre-flight connection validation. When unset, the default of 5 seconds is used. + * + * @return this instance. + * @since 5.5 + */ + public final H2RequesterBootstrap setPingAckTimeout(final Timeout pingAckTimeout) { + this.pingAckTimeout = pingAckTimeout; + return this; + } + /** * Sets {@link TlsStrategy} instance. * @@ -405,7 +418,9 @@ public H2AsyncRequester create() { h2Config != null ? h2Config : H2Config.DEFAULT, charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT, streamListener, - frameFactory); + frameFactory, + null, + pingAckTimeout); final TlsStrategy actualTlsStrategy = tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(); diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 1e3b43456..59033864a 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -120,7 +120,7 @@ public H2StreamMultiplexerImpl( final H2StreamListener streamListener, final Supplier streamHandlerSupplier) { this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, - streamHandlerSupplier, null); + streamHandlerSupplier, null, Timeout.ofSeconds(5)); } public H2StreamMultiplexerImpl( @@ -133,8 +133,23 @@ public H2StreamMultiplexerImpl( final H2StreamListener streamListener, final Supplier streamHandlerSupplier, final Timeout validateAfterInactivity) { + this(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, + streamHandlerSupplier, validateAfterInactivity, Timeout.ofSeconds(5)); + } + + public H2StreamMultiplexerImpl( + final ProtocolIOSession ioSession, + final FrameFactory frameFactory, + final StreamIdGenerator idGenerator, + final HttpProcessor httpProcessor, + final CharCodingConfig charCodingConfig, + final H2Config h2Config, + final H2StreamListener streamListener, + final Supplier streamHandlerSupplier, + final Timeout validateAfterInactivity, + final Timeout pingAckTimeout) { super(ioSession, frameFactory, idGenerator, httpProcessor, charCodingConfig, h2Config, streamListener, - validateAfterInactivity); + validateAfterInactivity, pingAckTimeout); this.streamHandlerSupplier = streamHandlerSupplier; } @@ -1632,6 +1647,49 @@ void testValidateAfterInactivityPingAckRestoresPreviousTimeout() throws Exceptio Mockito.verify(protocolIOSession, Mockito.atLeastOnce()).setSocketTimeout(ArgumentMatchers.eq(previousTimeout)); } + @Test + void testValidateAfterInactivityUsesConfiguredPingAckTimeout() throws Exception { + final List writes = new ArrayList<>(); + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(inv -> { + final ByteBuffer b = inv.getArgument(0, ByteBuffer.class); + final byte[] copy = new byte[b.remaining()]; + b.get(copy); + writes.add(copy); + return copy.length; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + Mockito.when(protocolIOSession.hasCommands()).thenReturn(true); + Mockito.when(protocolIOSession.getSocketTimeout()).thenReturn(Timeout.ofSeconds(30)); + + final Timeout customPingAckTimeout = Timeout.ofSeconds(15); + final H2Config h2Config = H2Config.custom() + .build(); + final Timeout validateAfterInactivity = Timeout.ofMilliseconds(1); + + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, FRAME_FACTORY, StreamIdGenerator.ODD, + httpProcessor, CharCodingConfig.DEFAULT, h2Config, h2StreamListener, () -> streamHandler, + validateAfterInactivity, customPingAckTimeout); + + mux.onConnect(); + completeSettingsHandshake(mux); + + writes.clear(); + makeMuxIdle(mux, validateAfterInactivity); + + mux.onOutput(); + + Mockito.verify(protocolIOSession, Mockito.atLeastOnce()) + .setSocketTimeout(ArgumentMatchers.eq(customPingAckTimeout)); + + final List frames = parseFrames(concat(writes)); + Assertions.assertTrue(frames.stream().anyMatch(f -> f.isPing() && !f.isAck()), + "Must emit pre-flight PING"); + } + @Test void testKeepAliveAckTimeoutShutsDownAndFailsStreams() throws Exception { final List writes = new ArrayList<>();