-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfigupdater.go
127 lines (107 loc) · 4.23 KB
/
configupdater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
* Copyright (c) 2024 Johan Stenstam, [email protected]
*/
package main
import (
"encoding/json"
"log"
"path/filepath"
"github.com/dnstapir/tapir"
"github.com/spf13/viper"
)
func (pd *PopData) ConfigUpdater(conf *Config, stopch chan struct{}) {
active := viper.GetBool("tapir.config.active")
if !active {
pd.Logger.Printf("*** ConfigUpdater: not active, skipping")
return
}
// Create a new mqtt engine just for the statusupdater.
me := pd.MqttEngine
if me == nil {
POPExiter("ConfigUpdater: MQTT Engine not running")
}
ConfigChan := make(chan tapir.MqttPkgIn, 5)
configTopic := viper.GetString("tapir.config.topic")
if configTopic == "" {
POPExiter("ConfigUpdater: MQTT config topic not set")
}
keyfile := viper.GetString("tapir.config.validatorkey")
if keyfile == "" {
POPExiter("ConfigUpdater: MQTT validator key not set for topic %s", configTopic)
}
keyfile = filepath.Clean(keyfile)
validatorkey, err := tapir.FetchMqttValidatorKey(configTopic, keyfile)
if err != nil {
POPExiter("ConfigUpdater: Error fetching MQTT validator key for topic %s: %v", configTopic, err)
}
pd.Logger.Printf("ConfigUpdater: Adding sub topic '%s' to MQTT Engine", configTopic)
msg, err := me.SubToTopic(configTopic, validatorkey, ConfigChan, "struct", true) // XXX: Brr. kludge.
if err != nil {
POPExiter("ConfigUpdater: Error adding topic %s to MQTT Engine: %v", configTopic, err)
}
pd.Logger.Printf("ConfigUpdater: Topic status for MQTT engine %s: %+v", me.Creator, msg)
log.Printf("ConfigUpdater: Starting")
for inbox := range ConfigChan {
log.Printf("ConfigUpdater: got config update message on topic %s: %v", inbox.Topic)
var gconfig tapir.GlobalConfig
err = json.Unmarshal(inbox.Payload, &gconfig)
if err != nil {
log.Printf("ConfigUpdater: error unmarshalling config update message: %v", err)
continue
}
pd.ProcessTapirGlobalConfig(gconfig)
if err != nil {
log.Printf("ConfigUpdater: error processing config update message: %v", err)
}
}
}
func (pd *PopData) ProcessTapirGlobalConfig(gconfig tapir.GlobalConfig) {
log.Printf("TapirProcessGlobalConfig: %+v", gconfig)
// Assume there is only one topic and that it is the one we want
// TODO maybe sanitize or sanity check or something
newTopic := gconfig.ObservationTopics[0]
bootstrapServers := gconfig.Bootstrap.Servers
bootstrapUrl := gconfig.Bootstrap.BaseUrl
bootstrapKey := gconfig.Bootstrap.ApiToken
//for _, listtype := range []string{"allowlist", "denylist", "doubtlist"} {
for _, wbgl := range pd.Lists["doubtlist"] {
if wbgl.Immutable || wbgl.Datasource != "mqtt" {
continue
}
for topic := range wbgl.MqttDetails.ValidatorKeys {
pd.MqttEngine.RemoveTopic(topic)
delete(wbgl.MqttDetails.ValidatorKeys, topic)
break // Only one topic
}
valkey := GetValidationKeyByKeyName(newTopic.PubKeyName)
pd.mu.Lock()
wbgl.MqttDetails.ValidatorKeys[newTopic.Topic] = valkey
wbgl.MqttDetails.Bootstrap = bootstrapServers
wbgl.MqttDetails.BootstrapUrl = bootstrapUrl
wbgl.MqttDetails.BootstrapKey = bootstrapKey
pd.mu.Unlock()
_, err := pd.MqttEngine.SubToTopic(newTopic.Topic, valkey, pd.TapirObservations, "struct", true) // XXX: Brr. kludge.
if err != nil {
POPExiter("ProcessTapirGlobalConfig: Error adding topic %s: %v", newTopic, err)
}
src := SourceConf{
Bootstrap: wbgl.MqttDetails.Bootstrap,
BootstrapUrl: wbgl.MqttDetails.BootstrapUrl,
BootstrapKey: wbgl.MqttDetails.BootstrapKey,
Name: wbgl.Name,
Format: wbgl.Format,
}
if len(gconfig.Bootstrap.Servers) > 0 {
pd.Logger.Printf("ProcessTapirGlobalConfig: %d bootstrap servers advertised: %v", wbgl.Name, len(src.Bootstrap), src.Bootstrap)
tmp, err := pd.BootstrapMqttSource(src)
if err != nil {
pd.Logger.Printf("ProcessTapirGlobalConfig: Error bootstrapping MQTT source %s: %v", wbgl.Name, err)
} else {
pd.mu.Lock()
*wbgl = *tmp
pd.mu.Unlock()
}
}
pd.Logger.Printf("*** DONE Processing global config")
}
}