Skip to content

Commit

Permalink
Consolidate ReadOnlyHttpClientConfig and ProxyConnectChannelSingle state
Browse files Browse the repository at this point in the history
  • Loading branch information
idelpivnitskiy committed Nov 8, 2023
1 parent 46ae4e4 commit 6e157a5
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,19 @@
*/
public final class ProxyConfigBuilder<A> {

private static final Consumer<HttpHeaders> NOOP_HEADERS_CONSUMER = new Consumer<HttpHeaders>() {
@Override
public void accept(final HttpHeaders headers) {
}

@Override
public String toString() {
return "NOOP";
}
};

private final A address;
private Consumer<HttpHeaders> connectRequestHeadersInitializer = __ -> { };
private Consumer<HttpHeaders> connectRequestHeadersInitializer = NOOP_HEADERS_CONSUMER;

/**
* Creates a new instance.
Expand Down Expand Up @@ -82,5 +93,36 @@ public A address() {
public Consumer<HttpHeaders> connectRequestHeadersInitializer() {
return connectRequestHeadersInitializer;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DefaultProxyConfig)) {
return false;
}

final DefaultProxyConfig<?> that = (DefaultProxyConfig<?>) o;
if (!address.equals(that.address)) {
return false;
}
return connectRequestHeadersInitializer.equals(that.connectRequestHeadersInitializer);
}

@Override
public int hashCode() {
int result = address.hashCode();
result = 31 * result + connectRequestHeadersInitializer.hashCode();
return result;
}

@Override
public String toString() {
return getClass().getSimpleName() +
"{address=" + address +
", connectRequestHeadersInitializer=" + connectRequestHeadersInitializer +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
@Nullable
private final U address;
@Nullable
private ProxyConfig<U> proxyConfig;
private U proxyAddress;
private final HttpClientConfig config;
final HttpExecutionContextBuilder executionContextBuilder;
private final ClientStrategyInfluencerChainBuilder strategyComputation;
Expand Down Expand Up @@ -148,7 +148,7 @@ final class DefaultSingleAddressHttpClientBuilder<U, R> implements SingleAddress
private DefaultSingleAddressHttpClientBuilder(@Nullable final U address,
final DefaultSingleAddressHttpClientBuilder<U, R> from) {
this.address = address;
proxyConfig = from.proxyConfig;
proxyAddress = from.proxyAddress;
config = new HttpClientConfig(from.config);
executionContextBuilder = new HttpExecutionContextBuilder(from.executionContextBuilder);
strategyComputation = from.strategyComputation.copy();
Expand Down Expand Up @@ -192,7 +192,7 @@ private static final class HttpClientBuildContext<U, R> {

U address() {
assert builder.address != null : "Attempted to buildStreaming with an unknown address";
return builder.proxyConfig != null ? builder.proxyConfig.address() : builder.address;
return builder.proxyAddress != null ? builder.proxyAddress : builder.address;
}

HttpClientConfig httpConfig() {
Expand Down Expand Up @@ -243,10 +243,10 @@ public HttpExecutionStrategy executionStrategy() {
ctx.builder.strategyComputation.buildForConnectionFactory();

if (roConfig.hasProxy() && sslContext != null) {
assert roConfig.connectAddress() != null;
assert roConfig.proxyConfig() != null;
@SuppressWarnings("deprecation")
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> proxy =
new ProxyConnectConnectionFactoryFilter<>(roConfig.connectAddress(), connectionFactoryStrategy);
new ProxyConnectConnectionFactoryFilter<>(roConfig.proxyConfig().address());
assert !proxy.requiredOffloads().hasOffloads();
connectionFactoryFilter = appendConnectionFilter(proxy, connectionFactoryFilter);
}
Expand Down Expand Up @@ -388,9 +388,8 @@ private static StreamingHttpRequestResponseFactory defaultReqRespFactory(ReadOnl

private static <U, R> String targetAddress(final HttpClientBuildContext<U, R> ctx) {
assert ctx.builder.address != null;
return ctx.builder.proxyConfig == null ?
ctx.builder.address.toString() :
ctx.builder.address + " (via " + ctx.builder.proxyConfig.address() + ")";
return ctx.builder.proxyAddress == null ?
ctx.builder.address.toString() : ctx.builder.address + " (via " + ctx.builder.proxyAddress + ")";
}

private static ContextAwareStreamingHttpClientFilterFactory appendFilter(
Expand Down Expand Up @@ -449,8 +448,8 @@ private AbsoluteAddressHttpRequesterFilter proxyAbsoluteAddressFilterFactory() {

@Override
public DefaultSingleAddressHttpClientBuilder<U, R> proxyConfig(final ProxyConfig<U> proxyConfig) {
this.proxyConfig = requireNonNull(proxyConfig);
config.proxy(proxyConfig, hostToCharSequenceFunction.apply(address));
this.proxyAddress = requireNonNull(proxyConfig.address());
config.proxyConfig(hostToCharSequenceFunction.apply(address), proxyConfig);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2018-2020 Apple Inc. and the ServiceTalk project authors
* Copyright © 2018-2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,14 @@
package io.servicetalk.http.netty;

import io.servicetalk.http.api.Http2Settings;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.ProxyConfig;
import io.servicetalk.tcp.netty.internal.TcpClientConfig;
import io.servicetalk.transport.api.ClientSslConfig;
import io.servicetalk.transport.api.DelegatingClientSslConfig;

import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.netty.handler.codec.http2.Http2CodecUtil.SETTINGS_ENABLE_PUSH;
Expand All @@ -35,9 +37,7 @@ final class HttpClientConfig {
private final TcpClientConfig tcpConfig;
private final HttpConfig protocolConfigs;
@Nullable
private ProxyConfig<?> proxyConfig;
@Nullable
private CharSequence connectAddress;
private ProxyConfig<String> proxyConfig;
@Nullable
private String fallbackPeerHost;
private int fallbackPeerPort = -1;
Expand Down Expand Up @@ -66,7 +66,6 @@ final class HttpClientConfig {
tcpConfig = new TcpClientConfig(from.tcpConfig());
protocolConfigs = new HttpConfig(from.protocolConfigs());
proxyConfig = from.proxyConfig;
connectAddress = from.connectAddress;
fallbackPeerHost = from.fallbackPeerHost;
fallbackPeerPort = from.fallbackPeerPort;
inferPeerHost = from.inferPeerHost;
Expand All @@ -83,18 +82,15 @@ HttpConfig protocolConfigs() {
}

@Nullable
ProxyConfig<?> proxyConfig() {
ProxyConfig<String> proxyConfig() {
return proxyConfig;
}

@Nullable
CharSequence connectAddress() {
return connectAddress;
}

void proxy(final ProxyConfig<?> proxyConfig, final CharSequence connectAddress) {
this.proxyConfig = requireNonNull(proxyConfig);
this.connectAddress = requireNonNull(connectAddress);
void proxyConfig(final CharSequence connectAddress, final ProxyConfig<?> proxyConfig) {
// Original ProxyConfig.address() is used only by DefaultSingleAddressHttpClientBuilder. For the actual
// ProxyConnectLBHttpConnectionFactory, we need only "connectAddress". To simplify internal state, we override
// ProxyConfig.address() with "connectAddress" and delegate all other methods to original ProxyConfig.
this.proxyConfig = new DelegatingProxyConfig(connectAddress.toString(), proxyConfig);
}

void fallbackPeerHost(@Nullable String fallbackPeerHost) {
Expand Down Expand Up @@ -179,4 +175,56 @@ private static String filterSniName(@Nullable String peerHost) {
// Literal IPv4 and IPv6 addresses are not permitted in "HostName".
return peerHost == null || isValidIpV4Address(peerHost) || isValidIpV6Address(peerHost) ? null : peerHost;
}

private static final class DelegatingProxyConfig implements ProxyConfig<String> {

private final String address;
private final ProxyConfig<?> delegate;

DelegatingProxyConfig(final String address, final ProxyConfig<?> delegate) {
this.address = requireNonNull(address);
this.delegate = requireNonNull(delegate);
}

@Override
public String address() {
return address;
}

@Override
public Consumer<HttpHeaders> connectRequestHeadersInitializer() {
return delegate.connectRequestHeadersInitializer();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DelegatingProxyConfig)) {
return false;
}

final DelegatingProxyConfig that = (DelegatingProxyConfig) o;
if (!address.equals(that.address)) {
return false;
}
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
int result = address.hashCode();
result = 31 * result + delegate.hashCode();
return result;
}

@Override
public String toString() {
return getClass().getSimpleName() +
"{address='" + address + '\'' +
", delegate=" + delegate +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,23 @@ final class ProxyConnectChannelSingle extends ChannelInitSingle<Channel> {

private final ConnectionObserver observer;
private final HttpHeadersFactory headersFactory;
private final String connectAddress;
private final ProxyConfig<?> proxyConfig;
private final ProxyConfig<String> proxyConfig;

ProxyConnectChannelSingle(final Channel channel,
final ChannelInitializer channelInitializer,
final ConnectionObserver observer,
final HttpHeadersFactory headersFactory,
final String connectAddress,
final ProxyConfig<?> proxyConfig) {
final ProxyConfig<String> proxyConfig) {
super(channel, channelInitializer);
this.observer = observer;
this.headersFactory = headersFactory;
this.connectAddress = connectAddress;
this.proxyConfig = proxyConfig;
assert !channel.config().isAutoRead();
}

@Override
protected ChannelHandler newChannelHandler(final Subscriber<? super Channel> subscriber) {
return new ProxyConnectHandler(observer, headersFactory, connectAddress, proxyConfig, subscriber);
return new ProxyConnectHandler(observer, headersFactory, proxyConfig, subscriber);
}

private static final class ProxyConnectHandler extends ChannelDuplexHandler {
Expand All @@ -88,8 +85,7 @@ private static final class ProxyConnectHandler extends ChannelDuplexHandler {

private final ConnectionObserver observer;
private final HttpHeadersFactory headersFactory;
private final String connectAddress;
private final ProxyConfig<?> proxyConfig;
private final ProxyConfig<String> proxyConfig;
@Nullable
private Subscriber<? super Channel> subscriber;
@Nullable
Expand All @@ -99,12 +95,10 @@ private static final class ProxyConnectHandler extends ChannelDuplexHandler {

private ProxyConnectHandler(final ConnectionObserver observer,
final HttpHeadersFactory headersFactory,
final String connectAddress,
final ProxyConfig<?> proxyConfig,
final ProxyConfig<String> proxyConfig,
final Subscriber<? super Channel> subscriber) {
this.observer = observer;
this.headersFactory = headersFactory;
this.connectAddress = connectAddress;
this.proxyConfig = proxyConfig;
this.subscriber = subscriber;
}
Expand All @@ -123,8 +117,8 @@ public void channelActive(final ChannelHandlerContext ctx) {
}

private void sendConnectRequest(final ChannelHandlerContext ctx) {
final HttpRequestMetaData request = newRequestMetaData(HTTP_1_1, CONNECT, connectAddress,
headersFactory.newHeaders()).addHeader(HOST, connectAddress);
final HttpRequestMetaData request = newRequestMetaData(HTTP_1_1, CONNECT, proxyConfig.address(),
headersFactory.newHeaders()).addHeader(HOST, proxyConfig.address());
proxyConfig.connectRequestHeadersInitializer().accept(request.headers());
connectObserver = observer.onProxyConnect(request);
ctx.writeAndFlush(request).addListener(f -> {
Expand All @@ -147,7 +141,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
}
response = (HttpResponseMetaData) msg;
if (response.status().statusClass() != SUCCESSFUL_2XX) {
failSubscriber(ctx, unsuccessfulResponse(ctx.channel(), response, connectAddress));
failSubscriber(ctx, unsuccessfulResponse(ctx.channel(), response, proxyConfig.address()));
}
// We do not complete subscriber here because we need to wait for the HttpResponseDecoder state machine
// to complete. Completion will be signalled by InboundDataEndEvent. Any other messages before that are
Expand Down Expand Up @@ -201,7 +195,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
connectObserver.proxyConnectComplete(response);
ctx.pipeline().remove(this);
final Channel channel = ctx.channel();
LOGGER.debug("{} Received successful response from proxy on CONNECT {}", channel, connectAddress);
LOGGER.debug("{} Received successful response from proxy on CONNECT {}", channel, proxyConfig.address());
final Subscriber<? super Channel> subscriberCopy = subscriber;
subscriber = null;
subscriberCopy.onSuccess(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;

import org.slf4j.Logger;
Expand Down Expand Up @@ -57,8 +56,8 @@ final class ProxyConnectConnectionFactoryFilter<ResolvedAddress, C extends Filte

private final String connectAddress;

ProxyConnectConnectionFactoryFilter(final CharSequence connectAddress, final ExecutionStrategy connectStrategy) {
this.connectAddress = connectAddress.toString();
ProxyConnectConnectionFactoryFilter(final String connectAddress) {
this.connectAddress = connectAddress;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
*/
final class ProxyConnectLBHttpConnectionFactory<ResolvedAddress>
extends AbstractLBHttpConnectionFactory<ResolvedAddress> {
private final String connectAddress;

ProxyConnectLBHttpConnectionFactory(
final ReadOnlyHttpClientConfig config, final HttpExecutionContext executionContext,
Expand All @@ -80,8 +79,6 @@ final class ProxyConnectLBHttpConnectionFactory<ResolvedAddress>
assert config.hasProxy() : "Unexpected hasProxy flag";
assert config.tcpConfig().sslContext() != null : "Proxy CONNECT works only for TLS connections";
assert config.proxyConfig() != null : "ProxyConfig is required";
assert config.connectAddress() != null : "Address (authority) for CONNECT request is required";
this.connectAddress = config.connectAddress().toString();
}

@Override
Expand All @@ -103,7 +100,7 @@ private Single<? extends FilterableStreamingHttpConnection> createConnection(
new TcpClientChannelInitializer(config.tcpConfig(), observer, executionContext, true)
.andThen(new HttpClientChannelInitializer(
getByteBufAllocator(executionContext.bufferAllocator()), h1Config, closeHandler)),
observer, h1Config.headersFactory(), connectAddress, config.proxyConfig())
observer, h1Config.headersFactory(), config.proxyConfig())
.flatMap(ProxyConnectLBHttpConnectionFactory::handshake)
.flatMap(protocol -> finishConnectionInitialization(protocol, channel, closeHandler, observer))
.onErrorMap(cause -> handleException(cause, channel));
Expand Down
Loading

0 comments on commit 6e157a5

Please sign in to comment.