-
Notifications
You must be signed in to change notification settings - Fork 0
/
druidtestplan.go
106 lines (91 loc) · 2.76 KB
/
druidtestplan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"context"
"github.com/xiaoyao1991/chukonu/core"
"github.com/xiaoyao1991/chukonu/impl"
)
type DruidRequestProvider struct {
}
func (m *DruidRequestProvider) UseEngine() func(core.ChukonuConfig) core.Engine {
return impl.NewHttpEngine
}
func (m *DruidRequestProvider) MetricsManager() core.MetricsManager {
return impl.NewHttpMetricsManager()
}
func (m *DruidRequestProvider) Provide(queue chan *core.ChukonuWorkflow) {
// throttle := time.Tick(200 * time.Millisecond)
i := 0
for {
// <-throttle
// fmt.Printf("Generating %dth request\n", i)
workflow := core.NewWorkflow("druid_workflow")
var fn1 = func(ctx context.Context) core.ChukonuRequest {
req, err := http.NewRequest("GET", "http://40.71.182.255:8082/druid/v2/datasources", nil)
if err != nil {
fmt.Println(err)
return nil
}
return impl.NewChukonuHttpRequest("datasources_req", 0, true, true, func(ctx context.Context, resp core.ChukonuResponse) context.Context {
defer resp.RawResponse().(*http.Response).Body.Close()
// dump, err := resp.Dump()
// if err != nil {
// log.Fatal(err)
// }
// fmt.Println(string(dump))
bodyBytes, _ := ioutil.ReadAll(resp.RawResponse().(*http.Response).Body)
bodyString := string(bodyBytes)
datasource := bodyString[2 : len(bodyString)-2]
return context.WithValue(ctx, "datasource", datasource)
}, nil, req)
}
workflow.AddRequest(fn1)
var fn2 = func(ctx context.Context) core.ChukonuRequest {
var jsonStr = []byte(fmt.Sprintf(`
{
"queryType" : "topN",
"dataSource" : "%s",
"intervals" : ["2013-08-01/2013-08-03"],
"granularity" : "all",
"dimension" : "page",
"metric" : "edits",
"threshold" : 25,
"aggregations" : [
{
"type" : "longSum",
"name" : "edits",
"fieldName" : "count"
}
]
}`, ctx.Value("datasource")))
req, err := http.NewRequest("POST", "http://40.71.182.255:8082/druid/v2/", bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
if err != nil {
fmt.Println(err)
return nil
}
return impl.NewChukonuHttpRequest("topN", 0, true, true, func(ctx context.Context, resp core.ChukonuResponse) context.Context {
defer resp.RawResponse().(*http.Response).Body.Close()
// dump, err := resp.Dump()
// if err != nil {
// log.Fatal(err)
// }
// fmt.Println(string(dump))
io.Copy(ioutil.Discard, resp.RawResponse().(*http.Response).Body)
return ctx
}, nil, req)
}
workflow.AddRequest(fn2)
queue <- workflow
i++
// if i == 100 {
// break
// }
}
// close(queue)
}
var TestPlan DruidRequestProvider