Skip to content

Commit

Permalink
Envoy 发送数据到服务端注释
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiantao committed Oct 7, 2024
1 parent 3bc45f4 commit cb092e1
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 25 deletions.
16 changes: 9 additions & 7 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() &&
connecting_clients_.empty() && early_data_clients_.empty())) {
ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size());
// instantiateActiveClient是创建新连接的主要方法,在连接创建成功后,将根据连接的当前状态将其放入可用连接ready_clients_、
// 已用连接busy_clients_、正在建立连接connecting_clients_的列表中
// 如果连接建立很快,其可以马上变成ready_clients_
// 如果已经有待处理请求,则会在onUpstreamReady回调方法中关联待处理请求并将其立即变成busy_clients_
// instantiateActiveClient 是创建新连接的主要方法,在连接创建成功后,
// 将根据连接的当前状态将其放入可用连接 ready_clients_、已用连接 busy_clients_、
// 正在建立连接 connecting_clients_ 的列表中
// 如果连接建立较慢,首先将连接放入 connecting_clients_ 列表中
// 如果连接建立很快,其可以马上变成 ready_clients_
// 如果已经有待处理请求,则会在 onUpstreamReady 回调方法中关联待处理请求并将其立即变成 busy_clients_
ActiveClientPtr client = instantiateActiveClient();
if (client.get() == nullptr) {
ENVOY_LOG(trace, "connection creation failed");
Expand Down Expand Up @@ -273,7 +275,6 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& cont
attachStreamToClient(client, context);
// Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
tryCreateNewConnections();
// 无法将新请求放入等待队列
return nullptr;
}

Expand All @@ -292,17 +293,18 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& cont
onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
context);
host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
// 无法将新请求放入等待队列
return nullptr;
}

// 创建一个新的待处理流,并将其添加到 pending_streams_ 中
ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
ENVOY_LOG(debug, "trying to create new connection");
ENVOY_LOG(trace, fmt::format("{}", *this));

auto old_capacity = connecting_stream_capacity_;
// This must come after newPendingStream() because this function uses the
// length of pending_streams_ to determine if a new connection is needed.
// 放入等待队列
// 调用 tryCreateNewConnections 方法检查 pending_streams_ 的长度,以此来判断是否需要创建新的连接
const ConnectionResult result = tryCreateNewConnections();
// If there is not enough connecting capacity, the only reason to not
// increase capacity is if the connection limits are exceeded.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::Route
snapScopedRouteConfig();
}
if (snapped_route_config_ != nullptr) {
// 根据HTTP请求头进行后续路由处理
// 根据 HTTP 请求头进行后续路由处理
route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
stream_id_);
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& con
ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
Envoy::ConnectionPool::PendingStreamPtr pending_stream(
new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
// 将请求放入连接池待处理请求队列pending_stream中
// 将请求放入连接池待处理请求队列 pending_streams_ 中
return addPendingStream(std::move(pending_stream));
}

Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
}
// The static cast makes sure we call the base class host() and not
// HttpConnPoolImplBase::host which is of a different type.
// 创建 TCP 网络连接
Upstream::Host::CreateConnectionData data =
static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
// 创建解码器对象
initialize(data, parent);
}

Expand Down
1 change: 1 addition & 0 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void ActiveClient::StreamWrapper::onResetStream(StreamResetReason, absl::string_

ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
OptRef<Upstream::Host::CreateConnectionData> data)
// HTTP1 连接每次只能处理一个并发请求
: Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
/* effective_concurrent_stream_limit */ 1,
/* configured_concurrent_stream_limit */ 1, data) {
Expand Down
9 changes: 5 additions & 4 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,7 @@ RouteConstSharedPtr VirtualHostImpl::getRouteFromRoutes(
const RouteCallback& cb, const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info, uint64_t random_value,
absl::Span<const RouteEntryImplBaseConstSharedPtr> routes) const {
// 根据HTTP请求头轮询配置文件routes列表来查找Cluster
// 根据 HTTP 请求头轮询配置文件 routes 列表来查找 Cluster
for (auto route = routes.begin(); route != routes.end(); ++route) {
if (!headers.Path() && !(*route)->supportsPathlessHeaders()) {
continue;
Expand Down Expand Up @@ -2123,7 +2123,7 @@ const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMa
}

// If 'ignore_port_in_host_matching' is set, ignore the port number in the host header(if any).
// 从HTTP请求头中取出Host值
// 从 HTTP 请求头中取出 Host 值
absl::string_view host_header_value = headers.getHostValue();
if (ignorePortInHostMatching()) {
if (const absl::string_view::size_type port_start =
Expand All @@ -2135,12 +2135,13 @@ const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMa
// TODO (@rshriram) Match Origin header in WebSocket
// request with VHost, using wildcard match
// Lower-case the value of the host header, as hostnames are case insensitive.
// 在配置文件virtual_hosts_列表中查找,如果匹配不到项目,则继续尝试使用findWildcardVirtualHost方法从通配符规则中查找路由
// 在配置文件 virtual_hosts_ 列表中查找
const std::string host = absl::AsciiStrToLower(host_header_value);
const auto iter = virtual_hosts_.find(host);
if (iter != virtual_hosts_.end()) {
return iter->second.get();
}
// 如果匹配不到项目,则继续尝试使用 findWildcardVirtualHost 方法从通配符规则中查找路由
if (!wildcard_virtual_host_suffixes_.empty()) {
const VirtualHostImpl* vhost = findWildcardVirtualHost(
host, wildcard_virtual_host_suffixes_,
Expand All @@ -2164,7 +2165,7 @@ RouteConstSharedPtr RouteMatcher::route(const RouteCallback& cb,
const Http::RequestHeaderMap& headers,
const StreamInfo::StreamInfo& stream_info,
uint64_t random_value) const {
// 根据HTTP请求头在配置中查找虚拟主机virtualHost
// 根据 HTTP 请求头在配置中查找虚拟主机 virtualHost
const VirtualHostImpl* virtual_host = findVirtualHost(headers);
if (virtual_host) {
return virtual_host->getRouteFromEntries(cb, headers, stream_info, random_value);
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ RetryStateImpl::~RetryStateImpl() { resetRetry(); }

void RetryStateImpl::enableBackoffTimer() {
if (!retry_timer_) {
// 在定时器时间到达后调用 onRetry 回调方法
retry_timer_ = dispatcher_.createTimer([this]() -> void { backoff_callback_(); });
}

Expand Down Expand Up @@ -352,7 +353,7 @@ RetryStatus RetryStateImpl::shouldRetryHeaders(const Http::ResponseHeaderMap& re
backoff_interval.value().count(), random_);
}
}

// 进行重试
return shouldRetry(retry_decision,
[disable_early_data, callback]() { callback(disable_early_data); });
}
Expand Down Expand Up @@ -391,7 +392,7 @@ RetryStateImpl::wouldRetryFromHeaders(const Http::ResponseHeaderMap& response_he
}

uint64_t response_status = Http::Utility::getResponseStatus(response_headers);

// 根据响应头部状态码判断是否匹配设置的重试条件,retry_on_ 变量是 Envoy 的重试策略配置
if (retry_on_ & RetryPolicy::RETRY_ON_5XX) {
if (Http::CodeUtility::is5xx(response_status)) {
return RetryDecision::RetryWithBackoff;
Expand Down
18 changes: 11 additions & 7 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};

// Determine if there is a route entry or a direct response for the request.
// 根据请求计算路由目标Cluster
// 匹配路由项
route_ = callbacks_->route();
if (!route_) {
stats_.no_route_.inc();
Expand All @@ -465,9 +465,12 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

// Determine if there is a direct response for the request.
const auto* direct_response = route_->directResponseEntry();
// 如果 Cluster 配置了 HTTP 本地响应 directResponse
if (direct_response != nullptr) {
stats_.rq_direct_response_.inc();
// 根据配置决定是否重写请求头中的 Path
direct_response->rewritePathHeader(headers, !config_->suppress_envoy_headers_);
// 调用 sendLocalReply 方法返回本地响应
callbacks_->sendLocalReply(
direct_response->responseCode(), direct_response->responseBody(),
[this, direct_response,
Expand All @@ -486,6 +489,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
// 不再继续执行其他 L7 过滤器
return Http::FilterHeadersStatus::StopIteration;
}

Expand All @@ -503,7 +507,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
route_entry_->clusterName());
};
}
// 获取目标服务线程安全Cluster
// 根据路由项匹配 Cluster
Upstream::ThreadLocalCluster* cluster =
config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
if (!cluster) {
Expand Down Expand Up @@ -642,7 +646,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
}
// 根据目标Cluster计算负载均衡目标主机Host,根据目标主机Host获取连接池
// 获取 Cluster 的上游连接池
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);

if (!generic_conn_pool) {
Expand Down Expand Up @@ -732,7 +736,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,

// Ensure an http transport scheme is selected before continuing with decoding.
ASSERT(headers.Scheme());
// 创建用于请求重试的对象retry_state_
// 创建用于请求重试的对象 retry_state_
retry_state_ = createRetryState(
route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
Expand Down Expand Up @@ -766,12 +770,12 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
// will never transition from false to true.
bool can_use_http3 =
!transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
// 创建上游对象upstream_request
// 创建上游 HTTP 请求
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
can_use_http3, false /*enable_half_close*/);
LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
// 处理上游请求
// 处理上游 HTTP 请求
upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
if (streaming_shadows_) {
// start the shadow streams.
Expand Down Expand Up @@ -819,7 +823,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
if (end_stream) {
onRequestComplete();
}
// 不再继续执行其他L7过滤器
// 不再继续执行其他 L7 过滤器
return Http::FilterHeadersStatus::StopIteration;
}

Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek) {
// 调用负载均衡器的chooseHost方法,在当前Cluster的实例中挑选一个最合适的目标主机
// 调用负载均衡器的 chooseHost 方法,在当前 Cluster 的实例中挑选一个最合适的目标主机
HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
if (!host) {
if (!peek) {
Expand Down Expand Up @@ -2023,14 +2023,15 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImp
context->downstreamConnection()) {
context->downstreamConnection()->hashKey(hash_key);
}

// 创建目标主机的连接池容器
ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);

// Note: to simplify this, we assume that the factory is only called in the scope of this
// function. Otherwise, we'd need to capture a few of these variables by value.
// 根据目标主机 Host 支持的协议类型计算散列值 hash_key,并在目标主机已创建的活跃连接池中通过 getPool 进行连接池查找
ConnPoolsContainer::ConnPools::PoolOptRef pool =
container.pools_->getPool(priority, hash_key, [&]() {
// 创建目标主机Host连接池
// 如果连接池不存在,调用 allocateConnPool 方法创建新连接池
auto pool = parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ratelimit/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Http::FilterFactoryCb RateLimitFilterConfig::createFilterFactoryFromProtoTyped(
THROW_IF_NOT_OK(Config::Utility::checkTransportVersion(proto_config.rate_limit_service()));
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key =
Grpc::GrpcServiceConfigWithHashKey(proto_config.rate_limit_service().grpc_service());
// 返回过滤器创建方法
return [config_with_hash_key, &context, timeout,
filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<Filter>(
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/router/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Http::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoTyped(
proto_config));

return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
// L7 路由过滤器
callbacks.addStreamDecoderFilter(
std::make_shared<Router::ProdFilter>(filter_config, filter_config->default_stats_));
};
Expand Down

0 comments on commit cb092e1

Please sign in to comment.