Skip to content

Commit

Permalink
Add timeout for write operations on client connections (0xPolygon#120)
Browse files Browse the repository at this point in the history
* Add a timeout for write operations on client connections

* DeadlineWrite renamed to TimeoutWrite. Log to show when timeout occurs

---------

Co-authored-by: agnusmor <[email protected]>
  • Loading branch information
dpunish3r and agnusmor authored May 31, 2024
1 parent 5e35df4 commit 311fa30
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 34 deletions.
18 changes: 16 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ func main() {
Value: 1000000, // nolint:gomnd
DefaultText: "1000000",
},
&cli.Uint64Flag{
Name: "writetout",
Usage: "timeout for write operations on client connections in ms (0=no timeout)",
Value: 3000, // nolint:gomnd
DefaultText: "3000",
},
},
Action: runServer,
},
Expand Down Expand Up @@ -188,6 +194,12 @@ func main() {
Value: "info",
DefaultText: "info",
},
&cli.Uint64Flag{
Name: "writetout",
Usage: "timeout for write operations on client connections in ms (0=no timeout)",
Value: 3000, // nolint:gomnd
DefaultText: "3000",
},
},
Action: runRelay,
},
Expand Down Expand Up @@ -217,12 +229,13 @@ func runServer(ctx *cli.Context) error {
port := ctx.Uint64("port")
sleep := ctx.Uint64("sleep")
numOpersLoop := ctx.Uint64("opers")
writeTout := ctx.Uint64("writetout")
if file == "" || port <= 0 {
return errors.New("bad/missing parameters")
}

// Create stream server
s, err := datastreamer.NewServer(uint16(port), 1, 137, StSequencer, file, nil) // nolint:gomnd
s, err := datastreamer.NewServer(uint16(port), 1, 137, StSequencer, file, time.Duration(writeTout)*time.Millisecond, nil) // nolint:gomnd
if err != nil {
return err
}
Expand Down Expand Up @@ -740,9 +753,10 @@ func runRelay(ctx *cli.Context) error {
if server == "" || file == "" || port <= 0 {
return errors.New("bad/missing parameters")
}
writeTout := ctx.Uint64("writetout")

// Create relay server
r, err := datastreamer.NewRelay(server, uint16(port), 1, 137, StSequencer, file, nil) // nolint:gomnd
r, err := datastreamer.NewRelay(server, uint16(port), 1, 137, StSequencer, file, time.Duration(writeTout)*time.Millisecond, nil) // nolint:gomnd
if err != nil {
return err
}
Expand Down
8 changes: 7 additions & 1 deletion datastreamer/config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package datastreamer

import "github.com/0xPolygonHermez/zkevm-data-streamer/log"
import (
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/log"
)

// Config type for datastreamer server
type Config struct {
// Port to listen on
Port uint16 `mapstructure:"Port"`
// Filename of the binary data file
Filename string `mapstructure:"Filename"`
// WriteTimeout for write opeations on client connections
WriteTimeout time.Duration
// Log
Log log.Config `mapstructure:"Log"`
}
4 changes: 3 additions & 1 deletion datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
Expand Down Expand Up @@ -101,6 +102,7 @@ var (
Level: "debug",
Outputs: []string{"stdout"},
},
WriteTimeout: time.Duration(3 * time.Second),
}
leveldb = config.Filename[0:strings.IndexRune(config.Filename, '.')] + ".db"
streamServer *datastreamer.StreamServer
Expand Down Expand Up @@ -199,7 +201,7 @@ func TestServer(t *testing.T) {
if err != nil {
panic(err)
}
streamServer, err = datastreamer.NewServer(config.Port, 1, 137, streamType, config.Filename, &config.Log)
streamServer, err = datastreamer.NewServer(config.Port, 1, 137, streamType, config.Filename, config.WriteTimeout, &config.Log)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion datastreamer/streamfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (f *StreamFile) writeHeaderEntry() error {
log.Debugf("writing header entry: %v", binaryHeader)
_, err = f.fileHeader.Write(binaryHeader)
if err != nil {
log.Errorf("Error writing the header: %v", err)
log.Errorf("Error writing the header %v: %v", binaryHeader, err)
return err
}

Expand Down
10 changes: 7 additions & 3 deletions datastreamer/streamrelay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package datastreamer

import "github.com/0xPolygonHermez/zkevm-data-streamer/log"
import (
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/log"
)

// StreamRelay type to manage a data stream relay
type StreamRelay struct {
Expand All @@ -9,7 +13,7 @@ type StreamRelay struct {
}

// NewRelay creates a new data stream relay
func NewRelay(server string, port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, cfg *log.Config) (*StreamRelay, error) {
func NewRelay(server string, port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, writeTimeout time.Duration, cfg *log.Config) (*StreamRelay, error) {
var r StreamRelay
var err error

Expand All @@ -21,7 +25,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64, stream
}

// Create server side
r.server, err = NewServer(port, version, systemID, streamType, fileName, cfg)
r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout, cfg)
if err != nil {
log.Errorf("Error creating relay server side: %v", err)
return nil, err
Expand Down
64 changes: 47 additions & 17 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package datastreamer

import (
"encoding/binary"
"errors"
"io"
"math"
"net"
"os"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -106,9 +108,10 @@ var (

// StreamServer type to manage a data stream server
type StreamServer struct {
port uint16 // Server stream port
fileName string // Stream file name
started bool // Flag server started
port uint16 // Server stream port
fileName string // Stream file name
writeTimeout time.Duration // Timeout for write operations on client connection
started bool // Flag server started

version uint8
systemID uint64
Expand Down Expand Up @@ -150,12 +153,13 @@ type ResultEntry struct {
}

// NewServer creates a new data stream server
func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, cfg *log.Config) (*StreamServer, error) {
func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string, writeTimeout time.Duration, cfg *log.Config) (*StreamServer, error) {
// Create the server data stream
s := StreamServer{
port: port,
fileName: fileName,
started: false,
port: port,
fileName: fileName,
writeTimeout: writeTimeout,
started: false,

version: version,
systemID: systemID,
Expand Down Expand Up @@ -393,7 +397,7 @@ func (s *StreamServer) addStream(desc string, etype EntryType, data []byte) (uin
// CommitAtomicOp commits the current atomic operation and streams it to the clients
func (s *StreamServer) CommitAtomicOp() error {
start := time.Now().UnixNano()
defer log.Debugf("CommitAtomicOp process time: %vns", time.Now().UnixNano()-start)
defer log.Infof("CommitAtomicOp process time: %vns", time.Now().UnixNano()-start)

log.Infof("!AtomicOp COMMIT (%d)", s.atomicOp.startEntry)
if s.atomicOp.status != aoStarted {
Expand All @@ -417,7 +421,9 @@ func (s *StreamServer) CommitAtomicOp() error {
atomic.entries = make([]FileEntry, len(s.atomicOp.entries))
copy(atomic.entries, s.atomicOp.entries)

log.Infof("[ds-debug] CommitAtomicOp before send to channel")
s.stream <- atomic
log.Infof("[ds-debug] CommitAtomicOp after send to channel")

// No atomic operation in progress
s.clearAtomicOp()
Expand Down Expand Up @@ -642,42 +648,51 @@ func (s *StreamServer) broadcastAtomicOp() {
broadcastOp := <-s.stream
start := time.Now().UnixMilli()
var killedClientMap = map[string]struct{}{}
log.Infof("[ds-debug] broadcastAtomicOp before mutexClients lock")
s.mutexClients.Lock()
// For each connected and started client
log.Debugf("Clients: %d, AO-entries: %d", len(s.clients), len(broadcastOp.entries))
log.Infof("Clients: %d, AO-entries: %d", len(s.clients), len(broadcastOp.entries))
for id, cli := range s.clients {
log.Infof("Client %s status %d[%s]", id, cli.status, StrClientStatus[cli.status])
if cli.status != csSynced {
continue
}

// Send entries
for _, entry := range broadcastOp.entries {
for index, entry := range broadcastOp.entries {
if entry.Number >= cli.fromEntry {
log.Debugf("Sending data entry %d (type %d) to %s", entry.Number, entry.Type, id)
binaryEntry := encodeFileEntryToBinary(entry)

// Send the file data entry
if cli.conn != nil {
_, err = cli.conn.Write(binaryEntry)
if index == 0 {
log.Infof("[ds-debug] before conn Write %s", id)
}
_, err = TimeoutWrite(cli.conn, binaryEntry, s.writeTimeout)
if index == 0 {
log.Infof("[ds-debug] after conn Write %s", id)
}
} else {
err = ErrNilConnection
}
if err != nil {
// Kill client connection
log.Warnf("Error sending entry to %s: %v", id, err)
killedClientMap[id] = struct{}{}
break // skip rest of entries for this client
}
}
}
}
s.mutexClients.Unlock()
log.Infof("[ds-debug] broadcastAtomicOp after mutexClients unlock")

for k := range killedClientMap {
s.killClient(k)
}

log.Debugf("broadcastAtomicOp process time: %vms", time.Now().UnixMilli()-start)
log.Infof("broadcastAtomicOp process time: %vms", time.Now().UnixMilli()-start)
}
}

Expand Down Expand Up @@ -883,7 +898,7 @@ func (s *StreamServer) processCmdHeader(client *client) error {

// Send header entry to the client
if client.conn != nil {
_, err = client.conn.Write(binaryHeader)
_, err = TimeoutWrite(client.conn, binaryHeader, s.writeTimeout)
} else {
err = ErrNilConnection
}
Expand Down Expand Up @@ -924,7 +939,7 @@ func (s *StreamServer) processCmdEntry(client *client) error {

// Send entry to the client
if client.conn != nil {
_, err = client.conn.Write(binaryEntry)
_, err = TimeoutWrite(client.conn, binaryEntry, s.writeTimeout)
} else {
err = ErrNilConnection
}
Expand Down Expand Up @@ -978,7 +993,7 @@ func (s *StreamServer) processCmdBookmark(client *client) error {

// Send entry to the client
if client.conn != nil {
_, err = client.conn.Write(binaryEntry)
_, err = TimeoutWrite(client.conn, binaryEntry, s.writeTimeout)
} else {
err = ErrNilConnection
}
Expand Down Expand Up @@ -1017,7 +1032,7 @@ func (s *StreamServer) streamingFromEntry(client *client, fromEntry uint64) erro
binaryEntry := encodeFileEntryToBinary(iterator.Entry)
log.Debugf("Sending data entry %d (type %d) to %s", iterator.Entry.Number, iterator.Entry.Type, client.clientId)
if client.conn != nil {
_, err = client.conn.Write(binaryEntry)
_, err = TimeoutWrite(client.conn, binaryEntry, s.writeTimeout)
} else {
err = ErrNilConnection
}
Expand Down Expand Up @@ -1054,7 +1069,7 @@ func (s *StreamServer) sendResultEntry(errorNum uint32, errorStr string, client
// Send the result entry to the client
var err error
if client.conn != nil {
_, err = client.conn.Write(binaryEntry)
_, err = TimeoutWrite(client.conn, binaryEntry, s.writeTimeout)
} else {
err = ErrNilConnection
}
Expand Down Expand Up @@ -1190,3 +1205,18 @@ func PrintResultEntry(e ResultEntry) {
func (c Command) IsACommand() bool {
return c >= CmdStart && c <= CmdBookmark
}

// TimeoutWrite sets a deadline time before write
func TimeoutWrite(conn net.Conn, data []byte, timeout time.Duration) (int, error) {
err := conn.SetWriteDeadline(time.Now().Add(timeout))
if err != nil {
log.Warnf("Error setting write deadline: %v", err)
}
n, err := conn.Write(data)
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
log.Infof("[ds-debug] Write deadline exceeded! %v", err)
}
}
return n, err
}
21 changes: 12 additions & 9 deletions relay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
Expand All @@ -19,10 +20,11 @@ const (
)

type config struct {
Server string
Port uint64
File string
Log string
Server string
Port uint64
File string
WriteTimeout time.Duration
Log string
}

func main() {
Expand Down Expand Up @@ -73,10 +75,11 @@ func main() {
// defaultConfig parses the default configuration values
func defaultConfig() (*config, error) {
cfg := config{
Server: "127.0.0.1:6900",
Port: 7900, // nolint:gomnd
File: "datarelay.bin",
Log: "info",
Server: "127.0.0.1:6900",
Port: 7900, // nolint:gomnd
File: "datarelay.bin",
WriteTimeout: time.Duration(3 * time.Second), // nolint:gomnd
Log: "info",
}

viper.SetConfigType("toml")
Expand Down Expand Up @@ -170,7 +173,7 @@ func run(ctx *cli.Context) error {
log.Infof(">> Relay server started: port[%d] file[%s] server[%s] log[%s]", cfg.Port, cfg.File, cfg.Server, cfg.Log)

// Create relay server
r, err := datastreamer.NewRelay(cfg.Server, uint16(cfg.Port), 1, 137, StSequencer, cfg.File, nil) // nolint:gomnd
r, err := datastreamer.NewRelay(cfg.Server, uint16(cfg.Port), 1, 137, StSequencer, cfg.File, cfg.WriteTimeout, nil) // nolint:gomnd
if err != nil {
log.Errorf(">> Relay server: NewRelay error! (%v)", err)
return err
Expand Down

0 comments on commit 311fa30

Please sign in to comment.