-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
185 lines (162 loc) · 4.8 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Lambda writing random weather events to Dynamodb.
// Meant to be triggered every minute by an EventBridge scheduler.
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
var dynamodbClient *dynamodb.Client
var dynamoTable *string
var ctx context.Context = context.Background()
func init() {
dynamoTable = aws.String(os.Getenv("DYNAMO_TABLE"))
sdkConfig, err := config.LoadDefaultConfig(ctx)
if err != nil {
log.Fatal("Could not connect to dynamo ", err)
}
dynamodbClient = dynamodb.NewFromConfig(sdkConfig)
}
type WeatherEvent struct {
DeviceId int64
Time time.Time
EventType string
Value float64
}
func handler(ctx context.Context, request events.EventBridgeEvent) {
log.Println("generating random weather event")
events := make([]WeatherEvent, 0, 50)
for i := range 10 {
deviceId := int64(1000 + i)
events = append(events, randomEvents(deviceId)...)
}
addAllSamples(ctx, events)
log.Println("done")
}
// randomEvents creates one random weather event of each type for the given deviceID
func randomEvents(deviceId int64) []WeatherEvent {
return []WeatherEvent{
randomPressureEvent(deviceId),
randomTemperatureEvent(deviceId),
randomHumidityEvent(deviceId),
randomWindSpeedEvent(deviceId),
randomWindDirectionEvent(deviceId),
}
}
func randomPressureEvent(deviceId int64) WeatherEvent {
return WeatherEvent{
DeviceId: deviceId,
Time: time.Now(),
EventType: "Pressure",
Value: float64(rand.Int31n(100) + 950),
}
}
func randomTemperatureEvent(deviceId int64) WeatherEvent {
return WeatherEvent{
DeviceId: deviceId,
Time: time.Now(),
EventType: "Temperature",
Value: rand.Float64()*40 - 10,
}
}
func randomHumidityEvent(deviceId int64) WeatherEvent {
return WeatherEvent{
DeviceId: deviceId,
Time: time.Now(),
EventType: "Humidity",
Value: rand.Float64() * 100,
}
}
func randomWindSpeedEvent(deviceId int64) WeatherEvent {
return WeatherEvent{
DeviceId: deviceId,
Time: time.Now(),
EventType: "WindSpeed",
Value: float64(rand.Int31n(50)),
}
}
func randomWindDirectionEvent(deviceId int64) WeatherEvent {
return WeatherEvent{
DeviceId: deviceId,
Time: time.Now(),
EventType: "WindDirection",
Value: rand.Float64() * 360,
}
}
// addAllSamples slices the given array into batches of 25 (i.e. the maximum allowed
// by DynamoDB) and sends them to addSamples.
// (in theory we should check if keys overlap, although here we know they never do)
func addAllSamples(ctx context.Context, weatherEvents []WeatherEvent) {
log.Println("sending generated data to DB")
var waiter = sync.WaitGroup{}
for i := 0; i < len(weatherEvents); i += 25 {
fromIdx := i
toIdx := min(i+25, len(weatherEvents))
waiter.Add(1)
go func() {
defer waiter.Done()
if err := addSamples(ctx, weatherEvents[fromIdx:toIdx]); err != nil {
log.Println("failed to insert data in Dynamo", err)
}
}()
}
waiter.Wait()
}
// addSamples inserts the given weather events into DynamoDB as one single batch
func addSamples(ctx context.Context, weatherEvents []WeatherEvent) error {
log.Println("inserting batch")
if len(weatherEvents) > 0 && len(weatherEvents) < 26 {
putRequests := make([]types.WriteRequest, 0, len(weatherEvents))
for _, weatherEvent := range weatherEvents {
putRequest := types.WriteRequest{
PutRequest: &types.PutRequest{
Item: map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{
Value: fmt.Sprintf("DeviceId#%d", weatherEvent.DeviceId),
},
"SK": &types.AttributeValueMemberS{
Value: fmt.Sprintf("Time#%d#Type%s", weatherEvent.Time.Unix(), weatherEvent.EventType),
},
"DeviceId": &types.AttributeValueMemberN{
Value: fmt.Sprintf("%d", weatherEvent.DeviceId),
},
"EventType": &types.AttributeValueMemberS{
Value: weatherEvent.EventType,
},
"Value": &types.AttributeValueMemberN{
Value: fmt.Sprintf("%f", weatherEvent.Value),
},
"Time": &types.AttributeValueMemberN{
Value: fmt.Sprintf("%d", weatherEvent.Time.Unix()),
},
},
},
}
putRequests = append(putRequests, putRequest)
}
input := dynamodb.BatchWriteItemInput{
RequestItems: map[string][]types.WriteRequest{
*dynamoTable: putRequests,
},
}
if _, err := dynamodbClient.BatchWriteItem(ctx, &input); err != nil {
return fmt.Errorf("error while inserting events in DyanmoDB %w", err)
}
} else {
log.Printf("refusing to insert a batch of size %d", len(weatherEvents))
}
return nil
}
func main() {
lambda.Start(handler)
}