Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unreadable symbols in DS stream on client side #1698

Open
wants to merge 2 commits into
base: zkevm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions zk/datastream/client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func NewClient(ctx context.Context, server string, useTLS bool, checkTimeout tim
mtxStreaming: &sync.Mutex{},
useTLS: useTLS,
tlsConfig: &tls.Config{},
allowStops: true,
}

// Extract hostname from server address (removing port if present)
Expand Down Expand Up @@ -133,6 +134,7 @@ func (c *StreamClient) GetL2BlockByNumber(blockNum uint64) (fullBLock *types.Ful

default:
}

if err := c.stopStreamingIfStarted(); err != nil {
return nil, fmt.Errorf("stopStreamingIfStarted: %w", err)
}
Expand Down Expand Up @@ -181,10 +183,6 @@ func (c *StreamClient) getL2BlockByNumber(blockNum uint64) (l2Block *types.FullL
return nil, fmt.Errorf("expected block number %d but got %d", blockNum, l2Block.L2BlockNumber)
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand Down Expand Up @@ -225,6 +223,24 @@ func (c *StreamClient) stopStreamingIfStarted() error {
if err := c.sendStopCmd(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}

// Read packet
packet, err := c.readBuffer(1)
if err != nil {
return fmt.Errorf("readBuffer: %w", err)
}

// Check packet type
if packet[0] != PtResult {
log.Error("[Datastream client] Expecting result packet type", "packet", packet[0], "client", c)
return fmt.Errorf("expecting result packet type %d and received %d", PtResult, packet[0])
}

// Read server result entry for the command
if _, err := c.readResultEntry(packet); err != nil {
return fmt.Errorf("readResultEntry: %w", err)
}

c.setStreaming(false)
}

Expand Down Expand Up @@ -262,10 +278,6 @@ func (c *StreamClient) getLatestL2Block() (l2Block *types.FullL2Block, err error
return nil, errors.New("no block found")
}

if err := c.Stop(); err != nil {
return nil, fmt.Errorf("Stop: %w", err)
}

return l2Block, nil
}

Expand Down Expand Up @@ -296,7 +308,7 @@ func (c *StreamClient) Stop() error {
if c.conn == nil || !c.allowStops {
return nil
}
if err := c.sendStopCmd(); err != nil {
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("sendStopCmd: %w", err)
}

Expand Down Expand Up @@ -434,44 +446,31 @@ func (c *StreamClient) ReadAllEntriesToChannel() (err error) {
return fmt.Errorf("context done - stopping")
default:
}
if err := c.stopStreamingIfStarted(); err != nil {
return fmt.Errorf("stopStreamingIfStarted: %w", err)

if err = c.stopStreamingIfStarted(); err != nil {
err = fmt.Errorf("stopStreamingIfStarted: %w", err)
return err
}

// first load up the header of the stream
if _, err := c.GetHeader(); err != nil {
return fmt.Errorf("GetHeader: %w", err)
if _, err = c.GetHeader(); err != nil {
err = fmt.Errorf("GetHeader: %w, client %v", err, c)
return err
}

if err = c.readAllEntriesToChannel(); err != nil {
c.lastError = err
return err
}

return nil
}

func (c *StreamClient) handleSocketError(socketErr error) bool {
if socketErr != nil {
log.Warn(fmt.Sprintf("%v", socketErr))
}
if err := c.tryReConnect(); err != nil {
log.Warn(fmt.Sprintf("try reconnect: %v", err))
return false
}

c.RenewEntryChannel()

return true
}

// reads entries to the end of the stream
// at end will wait for new entries to arrive
func (c *StreamClient) readAllEntriesToChannel() (err error) {
defer func() {
if err != nil {
c.setStreaming(false)
c.lastError = err
}
}()

Expand Down Expand Up @@ -542,14 +541,8 @@ LOOP:
break LOOP
}

var timeout time.Time
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
} else {
timeout = time.Now().Add(c.checkTimeout)
}

if err = c.conn.SetReadDeadline(timeout); err != nil {
err = c.resetReadTimeout()
if err != nil {
return err
}

Expand Down Expand Up @@ -893,7 +886,7 @@ func (c *StreamClient) readResultEntry(packet []byte) (re *types.ResultEntry, er
case types.CmdErrInvalidCommand:
return re, fmt.Errorf("%w: %s", types.ErrInvalidCommand, re.ErrorStr)
default:
return re, fmt.Errorf("unknown error code: %s", re.ErrorStr)
return re, fmt.Errorf("unknown error code: %d str: %s", re.ErrorNum, re.ErrorStr)
}
}

Expand Down Expand Up @@ -949,29 +942,29 @@ func (c *StreamClient) writeToConn(data interface{}) error {
}

func (c *StreamClient) resetWriteTimeout() error {
var timeout time.Time
var timeout time.Duration
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
timeout = minimumCheckTimeout
} else {
timeout = time.Now().Add(c.checkTimeout)
timeout = c.checkTimeout
}

if err := c.conn.SetWriteDeadline(timeout); err != nil {
if err := c.setWriteTimeout(timeout); err != nil {
return fmt.Errorf("%w: conn.SetWriteDeadline: %v", ErrSocket, err)
}

return nil
}

func (c *StreamClient) resetReadTimeout() error {
var timeout time.Time
var timeout time.Duration
if c.checkTimeout < minimumCheckTimeout {
timeout = time.Now().Add(minimumCheckTimeout)
timeout = minimumCheckTimeout
} else {
timeout = time.Now().Add(c.checkTimeout)
timeout = c.checkTimeout
}

if err := c.conn.SetReadDeadline(timeout); err != nil {
if err := c.setReadTimeout(timeout); err != nil {
return fmt.Errorf("%w: conn.SetReadDeadline: %v", ErrSocket, err)
}

Expand All @@ -985,3 +978,11 @@ func (c *StreamClient) PrepUnwind() {
// is activated.
c.setStreaming(true)
}

func (c *StreamClient) setReadTimeout(timeout time.Duration) error {
return c.conn.SetReadDeadline(time.Now().Add(timeout))
}

func (c *StreamClient) setWriteTimeout(timeout time.Duration) error {
return c.conn.SetWriteDeadline(time.Now().Add(timeout))
}
6 changes: 5 additions & 1 deletion zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,11 @@ func newStreamClient(ctx context.Context, cfg BatchesCfg, latestForkId uint64) (
}
} else {
dsClient = cfg.dsClient
stopFn = func() {}
stopFn = func() {
if err := dsClient.Stop(); err != nil {
log.Warn("Failed to stop datastream client", "err", err)
}
}
}

return dsClient, stopFn, nil
Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_batches_datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *DatastreamClientRunner) StartRead(errorChan chan struct{}, diffBlock ui
} else {
r.dsClient.RenewEntryChannel()
}
r.dsClient.RenewEntryChannel()

if r.isReading.Load() {
return fmt.Errorf("tried starting datastream client runner thread while another is running")
}
Expand Down
Loading