Skip to content

Commit

Permalink
Envoy运行和连接创建注释
Browse files Browse the repository at this point in the history
  • Loading branch information
hanxiantao committed Jul 21, 2024
1 parent f3e0fad commit 8b72ff5
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 4 deletions.
1 change: 1 addition & 0 deletions source/common/api/api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Event::DispatcherPtr Impl::allocateDispatcher(const std::string& name) {
Event::DispatcherPtr
Impl::allocateDispatcher(const std::string& name,
const Event::ScaledRangeTimerManagerFactory& scaled_timer_factory) {
// 创建调度器DispatcherImpl
return std::make_unique<Event::DispatcherImpl>(name, *this, time_system_, scaled_timer_factory,
watermark_factory_);
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/listener_manager/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
config),
tcp_conn_handler_(parent), connection_balancer_(connection_balancer),
listen_address_(listen_address) {
// 将当前监听器注册到连接均衡选择器中
connection_balancer_.registerHandler(*this);
}

Expand Down Expand Up @@ -122,7 +123,7 @@ void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
return;
}
}

// 创建ActiveTcpSocket对象封装并继续调用onSocketAccepted方法
auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
hand_off_restored_destination_connections);

Expand Down
1 change: 1 addition & 0 deletions source/common/listener_manager/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void ConnectionHandlerImpl::addListener(absl::optional<uint64_t> overridden_list
disable_listeners_, std::move(internal_listener),
config.shouldBypassOverloadManager() ? null_overload_manager_ : overload_manager_);
} else if (config.listenSocketFactories()[0]->socketType() == Network::Socket::Type::Stream) {
// 对于TCP,将创建ActiveTcpListener实例
auto overload_state =
config.shouldBypassOverloadManager()
? (null_overload_manager_
Expand Down
3 changes: 3 additions & 0 deletions source/common/listener_manager/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
init_target_("LDS", [this]() { subscription_->start({}); }) {
const auto resource_name = getResourceName();
if (lds_resources_locator == nullptr) {
// 注册LDS订阅subscription
// 当有LDS变更事件到来时,通过SubscriptionCallbacks回调方法进入Envoy,然后主线程调用LdsApiImpl::onConfigUpdate方法
// 执行ListenerManager的addOrUpdateListener或removeListener方法来添加或删除监听器
subscription_ = THROW_OR_RETURN_VALUE(cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Expand Down
9 changes: 8 additions & 1 deletion source/common/listener_manager/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ ListenerManagerImpl::ListenerManagerImpl(Instance& server,
return dumpListenerConfigs(name_matcher);
});
}

// 根据配置的工作线程数创建工作线程,并通过ProdWorkerFactory来创建新的工作线程对象,工作线程名称以worker_为前缀
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(worker_factory.createWorker(
i, server.overloadManager(), server.nullOverloadManager(), absl::StrCat("worker_", i)));
Expand Down Expand Up @@ -581,6 +581,7 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(
stats_.listener_in_place_updated_.inc();
} else {
ENVOY_LOG(debug, "use full listener update path for listener name={} hash={}", name, hash);
// 创建新的监听器对象
new_listener = std::make_unique<ListenerImpl>(config, version_info, *this, name, added_via_api,
workers_started_, hash);
}
Expand Down Expand Up @@ -609,11 +610,15 @@ bool ListenerManagerImpl::addOrUpdateListenerInternal(
// We have no warming or active listener so we need to make a new one. What we do depends on
// whether workers have been started or not.
setNewOrDrainingSocketFactory(name, *new_listener);
// 判断工作线程是否已经启动
if (workers_started_) {
new_listener->debugLog("add warming listener");
// 如果工作线程已经启动,则新添加的监听器处于warming状态,此时还需要获取路由的配置及Cluster的配置后才能为工作线程提供服务
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("add active listener");
// 如果工作线程还未启动,则此时ClusterManager、ListenerManager将通过xDS虎丘监听器相关的RDS及CDS配置,
// 这样在监听器关联的工作线程启动后,这些监听器将被设置为active状态,表示可以立即提供服务
active_listeners_.emplace_back(std::move(new_listener));
}

Expand Down Expand Up @@ -953,6 +958,7 @@ void ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog, std::function
removeListenerInternal(listener->name(), false);
continue;
}
// 遍历所有工作线程workers_,并依次执行addListenerToWorker方法将监听器与工作线程进行绑定
for (const auto& worker : workers_) {
addListenerToWorker(*worker, absl::nullopt, *listener,
[this, listeners_pending_init, callback]() {
Expand All @@ -963,6 +969,7 @@ void ListenerManagerImpl::startWorkers(OptRef<GuardDog> guard_dog, std::function
});
}
}
// 轮询工作线程列表workers_,并运行WorkerImpl::start方法启动每个线程
for (const auto& worker : workers_) {
ENVOY_LOG(debug, "starting worker {}", i);
worker->start(guard_dog, worker_started_running);
Expand Down
1 change: 1 addition & 0 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ void IoSocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Even
Event::FileTriggerType trigger, uint32_t events) {
ASSERT(file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
"file descriptor. This is not allowed.");
// 通过Dispatcher向libevent库传递监听fd_及网络事件的回调方法onSocketEvent
file_event_ = dispatcher.createFileEvent(fd_, cb, trigger, events);
}

Expand Down
5 changes: 3 additions & 2 deletions source/common/network/tcp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void TcpListenerImpl::onSocketEvent(short flags) {

sockaddr_storage remote_addr;
socklen_t remote_addr_len = sizeof(remote_addr);

// 执行系统的accept方法来接收新连接Socket对象
IoHandlePtr io_handle =
socket_->ioHandle().accept(reinterpret_cast<sockaddr*>(&remote_addr), &remote_addr_len);
if (io_handle == nullptr) {
Expand Down Expand Up @@ -104,7 +104,7 @@ void TcpListenerImpl::onSocketEvent(short flags) {
: Address::addressFromSockAddrOrThrow(remote_addr, remote_addr_len,
local_address->ip()->version() ==
Address::IpVersion::v6);

// 执行ActiveTcpListener::onAccept回调方法进入连接接收阶段
cb_.onAccept(std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address,
remote_address, overload_state_,
track_global_cx_limit_in_overload_manager_));
Expand Down Expand Up @@ -136,6 +136,7 @@ TcpListenerImpl::TcpListenerImpl(Event::Dispatcher& dispatcher, Random::RandomGe
// Use level triggered mode to avoid potential loss of the trigger due to
// transient accept errors or early termination due to accepting
// max_connections_to_accept_per_socket_event connections.
// 创建网络监听,并设置收到新连接的回调方法为onSocketEvent
socket_->ioHandle().initializeFileEvent(
dispatcher,
[this](uint32_t events) {
Expand Down
3 changes: 3 additions & 0 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
ASSERT(!shutdown_);

if (main_thread) {
// 主线程单独记录
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else {
ASSERT(!containsReference(registered_threads_, dispatcher));
// 将新工作线程注册到列表中
registered_threads_.push_back(dispatcher);
// 主线程轮询每个工作线程发送post任务
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,
helper_(cm, "cds"), cm_(cm), scope_(scope.createScope("cluster_manager.cds.")) {
const auto resource_name = getResourceName();
if (cds_resources_locator == nullptr) {
// 注册对CDS资源的订阅subscription
// 每当有CDS配置事件发生变化时,都通过SubscriptionCallbacks注册的回调方法执行CdsApiImpl::onConfigUpdate方法,
// 然后执行ClusterManager中的addOrUpdateCluster或removeCluster反复噶添加或删除Cluster
subscription_ = THROW_OR_RETURN_VALUE(cm_.subscriptionFactory().subscriptionFromConfigSource(
cds_config, Grpc::Common::typeUrl(resource_name),
*scope_, *this, resource_decoder_, {}),
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo

// Once the initial set of static bootstrap clusters are created (including the local cluster),
// we can instantiate the thread local cluster manager.
// 针对每个线程创建ThreadLocalClusterManagerImpl,解决多个工作线程访问Cluster配置的锁问题
tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
});
Expand Down
1 change: 1 addition & 0 deletions source/server/configuration_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ absl::Status MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap&
ENVOY_LOG(info, "loading {} cluster(s)", bootstrap.static_resources().clusters().size());

// clusterManagerFromProto() and init() have to be called consecutively.
// 创建Cluster管理器
cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);
status = cluster_manager_->initialize(bootstrap);
RETURN_IF_NOT_OK(status);
Expand Down
4 changes: 4 additions & 0 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar
}

// Workers get created first so they register for thread local updates.
// 初始化每个工作线程Worker对象
listener_manager_ = listener_manager_factory->createListenerManager(
*this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);

Expand Down Expand Up @@ -760,6 +761,7 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar
bootstrap_.dynamic_resources().lds_resources_locator()),
xds::core::v3::ResourceLocator));
}
// 加载启动文件里的LDS配置,调用父类ListenerManagerImpl创建对LDS配置的订阅
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config(),
lds_resources_locator.get());
}
Expand Down Expand Up @@ -894,6 +896,7 @@ RunHelper::RunHelper(Instance& instance, const Options& options, Event::Dispatch
OverloadManager& null_overload_manager, std::function<void()> post_init_cb)
: init_watcher_("RunHelper", [&instance, post_init_cb]() {
if (!instance.isShutdown()) {
// 将调用startWorkers
post_init_cb();
}
}) {
Expand Down Expand Up @@ -967,6 +970,7 @@ RunHelper::RunHelper(Instance& instance, const Options& options, Event::Dispatch
void InstanceBase::run() {
// RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
// startup (see RunHelperTest in server_test.cc).
// 创建RunHelper对象来负责工作线程启动前的准备工作,并在准备工作完成后执行InstanceBase::startWorkers方法启动工作线程
const auto run_helper =
RunHelper(*this, options_, *dispatcher_, clusterManager(), access_log_manager_, init_manager_,
overloadManager(), nullOverloadManager(), [this] {
Expand Down
5 changes: 5 additions & 0 deletions source/server/worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
: tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
api_(api), reset_streams_counter_(
api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
// 在构造方法中注册工作线程对象
tls_.registerThread(*dispatcher_, false);
overload_manager.registerForAction(
OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
Expand All @@ -64,6 +65,7 @@ WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
Network::ListenerConfig& listener, AddListenerCompletion completion,
Runtime::Loader& runtime, Random::RandomGenerator& random) {
// 调用目标线程Dispatcher的post方法,将连接管理器ConnectionHandler与监听器进行绑定
dispatcher_->post(
[this, overridden_listener, &listener, &runtime, &random, completion]() -> void {
handler_->addListener(overridden_listener, listener, runtime, random);
Expand Down Expand Up @@ -146,13 +148,16 @@ void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<v
ENVOY_LOG(debug, "worker entering dispatch loop");
// The watch dog must be created after the dispatcher starts running and has post events flushed,
// as this is when TLS stat scopes start working.
// 发送post异步任务到目标工作线程任务调度队列
dispatcher_->post([this, &guard_dog, cb]() {
cb();
if (guard_dog.has_value()) {
// 创建工作线程自己的WatchDog
watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(),
dispatcher_->name(), *dispatcher_);
}
});
// 执行线程Dispatcher中的run方法来阻塞等待处理新事件
dispatcher_->run(Event::Dispatcher::RunType::Block);
ENVOY_LOG(debug, "worker exited dispatch loop");
if (guard_dog.has_value()) {
Expand Down

0 comments on commit 8b72ff5

Please sign in to comment.