-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
99 lines (85 loc) · 2.4 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"github.com/kine-dmd/athena-query-speed-test/appleWatch3Row"
"github.com/kine-dmd/athena-query-speed-test/parquetHandler"
"github.com/kine-dmd/athena-query-speed-test/s3Connection"
"log"
"math/rand"
"os"
"runtime"
"strconv"
"sync"
"time"
)
const (
totalRows = 100000000
bucketName = "athena-query-size-test"
localFilePrefix = "/data/rp1615/"
)
func main() {
// Constants
numRowsPerFile, _ := strconv.Atoi(os.Args[1])
numFiles := totalRows / numRowsPerFile
// Make a file uploader
s3Conn := s3Connection.MakeS3Connection()
// Sync variables to limit number of threads
wg := sync.WaitGroup{}
routinePool := make(chan struct{}, runtime.NumCPU()*3)
// Make required number of files and upload them on multiple threads
for i := 0; i < numFiles; i++ {
// Send an empty struct to the routine pool (blocks if routine pool is full)
routinePool <- struct{}{}
wg.Add(1)
go func() {
// Make the parquet file and open it for upload
filePath := makeFileWithRandomRows(numRowsPerFile)
file, _ := os.Open(filePath)
s3FilePath := strconv.Itoa(numRowsPerFile) + "/" + filePath[len(localFilePrefix):]
// Upload the file and delete it to save space
err := s3Conn.UploadFile(bucketName, s3FilePath, file)
if err != nil {
log.Print("Error uploading file: ", err)
}
_ = file.Close()
_ = os.Remove(filePath)
// Read out from routine pool before uploading so another file can begin data generation
<-routinePool
wg.Done()
}()
}
// Wait for all threads to finish
wg.Wait()
}
func generateRandomRow() appleWatch3Row.AppleWatch3Row {
return appleWatch3Row.AppleWatch3Row{
Ts: rand.Uint64(),
Rx: rand.Float64(),
Ry: rand.Float64(),
Rz: rand.Float64(),
Rl: rand.Float64(),
Pt: rand.Float64(),
Yw: rand.Float64(),
Ax: rand.Float64(),
Ay: rand.Float64(),
Az: rand.Float64(),
Hr: rand.Float64(),
}
}
func makeFileWithRandomRows(numRows int) string {
// Make a new file
filePath := localFilePrefix + strconv.Itoa(int(time.Now().UnixNano())) + ".parquet"
pqFile, _ := parquetHandler.MakeParquetFile(filePath)
// Write rows to the file
for i := 0; i < numRows; i++ {
err := pqFile.WriteRow(generateRandomRow())
if err != nil {
log.Println("Error writing row to parquet file. ", err)
}
}
// Close the file
err := pqFile.CloseFile()
if err != nil {
log.Println("Unable to close file: ", filePath, err)
}
return filePath
}