diff --git a/server/leader_controller.go b/server/leader_controller.go index dab298d3..b5b9739c 100644 --- a/server/leader_controller.go +++ b/server/leader_controller.go @@ -886,10 +886,10 @@ func (lc *leaderController) handleWriteStream(stream proto.OxiaClient_WriteStrea req, err := stream.Recv() if err != nil { - closeCh <- err + sendNonBlocking(closeCh, err) return } else if req == nil { - closeCh <- errors.New("stream closed") + sendNonBlocking(closeCh, errors.New("stream closed")) return } @@ -908,7 +908,7 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS offset int64, timestamp uint64, err error, timer metrics.Timer) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } @@ -917,13 +917,13 @@ func (lc *leaderController) handleWalSynced(stream proto.OxiaClient_WriteStreamS }, func(response *proto.WriteResponse, err error) { if err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } if err = stream.Send(response); err != nil { timer.Done() - closeCh <- err + sendNonBlocking(closeCh, err) return } timer.Done() @@ -1136,3 +1136,10 @@ func checkStatusIsLeader(actual proto.ServingStatus) error { } return nil } + +func sendNonBlocking(ch chan error, err error) { + select { + case ch <- err: + default: + } +}