1 """Deal with representations of Markov Models.
2 """
3
4 import copy
5 import math
6 import random
7
8
9
10
11
12 from Bio.Seq import MutableSeq
13
14
16 """ Return an array of n random numbers, where the elements of the array sum
17 to 1.0"""
18 randArray = [random.random() for i in range(n)]
19 total = sum(randArray)
20 normalizedRandArray = [x/total for x in randArray]
21
22 return normalizedRandArray
23
24
26 """Calculate which symbols can be emitted in each state
27 """
28
29
30 emissions = dict()
31 for state, symbol in emission_probs:
32 try:
33 emissions[state].append(symbol)
34 except KeyError:
35 emissions[state] = [symbol]
36
37 return emissions
38
39
41 """Calculate which 'from transitions' are allowed for each state
42
43 This looks through all of the trans_probs, and uses this dictionary
44 to determine allowed transitions. It converts this information into
45 a dictionary, whose keys are source states and whose values are
46 lists of destination states reachable from the source state via a
47 transition.
48 """
49 transitions = dict()
50 for from_state, to_state in trans_probs:
51 try:
52 transitions[from_state].append(to_state)
53 except KeyError:
54 transitions[from_state] = [to_state]
55
56 return transitions
57
58
60 """Calculate which 'to transitions' are allowed for each state
61
62 This looks through all of the trans_probs, and uses this dictionary
63 to determine allowed transitions. It converts this information into
64 a dictionary, whose keys are destination states and whose values are
65 lists of source states from which the destination is reachable via a
66 transition.
67 """
68 transitions = dict()
69 for from_state, to_state in trans_probs:
70 try:
71 transitions[to_state].append(from_state)
72 except KeyError:
73 transitions[to_state] = [from_state]
74
75 return transitions
76
77
79 """Interface to build up a Markov Model.
80
81 This class is designed to try to separate the task of specifying the
82 Markov Model from the actual model itself. This is in hopes of making
83 the actual Markov Model classes smaller.
84
85 So, this builder class should be used to create Markov models instead
86 of trying to initiate a Markov Model directly.
87 """
88
89 DEFAULT_PSEUDO = 1
90
91 - def __init__(self, state_alphabet, emission_alphabet):
92 """Initialize a builder to create Markov Models.
93
94 Arguments:
95
96 o state_alphabet -- An alphabet containing all of the letters that
97 can appear in the states
98
99 o emission_alphabet -- An alphabet containing all of the letters for
100 states that can be emitted by the HMM.
101 """
102 self._state_alphabet = state_alphabet
103 self._emission_alphabet = emission_alphabet
104
105
106
107 self.initial_prob = {}
108
109
110
111 self.transition_prob = {}
112 self.emission_prob = self._all_blank(state_alphabet,
113 emission_alphabet)
114
115
116 self.transition_pseudo = {}
117 self.emission_pseudo = self._all_pseudo(state_alphabet,
118 emission_alphabet)
119
120 - def _all_blank(self, first_alphabet, second_alphabet):
121 """Return a dictionary with all counts set to zero.
122
123 This uses the letters in the first and second alphabet to create
124 a dictionary with keys of two tuples organized as
125 (letter of first alphabet, letter of second alphabet). The values
126 are all set to 0.
127 """
128 all_blank = {}
129 for first_state in first_alphabet.letters:
130 for second_state in second_alphabet.letters:
131 all_blank[(first_state, second_state)] = 0
132
133 return all_blank
134
135 - def _all_pseudo(self, first_alphabet, second_alphabet):
136 """Return a dictionary with all counts set to a default value.
137
138 This takes the letters in first alphabet and second alphabet and
139 creates a dictionary with keys of two tuples organized as:
140 (letter of first alphabet, letter of second alphabet). The values
141 are all set to the value of the class attribute DEFAULT_PSEUDO.
142 """
143 all_counts = {}
144 for first_state in first_alphabet.letters:
145 for second_state in second_alphabet.letters:
146 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
147
148 return all_counts
149
151 """Return the markov model corresponding with the current parameters.
152
153 Each markov model returned by a call to this function is unique
154 (ie. they don't influence each other).
155 """
156
157
158 if not self.initial_prob:
159 raise Exception("set_initial_probabilities must be called to " +
160 "fully initialize the Markov model")
161
162 initial_prob = copy.deepcopy(self.initial_prob)
163 transition_prob = copy.deepcopy(self.transition_prob)
164 emission_prob = copy.deepcopy(self.emission_prob)
165 transition_pseudo = copy.deepcopy(self.transition_pseudo)
166 emission_pseudo = copy.deepcopy(self.emission_pseudo)
167
168 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
169 transition_pseudo, emission_pseudo)
170
172 """Set initial state probabilities.
173
174 initial_prob is a dictionary mapping states to probabilities.
175 Suppose, for example, that the state alphabet is ['A', 'B']. Call
176 set_initial_prob({'A': 1}) to guarantee that the initial
177 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
178 to make each initial state equally probable.
179
180 This method must now be called in order to use the Markov model
181 because the calculation of initial probabilities has changed
182 incompatibly; the previous calculation was incorrect.
183
184 If initial probabilities are set for all states, then they should add up
185 to 1. Otherwise the sum should be <= 1. The residual probability is
186 divided up evenly between all the states for which the initial
187 probability has not been set. For example, calling
188 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
189 for the above example.
190 """
191 self.initial_prob = copy.copy(initial_prob)
192
193
194 for state in initial_prob.iterkeys():
195 assert state in self._state_alphabet.letters, \
196 "State %s was not found in the sequence alphabet" % state
197
198
199 num_states_not_set =\
200 len(self._state_alphabet.letters) - len(self.initial_prob)
201 if num_states_not_set < 0:
202 raise Exception("Initial probabilities can't exceed # of states")
203 prob_sum = sum(self.initial_prob.values())
204 if prob_sum > 1.0:
205 raise Exception("Total initial probability cannot exceed 1.0")
206 if num_states_not_set > 0:
207 prob = (1.0 - prob_sum) / num_states_not_set
208 for state in self._state_alphabet.letters:
209 if not state in self.initial_prob:
210 self.initial_prob[state] = prob
211
213 """Reset all probabilities to be an average value.
214
215 Resets the values of all initial probabilities and all allowed
216 transitions and all allowed emissions to be equal to 1 divided by the
217 number of possible elements.
218
219 This is useful if you just want to initialize a Markov Model to
220 starting values (ie. if you have no prior notions of what the
221 probabilities should be -- or if you are just feeling too lazy
222 to calculate them :-).
223
224 Warning 1 -- this will reset all currently set probabilities.
225
226 Warning 2 -- This just sets all probabilities for transitions and
227 emissions to total up to 1, so it doesn't ensure that the sum of
228 each set of transitions adds up to 1.
229 """
230
231
232 new_initial_prob = float(1) / float(len(self.transition_prob))
233 for state in self._state_alphabet.letters:
234 self.initial_prob[state] = new_initial_prob
235
236
237 new_trans_prob = float(1) / float(len(self.transition_prob))
238 for key in self.transition_prob:
239 self.transition_prob[key] = new_trans_prob
240
241
242 new_emission_prob = float(1) / float(len(self.emission_prob))
243 for key in self.emission_prob:
244 self.emission_prob[key] = new_emission_prob
245
247 """Set all initial state probabilities to a randomly generated distribution.
248 Returns the dictionary containing the initial probabilities.
249 """
250 initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
251 for state in self._state_alphabet.letters:
252 self.initial_prob[state] = initial_freqs.pop()
253
254 return self.initial_prob
255
257 """Set all allowed transition probabilities to a randomly generated distribution.
258 Returns the dictionary containing the transition probabilities.
259 """
260
261 if not self.transition_prob:
262 raise Exception("No transitions have been allowed yet. " +
263 "Allow some or all transitions by calling " +
264 "allow_transition or allow_all_transitions first.")
265
266 transitions_from = _calculate_from_transitions(self.transition_prob)
267 for from_state in transitions_from.keys():
268 freqs = _gen_random_array(len(transitions_from[from_state]))
269 for to_state in transitions_from[from_state]:
270 self.transition_prob[(from_state, to_state)] = freqs.pop()
271
272 return self.transition_prob
273
275 """Set all allowed emission probabilities to a randomly generated
276 distribution. Returns the dictionary containing the emission
277 probabilities.
278 """
279
280 if not self.emission_prob:
281 raise Exception("No emissions have been allowed yet. " +
282 "Allow some or all emissions.")
283
284 emissions = _calculate_emissions(self.emission_prob)
285 for state in emissions.iterkeys():
286 freqs = _gen_random_array(len(emissions[state]))
287 for symbol in emissions[state]:
288 self.emission_prob[(state, symbol)] = freqs.pop()
289
290 return self.emission_prob
291
301
302
303
305 """A convenience function to create transitions between all states.
306
307 By default all transitions within the alphabet are disallowed; this
308 is a way to change this to allow all possible transitions.
309 """
310
311
312 all_probs = self._all_blank(self._state_alphabet,
313 self._state_alphabet)
314
315 all_pseudo = self._all_pseudo(self._state_alphabet,
316 self._state_alphabet)
317
318
319
320 for set_key in self.transition_prob:
321 all_probs[set_key] = self.transition_prob[set_key]
322
323 for set_key in self.transition_pseudo:
324 all_pseudo[set_key] = self.transition_pseudo[set_key]
325
326
327 self.transition_prob = all_probs
328 self.transition_pseudo = all_pseudo
329
330 - def allow_transition(self, from_state, to_state, probability = None,
331 pseudocount = None):
332 """Set a transition as being possible between the two states.
333
334 probability and pseudocount are optional arguments
335 specifying the probabilities and pseudo counts for the transition.
336 If these are not supplied, then the values are set to the
337 default values.
338
339 Raises:
340 KeyError -- if the two states already have an allowed transition.
341 """
342
343 for state in [from_state, to_state]:
344 assert state in self._state_alphabet.letters, \
345 "State %s was not found in the sequence alphabet" % state
346
347
348 if (from_state, to_state) not in self.transition_prob and \
349 (from_state, to_state) not in self.transition_pseudo:
350
351 if probability is None:
352 probability = 0
353 self.transition_prob[(from_state, to_state)] = probability
354
355
356 if pseudocount is None:
357 pseudcount = self.DEFAULT_PSEUDO
358 self.transition_pseudo[(from_state, to_state)] = pseudocount
359 else:
360 raise KeyError("Transition from %s to %s is already allowed."
361 % (from_state, to_state))
362
364 """Restrict transitions between the two states.
365
366 Raises:
367 KeyError if the transition is not currently allowed.
368 """
369 try:
370 del self.transition_prob[(from_state, to_state)]
371 del self.transition_pseudo[(from_state, to_state)]
372 except KeyError:
373 raise KeyError("Transition from %s to %s is already disallowed."
374 % (from_state, to_state))
375
377 """Set the probability of a transition between two states.
378
379 Raises:
380 KeyError if the transition is not allowed.
381 """
382 if (from_state, to_state) in self.transition_prob:
383 self.transition_prob[(from_state, to_state)] = probability
384 else:
385 raise KeyError("Transition from %s to %s is not allowed."
386 % (from_state, to_state))
387
389 """Set the default pseudocount for a transition.
390
391 To avoid computational problems, it is helpful to be able to
392 set a 'default' pseudocount to start with for estimating
393 transition and emission probabilities (see p62 in Durbin et al
394 for more discussion on this. By default, all transitions have
395 a pseudocount of 1.
396
397 Raises:
398 KeyError if the transition is not allowed.
399 """
400 if (from_state, to_state) in self.transition_pseudo:
401 self.transition_pseudo[(from_state, to_state)] = count
402 else:
403 raise KeyError("Transition from %s to %s is not allowed."
404 % (from_state, to_state))
405
406
407
409 """Set the probability of a emission from a particular state.
410
411 Raises:
412 KeyError if the emission from the given state is not allowed.
413 """
414 if (seq_state, emission_state) in self.emission_prob:
415 self.emission_prob[(seq_state, emission_state)] = probability
416 else:
417 raise KeyError("Emission of %s from %s is not allowed."
418 % (emission_state, seq_state))
419
421 """Set the default pseudocount for an emission.
422
423 To avoid computational problems, it is helpful to be able to
424 set a 'default' pseudocount to start with for estimating
425 transition and emission probabilities (see p62 in Durbin et al
426 for more discussion on this. By default, all emissions have
427 a pseudocount of 1.
428
429 Raises:
430 KeyError if the emission from the given state is not allowed.
431 """
432 if (seq_state, emission_state) in self.emission_pseudo:
433 self.emission_pseudo[(seq_state, emission_state)] = count
434 else:
435 raise KeyError("Emission of %s from %s is not allowed."
436 % (emission_state, seq_state))
437
438
440 """Represent a hidden markov model that can be used for state estimation.
441 """
442 - def __init__(self, initial_prob, transition_prob, emission_prob,
443 transition_pseudo, emission_pseudo):
444 """Initialize a Markov Model.
445
446 Note: You should use the MarkovModelBuilder class instead of
447 initiating this class directly.
448
449 Arguments:
450
451 o initial_prob - A dictionary of initial probabilities for all states.
452
453 o transition_prob -- A dictionary of transition probabilities for all
454 possible transitions in the sequence.
455
456 o emission_prob -- A dictionary of emission probabilities for all
457 possible emissions from the sequence states.
458
459 o transition_pseudo -- Pseudo-counts to be used for the transitions,
460 when counting for purposes of estimating transition probabilities.
461
462 o emission_pseudo -- Pseudo-counts to be used for the emissions,
463 when counting for purposes of estimating emission probabilities.
464 """
465
466 self.initial_prob = initial_prob
467
468 self._transition_pseudo = transition_pseudo
469 self._emission_pseudo = emission_pseudo
470
471 self.transition_prob = transition_prob
472 self.emission_prob = emission_prob
473
474
475
476
477 self._transitions_from = \
478 _calculate_from_transitions(self.transition_prob)
479
480
481
482
483 self._transitions_to = \
484 _calculate_to_transitions(self.transition_prob)
485
487 """Get the default transitions for the model.
488
489 Returns a dictionary of all of the default transitions between any
490 two letters in the sequence alphabet. The dictionary is structured
491 with keys as (letter1, letter2) and values as the starting number
492 of transitions.
493 """
494 return self._transition_pseudo
495
497 """Get the starting default emmissions for each sequence.
498
499 This returns a dictionary of the default emmissions for each
500 letter. The dictionary is structured with keys as
501 (seq_letter, emmission_letter) and values as the starting number
502 of emmissions.
503 """
504 return self._emission_pseudo
505
507 """Get all destination states to which there are transitions from the
508 state_letter source state.
509
510 This returns all letters which the given state_letter can transition
511 to. An empty list is returned if state_letter has no outgoing
512 transitions.
513 """
514 if state_letter in self._transitions_from:
515 return self._transitions_from[state_letter]
516 else:
517 return []
518
520 """Get all source states from which there are transitions to the
521 state_letter destination state.
522
523 This returns all letters which the given state_letter is reachable
524 from. An empty list is returned if state_letter is unreachable.
525 """
526 if state_letter in self._transitions_to:
527 return self._transitions_to[state_letter]
528 else:
529 return []
530
531 - def viterbi(self, sequence, state_alphabet):
532 """Calculate the most probable state path using the Viterbi algorithm.
533
534 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
535 al for a full explanation -- this is where I took my implementation
536 ideas from), to allow decoding of the state path, given a sequence
537 of emissions.
538
539 Arguments:
540
541 o sequence -- A Seq object with the emission sequence that we
542 want to decode.
543
544 o state_alphabet -- The alphabet of the possible state sequences
545 that can be generated.
546 """
547
548
549 log_initial = self._log_transform(self.initial_prob)
550 log_trans = self._log_transform(self.transition_prob)
551 log_emission = self._log_transform(self.emission_prob)
552
553 viterbi_probs = {}
554 pred_state_seq = {}
555 state_letters = state_alphabet.letters
556
557
558
559
560
561
562 for i in range(0, len(sequence)):
563
564 for cur_state in state_letters:
565
566 emission_part = log_emission[(cur_state, sequence[i])]
567
568 max_prob = 0
569 if i == 0:
570
571
572 max_prob = log_initial[cur_state]
573 else:
574
575 possible_state_probs = {}
576 for prev_state in self.transitions_to(cur_state):
577
578 trans_part = log_trans[(prev_state, cur_state)]
579
580
581 viterbi_part = viterbi_probs[(prev_state, i - 1)]
582 cur_prob = viterbi_part + trans_part
583
584 possible_state_probs[prev_state] = cur_prob
585
586
587 max_prob = max(possible_state_probs.values())
588
589
590 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
591
592 if i > 0:
593
594 for state in possible_state_probs:
595 if possible_state_probs[state] == max_prob:
596 pred_state_seq[(i - 1, cur_state)] = state
597 break
598
599
600
601
602 all_probs = {}
603 for state in state_letters:
604
605 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
606
607 state_path_prob = max(all_probs.values())
608
609
610 last_state = ''
611 for state in all_probs:
612 if all_probs[state] == state_path_prob:
613 last_state = state
614
615 assert last_state != '', "Didn't find the last state to trace from!"
616
617
618 traceback_seq = MutableSeq('', state_alphabet)
619
620 loop_seq = range(1, len(sequence))
621 loop_seq.reverse()
622
623
624
625
626
627 state = last_state
628 traceback_seq.append(state)
629 for i in loop_seq:
630 state = pred_state_seq[(i - 1, state)]
631 traceback_seq.append(state)
632
633
634 traceback_seq.reverse()
635
636 return traceback_seq.toseq(), state_path_prob
637
661