-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathexecutor.go
263 lines (211 loc) · 5.95 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package dqlx
import (
"context"
"encoding/json"
"errors"
dgo "github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"
)
// OperationExecutor represents a Dgraph executor for operations
// such as Queries and Mutations using the official dgo client
type OperationExecutor struct {
client *dgo.Dgraph
tnx *dgo.Txn
readOnly bool
bestEffort bool
}
// OperationExecutorOptionFn used to modify options of the executor
type OperationExecutorOptionFn func(executor *OperationExecutor)
// WithTnx configures a transaction to be used
// for the current execution
func WithTnx(tnx *dgo.Txn) OperationExecutorOptionFn {
return func(executor *OperationExecutor) {
executor.tnx = tnx
}
}
// WithClient configures a client for the current execution
func WithClient(client *dgo.Dgraph) OperationExecutorOptionFn {
return func(executor *OperationExecutor) {
executor.client = client
}
}
// WithReadOnly marks the execution as a read-only operation
// you can use this only on queries
func WithReadOnly(readOnly bool) OperationExecutorOptionFn {
return func(executor *OperationExecutor) {
executor.readOnly = readOnly
}
}
// WithBestEffort sets the best effort flag for the current execution
func WithBestEffort(bestEffort bool) OperationExecutorOptionFn {
return func(executor *OperationExecutor) {
executor.bestEffort = bestEffort
}
}
// NewDGoExecutor creates a new OperationExecutor
func NewDGoExecutor(client *dgo.Dgraph) *OperationExecutor {
return &OperationExecutor{
client: client,
}
}
// ExecuteQueries executes a query operation. If multiple queries are provided they will
// get merged into a single one.
// the transaction will be automatically committed if a custom tnx is not provided.
// only non-readonly transactions will be committed.
func (executor OperationExecutor) ExecuteQueries(ctx context.Context, queries ...QueryBuilder) (*Response, error) {
if err := executor.ensureClient(); err != nil {
return nil, err
}
query, variables, err := QueriesToDQL(queries...)
if err != nil {
return nil, err
}
tx := executor.getTnx()
defer tx.Discard(ctx)
resp, err := tx.QueryWithVars(ctx, query, variables)
if err != nil {
return nil, err
}
if !executor.readOnly {
if executor.tnx != nil {
err := tx.Commit(ctx)
if err != nil {
return nil, err
}
}
}
return executor.toResponse(resp, queries...)
}
// ExecuteMutations executes one ore more mutations.
// the transaction will be automatically committed if a custom tnx is not provided.
func (executor OperationExecutor) ExecuteMutations(ctx context.Context, mutations ...MutationBuilder) (*Response, error) {
if err := executor.ensureClient(); err != nil {
return nil, err
}
var queries []QueryBuilder
var mutationRequests []*api.Mutation
for _, mutation := range mutations {
var condition string
if mutation.condition != nil {
conditionDql, _, err := mutation.condition.ToDQL()
if err != nil {
return nil, err
}
condition = conditionDql
}
queries = append(queries, mutation.query)
setData, deleteData, err := mutationData(mutation)
if err != nil {
return nil, err
}
mutationRequest := &api.Mutation{
SetJson: setData,
DeleteJson: deleteData,
Cond: condition,
CommitNow: executor.tnx == nil,
}
mutationRequests = append(mutationRequests, mutationRequest)
}
query, variables, err := QueriesToDQL(queries...)
if IsEmptyQuery(query) {
query = ""
variables = nil
}
request := &api.Request{
Query: query,
Vars: variables,
ReadOnly: executor.readOnly,
BestEffort: executor.bestEffort,
Mutations: mutationRequests,
CommitNow: executor.tnx == nil,
RespFormat: api.Request_JSON,
}
tx := executor.getTnx()
defer tx.Discard(ctx)
resp, err := tx.Do(ctx, request)
if err != nil {
return nil, err
}
return executor.toResponse(resp, queries...)
}
func (executor OperationExecutor) toResponse(resp *api.Response, queries ...QueryBuilder) (*Response, error) {
var dataPathKey string
if len(queries) == 1 {
dataPathKey = queries[0].rootEdge.Name
} else {
dataPathKey = ""
}
queryResponse := &Response{
dataKeyPath: dataPathKey,
Raw: resp,
}
queries = ensureUniqueQueryNames(queries)
for _, queryBuilder := range queries {
if queryBuilder.unmarshalInto == nil {
continue
}
singleResponse := &Response{
dataKeyPath: queryBuilder.rootEdge.Name,
Raw: resp,
}
err := singleResponse.Unmarshal(queryBuilder.unmarshalInto)
if err != nil {
return nil, err
}
}
return queryResponse, nil
}
func mutationData(mutation MutationBuilder) (updateData []byte, deleteData []byte, err error) {
var setDataBytes []byte
var deleteDataBytes []byte
if mutation.setData != nil {
setBytes, err := json.Marshal(mutation.setData)
if err != nil {
return nil, nil, err
}
setDataBytes = setBytes
}
if mutation.delData != nil {
deleteBytes, err := json.Marshal(mutation.delData)
if err != nil {
return nil, nil, err
}
deleteDataBytes = deleteBytes
}
return setDataBytes, deleteDataBytes, nil
}
func (executor OperationExecutor) ensureClient() error {
if executor.client == nil {
return errors.New("cannot execute query without setting a dqlx. use DClient() to set one")
}
return nil
}
func (executor OperationExecutor) getTnx() *dgo.Txn {
tx := executor.tnx
if tx == nil {
if executor.readOnly {
tx = executor.client.NewReadOnlyTxn()
} else {
tx = executor.client.NewTxn()
}
}
return tx
}
// Response represents an operation response
type Response struct {
Raw *api.Response
dataKeyPath string
}
// Unmarshal allows to dynamically marshal the result set of a query
// into an interface value
func (response Response) Unmarshal(value interface{}) error {
if response.dataKeyPath == "" {
return json.Unmarshal(response.Raw.Json, value)
}
values := map[string]json.RawMessage{}
err := json.Unmarshal(response.Raw.Json, &values)
if err != nil {
return err
}
return json.Unmarshal(values[response.dataKeyPath], value)
}