-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsegment
executable file
·139 lines (108 loc) · 4.44 KB
/
segment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys, os
class gt:
def __init__(self, args):
self.label = None
self.firstcall = True
self.max_size = args.window_size
self.count = 0
def __call__(self,data,label=None):
s = ""
if self.label != label or (self.max_size!=0 and self.count > self.max_size):
self.label = label
self.count = 0
s += "\n"
s += label + "\t"
s += "\t".join(data)
s += "\n"
self.count += 1
return s
class sw:
def __init__(self, args):
self.size = args.window_size
self.overlap = args.overlap
self.backlog = []
def __call__(self,data,label=None):
s = ""
if len(self.backlog) >= self.size:
s = "\n".join(self.backlog)+"\n\n"
self.backlog = self.backlog[int(self.size*(1-self.overlap)):]
self.backlog.append(label + "\t" + "\t".join(data))
return s
class nrg:
def __init__(self, args):
self.firstcall = True;
# the command line params
self.threshold = args.threshold
self.intWinSize = args.window_size
# the ground truth labels
self.labels = {}
# the shortened horizon where the energy is calculated over
self.buffer = [['0', '0', '0'] for x in range(0, self.intWinSize)]
self.sampleCount = 0
# mark if we crossed the threshold
self.wasBelowThreshold = True
def __call__(self,data,label=None):
s = ""
if self.firstcall:
self.firstcall = False
# add new data to buffer
del self.buffer[0]
self.buffer.append(data)
self.sampleCount += 1
if self.sampleCount < self.intWinSize:
return ""
# calculate the signal energy of the current buffer
energy = 0
for b in self.buffer:
x = float(b[0])
y = float(b[1])
z = float(b[2])
energy += x * x + y * y + z * z
energy /= float(self.intWinSize)
# write data, delayed by (intWinSize / 2)
s += label + "\t"
s += "\t".join(self.buffer[int(self.intWinSize / 2)])
# if a segment is reached (the threshold was crossed), make a line break
if ((energy > self.threshold) is self.wasBelowThreshold): # XNOR
self.wasBelowThreshold = not self.wasBelowThreshold
s += "\n"
s += "\n"
return s
methods = {
'gt' : gt,
'sw' : sw,
'nrg': nrg }
if __name__=="__main__":
import argparse, fileinput
cmdline = argparse.ArgumentParser('segment stream of data into timeseries')
subparser = cmdline.add_subparsers(help="sub command for the chosen segmentation method", dest="method")
sw_cmdline = subparser.add_parser('sw', help="sliding window method")
sw_cmdline.add_argument('-W', '--window-size', type=int, default=10, help="number of samples in each window")
sw_cmdline.add_argument('-O', '--overlap', type=float, default=0., help="percentage from [0,1] of overlap in each window")
sw_cmdline.add_argument('files', metavar='FILES', type=str, nargs='*', help="input files or - for stdin")
gt_cmdline = subparser.add_parser('gt', help="groundtruth method")
gt_cmdline.add_argument('-W', '--window-size', type=int, default=0, help="set to non-zero for maximum sample size before split")
gt_cmdline.add_argument('files', metavar='FILES', type=str, nargs='*', help="input files or - for stdin")
eb_cmdline = subparser.add_parser('nrg', help="energy based method")
eb_cmdline.add_argument('-T', '--threshold', type=float, default=0, help="creates a new segment every time the threshold is crossed")
eb_cmdline.add_argument('-W', '--window-size', type=int, default=1, help="size of the integration window")
eb_cmdline.add_argument('files', metavar='FILES', type=str, nargs='*', help="input files or - for stdin")
args = cmdline.parse_args()
if args.method is None:
cmdline.print_help()
print("ERR: need a segmentation method")
os._exit(-1)
method = methods[args.method](args)
last_label = None
# avoid buffering with bufsize=1
for line in fileinput.input(args.files,bufsize=1000):
if line.strip() == "":
continue
if line.strip()[0] == '#':
print (line)
continue
label,*rest = line.split()
sys.stdout.write(method(rest,label))
sys.stdout.flush()