Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tatanka: switch to lexi db package and clean up message handling #3153

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dex/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func Uint64Bytes(i uint64) []byte {
return b
}

// BytesToUint64 converts the length-8, big-endian encoded byte slice to a uint64.
func BytesToUint64(i []byte) uint64 {
return IntCoder.Uint64(i[:8])
}

// CopySlice makes a copy of the slice.
func CopySlice(b []byte) []byte {
newB := make([]byte, len(b))
Expand Down
160 changes: 96 additions & 64 deletions tatanka/client_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (t *Tatanka) registerRemoteClient(tankaID, clientID tanka.PeerID) {
}
}

// Tatanka.specialHandlers

// handleClientConnect handles a new locally-connected client. checking
// reputation before adding the client to the map.
func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
Expand All @@ -102,7 +104,7 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms
return msgjson.NewError(mj.ErrBadRequest, "error unmarshaling client connection configuration from %q: %v", cl.PeerID(), err)
}

p, rrs, err := t.loadPeer(conn.ID)
p, err := t.db.Peer(conn.ID)
if err != nil {
return msgjson.NewError(mj.ErrInternal, "error getting peer info for peer %q: %v", conn.ID, err)
}
Expand All @@ -113,7 +115,7 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms

cl.SetPeerID(p.ID)

pp := &peer{Peer: p, Sender: cl, rrs: rrs}
pp := &peer{Peer: p, Sender: cl, rrs: make(map[tanka.PeerID]*tanka.Reputation)}
if pp.banned() {
return msgjson.NewError(mj.ErrBannned, "your tier is <= 0. post some bonds")
}
Expand All @@ -132,67 +134,25 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms

t.sendResult(cl, msg.ID, t.generateConfig(bondTier))

note := mj.MustNotification(mj.RouteNewClient, conn)
for _, s := range t.tatankaNodes() {
if err := t.send(s, note); err != nil {
req := mj.MustRequest(mj.RouteNewClient, conn)
for tt, s := range t.tatankaNodes() {
if err := t.request(s, req, func(m *msgjson.Message) {
var rep *tanka.Reputation
if err := m.UnmarshalResult(&rep); err != nil {
t.log.Errorf("error parsing response for new client request to %s: %w", tt, err)
return
}
pp.mtx.Lock()
pp.rrs[conn.ID] = rep
pp.mtx.Unlock()
}); err != nil {
t.log.Errorf("error sharing new client info with tatanka node %q", s.ID)
}
}

return nil
}

// handleClientMessage handles incoming message from locally-connected clients.
// All messages except for handleClientConnect and handlePostBond are handled
// here, with some common pre-processing and validation done before the
// subsequent route handler is called.
func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
peerID := cl.PeerID()
c := t.clientNode(peerID)
if c == nil {
t.log.Errorf("Ignoring message from unknown client %s", peerID)
cl.Disconnect()
return nil
}

if err := mj.CheckSig(msg, c.PubKey); err != nil {
t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err)
return msgjson.NewError(mj.ErrSig, "signature doesn't check")
}

t.clientMtx.RLock()
c, found := t.clients[peerID]
t.clientMtx.RUnlock()
if !found {
t.log.Errorf("client %s sent a message requiring tier before connecting", peerID)
return msgjson.NewError(mj.ErrAuth, "not connected")
}

switch msg.Type {
case msgjson.Request:
switch msg.Route {
case mj.RouteSubscribe:
return t.handleSubscription(c, msg)
// case mj.RouteUnsubscribe:
// return t.handleUnsubscribe(c, msg)
case mj.RouteBroadcast:
return t.handleBroadcast(c.peer, msg, true)
case mj.RouteTankagram:
return t.handleTankagram(c, msg)
default:
return msgjson.NewError(mj.ErrBadRequest, "unknown request route %q", msg.Route)
}
case msgjson.Notification:
switch msg.Route {
default:
// TODO: What? Can't let this happen too much.
return msgjson.NewError(mj.ErrBadRequest, "unknown notification route %q", msg.Route)
}
default:
return msgjson.NewError(mj.ErrBadRequest, "unknown message type %d", msg.Type)
}
}

// handlePostBond handles a new bond sent from a locally connected client.
// handlePostBond is the only client route than can be invoked before the user
// is bonded.
Expand Down Expand Up @@ -220,7 +180,6 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
}
}

var allBonds []*tanka.Bond
for _, b := range bonds {
if b == nil {
t.log.Errorf("Bond-posting client %s sent a nil bond", peerID)
Expand All @@ -244,18 +203,21 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
return msgjson.NewError(mj.ErrBadRequest, "failed validation")
}

var err error
allBonds, err = t.db.StoreBond(b)
if err != nil {
if err := t.db.StoreBond(b); err != nil {
t.log.Errorf("Error storing bond for client %s in db: %v", peerID, err)
return msgjson.NewError(mj.ErrInternal, "internal error")
}
}

liveBonds, err := t.db.GetBonds(peerID)
if err != nil {
t.log.Errorf("Error retrieving bonds for client %s in db: %v", peerID, err)
msgjson.NewError(mj.ErrInternal, "internal error")
}

if len(allBonds) > 0 { // Probably no way to get here with empty allBonds, but checking anyway.
if len(liveBonds) > 0 { // Probably no way to get here with empty allBonds, but checking anyway.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(liveBonds) > 0 { // Probably no way to get here with empty allBonds, but checking anyway.
if len(liveBonds) > 0 { // Probably no way to get here with empty liveBonds, but checking anyway.

if c := t.clientNode(peerID); c != nil {
c.updateBonds(allBonds)
c.updateBonds(liveBonds)
}
}

Expand All @@ -264,6 +226,38 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
return nil
}

// Tatanka.clientHandlers

type clientRequestHandler = func(c *client, msg *msgjson.Message) *msgjson.Error
type clientNotificationHandler = func(c *client, msg *msgjson.Message)

// handleClientMessage handles incoming message from locally-connected clients.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// handleClientMessage handles incoming message from locally-connected clients.
// handleClientMessage handles incoming messages from locally-connected clients.

// All messages except for handleClientConnect and handlePostBond are handled
// here, with some common pre-processing and validation done before the
// subsequent route handler is called.
func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
peerID := cl.PeerID()
c := t.clientNode(peerID)
if c == nil {
t.log.Errorf("Ignoring message from unknown client %s", peerID)
cl.Disconnect()
return nil
}

if err := mj.CheckSig(msg, c.PubKey); err != nil {
t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err)
return msgjson.NewError(mj.ErrSig, "signature doesn't check")
}

switch handle := t.clientHandlers[msg.Route].(type) {
case clientRequestHandler:
return handle(c, msg)
case clientNotificationHandler:
handle(c, msg)
}
return nil // Notification
}

// handleSubscription handles a new subscription, adding the subject to the
// map if it doesn't exist. It then distributes a NewSubscriber broadcast
// to all current subscribers and remote tatankas.
Expand Down Expand Up @@ -478,7 +472,8 @@ func (t *Tatanka) distributeBroadcastedMessage(bcast *mj.Broadcast, mustExist bo

// handleBroadcast handles a broadcast from a locally connected client,
// forwarding the message to all remote tatankas and local subscribers.
func (t *Tatanka) handleBroadcast(p *peer, msg *msgjson.Message, mustExist bool) *msgjson.Error {
func (t *Tatanka) handleBroadcast(c *client, msg *msgjson.Message) *msgjson.Error {
p := c.peer
if t.skipRelay(msg) {
return nil
}
Expand All @@ -503,7 +498,7 @@ func (t *Tatanka) handleBroadcast(p *peer, msg *msgjson.Message, mustExist bool)
t.relayBroadcast(bcast, p.ID)

// Send to local subscribers.
if msgErr := t.distributeBroadcastedMessage(bcast, mustExist); msgErr != nil {
if msgErr := t.distributeBroadcastedMessage(bcast, true); msgErr != nil {
return msgErr
}

Expand Down Expand Up @@ -635,6 +630,43 @@ func (t *Tatanka) handleTankagram(c *client, msg *msgjson.Message) *msgjson.Erro
return nil
}

func (t *Tatanka) handleSetScore(c *client, msg *msgjson.Message) {
scorer := c.peer.ID
var score *mj.ScoreReport
if err := msg.Unmarshal(&score); err != nil {
t.log.Errorf("error unmarshaling set_score from %s: %v", scorer, err)
return
}
if err := t.db.SetScore(score.PeerID, scorer, score.Score, time.Now()); err != nil {
t.log.Errorf("error adding score from %s for %s to db: %v", scorer, score.PeerID, err)
}
rep, err := t.db.Reputation(score.PeerID)
if err != nil {
t.log.Errorf("error getting reputation after score update from %s for %s: %v", score.PeerID, scorer, err)
return
}
t.clientMtx.RLock()
c, found := t.clients[score.PeerID]
t.clientMtx.RUnlock()
if found {
c.mtx.Lock()
c.Reputation = rep
c.mtx.Unlock()
}

note := mj.MustNotification(mj.RouteShareScore, &mj.SharedScore{
Scorer: scorer,
Scored: score.PeerID,
Score: score.Score,
Reputation: rep,
})
for _, tt := range t.tatankaNodes() {
if err := t.send(tt, note); err != nil {
t.log.Errorf("error notifying %s of new score: %v", tt.ID, err)
}
}
}

const ErrNoPath = dex.ErrorKind("no path")

// requestAnyOne tries to request from the senders in order until one succeeds.
Expand Down
95 changes: 50 additions & 45 deletions tatanka/db/bonds.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,70 @@
package db

import (
"encoding/json"
"fmt"
"time"

"decred.org/dcrdex/dex/encode"
"decred.org/dcrdex/dex/lexi"
"decred.org/dcrdex/tatanka/tanka"
)

type jsonCoder struct {
thing interface{}
type dbBond struct {
*tanka.Bond
}

func newJSON(thing interface{}) *jsonCoder {
return &jsonCoder{thing}
func (bond *dbBond) MarshalBinary() ([]byte, error) {
const bondVer = 0
var b encode.BuildyBytes = make([]byte, 1, 1+tanka.PeerIDLength+4+len(bond.CoinID)+8+8)
b[0] = bondVer
b = b.AddData(bond.PeerID[:]).
AddData(encode.Uint32Bytes(bond.AssetID)).
AddData(bond.CoinID).
AddData(encode.Uint64Bytes(bond.Strength)).
AddData(encode.Uint64Bytes(uint64(bond.Expiration.Unix())))
return b, nil
}

func (p *jsonCoder) MarshalBinary() ([]byte, error) {
return json.Marshal(p.thing)
}

func (p *jsonCoder) UnmarshalBinary(b []byte) error {
return json.Unmarshal(b, p.thing)
}

func (d *DB) StoreBond(newBond *tanka.Bond) (goodBonds []*tanka.Bond, err error) {
var existingBonds []*tanka.Bond
if _, err = d.bondsDB.Get(newBond.PeerID[:], newJSON(existingBonds)); err != nil {
return nil, fmt.Errorf("error reading bonds db: %w", err)
func (bond *dbBond) UnmarshalBinary(b []byte) error {
const bondVer = 0
bond.Bond = new(tanka.Bond)
ver, pushes, err := encode.DecodeBlob(b, 5)
if err != nil {
return fmt.Errorf("error decoding bond blob: %w", err)
}

for _, b := range existingBonds {
if time.Now().After(b.Expiration) {
continue
}
goodBonds = append(goodBonds, b)
if ver != bondVer {
return fmt.Errorf("unknown bond version %d", ver)
}
if len(pushes) != 5 {
return fmt.Errorf("unknown number of bond blob pushes %d", len(pushes))
}
copy(bond.PeerID[:], pushes[0])
bond.AssetID = encode.BytesToUint32(pushes[1])
bond.CoinID = pushes[2]
bond.Strength = encode.BytesToUint64(pushes[3])
bond.Expiration = time.Unix(int64(encode.BytesToUint64(pushes[4])), 0)
return nil
}

goodBonds = append(goodBonds, newBond)
return goodBonds, d.bondsDB.Store(newBond.PeerID[:], newJSON(goodBonds))
func (d *DB) StoreBond(newBond *tanka.Bond) error {
return d.bonds.Set(lexi.B(newBond.CoinID), &dbBond{newBond}, lexi.WithReplace())
}

func (d *DB) GetBonds(peerID tanka.PeerID) ([]*tanka.Bond, error) {
var existingBonds []*tanka.Bond
if _, err := d.bondsDB.Get(peerID[:], newJSON(&existingBonds)); err != nil {
return nil, fmt.Errorf("error reading bonds db: %w", err)
}

var goodBonds []*tanka.Bond
for _, b := range existingBonds {
if time.Now().After(b.Expiration) {
continue
}
goodBonds = append(goodBonds, b)
}

if len(goodBonds) != len(existingBonds) {
if err := d.bondsDB.Store(peerID[:], newJSON(goodBonds)); err != nil {
return nil, fmt.Errorf("error storing bonds after pruning: %v", err)
}
}

return goodBonds, nil
var bonds []*tanka.Bond
now := time.Now()
return bonds, d.bonderIdx.Iterate(peerID[:], func(it *lexi.Iter) error {
return it.V(func(vB []byte) error {
var bond dbBond
if err := bond.UnmarshalBinary(vB); err != nil {
return fmt.Errorf("error unmarshaling bond: %w", err)
}
if bond.Expiration.Before(now) {
it.Delete()
return nil
}
bonds = append(bonds, bond.Bond)
return nil
})
})
}
Loading
Loading