Skip to content

Commit

Permalink
fix: unreadable symbols in DS stream on client side
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanBelyakoff committed Jan 30, 2025
1 parent 589e949 commit e6eaaf4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 44 deletions.
81 changes: 39 additions & 42 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewClient(ctx context.Context, server string, useTLS bool, checkTimeout tim
mtxStreaming: &sync.Mutex{},
useTLS: useTLS,
tlsConfig: &tls.Config{},
allowStops: true,
}

// Extract hostname from server address (removing port if present)
Expand Down Expand Up @@ -133,6 +134,7 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful

default:
}

if err := c.stopStreamingIfStarted(); err != nil {
return nil, fmt.Errorf("stopStreamingIfStarted: %w", err)
}
Expand Down Expand Up @@ -181,10 +183,6 @@ func (c *StreamClient) getL2BlockByNumber(blockNum uint64) (l2Block *types.FullL
return nil, fmt.Errorf("expected block number %d but got %d", blockNum, l2Block.L2BlockNumber)
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand Down Expand Up @@ -225,6 +223,24 @@ func (c *StreamClient) stopStreamingIfStarted() error {
if err := c.sendStopCmd(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}

// Read packet
packet, err := c.readBuffer(1)
if err != nil {
return fmt.Errorf("readBuffer: %w", err)
}

// Check packet type
if packet[0] != PtResult {
log.Error("[Datastream client] Expecting result packet type", "packet", packet[0], "client", c)
return fmt.Errorf("expecting result packet type %d and received %d", PtResult, packet[0])
}

// Read server result entry for the command
if _, err := c.readResultEntry(packet); err != nil {
return fmt.Errorf("readResultEntry: %w", err)
}

c.setStreaming(false)
}

Expand Down Expand Up @@ -262,10 +278,6 @@ func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error
return nil, errors.New("no block found")
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand Down Expand Up @@ -296,7 +308,7 @@ func (c *StreamClient) Stop() error {
if c.conn == nil || !c.allowStops {
return nil
}
if err := c.sendStopCmd(); err != nil {
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}

Expand Down Expand Up @@ -434,44 +446,31 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
return fmt.Errorf("context done - stopping")
default:
}
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("stopStreamingIfStarted: %w", err)

if err = c.stopStreamingIfStarted(); err != nil {
err = fmt.Errorf("stopStreamingIfStarted: %w", err)
return err
}

// first load up the header of the stream
if _, err := c.GetHeader(); err != nil {
return fmt.Errorf("GetHeader: %w", err)
if _, err = c.GetHeader(); err != nil {
err = fmt.Errorf("GetHeader: %w, client %v", err, c)
return err
}

if err = c.readAllEntriesToChannel(); err != nil {
c.lastError = err
return err
}

return nil
}

func (c *StreamClient) handleSocketError(socketErr error) bool {
if socketErr != nil {
log.Warn(fmt.Sprintf("%v", socketErr))
}
if err := c.tryReConnect(); err != nil {
log.Warn(fmt.Sprintf("try reconnect: %v", err))
return false
}

c.RenewEntryChannel()

return true
}

// reads entries to the end of the stream
// at end will wait for new entries to arrive
func (c *StreamClient) readAllEntriesToChannel() (err error) {
defer func() {
if err != nil {
c.setStreaming(false)
c.lastError = err
}
}()

Expand Down Expand Up @@ -542,14 +541,8 @@ LOOP:
break LOOP
}

var timeout time.Time
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
} else {
timeout = time.Now().Add(c.checkTimeout)
}

if err = c.conn.SetReadDeadline(timeout); err != nil {
err = c.resetReadTimeout()
if err != nil {
return err
}

Expand Down Expand Up @@ -893,7 +886,7 @@ func (c *StreamClient) readResultEntry(packet []byte) (re *types.ResultEntry, er
case types.CmdErrInvalidCommand:
return re, fmt.Errorf("%w: %s", types.ErrInvalidCommand, re.ErrorStr)
default:
return re, fmt.Errorf("unknown error code: %s", re.ErrorStr)
return re, fmt.Errorf("unknown error code: %d str: %s", re.ErrorNum, re.ErrorStr)
}
}

Expand Down Expand Up @@ -964,14 +957,14 @@ func (c *StreamClient) resetWriteTimeout() error {
}

func (c *StreamClient) resetReadTimeout() error {
var timeout time.Time
var timeout time.Duration
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
timeout = minimumCheckTimeout
} else {
timeout = time.Now().Add(c.checkTimeout)
timeout = c.checkTimeout
}

if err := c.conn.SetReadDeadline(timeout); err != nil {
if err := c.setReadTimeout(timeout); err != nil {
return fmt.Errorf("%w: conn.SetReadDeadline: %v", ErrSocket, err)
}

Expand All @@ -985,3 +978,7 @@ func (c *StreamClient) PrepUnwind() {
// is activated.
c.setStreaming(true)
}

func (c *StreamClient) setReadTimeout(timeout time.Duration) error {
return c.conn.SetReadDeadline(time.Now().Add(timeout))
}
6 changes: 5 additions & 1 deletion zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,11 @@ func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (
}
} else {
dsClient = cfg.dsClient
stopFn = func() {}
stopFn = func() {
if err := dsClient.Stop(); err != nil {
log.Warn("Failed to stop datastream client", "err", err)
}
}
}

return dsClient, stopFn, nil
Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_batches_datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *DatastreamClientRunner) StartRead(errorChan chan struct{}, diffBlock ui
} else {
r.dsClient.RenewEntryChannel()
}
r.dsClient.RenewEntryChannel()

if r.isReading.Load() {
return fmt.Errorf("tried starting datastream client runner thread while another is running")
}
Expand Down

0 comments on commit e6eaaf4

Please sign in to comment.