Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterize vreplgen + PR Feedback #15

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions vreplgen/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
A golang CLI utility to generate vtctlclient commands to add vreplication
rules:

```
Usage: vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] <tablet_id> <src_keyspace> <src_shard> <dest_keyspace> <dest_table1> 'filter1' [<dest_table2> 'filter2']...
```

E.g.:

```
./vreplgen cell-0000000001 main -80 main_copy transactionhistory 'select * from transactionhistory where in_keyrange(merchant_id, "hash", "80-")'
```

The utility also supports multiple table filters, which allows multiple tables
to be specified in a single vreplication stream (good for if you have
a lot of tables you want to process via vreplication). E.g.:

```
./vreplgen cell-0000000001 main -80 main_copy transactionhistory 'select * from transactionhistory where in_keyrange(merchant_id, "hash", "80-")' transactionhistory2 'select * from transactionhistory2 where in_keyrange(merchant_id, "hash", "-80")'
```

An important thing to note is that a single vreplication stream cannot use
the same source table in the same stream. The utility will not prevent
you from doing this, however.

`vreplgen` assumes you are running vtctld on localhost port 15999. If not,
you can set your VTCTLCLIENT environment variable to the vtctlclient command
you want `vreplgen` to generate, e.g.:

```
export VTCTLCLIENT="vtctlclient -server vtctld:15999"
```

Lastly, you can control the on_ddl flag using vreplgen. The default if you
do not specify the `-on_ddl` option is `ignore`, but you can specify:

* `-on_ddl ignore`
* `-on_ddl stop`
* `-on_ddl exec`
* `-on_ddl exec_ignore`

depending on how you want your DDL to be handled in your replication streams.
See the main vreplication documentation for more details.
58 changes: 47 additions & 11 deletions vreplgen/vreplgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,71 @@ package main

import (
"bytes"
"flag"
"fmt"
"os"
"strings"

"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

var onDDL string

func init() {
flag.StringVar(&onDDL, "on_ddl", "ignore", "Set on_ddl value for replication stream - ignore, stop, exec, exec_ignore")
}

func main() {
vtctl := "./lvtctl.sh"
tabletID := "test-400"
dbName := "vt_merchant"
flag.Parse()

if len(os.Args) < 9 {
fmt.Println("Usage: vreplgen [-on_ddl (ignore|stop|exec|exec_ignore)] <tablet_id> <src_keyspace> <src_shard> <dest_keyspace> <dest_table1> 'filter1' [<dest_table2> 'filter2']...")
os.Exit(1)
}

vtctl := os.Getenv("VTCTLCLIENT")
if vtctl == "" {
vtctl = "vtctlclient -server localhost:15999"
}

// First, we process fixed positional arguments
// such as the intended target and source
tabletID := os.Args[3]
sourceKeyspace := os.Args[4]
sourceShard := os.Args[5]
destKeyspace := os.Args[6]
destDbName := destKeyspace
var rules []*binlogdatapb.Rule

// Next, we iterate over all possible rules
// Note this can be a variable number!
for i := 7; i < len(os.Args); i = i + 2 {
destTable := os.Args[i]
destFilter := os.Args[i+1]
rule := new(binlogdatapb.Rule)
rule.Match = destTable
rule.Filter = destFilter
rules = append(rules, rule)
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "morder",
Filter: "select * from uorder where in_keyrange(mname, 'unicode_loose_md5', '-80')",
}},
Rules: rules,
}

onDDLAction := binlogdatapb.OnDDLAction(binlogdatapb.OnDDLAction_value[strings.ToUpper(onDDL)])

bls := &binlogdatapb.BinlogSource{
Keyspace: "user",
Shard: "-80",
Keyspace: sourceKeyspace,
Shard: sourceShard,
Filter: filter,
OnDdl: binlogdatapb.OnDDLAction_IGNORE,
OnDdl: onDDLAction,
}
val := sqltypes.NewVarBinary(fmt.Sprintf("%v", bls))
var sqlEscaped bytes.Buffer
val.EncodeSQL(&sqlEscaped)
query := fmt.Sprintf("insert into _vt.vreplication "+
"(db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
"('%s', %s, '', 9999, 9999, 'master', 0, 0, 'Running')", dbName, sqlEscaped.String())
"('%s', %s, '', 9999, 9999, 'master', 0, 0, 'Running')", destDbName, sqlEscaped.String())

fmt.Printf("%s VReplicationExec %s '%s'\n", vtctl, tabletID, strings.Replace(query, "'", "'\"'\"'", -1))
}