-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFraudGraph_ddl.gsql
511 lines (425 loc) · 17.3 KB
/
FraudGraph_ddl.gsql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
SET exit_on_error = FALSE
DROP ALL
// =========================================================================
// Vertex types
CREATE VERTEX merchant(PRIMARY_ID id STRING, num_frauds INT, num_trans INT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
CREATE VERTEX category(PRIMARY_ID id STRING, num_frauds INT, num_trans INT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
CREATE VERTEX global_stats(PRIMARY_ID id STRING, num_trans INT, num_frauds INT) WITH STATS="OUTDEGREE_BY_EDGETYPE"
CREATE VERTEX customer(PRIMARY_ID id STRING, num_frauds INT, average_purchase DOUBLE, stddev_purchase DOUBLE, num_trans INT) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
CREATE VERTEX transaction(PRIMARY_ID id UINT, amount DOUBLE, fraud INT, step INT, prev_frauds_cust INT, prev_frauds_merch INT, prev_frauds_cat INT, rank_fraud DOUBLE, dist_fraud INT, prev_trans_cat INT, prev_trans_merch INT, prev_trans_cust INT, prev_sum_cat DOUBLE, prev_sum_merch DOUBLE, prev_sum_cust DOUBLE, prev_max_cat DOUBLE, prev_max_merch DOUBLE, prev_max_cust DOUBLE) WITH STATS="OUTDEGREE_BY_EDGETYPE", PRIMARY_ID_AS_ATTRIBUTE="true"
// =========================================================================
// Edge types
CREATE UNDIRECTED EDGE transaction2category(FROM transaction, TO category)
CREATE UNDIRECTED EDGE transaction2merchant(FROM transaction, TO merchant)
CREATE UNDIRECTED EDGE customer2transaction(FROM customer, TO transaction)
// =========================================================================
// Graph
CREATE GRAPH FraudGraph(merchant, category, global_stats, customer, transaction, transaction2category, transaction2merchant, customer2transaction)
// =========================================================================
// Indices
BEGIN
CREATE GLOBAL SCHEMA_CHANGE JOB add_index {
ALTER VERTEX transaction ADD INDEX transaction_step_ix ON (step);
}
END
RUN GLOBAL SCHEMA_CHANGE JOB add_index
USE GLOBAL
DROP JOB add_index
USE GRAPH FraudGraph
// =========================================================================
// Data sources
CREATE DATA_SOURCE S3 s3ds = "{'file.reader.settings.fs.s3a.access.key':'{$ACCESS_KEY}','file.reader.settings.fs.s3a.secret.key':'{$SECRET_KEY}'}"
GRANT DATA_SOURCE s3ds TO GRAPH FraudGraph
// =========================================================================
// Loading jobs
BEGIN
CREATE LOADING JOB load_data FOR GRAPH FraudGraph {
DEFINE FILENAME MyDataSource = "$s3ds:{\"file.uris\":\"s3://tg-workshop-2020-10-04/newdata.csv\"}";
LOAD MyDataSource TO VERTEX merchant VALUES($6, _, _) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO VERTEX customer VALUES($2, _, _, _, _) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO VERTEX transaction VALUES($0, $9, $10, $1, _, _, _, _, _, _, _, _, _, _, _, _, _, _) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO VERTEX category VALUES($8, _, _) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO EDGE customer2transaction VALUES($2, $0) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO EDGE transaction2merchant VALUES($0, $6) USING SEPARATOR=",", HEADER="true", EOL="\n";
LOAD MyDataSource TO EDGE transaction2category VALUES($0, $8) USING SEPARATOR=",", HEADER="true", EOL="\n";
}
END
RUN LOADING JOB load_data USING EOF="true"
// =========================================================================
// Queries
BEGIN
CREATE QUERY fix_statistics() FOR GRAPH FraudGraph {
/*
Calculate statistics across data set
In real world, statistics would be updated per transaction incrementally, not in bulk like this
*/
// Statistics
SumAccum<INT> @frauds; // fraud_fraction_alltime = frauds/numTrans
SumAccum<INT> @numTrans;
ListAccum<DOUBLE> @amounts;
AvgAccum @mean;
SumAccum<INT> @@num_frauds;
AllC = {customer.*};
AllC =
SELECT c
FROM AllC:c -(customer2transaction)- transaction:t
ACCUM c.@frauds += t.fraud
, c.@amounts += t.amount
, c.@numTrans += 1
, c.@mean += t.amount
POST-ACCUM
DOUBLE runSum = 0
, c.num_frauds = c.@frauds
, c.average_purchase = c.@mean
, IF c.@numTrans > 1 THEN
FOREACH a IN c.@amounts DO
runSum = runSum + pow(a-c.@mean, 2) // This gives the square of the mean difference
END
, c.stddev_purchase = sqrt(runSum/(c.@numTrans-1)) // Corrected stddev (Bessel's correction)
ELSE
c.stddev_purchase = 0
END
, c.num_trans = c.@numTrans;
AllM = {merchant.*};
AllM =
SELECT m
FROM AllM:m -(transaction2merchant)- transaction:t
ACCUM m.@frauds += t.fraud
, m.@numTrans += 1
POST-ACCUM
m.num_frauds = m.@frauds
, m.num_trans = m.@numTrans;
AllCat = {category.*};
AllCat =
SELECT c
FROM AllCat:c -(transaction2category)- transaction:t
ACCUM c.@frauds += t.fraud
, c.@numTrans += 1
POST-ACCUM
c.num_frauds = c.@frauds
, c.num_trans = c.@numTrans;
AllT = {transaction.*};
AllT =
SELECT t
FROM AllT:t
POST-ACCUM
@@num_frauds += t.fraud;
AllG = {global_stats.*};
IF AllG.size() == 0 THEN
INSERT INTO global_stats
VALUES ("1", AllT.size(), @@num_frauds);
ELSE
UPDATE g FROM AllG:g
SET g.num_trans = AllT.size()
, g.num_frauds = @@num_frauds;
END;
PRINT "Global, category, merchant and customer all-history statistics updated" as complete;
}
END
// -------------------------------------------------------------------------
BEGIN
CREATE QUERY fix_txn_aggs() FOR GRAPH FraudGraph {
/*
The problem is, how to put aggregates per transaction for that day.
Fixes up a transaction with any previous frauds on the attached customer, merchant or category.
In production, this would be aggregated live based on the transactions that day
Time: About 3':34''
*/
SetAccum<VERTEX<transaction>> @my_fraud_ts, @my_ts;
SumAccum<INT> @prev_f_cust, @prev_f_merch, @prev_f_cat; // Prev's are for this day only
SumAccum<INT> @prev_t_cust, @prev_t_merch, @prev_t_cat;
SumAccum<DOUBLE> @prev_sum_cust, @prev_sum_merch, @prev_sum_cat;
MaxAccum<DOUBLE> @prev_max_cust = 0, @prev_max_merch = 0, @prev_max_cat = 0;
MinAccum<INT> @@min_step;
MaxAccum<INT> @@max_step;
allT = {transaction.*};
allT =
SELECT t
FROM allT:t
POST-ACCUM
@@min_step += t.step
, @@max_step += t.step;
// Fixing up transactions in each step, one at a time.
// In the production world, this would be a single day.
FOREACH i_step IN RANGE[@@min_step, @@max_step] DO
// Get all fraudulent transations of the current step
myFrauds =
SELECT t
FROM allT:t
WHERE t.step == i_step AND t.fraud == 1;
// Add all frauds on this step to the attached customer
fraudC =
SELECT cs
FROM myFrauds:t -()- customer:cs
ACCUM cs.@my_fraud_ts += t;
// And attach them to all transactions
fraudC =
SELECT cs
FROM fraudC:cs -()- transaction:t
WHERE t.step == i_step
ACCUM FOREACH it IN cs.@my_fraud_ts DO
IF it.id < t.id THEN
t.@prev_f_cust += 1 // Add 1 when a fraud
END
END
POST-ACCUM
cs.@my_fraud_ts.clear();
// Add all frauds on this step to the merchant
fraudM =
SELECT ms
FROM myFrauds:t -()- merchant:ms
ACCUM ms.@my_fraud_ts += t;
// And attach them to all transactions
fraudM =
SELECT ms
FROM fraudM:ms -()- transaction:t
WHERE t.step == i_step
ACCUM FOREACH it IN ms.@my_fraud_ts DO
IF it.id < t.id THEN
t.@prev_f_merch += 1
END
END
POST-ACCUM
ms.@my_fraud_ts.clear();
// Add all transactions on this step to the category
fraudCat =
SELECT cat
FROM myFrauds:t -()- category:cat
ACCUM cat.@my_fraud_ts += t;
// And attach them to all transactions
fraudCat =
SELECT cat
FROM fraudCat:cat -()- transaction:t
WHERE t.step == i_step
ACCUM FOREACH it IN cat.@my_fraud_ts DO
IF it.id < t.id THEN
t.@prev_f_cat += 1
END
END
POST-ACCUM
cat.@my_fraud_ts.clear();
// Transactions of this step
myTrans =
SELECT tt
FROM allT:tt
WHERE tt.step == i_step;
// Assign all transactions of this step to all customers, marchants and categories affected in this step
myCMC =
SELECT cmc
FROM myTrans:tt -()- :cmc
ACCUM cmc.@my_ts += tt;
// Aggregate
myCMC =
SELECT cmc
FROM myCMC:cmc -()- transaction:tt
WHERE tt.step == i_step
ACCUM FOREACH it IN cmc.@my_ts DO
IF it.id < tt.id THEN
CASE cmc.type
WHEN "category" THEN
tt.@prev_t_cat += 1
, tt.@prev_sum_cat += it.amount
, tt.@prev_max_cat += it.amount
WHEN "merchant" THEN
tt.@prev_t_merch += 1
, tt.@prev_sum_merch += it.amount
, tt.@prev_max_merch += it.amount
WHEN "customer" THEN
tt.@prev_t_cust += 1
, tt.@prev_sum_cust += it.amount
, tt.@prev_max_cust += it.amount
END // CASE
END // IF
END // FOREACH
POST-ACCUM
cmc.@my_ts.clear();
// Update transactions' attributes with the aggregated data
myTrans =
SELECT ts
FROM myTrans:ts
POST-ACCUM
ts.prev_frauds_cust = ts.@prev_f_cust
, ts.prev_frauds_merch = ts.@prev_f_merch
, ts.prev_frauds_cat = ts.@prev_f_cat
, ts.prev_trans_cat = ts.@prev_t_cat
, ts.prev_trans_merch = ts.@prev_t_merch
, ts.prev_trans_cust = ts.@prev_t_cust
, ts.prev_sum_cat = ts.@prev_sum_cat
, ts.prev_sum_merch = ts.@prev_sum_merch
, ts.prev_sum_cust = ts.@prev_sum_cust
, ts.prev_max_cat = ts.@prev_max_cat
, ts.prev_max_merch = ts.@prev_max_merch
, ts.prev_max_cust = ts.@prev_max_cust;
END; // FOR loop
PRINT "Daily transaction aggregates updated" as complete;
}
END
// -------------------------------------------------------------------------
BEGIN
CREATE QUERY get_all_trans_tabular3(INT days_history = 1) FOR GRAPH FraudGraph {
/*
Now with hops_to_fraud
This is the all transactions in timespan specified with graph expansion
Returns a table with some days history aggregated
Basic info: tran_id, my_cust, my_merch, my_cat, amount, is_fraud, step
Cust history info agg'd for last days inc any others today: cust_avg_amount, cust_max_amount, cust_num_trans, cust_num_frauds
Merchant history info agg'd : merch_avg_amount, merch_max_amount, merch_num_trans, merch_num_frauds
Category history : cat_avg_amount, cat_max_amount, cat_num_trans, cat_num_frauds
*/
// Maps: day/step value -> aggregate values ready for pulling into transactions
TYPEDEF TUPLE <INT ttid, INT fraud, VERTEX <transaction> tt> tt_tuple;
GroupByAccum<INT step, SumAccum<DOUBLE> sum_amount, MaxAccum<DOUBLE> max_amount, SumAccum<INT> num_trans, SumAccum<INT> num_frauds> @my_aggs;
// This is what we'll use to calc outputs per transaction
MinAccum<STRING> @my_merch, @my_cat, @my_cust;
SumAccum<DOUBLE> @cust_sum, @merch_sum, @cat_sum;
MaxAccum<DOUBLE> @cust_max=0, @merch_max=0, @cat_max=0, @cust_f_t=0, @merch_f_t=0, @cat_f_t=0;
SumAccum<INT> @cust_num_t, @cust_num_f, @merch_num_t, @merch_num_f, @cat_num_t, @cat_num_f, @f_prev, @f2_prev;
// ASC means first pop off the top is the lowest value
HeapAccum<tt_tuple> (1, ttid ASC) @transactions_in_order;
MapAccum<VERTEX<transaction>, UINT> @@tran_fraud_bitmaps;
ListAccum<VERTEX<transaction>> @@bunch_of_transactions;
// Probably this would be windowed in the real world i.e. only look at 3 months history
// Accumulate aggregates per historical step
AllC = {customer.*};
AllC =
SELECT c
FROM AllC:c -(customer2transaction)- transaction:t
ACCUM c.@my_aggs += (t.step -> t.amount, t.amount, 1, t.fraud);
AllM = {merchant.*};
AllM =
SELECT m
FROM AllM:m -(transaction2merchant)- transaction:t
ACCUM m.@my_aggs += (t.step -> t.amount, t.amount, 1, t.fraud);
AllCat = {category.*};
AllCat =
SELECT cat
FROM AllCat:cat -(transaction2category)- transaction:t
ACCUM cat.@my_aggs += (t.step -> t.amount, t.amount, 1, t.fraud);
// OK, so now we go through all transactions, grabbing agg's from related historical records as we go
// offset from step in transaction itself from previous day
AllT = {transaction.*};
// Grab customer history
AllT =
SELECT t
FROM AllT:t -(customer2transaction)- customer:c
ACCUM // will be one customer per transaction
t.@my_cust = c.id
, FOREACH step in RANGE[t.step - days_history, t.step-1] DO
t.@cust_sum += c.@my_aggs.get(step).sum_amount
, t.@cust_max += c.@my_aggs.get(step).max_amount
, t.@cust_num_t += c.@my_aggs.get(step).num_trans
, t.@cust_num_f += c.@my_aggs.get(step).num_frauds
END
, t.@cust_sum += t.prev_sum_cust // this day frauds, exact history
, t.@cust_max += t.prev_max_cust
, t.@cust_num_t += t.prev_trans_cust
, t.@cust_num_f += t.prev_frauds_cust
POST-ACCUM
IF t.@cust_num_t > 0 AND t.@cust_num_f > 0 THEN
t.@cust_f_t = 1.0 * t.@cust_num_f / t.@cust_num_t
END;
AllT =
SELECT t
FROM AllT:t -(transaction2merchant)- merchant:m
ACCUM t.@my_merch = m.id
, FOREACH step in RANGE[t.step-days_history, t.step-1] DO
t.@merch_sum += m.@my_aggs.get(step).sum_amount
, t.@merch_max += m.@my_aggs.get(step).max_amount
, t.@merch_num_t += m.@my_aggs.get(step).num_trans
, t.@merch_num_f += m.@my_aggs.get(step).num_frauds
END
, t.@merch_sum += t.prev_sum_merch
, t.@merch_max += t.prev_max_merch
, t.@merch_num_t += t.prev_trans_merch
, t.@merch_num_f += t.prev_frauds_merch
POST-ACCUM
IF t.@merch_num_t > 0 AND t.@merch_num_f > 0 THEN
t.@merch_f_t = 1.0 * t.@merch_num_f/t.@merch_num_t
END;
T =
SELECT t
FROM AllT:t -(transaction2category)- category:cat
ACCUM t.@my_cat = cat.id
, FOREACH step in RANGE[t.step-days_history, t.step-1] DO
t.@cat_sum += cat.@my_aggs.get(step).sum_amount
, t.@cat_max += cat.@my_aggs.get(step).max_amount
, t.@cat_num_t += cat.@my_aggs.get(step).num_trans
, t.@cat_num_f += cat.@my_aggs.get(step).num_frauds
END
, t.@cat_sum += t.prev_sum_cat
, t.@cat_max += t.prev_max_cat
, t.@cat_num_t += t.prev_trans_cat
, t.@cat_num_f += t.prev_frauds_cat
POST-ACCUM
IF t.@cat_num_t > 0 AND t.@cat_num_f > 0 THEN
t.@cat_f_t = 1.0 * t.@cat_num_f/t.@cat_num_t
END;
// Set f_prev for each customer
all_c = {customer.*};
// Attach correctly sized heap
all_c =
SELECT cc
FROM all_c:cc
POST-ACCUM
cc.@transactions_in_order.resize(cc.outdegree("customer2transaction"));
all_c =
SELECT cc
FROM all_c:cc -()- transaction:tt
ACCUM cc.@transactions_in_order += tt_tuple(tt.id, tt.fraud, tt)
POST-ACCUM
UINT pre_fraud_bitmap = 0
// Set transaction bitmap for later update
, WHILE cc.@transactions_in_order.size() > 0 DO
IF pre_fraud_bitmap != 0 THEN
@@tran_fraud_bitmaps += (cc.@transactions_in_order.top().tt -> pre_fraud_bitmap)
END
// Grab fraud bit for next record
, pre_fraud_bitmap = (pre_fraud_bitmap << 1) + cc.@transactions_in_order.top().fraud
, pre_fraud_bitmap = pre_fraud_bitmap & 3
// And pop old record
, cc.@transactions_in_order.pop()
END
, cc.@transactions_in_order.clear(); // clear memory
// So now we have @@tran_fraud_bitmaps, need to stuff into real records
FOREACH (tt, bitmap) IN @@tran_fraud_bitmaps DO
@@bunch_of_transactions += tt;
END;
my_tt = {@@bunch_of_transactions};
my_tt =
SELECT t1
FROM my_tt:t1
POST-ACCUM
UINT bitmap = 0
, bitmap = @@tran_fraud_bitmaps.get(t1)
, t1.@f_prev = bitmap & 1
, t1.@f2_prev = bitmap >> 1 & 1;
PRINT T[
//T.id
T.amount
, T.fraud
, T.step
, T.@my_cust
, T.@my_merch
, T.@my_cat
, T.@cust_sum
, T.@cust_max
, T.@cust_f_t
, T.@merch_sum
, T.@merch_max
, T.@merch_f_t
, T.@cat_sum
, T.@cat_max
, T.@cat_f_t
, T.@f_prev
, T.@f2_prev
, T.@cust_num_f
, T.@cust_num_t
, T.@merch_num_f
, T.@merch_num_t
, T.@cat_num_f
, T.@cat_num_t
];
}
END
INSTALL QUERY ALL
// EOF