Skip to content

Commit

Permalink
Do not attempt to write data to a closed stream's buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 18, 2024
1 parent 84b935d commit 7cb75bd
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ func (s *Stream) Close() error {
return nil
}

// Esolc closing a stream passively.
func (s *Stream) Esolc() error {
s.rer.Put(io.EOF)
s.wer.Put(io.ErrClosedPipe)
s.son.Do(func() {
s.idp <- s.idx
})
return nil
}

// Read implements io.Reader.
func (s *Stream) Read(p []byte) (int, error) {
if len(s.rbf) != 0 {
Expand All @@ -43,6 +53,9 @@ func (s *Stream) Read(p []byte) (int, error) {
s.rbf = s.rbf[n:]
return n, nil
}
if err := s.rer.Get(); err != nil {
return 0, err
}
select {
case s.rbf = <-s.rch:
n := copy(p, s.rbf)
Expand All @@ -51,6 +64,7 @@ func (s *Stream) Read(p []byte) (int, error) {
case <-s.rer.Sig():
return 0, s.rer.Get()
case <-s.mux.rer.Sig():
s.rer.Put(s.mux.rer.Get())
return 0, s.mux.rer.Get()
}
}
Expand Down Expand Up @@ -178,15 +192,16 @@ func (m *Mux) Spawn() {
break
}
stm = m.usb[idx]
if stm.rer.Get() != nil {
break
}
select {
case stm.rch <- msg:
case <-stm.rer.Sig():
}
case cmd == 0x02:
stm = m.usb[idx]
stm.rer.Put(io.EOF)
stm.wer.Put(io.ErrClosedPipe)
stm.son.Do(func() { stm.idp <- stm.idx })
stm.Esolc()
case cmd >= 0x03:
// Packet format error, connection closed.
m.con.Close()
Expand Down

0 comments on commit 7cb75bd

Please sign in to comment.