-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
343 lines (292 loc) · 10.3 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.
package main
import (
"fmt"
"net/http"
"os"
"os/signal"
"runtime/pprof"
"sync"
"time"
"github.com/dcrdata/dcrdata/blockdata"
"github.com/dcrdata/dcrdata/dcrsqlite"
"github.com/dcrdata/dcrdata/mempool"
"github.com/dcrdata/dcrdata/rpcutils"
"github.com/dcrdata/dcrdata/semver"
"github.com/dcrdata/dcrdata/txhelpers"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrrpcclient"
"github.com/go-chi/chi"
)
// mainCore does all the work. Deferred functions do not run after os.Exit(),
// so main wraps this function, which returns a code.
func mainCore() int {
// Parse the configuration file, and setup logger.
cfg, err := loadConfig()
if err != nil {
fmt.Printf("Failed to load dcrdata config: %s\n", err.Error())
return 1
}
defer func() {
if logRotator != nil {
logRotator.Close()
}
}()
if cfg.CPUProfile != "" {
var f *os.File
f, err = os.Create(cfg.CPUProfile)
if err != nil {
log.Critical(err)
return -1
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
// Start with version info
log.Infof(appName+" version %s", ver.String())
//log.Debugf("Output folder: %v", cfg.OutFolder)
log.Debugf("Log folder: %v", cfg.LogDir)
// // Create data output folder if it does not already exist
// if err = os.MkdirAll(cfg.OutFolder, 0750); err != nil {
// log.Errorf("Failed to create data output folder %s. Error: %s\n",
// cfg.OutFolder, err.Error())
// return 2
// }
// Connect to dcrd RPC server using websockets
// Set up the notification handler to deliver blocks through a channel.
makeNtfnChans(cfg)
// Daemon client connection
ntfnHandlers, collectionQueue := makeNodeNtfnHandlers(cfg)
dcrdClient, nodeVer, err := connectNodeRPC(cfg, ntfnHandlers)
if err != nil || dcrdClient == nil {
log.Errorf("Connection to dcrd failed: %v", err)
return 4
}
defer func() {
// Closing these channels should be unnecessary if quit was handled right
closeNtfnChans()
if dcrdClient != nil {
log.Infof("Closing connection to dcrd.")
dcrdClient.Shutdown()
}
log.Infof("Bye!")
time.Sleep(250 * time.Millisecond)
}()
// Display connected network
curnet, err := dcrdClient.GetCurrentNet()
if err != nil {
log.Errorf("Unable to get current network from dcrd: %v", err)
return 5
}
log.Infof("Connected to dcrd (JSON-RPC API v%s) on %v",
nodeVer.String(), curnet.String())
// Another (horrible) example of saving to a map in memory
// blockDataMapSaver := NewBlockDataToMemdb()
// blockDataSavers = append(blockDataSavers, blockDataMapSaver)
// Sqlite output
dbInfo := dcrsqlite.DBInfo{FileName: cfg.DBFileName}
//sqliteDB, err := dcrsqlite.InitDB(&dbInfo)
sqliteDB, cleanupDB, err := dcrsqlite.InitWiredDB(&dbInfo,
ntfnChans.updateStatusDBHeight, dcrdClient, activeChain)
defer cleanupDB()
if err != nil {
log.Errorf("Unable to initialize SQLite database: %v", err)
return 16
}
log.Infof("SQLite DB successfully opened: %s", cfg.DBFileName)
defer sqliteDB.Close()
// Ctrl-C to shut down.
// Nothing should be sent the quit channel. It should only be closed.
quit := make(chan struct{})
// Only accept a single CTRL+C
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
// Start waiting for the interrupt signal
go func() {
<-c
signal.Stop(c)
// Close the channel so multiple goroutines can get the message
log.Infof("CTRL+C hit. Closing goroutines.")
close(quit)
}()
// Resync db
var waitSync sync.WaitGroup
waitSync.Add(1)
// start as goroutine to let chain monitor start, but the sync will keep up
// with current height, it is not likely to matter.
if err = sqliteDB.SyncDBWithPoolValue(&waitSync, quit); err != nil {
log.Error("Resync failed: ", err)
return 15
}
// wait for resync before serving or collecting
waitSync.Wait()
select {
case <-quit:
return 20
default:
}
// Block data collector
collector := blockdata.NewCollector(dcrdClient, activeChain, sqliteDB.GetStakeDB())
if collector == nil {
log.Errorf("Failed to create block data collector")
return 9
}
// Build a slice of each required saver type for each data source
var blockDataSavers []blockdata.BlockDataSaver
var mempoolSavers []mempool.MempoolDataSaver
// For example, dumping all mempool fees with a custom saver
if cfg.DumpAllMPTix {
log.Debugf("Dumping all mempool tickets to file in %s.\n", cfg.OutFolder)
mempoolFeeDumper := mempool.NewMempoolFeeDumper(cfg.OutFolder, "mempool-fees")
mempoolSavers = append(mempoolSavers, mempoolFeeDumper)
}
blockDataSavers = append(blockDataSavers, &sqliteDB)
mempoolSavers = append(mempoolSavers, sqliteDB.MPC)
// Web template data. WebUI implements BlockDataSaver interface
webUI := NewWebUI()
if webUI == nil {
log.Info("Failed to start WebUI. Missing HTML resources?")
return 17
}
defer webUI.StopWebsocketHub()
webUI.UseSIGToReloadTemplates()
blockDataSavers = append(blockDataSavers, webUI)
mempoolSavers = append(mempoolSavers, webUI)
// Initial data summary for web ui
blockData, err := collector.Collect()
if err != nil {
log.Errorf("Block data collection for initial summary failed: %v",
err.Error())
return 10
}
if err = webUI.Store(blockData); err != nil {
log.Errorf("Failed to store initial block data: %v", err.Error())
return 11
}
// WaitGroup for the monitor goroutines
var wg sync.WaitGroup
// Blockchain monitor for the collector
addrMap := make(map[string]txhelpers.TxAction) // for support of watched addresses
// On reorg, only update web UI since dcrsqlite's own reorg handler will
// deal with patching up the block info database.
reorgBlockDataSavers := []blockdata.BlockDataSaver{webUI}
wsChainMonitor := blockdata.NewChainMonitor(collector, blockDataSavers,
reorgBlockDataSavers, quit, &wg, addrMap,
ntfnChans.connectChan, ntfnChans.recvTxBlockChan,
ntfnChans.reorgChanBlockData)
wg.Add(2)
go wsChainMonitor.BlockConnectedHandler()
// The blockdata reorg handler disables collection during reorg, leaving
// dcrsqlite to do the switch, except for the last block which gets
// collected and stored via reorgBlockDataSavers.
go wsChainMonitor.ReorgHandler()
// Blockchain monitor for the stake DB
sdbChainMonitor := sqliteDB.NewStakeDBChainMonitor(quit, &wg,
ntfnChans.connectChanStakeDB, ntfnChans.reorgChanStakeDB)
wg.Add(2)
go sdbChainMonitor.BlockConnectedHandler()
go sdbChainMonitor.ReorgHandler()
// Blockchain monitor for the wired sqlite DB
wiredDBChainMonitor := sqliteDB.NewChainMonitor(collector, quit, &wg,
ntfnChans.connectChanWiredDB, ntfnChans.reorgChanWiredDB)
wg.Add(2)
// dcrsqlite does not handle new blocks except during reorg
go wiredDBChainMonitor.BlockConnectedHandler()
go wiredDBChainMonitor.ReorgHandler()
// Setup the synchronous handler functions called by the collectionQueue via
// OnBlockConnected.
collectionQueue.SetSynchronousHandlers([]func(*chainhash.Hash){
sdbChainMonitor.BlockConnectedSync, // 1. Stake DB for pool info
wsChainMonitor.BlockConnectedSync, // 2. blockdata for regular block data collection and storage
wiredDBChainMonitor.BlockConnectedSync, // 3. dcrsqlite for sqlite DB reorg handling
})
if cfg.MonitorMempool {
mpoolCollector := mempool.NewMempoolDataCollector(dcrdClient, activeChain)
if mpoolCollector == nil {
log.Error("Failed to create mempool data collector")
return 13
}
mpData, err := mpoolCollector.Collect()
if err != nil {
log.Errorf("Mempool info collection failed while gathering initial"+
"data: %v", err.Error())
return 14
}
// Store initial MP data
sqliteDB.MPC.StoreMPData(mpData, time.Now())
// Store initial MP data to webUI
if err = webUI.StoreMPData(mpData, time.Now()); err != nil {
log.Errorf("Failed to store initial mempool data: %v",
err.Error())
return 19
}
// Setup monitor
mpi := &mempool.MempoolInfo{
CurrentHeight: mpData.GetHeight(),
NumTicketPurchasesInMempool: mpData.GetNumTickets(),
NumTicketsSinceStatsReport: 0,
LastCollectTime: time.Now(),
}
newTicketLimit := int32(cfg.MPTriggerTickets)
mini := time.Duration(cfg.MempoolMinInterval) * time.Second
maxi := time.Duration(cfg.MempoolMaxInterval) * time.Second
mpm := mempool.NewMempoolMonitor(mpoolCollector, mempoolSavers,
ntfnChans.newTxChan, quit, &wg, newTicketLimit, mini, maxi, mpi)
wg.Add(1)
go mpm.TxHandler(dcrdClient)
}
select {
case <-quit:
return 20
default:
}
// Register for notifications now that the monitors are listening
cerr := registerNodeNtfnHandlers(dcrdClient)
if cerr != nil {
log.Errorf("RPC client error: %v (%v)", cerr.Error(), cerr.Cause())
return 6
}
// Start web API
app := newContext(dcrdClient, &sqliteDB, cfg.IndentJSON)
// Start notification hander to keep /status up-to-date
wg.Add(1)
go app.StatusNtfnHandler(&wg, quit)
// Initial setting of db_height. Subsequently, Store() will send this.
ntfnChans.updateStatusDBHeight <- uint32(sqliteDB.GetHeight())
apiMux := newAPIRouter(app, cfg.UseRealIP)
expMux := newExplorerMux(app, cfg.UseRealIP)
webMux := chi.NewRouter()
webMux.Get("/", webUI.RootPage)
webMux.Get("/ws", webUI.WSBlockUpdater)
webMux.Get("/favicon.ico", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "./public/images/favicon.ico")
})
FileServer(webMux, "/js", http.Dir("./public/js"))
FileServer(webMux, "/css", http.Dir("./public/css"))
FileServer(webMux, "/fonts", http.Dir("./public/fonts"))
FileServer(webMux, "/images", http.Dir("./public/images"))
webMux.NotFound(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, r.URL.RequestURI()+" ain't no country I've ever heard of! (404)", http.StatusNotFound)
})
webMux.Mount("/api", apiMux.Mux)
webMux.Mount("/explorer", expMux.Mux)
listenAndServeProto(cfg.APIListen, cfg.APIProto, webMux)
// Wait for notification handlers to quit
wg.Wait()
return 0
}
func main() {
os.Exit(mainCore())
}
func connectNodeRPC(cfg *config, ntfnHandlers *dcrrpcclient.NotificationHandlers) (*dcrrpcclient.Client, semver.Semver, error) {
return rpcutils.ConnectNodeRPC(cfg.DcrdServ, cfg.DcrdUser, cfg.DcrdPass,
cfg.DcrdCert, cfg.DisableDaemonTLS, ntfnHandlers)
}
func listenAndServeProto(listen, proto string, mux http.Handler) {
apiLog.Infof("Now serving on %s://%v/", proto, listen)
if proto == "https" {
go http.ListenAndServeTLS(listen, "dcrdata.cert", "dcrdata.key", mux)
}
go http.ListenAndServe(listen, mux)
}