From e7ef8736512b22915ffd1dbb5b4c8e3bf1aed11f Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 15 Nov 2024 19:06:48 -0800 Subject: [PATCH] Fixed deadlock when handling multiple stream entries errors (#576) This is an alternate approach for #572 The `closeCh` is used to propagate back the error to the client stream handler. Writing on this channel should never block and it's just necessary to write once, all other errors are just ignored. --- server/leader_controller.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/server/leader_controller.go b/server/leader_controller.go index dab298d3..b5b9739c 100644 --- a/server/leader_controller.go +++ b/server/leader_controller.go @@ -886,10 +886,10 @@ func (lc *leaderController) handleWriteStream(stream proto.OxiaClient_WriteStrea req, err := stream.Recv() if err != nil { - closeCh <- err + sendNonBlocking(closeCh, err) return } else if req == nil { - closeCh <- errors.New("stream closed") + sendNonBlocking(closeCh, errors.New("stream closed")) return } @@ -908,7 +908,7 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS offset int64, timestamp uint64, err error, timer metrics.Timer) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } @@ -917,13 +917,13 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS }, func(response *proto.WriteResponse, err error) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } if err = stream.Send(response); err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } timer.Done() @@ -1136,3 +1136,10 @@ func checkStatusIsLeader(actual proto.ServingStatus) error { } return nil } + +func sendNonBlocking(ch chan error, err error) { + select { + case ch <- err: + default: + } +}