-
Notifications
You must be signed in to change notification settings - Fork 594
Rewrite stream manager back presssure algorithm #1785
base: master
Are you sure you want to change the base?
Conversation
@@ -76,6 +76,9 @@ class StMgrClientMgr { | |||
|
|||
sp_int64 high_watermark_; | |||
sp_int64 low_watermark_; | |||
|
|||
// Counters for remote instance traffic, this is used for back pressure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more description here as to whats the key, value means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
sp_int32 StMgrClientMgr::FindBusiestTaskOnStmgr(const sp_string& _stmgr_id) { | ||
sp_int32 task_id; | ||
sp_int64 max = 0; | ||
for (auto iter = instance_stats_[_stmgr_id].begin(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if instance_stats_[_stmgr_id] does not exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If instance_stats_[_stmgr_id] does not exist that means we don't have any traffic to that stmgr, therefore impossible to trigger a back pressure.
void StMgrClientMgr::SendTupleStreamMessage(sp_int32 _task_id, const sp_string& _stmgr_id, | ||
const proto::system::HeronTupleSet2& _msg) { | ||
auto iter = clients_.find(_stmgr_id); | ||
CHECK(iter != clients_.end()); | ||
|
||
instance_stats_[_stmgr_id][_task_id] += _msg.GetCachedSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where are you clearing this? Shouldn;t this be cleared on a regular basis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What makes you think a += operator is clearing it?? I am totally confused...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, replied too fast. Good point, we need to clear it after a backpressure is triggered.
The current approach has limitations, but it also has simplicity and transparency. It's easy for someone to rationalize that one agent invoked back pressure and all the spouts stopped. With this new approach, understanding the system becomes a more complex problem. @congwang can you discuss what metrics, logging, etc that should be (or are being) included to help someone troubleshooting to understand the current state of the system and how it got there? For example why some bolts might be emitting and not others, or visualizations around the reference counting mechanism, or which agents are contributing to the backpressure and in what order. Also, the shortcomings and the features of the new approach are well described above, but could you expand on the specifics of the new algorithms at play, either here or in another doc or state diagram? I think that will be a critical thing to understand both when troubleshooting and when reading the code. |
@billonahill The new one is not any harder to understand, we still throttle things, just that we only throttle relevant things in the path, in a smart way. The reference counting might be slightly harder to understand, but think about the cases where paths overlap, it is natural to have. The logging is extended to include task id rather than just stmgr id, to reflect this change. But if you feel we need more logging in some places for trouble shooting purpose, I am very happy to add (actually I removed some debugging logs before sending this PR). The metrics should be the same as before, I don't understand your question about bolts emitting tuples, this totally depends on their code, right? I am not sure about the doc, perhaps need to add a page under |
Thanks @congwang, yes we'll certainly want docs under website/content/docs so might as well start there. For the reference counting I understand the reason for the feature, I was questioning how the effect of the feature would be made clear to the user during backpressure. I wanted to make sure we've thought about how to help a user answer these questions (for example) when backpressure is occurring, ideally without having to sift too much through logs:
I haven't yet reviewed the code, but wanted to hear your thoughts on these so I'd know what to expect. From past experience understanding backpressure hasn't always been easy with the existing implementation. My question about the bolts not emitting was because with the new approach you would stop upstream bolts from emitting but others would continue to emit, as I understand it. I was using that as an example for something that we might need to clearly point out to our users (or not?). |
@billonahill I will open a separate issue on github for the doc, since my English writing skill is not good at all, I expect someone else could write it. We do have metrics for back pressure initiater, Of course we should not throttle downstream ones, otherwise no one will consume the packets, deadlock!! ;-) |
For documenting the specifics of the algorithm, an README targeted to just Heron developers would suffice, if you're not comfortable writing the public documentation. This is a critical thing to clearly describe to make it possible to best review and maintain the code. |
@billonahill I believe I already clarify the high-level overview of the new algorithm in the description of this PR. If anything not clear, please point it out, I am happy to add more. BTW, I don't think we should cover any code details in description. |
@@ -148,9 +148,6 @@ void StMgrClient::HandleHelloResponse(void*, proto::stmgr::StrMgrHelloResponse* | |||
Stop(); | |||
} | |||
delete _response; | |||
if (client_manager_->DidAnnounceBackPressure()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it failed the unit test test_back_pressure_stmgr_reconnect.
when the stmgr X in backpressure disconnects and reconnects, stmgr X is supposed to receive backpressure notice according to test_back_pressure_stmgr_reconnect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow up question:
how do you maintain the state eg. backpressure_starters_ after stmgr restarts/reconnects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huijunw That should be handled in HandleConnectionClose().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huijunw I already noticed the test case failure, will fix it.
i have question on point 2. |
@huijunw It is not a guess, the topology is in pplan, it is 100% accurate. FindBusiestTaskOnStmgr() is the best effort we can do to find out which one to blame, but we can't predicate the future, if we blame a wrong one with this, the next time when BP is triggered again we could catch up. |
The current back pressure algorithm we use in stream manager is problematic in several ways:
It simply throttles all spouts in the whole topology when congestion happens in even just one container. This unfairly penalizes irrelevant paths in the topology;
It does not do anything to these bolts in the middle even when they generate tuples too hence contribute to the congestion;
In an entire topology at one time there is only one instance could trigger back pressure, we don't allow multiple instances to trigger back pressure again because all spouts are already throttled so we don't need to throttle them again.
We could improve it by:
Stream managers need to learn all the paths in the topology and penalize all the bolts and spouts upstream along its paths, because they all could contribute to the congestion;
For congestions on the data path between two stream managers, there is no way to define "upstream" by the design of Heron, but we could add some counters to count packets from which instances contribute most to the congestion and do back pressure on behalf of these instances;
Back pressure should be able to trigger multiple times in a whole topology to reflect the fact that multiple instances on different paths of the topology could have congestions independently. Therefore, we need a real "reference counting" to make it work correctly.
This resolves #1567.