-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhmm.py
128 lines (97 loc) · 3.6 KB
/
hmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import math
import sys
class Hmm:
# state is alway a string
start_p = []
trans_p = []
emit_p = []
states = {} # state:index
numStates = 0
alphabet = []
def __init__(self, states):
self.alphabet = states;
self.createStates(states)
self.numStates = len(self.states)
self.start_p = [1.0]*self.numStates;
# initialize matrices.
for i in xrange(0, self.numStates):
trans = list()
for j in xrange(0, self.numStates):
trans.append(1.0)
self.trans_p.append(trans)
self.createEmitP(states)
def createStates(self, states):
for i, state in enumerate(states):
self.states[state] = i;
def createEmitP(self, states):
for s in states:
self.emit_p.append({})
def normalize(self):
# normalize Initials
s = sum(self.start_p)
self.start_p = [math.log(i/s) for i in self.start_p]
# normalize Transition matrix
for i, line in enumerate(self.trans_p):
s = sum(line)
self.trans_p[i] = [math.log(x/s) for x in line]
# normailzie output matrix
for i, dic in enumerate(self.emit_p):
s = sum(dic.values())
for key, val in dic.iteritems():
dic[key] = math.log(val/s)
# to be able to represent situations not observed
dic['low'] = math.log(1.0/s)
def updateStartP(self, state):
self.start_p[self.states[state]] += 1
def updateTransP(self, previous, state):
row = self.states[previous]
col = self.states[state]
self.trans_p[row][col] += 1
def updateEmitP(self, state, output):
index = self.states[state]
dic = self.emit_p[index]
value = dic.get(output)
if value == None:
dic[output] = 2.0 # start with two for Laplace smothening
else:
dic[output] = value + 1.0
def getStartP(self, state):
return self.start_p[self.states[state]]
def getTransP(self, previous, state):
row = self.states[previous]
col = self.states[state]
return self.trans_p[row][col]
def getEmitP(self, state, output):
row = self.states[state]
dic = self.emit_p[row]
value = dic.get(output)
if not value:
# if the probability hasn't been observed
value = dic['low']
return value
#Returns the number of states in this HMM.
def getNumStates(self):
return self.numStates
#Returns the number of output symbols for this HMM.
def getNumOutputs(self):
return self.numStates
def printStartP(self):
print "\nStart probabilities"
for i, elem in enumerate(self.start_p):
print self.alphabet[i], elem
def printTransP(self):
print "\nTransition probabilities"
print "The numbers are rounded in the print but not in the calculations"
for i, l in enumerate(self.trans_p):
sys.stdout.write(self.alphabet[i]+" = ")
for key, val in enumerate(l):
sys.stdout.write(self.alphabet[key]+":%.3f " % (val))
sys.stdout.write("\n\n")
def printEmitP(self):
print "\nEmit probabilities"
print "The numbers are rounded in the print but not in the calculations"
for i, dic in enumerate(self.emit_p):
sys.stdout.write(self.alphabet[i]+" = ")
for key, val in dic.iteritems():
sys.stdout.write(key+":%.3f " % (val))
sys.stdout.write("\n\n")