-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbootstrap.go
157 lines (135 loc) · 5.21 KB
/
bootstrap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
* Copyright (c) Johan Stenstam, [email protected]
*/
package main
import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/dnstapir/tapir"
"github.com/ryanuber/columnize"
"github.com/spf13/viper"
)
func (td *PopData) BootstrapMqttSource(src SourceConf) (*tapir.WBGlist, error) {
// Initialize the API client
api := &tapir.ApiClient{
BaseUrl: fmt.Sprintf(src.BootstrapUrl, src.Bootstrap[0]), // Must specify a valid BaseUrl
ApiKey: src.BootstrapKey,
AuthMethod: "X-API-Key",
}
cd := viper.GetString("certs.certdir")
if cd == "" {
POPExiter("BootstrapMqttSource error: missing config key: certs.certdir")
}
// cert := cd + "/" + certname
key := viper.GetString("certs.tapir-pop.key")
cert := viper.GetString("certs.tapir-pop.cert")
tlsConfig, err := tapir.NewClientConfig(viper.GetString("certs.cacertfile"), key, cert)
if err != nil {
POPExiter("BootstrapMqttSource: Error: Could not set up TLS: %v", err)
}
// XXX: Need to verify that the server cert is valid for the bootstrap server
tlsConfig.InsecureSkipVerify = true
err = api.SetupTLS(tlsConfig)
if err != nil {
POPExiter("BootstrapMqttSource: error setting up TLS for the API client: %v", err)
}
bootstrapaddrs := viper.GetStringSlice("bootstrapserver.addresses")
tlsbootstrapaddrs := viper.GetStringSlice("bootstrapserver.tlsaddresses")
bootstrapaddrs = append(bootstrapaddrs, tlsbootstrapaddrs...)
// Iterate over the bootstrap servers
for _, server := range src.Bootstrap {
// Is this myself?
for _, bs := range bootstrapaddrs {
if bs == server {
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s is myself, skipping", server)
continue
}
}
api.BaseUrl = fmt.Sprintf(src.BootstrapUrl, server)
// Send an API ping command
pr, err := api.SendPing(0, false)
if err != nil {
td.Logger.Printf("BootstrapMqttSource: Ping to MQTT bootstrap server %s failed: %v", server, err)
continue
}
uptime := time.Since(pr.BootTime).Round(time.Second)
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages", server, uptime, 17)
status, buf, err := api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "doubtlist-status",
ListName: src.Name,
Encoding: "json", // XXX: This is our default, but we'll test other encodings later
}, true)
if err != nil {
fmt.Printf("Error from RequestNG: %v\n", err)
continue
}
if status != http.StatusOK {
td.Logger.Printf("HTTP Error: %s\n", buf)
continue
}
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("BootstrapMqttSource: Error decoding doubtlist-status response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of doubtlist status)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s", server, br.Msg)
}
td.Logger.Printf("BootstrapMqttSource: MQTT bootstrap server %s uptime: %v. It has processed %d MQTT messages on the %s topic (last sub msg arrived at %s), ", server, uptime, br.TopicData[src.Name].SubMsgs, src.Name, br.TopicData[src.Name].LatestSub.Format(tapir.TimeLayout))
status, buf, err = api.RequestNG(http.MethodPost, "/bootstrap", tapir.BootstrapPost{
Command: "export-doubtlist",
ListName: src.Name,
Encoding: "gob", // XXX: This is our default, but we'll test other encodings later
}, true)
if err != nil {
td.Logger.Printf("BootstrapMqttSource: Error from RequestNG: %v\n", err)
continue
}
if status != http.StatusOK {
td.Logger.Printf("BootstrapMqttSource: HTTP Error: %s\n", buf)
continue
}
var doubtlist tapir.WBGlist
decoder := gob.NewDecoder(bytes.NewReader(buf))
err = decoder.Decode(&doubtlist)
if err != nil {
// fmt.Printf("Error decoding doubtlist data: %v\n", err)
// If decoding the gob failed, perhaps we received a tapir.BootstrapResponse instead?
var br tapir.BootstrapResponse
err = json.Unmarshal(buf, &br)
if err != nil {
td.Logger.Printf("BootstrapMqttSource: Error decoding bootstrap response from %s: %v. Giving up.\n", server, err)
continue
}
if br.Error {
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded with error: %s (instead of GOB blob)", server, br.ErrorMsg)
}
if len(br.Msg) != 0 {
td.Logger.Printf("BootstrapMqttSource: Bootstrap server %s responded: %s (instead of GOB blob)", server, br.Msg)
}
// return nil, fmt.Errorf("Command Error: %s", br.ErrorMsg)
continue
}
if td.Debug {
td.Logger.Printf("%v", doubtlist)
td.Logger.Printf("Names present in doubtlist %s:", src.Name)
out := []string{"Name|Time added|TTL|Tags"}
for _, n := range doubtlist.Names {
out = append(out, fmt.Sprintf("%s|%v|%v|%v", n.Name, n.TimeAdded.Format(tapir.TimeLayout), n.TTL, n.TagMask))
}
td.Logger.Printf("%s", columnize.SimpleFormat(out))
}
// Successfully received and decoded bootstrap data
return &doubtlist, nil
}
// If no bootstrap server succeeded
return nil, fmt.Errorf("BootstrapMqttSource: all bootstrap servers failed")
}