This repository has been archived by the owner on Sep 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcore.go
151 lines (139 loc) · 4.33 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Kudos to <github.com/strpc/zaptelegram> for the initial implementation
package zap2telegram
import (
"context"
"errors"
"go.uber.org/zap"
"time"
"go.uber.org/zap/zapcore"
)
// zap2telegram default options
const (
defaultLevel = zapcore.WarnLevel // send messages equal or above this level
defaultAsyncOpt = true // send messages asynchronously by default
defaultQueueOpt = false // disable queue by default
)
// All levels provided by zap
var AllLevels = [6]zapcore.Level{
zapcore.DebugLevel,
zapcore.InfoLevel,
zapcore.WarnLevel,
zapcore.ErrorLevel,
zapcore.FatalLevel,
zapcore.PanicLevel,
}
// Posible errors when creating a new Zap Core
var (
ErrBotAccessToken = errors.New("bot access token not defined")
ErrChatIDs = errors.New("chat ids not defined")
ErrAsyncOpt = errors.New("async option not worked with queue option")
)
type TelegramCore struct {
zapcore.Core
// inheritedFields is a collection of fields that have been added to the logger
// through the use of `.With()`. These fields should never be cleared after
// logging a single entry.
inheritedFields []zapcore.Field
telegramClient *telegramClient // telegram client
enabler zapcore.LevelEnabler // only send message if level is in this list
async bool // send messages asynchronously
queue bool // use a queue to send messages
intervalQueue time.Duration // queue interval between messages sending
entriesChan chan chanEntry // channel to store messages in queue
}
type chanEntry struct {
entry zapcore.Entry
fields []zapcore.Field
}
// NewTelegramCore returns a new zap2telegram instance configured with the given options
func NewTelegramCore(botAccessToken string, chatIDs []int64, opts ...Option) (zapcore.Core, error) {
if botAccessToken == "" {
return nil, ErrBotAccessToken
} else if len(chatIDs) == 0 {
return nil, ErrChatIDs
}
telegramClient, err := newTelegramClient(botAccessToken, chatIDs)
if err != nil {
return nil, err
}
c := &TelegramCore{
inheritedFields: []zapcore.Field{},
telegramClient: telegramClient,
enabler: zap.NewAtomicLevelAt(defaultLevel),
async: defaultAsyncOpt,
queue: defaultQueueOpt,
}
// apply options
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
return c, nil
}
func (c *TelegramCore) Enabled(l zapcore.Level) bool {
return c.enabler.Enabled(l)
}
func (c *TelegramCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(entry.Level) {
return checked.AddCore(entry, c)
}
return checked
}
func (c *TelegramCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
entryFields := append(fields, c.inheritedFields...) // fields passed for the current entry log entry + inherited fields
if c.async {
go func() {
_ = c.telegramClient.sendMessage(entry, entryFields)
}()
} else if c.queue {
c.entriesChan <- chanEntry{entry, entryFields}
} else {
// if async or queue option is not set, send message immediately synchronously (blocking)
if err := c.telegramClient.sendMessage(entry, entryFields); err != nil {
return err
}
}
return nil
}
func (c *TelegramCore) With(fields []zapcore.Field) zapcore.Core {
cloned := *c
cloned.inheritedFields = append(cloned.inheritedFields, fields...)
return &cloned
}
func (c *TelegramCore) Sync() error {
if c.queue {
c.handleNewQueueEntries()
}
return nil
}
// consumeEntriesQueue sends all the entries (messages) in the queue to telegram at the given interval
func (h TelegramCore) consumeEntriesQueue(ctx context.Context) error {
ticker := time.NewTicker(h.intervalQueue)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.handleNewQueueEntries()
case <-ctx.Done():
h.handleNewQueueEntries()
return ctx.Err()
}
}
}
// handleNewQueueEntries send all new message entries in queue to telegram
func (h TelegramCore) handleNewQueueEntries() {
for len(h.entriesChan) > 0 {
chanEntry := <-h.entriesChan
_ = h.telegramClient.sendMessage(chanEntry.entry, chanEntry.fields)
}
}
// getLevelThreshold returns all levels equal and above the given level
func getLevelThreshold(l zapcore.Level) []zapcore.Level {
for i := range AllLevels {
if AllLevels[i] == l {
return AllLevels[i:]
}
}
return []zapcore.Level{}
}