Skip to content

Commit

Permalink
Rewrite stream id pool
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 16, 2024
1 parent 0086a26 commit 1adff02
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 15 deletions.
2 changes: 1 addition & 1 deletion protocol/czar/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// Close the specified stream.
//
// +-----+-----+-----+-----+
// | Sid | 2 | Rsv |
// | Sid | 2 | 0/1 | Rsv |
// +-----+-----+-----+-----+

// Server implemented the czar protocol.
Expand Down
28 changes: 14 additions & 14 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

// A Stream managed by the multiplexer.
type Stream struct {
idp chan uint8
idp *Sip
idx uint8
mux *Mux
rbf []byte
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewStream(idx uint8, mux *Mux) *Stream {
type Mux struct {
ach chan *Stream
con net.Conn
idp chan uint8
idp *Sip
rer *Err
usb []*Stream
wm0 sync.Mutex
Expand All @@ -132,11 +132,14 @@ func (m *Mux) Close() error {

// Open is used to create a new stream as a net.Conn.
func (m *Mux) Open() (*Stream, error) {
idx := <-m.idp
_, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
if err != nil {
m.idp <- idx
return nil, err
idx, er0 := m.idp.Get()
if er0 != nil {
return nil, er0
}
_, er1 := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
if er1 != nil {
m.idp.Put(idx)
return nil, er1
}
stm := NewStream(idx, m)
stm.idp = m.idp
Expand Down Expand Up @@ -174,7 +177,8 @@ func (m *Mux) Spawn() {
}
stm = NewStream(idx, m)
// The mux server does not need to using an id pool.
stm.idp = make(chan uint8, 1)
stm.idp = NewSip()
stm.idp.Set(idx)
m.usb[idx] = stm
m.ach <- stm
case cmd == 0x01:
Expand All @@ -196,7 +200,7 @@ func (m *Mux) Spawn() {
stm.Esolc()
}
stm.zon[2].Do(func() {
stm.idp <- stm.idx
stm.idp.Put(stm.idx)
})
case cmd >= 0x03:
// Packet format error, connection closed.
Expand Down Expand Up @@ -251,12 +255,8 @@ func NewMuxServer(conn net.Conn) *Mux {

// NewMuxClient returns a new MuxClient.
func NewMuxClient(conn net.Conn) *Mux {
idp := make(chan uint8, 256)
for i := range 256 {
idp <- uint8(i)
}
mux := NewMux(conn)
mux.idp = idp
mux.idp = NewSip()
go mux.Spawn()
return mux
}
49 changes: 49 additions & 0 deletions protocol/czar/sip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package czar

import (
"errors"
"math/big"
"sync"
)

// A stream id generator. Stream id can be reused, and the smallest available stream id is guaranteed to be generated
// each time.
type Sip struct {
i *big.Int
m *sync.Mutex
}

// Get selects an stream id from the pool, removes it from the pool, and returns it to the caller.
func (s *Sip) Get() (uint8, error) {
s.m.Lock()
defer s.m.Unlock()
n := big.NewInt(0).Not(s.i)
m := n.TrailingZeroBits()
if m == 256 {
return 0, errors.New("daze: out of stream")
}
s.i.SetBit(s.i, int(m), 1)
return uint8(m), nil
}

// Put adds x to the pool.
func (s *Sip) Put(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 0)
}

// Set removes x from the pool.
func (s *Sip) Set(x uint8) {
s.m.Lock()
defer s.m.Unlock()
s.i = s.i.SetBit(s.i, int(x), 1)
}

// NewSip returns a new sid.
func NewSip() *Sip {
return &Sip{
i: big.NewInt(0),
m: &sync.Mutex{},
}
}
19 changes: 19 additions & 0 deletions protocol/czar/sip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package czar

import (
"testing"

"github.com/mohanson/daze/lib/doa"
)

func TestSip(t *testing.T) {
sid := NewSip()
for i := range 256 {
doa.Doa(doa.Try(sid.Get()) == uint8(i))
}
doa.Doa(doa.Err(sid.Get()) != nil)
sid.Put(65)
sid.Put(15)
doa.Doa(doa.Try(sid.Get()) == 15)
doa.Doa(doa.Try(sid.Get()) == 65)
}

0 comments on commit 1adff02

Please sign in to comment.