forked from tsaikd/gogstash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttplisten.go
170 lines (155 loc) · 4.57 KB
/
httplisten.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package inputhttplisten
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"os"
codecjson "github.com/tsaikd/gogstash/codec/json"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)
// ModuleName is the name used in config file
const ModuleName = "httplisten"
const invalidMethodError = "Method not allowed: '%v'"
const invalidRequestError = "Invalid request received on HTTP listener. Decoder error: %+v"
const invalidAccessToken = "Invalid access token. Access denied."
// InputConfig holds the configuration json fields and internal objects
type InputConfig struct {
config.InputConfig
Address string `json:"address"` // host:port to listen on
Path string `json:"path"` // The path to accept json HTTP POST requests on
ServerCert string `json:"cert"`
ServerKey string `json:"key"`
CA string `json:"ca"` // for client certification
RequireHeader []string `json:"require_header"` // Require this header to be present to accept the POST ("X-Access-Token: Potato")
}
// DefaultInputConfig returns an InputConfig struct with default values
func DefaultInputConfig() InputConfig {
return InputConfig{
InputConfig: config.InputConfig{
CommonConfig: config.CommonConfig{
Type: ModuleName,
},
},
Address: "0.0.0.0:8080",
Path: "/",
RequireHeader: []string{},
}
}
// InitHandler initialize the input plugin
func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeInputConfig, error) {
conf := DefaultInputConfig()
err := config.ReflectConfig(raw, &conf)
if err != nil {
return nil, err
}
conf.Codec, err = config.GetCodecDefault(ctx, *raw, codecjson.ModuleName)
if err != nil {
return nil, err
}
return &conf, nil
}
// Start wraps the actual function starting the plugin
func (i *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error) {
logger := goglog.Logger
http.HandleFunc(i.Path, func(rw http.ResponseWriter, req *http.Request) {
// Only allow POST requests (for now).
if req.Method != http.MethodPost {
logger.Warnf(invalidMethodError, req.Method)
rw.WriteHeader(http.StatusMethodNotAllowed)
rw.Write([]byte(fmt.Sprintf(invalidMethodError, req.Method)))
return
}
// Check for header
if len(i.RequireHeader) == 2 {
// get returns empty string if header not found
if req.Header.Get(i.RequireHeader[0]) != i.RequireHeader[1] {
logger.Warn(invalidAccessToken)
rw.WriteHeader(http.StatusForbidden)
rw.Write([]byte(invalidAccessToken))
return
}
}
i.postHandler(msgChan, rw, req)
})
go func() {
logger.Infof("accepting POST requests to %s%s", i.Address, i.Path)
if i.ServerCert != "" && i.ServerKey != "" {
var tlsConfig *tls.Config
srvCert, err := tls.LoadX509KeyPair(i.ServerCert, i.ServerKey)
if err != nil {
logger.Fatal(err)
return
}
if i.CA != "" {
// enable client certificate
f, err := os.Open(i.CA)
if err != nil {
logger.Fatal(err)
return
}
content, err := ioutil.ReadAll(f)
ferr := f.Close()
if ferr != nil {
logger.Warning(ferr)
}
if err != nil {
logger.Fatal(err)
return
}
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM(content)
tlsConfig = &tls.Config{
ClientCAs: certPool,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{srvCert},
}
} else {
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{srvCert},
}
}
l, err := tls.Listen("tcp", i.Address, tlsConfig)
if err != nil {
logger.Fatal(err)
return
}
err = http.Serve(l, nil)
} else {
err = http.ListenAndServe(i.Address, nil)
}
if err != nil {
logger.Fatal(err)
}
}()
return nil
}
// Handle HTTP POST requests
func (i *InputConfig) postHandler(msgChan chan<- logevent.LogEvent, rw http.ResponseWriter, req *http.Request) {
logger := goglog.Logger
logger.Debugf("Received request")
data, err := ioutil.ReadAll(req.Body)
if err != nil {
logger.Errorf("read request body error: %v", err)
return
}
ok, err := i.Codec.Decode(context.TODO(), data, nil, []string{}, msgChan)
if err != nil {
logger.Errorf("decode request body error: %v", err)
}
if !ok {
// event not sent to msgChan
rw.WriteHeader(http.StatusInternalServerError)
if err != nil {
rw.Write([]byte(err.Error()))
}
} else if err != nil {
// event sent to msgChan
rw.WriteHeader(http.StatusBadRequest)
rw.Write([]byte(fmt.Sprintf(invalidRequestError, err)))
}
}