-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriter.go
107 lines (91 loc) · 2.15 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package gloader
import (
"context"
"errors"
"log"
"sync"
"github.com/mohammadv184/gloader/data"
"github.com/mohammadv184/gloader/driver"
)
type Writer struct {
buffer *data.Buffer
connectionP *driver.ConnectionPool
dataCollection string
workers uint
rowPerBatch uint64
ctx context.Context
}
func NewWriter(ctx context.Context, dataCollection string, buffer *data.Buffer, connectionP *driver.ConnectionPool) *Writer {
return &Writer{
buffer: buffer,
connectionP: connectionP,
dataCollection: dataCollection,
workers: DefaultWorkers,
rowPerBatch: DefaultRowsPerBatch,
ctx: ctx,
}
}
func (w *Writer) SetWorkers(workers uint) {
w.workers = workers
}
func (w *Writer) SetRowsPerBatch(rowsPerBatch uint64) {
w.rowPerBatch = rowsPerBatch
}
func (w *Writer) Start() error {
if w.buffer == nil {
return ErrBufferNotSet
}
if w.connectionP == nil {
return ErrConnectionPoolNotSet
}
wg := &sync.WaitGroup{}
wg.Add(int(w.workers))
for i := uint(0); i < w.workers; i++ {
go func() {
defer wg.Done()
w.RunWorker()
}()
}
wg.Wait()
return nil
}
func (w *Writer) RunWorker() {
conn, cIndex, err := w.connectionP.Connect(w.ctx)
if err != nil {
panic(err)
}
wConn := conn.(driver.WritableConnection)
batch := data.NewDataBatch()
for {
batch.Clear()
for i := uint64(0); i < w.rowPerBatch; i++ {
dSet, err := w.buffer.Read()
if err != nil {
if errors.Is(err, data.ErrBufferIsClosed) {
batch.Add(dSet)
break
}
panic(err)
return
}
// fmt.Println("LengthChanged: ", w.dataCollection, batch.GetLength())
batch.Add(dSet)
}
if batch.GetLength() > 0 {
//fmt.Println("Writing: ", w.dataCollection, batch.GetLength())
if err := wConn.Write(w.ctx, w.dataCollection, batch); err != nil {
log.Printf("error on writing data to %s: %s", w.dataCollection, err)
panic(err)
}
}
if w.buffer.IsClosed() && w.buffer.IsEmpty() {
//log.Printf("%s Writer worker is closed", w.dataCollection)
err := w.connectionP.CloseConnection(cIndex)
if err != nil {
// TODO: logging system
//log.Println(err)
}
return
}
}
}