Skip to content

Commit

Permalink
2024-08-15 16:45:37
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 15, 2024
1 parent d5a9be9 commit 835fe8f
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,39 @@ import (
type Stream struct {
idx uint8
mux *Mux
ocn sync.Once // once close negative
ocp sync.Once // once close positive
rbf []byte
rch chan []byte
rer *Err
sip *Sip
son sync.Once
wer *Err
wmu sync.Mutex
}

// Close implements io.Closer.
func (s *Stream) Close() error {
// Close the stream negative.
func (s *Stream) CloseNegative() error {
s.rer.Put(io.EOF)
s.wer.Put(io.ErrClosedPipe)
s.ocn.Do(func() { s.mux.Write(0, []byte{s.idx, 0x03, 0x00, 0x00}) })
return nil
}

// Close the stream positive.
func (s *Stream) ClosePositive() error {
s.rer.Put(io.ErrClosedPipe)
s.wer.Put(io.ErrClosedPipe)
s.son.Do(func() {
s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00})
s.sip.Put(s.idx)
})
s.wmu.Lock()
defer s.wmu.Unlock()
s.ocp.Do(func() { s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00}) })
return nil
}

// Close implements io.Closer.
func (s *Stream) Close() error {
return s.ClosePositive()
}

// Read implements io.Reader.
func (s *Stream) Read(p []byte) (int, error) {
if len(s.rbf) != 0 {
Expand Down Expand Up @@ -59,6 +73,8 @@ func (s *Stream) Read(p []byte) (int, error) {

// Write implements io.Writer.
func (s *Stream) Write(p []byte) (int, error) {
s.wmu.Lock()
defer s.wmu.Unlock()
n := 0
l := 0
b := make([]byte, 2048)
Expand Down Expand Up @@ -93,12 +109,14 @@ func NewStream(idx uint8, mux *Mux) *Stream {
return &Stream{
idx: idx,
mux: mux,
ocn: sync.Once{},
ocp: sync.Once{},
rbf: make([]byte, 0),
rch: make(chan []byte, 32),
rer: NewErr(),
sip: nil,
son: sync.Once{},
wer: NewErr(),
wmu: sync.Mutex{},
}
}

Expand Down Expand Up @@ -186,10 +204,11 @@ func (m *Mux) Spawn() {
}
case cmd == 0x02:
stm := m.usb[idx]
stm.rer.Put(io.EOF)
stm.wer.Put(io.ErrClosedPipe)
stm.son.Do(func() { stm.sip.Put(stm.idx) })
case cmd >= 0x03:
stm.CloseNegative()
case cmd == 0x03:
stm := m.usb[idx]
stm.sip.Put(idx)
case cmd >= 0x04:
// Packet format error, connection closed.
m.con.Close()
}
Expand Down Expand Up @@ -230,8 +249,8 @@ func NewMuxServer(conn net.Conn) *Mux {
mux := NewMux(conn)
for i := range len(mux.usb) {
old := NewStream(uint8(i), mux)
old.son.Do(func() {})
old.Close()
old.ocp.Do(func() {})
old.ClosePositive()
mux.usb[i] = old
}
go mux.Spawn()
Expand Down

0 comments on commit 835fe8f

Please sign in to comment.