1
2
3
4
5
6 """Deal with representations of Markov Models.
7 """
8
9 import copy
10 import math
11 import random
12
13
14
15
16 from Bio._py3k import range
17
18 from Bio.Seq import MutableSeq
19
20
22 """ Return an array of n random numbers, where the elements of the array sum
23 to 1.0"""
24 randArray = [random.random() for i in range(n)]
25 total = sum(randArray)
26 normalizedRandArray = [x / total for x in randArray]
27
28 return normalizedRandArray
29
30
32 """Calculate which symbols can be emitted in each state
33 """
34
35
36 emissions = dict()
37 for state, symbol in emission_probs:
38 try:
39 emissions[state].append(symbol)
40 except KeyError:
41 emissions[state] = [symbol]
42
43 return emissions
44
45
47 """Calculate which 'from transitions' are allowed for each state
48
49 This looks through all of the trans_probs, and uses this dictionary
50 to determine allowed transitions. It converts this information into
51 a dictionary, whose keys are source states and whose values are
52 lists of destination states reachable from the source state via a
53 transition.
54 """
55 transitions = dict()
56 for from_state, to_state in trans_probs:
57 try:
58 transitions[from_state].append(to_state)
59 except KeyError:
60 transitions[from_state] = [to_state]
61
62 return transitions
63
64
66 """Calculate which 'to transitions' are allowed for each state
67
68 This looks through all of the trans_probs, and uses this dictionary
69 to determine allowed transitions. It converts this information into
70 a dictionary, whose keys are destination states and whose values are
71 lists of source states from which the destination is reachable via a
72 transition.
73 """
74 transitions = dict()
75 for from_state, to_state in trans_probs:
76 try:
77 transitions[to_state].append(from_state)
78 except KeyError:
79 transitions[to_state] = [from_state]
80
81 return transitions
82
83
85 """Interface to build up a Markov Model.
86
87 This class is designed to try to separate the task of specifying the
88 Markov Model from the actual model itself. This is in hopes of making
89 the actual Markov Model classes smaller.
90
91 So, this builder class should be used to create Markov models instead
92 of trying to initiate a Markov Model directly.
93 """
94
95 DEFAULT_PSEUDO = 1
96
97 - def __init__(self, state_alphabet, emission_alphabet):
98 """Initialize a builder to create Markov Models.
99
100 Arguments:
101
102 o state_alphabet -- An alphabet containing all of the letters that
103 can appear in the states
104
105 o emission_alphabet -- An alphabet containing all of the letters for
106 states that can be emitted by the HMM.
107 """
108 self._state_alphabet = state_alphabet
109 self._emission_alphabet = emission_alphabet
110
111
112
113 self.initial_prob = {}
114
115
116
117 self.transition_prob = {}
118 self.emission_prob = self._all_blank(state_alphabet,
119 emission_alphabet)
120
121
122 self.transition_pseudo = {}
123 self.emission_pseudo = self._all_pseudo(state_alphabet,
124 emission_alphabet)
125
126 - def _all_blank(self, first_alphabet, second_alphabet):
127 """Return a dictionary with all counts set to zero.
128
129 This uses the letters in the first and second alphabet to create
130 a dictionary with keys of two tuples organized as
131 (letter of first alphabet, letter of second alphabet). The values
132 are all set to 0.
133 """
134 all_blank = {}
135 for first_state in first_alphabet.letters:
136 for second_state in second_alphabet.letters:
137 all_blank[(first_state, second_state)] = 0
138
139 return all_blank
140
141 - def _all_pseudo(self, first_alphabet, second_alphabet):
142 """Return a dictionary with all counts set to a default value.
143
144 This takes the letters in first alphabet and second alphabet and
145 creates a dictionary with keys of two tuples organized as:
146 (letter of first alphabet, letter of second alphabet). The values
147 are all set to the value of the class attribute DEFAULT_PSEUDO.
148 """
149 all_counts = {}
150 for first_state in first_alphabet.letters:
151 for second_state in second_alphabet.letters:
152 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
153
154 return all_counts
155
157 """Return the markov model corresponding with the current parameters.
158
159 Each markov model returned by a call to this function is unique
160 (ie. they don't influence each other).
161 """
162
163 if not self.initial_prob:
164 raise Exception("set_initial_probabilities must be called to " +
165 "fully initialize the Markov model")
166
167 initial_prob = copy.deepcopy(self.initial_prob)
168 transition_prob = copy.deepcopy(self.transition_prob)
169 emission_prob = copy.deepcopy(self.emission_prob)
170 transition_pseudo = copy.deepcopy(self.transition_pseudo)
171 emission_pseudo = copy.deepcopy(self.emission_pseudo)
172
173 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
174 transition_pseudo, emission_pseudo)
175
177 """Set initial state probabilities.
178
179 initial_prob is a dictionary mapping states to probabilities.
180 Suppose, for example, that the state alphabet is ['A', 'B']. Call
181 set_initial_prob({'A': 1}) to guarantee that the initial
182 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
183 to make each initial state equally probable.
184
185 This method must now be called in order to use the Markov model
186 because the calculation of initial probabilities has changed
187 incompatibly; the previous calculation was incorrect.
188
189 If initial probabilities are set for all states, then they should add up
190 to 1. Otherwise the sum should be <= 1. The residual probability is
191 divided up evenly between all the states for which the initial
192 probability has not been set. For example, calling
193 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
194 for the above example.
195 """
196 self.initial_prob = copy.copy(initial_prob)
197
198
199 for state in initial_prob:
200 assert state in self._state_alphabet.letters, \
201 "State %s was not found in the sequence alphabet" % state
202
203
204 num_states_not_set =\
205 len(self._state_alphabet.letters) - len(self.initial_prob)
206 if num_states_not_set < 0:
207 raise Exception("Initial probabilities can't exceed # of states")
208 prob_sum = sum(self.initial_prob.values())
209 if prob_sum > 1.0:
210 raise Exception("Total initial probability cannot exceed 1.0")
211 if num_states_not_set > 0:
212 prob = (1.0 - prob_sum) / num_states_not_set
213 for state in self._state_alphabet.letters:
214 if state not in self.initial_prob:
215 self.initial_prob[state] = prob
216
218 """Reset all probabilities to be an average value.
219
220 Resets the values of all initial probabilities and all allowed
221 transitions and all allowed emissions to be equal to 1 divided by the
222 number of possible elements.
223
224 This is useful if you just want to initialize a Markov Model to
225 starting values (ie. if you have no prior notions of what the
226 probabilities should be -- or if you are just feeling too lazy
227 to calculate them :-).
228
229 Warning 1 -- this will reset all currently set probabilities.
230
231 Warning 2 -- This just sets all probabilities for transitions and
232 emissions to total up to 1, so it doesn't ensure that the sum of
233 each set of transitions adds up to 1.
234 """
235
236 new_initial_prob = float(1) / float(len(self.transition_prob))
237 for state in self._state_alphabet.letters:
238 self.initial_prob[state] = new_initial_prob
239
240
241 new_trans_prob = float(1) / float(len(self.transition_prob))
242 for key in self.transition_prob:
243 self.transition_prob[key] = new_trans_prob
244
245
246 new_emission_prob = float(1) / float(len(self.emission_prob))
247 for key in self.emission_prob:
248 self.emission_prob[key] = new_emission_prob
249
251 """Set all initial state probabilities to a randomly generated distribution.
252 Returns the dictionary containing the initial probabilities.
253 """
254 initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
255 for state in self._state_alphabet.letters:
256 self.initial_prob[state] = initial_freqs.pop()
257
258 return self.initial_prob
259
261 """Set all allowed transition probabilities to a randomly generated distribution.
262
263 Returns the dictionary containing the transition probabilities.
264 """
265 if not self.transition_prob:
266 raise Exception("No transitions have been allowed yet. " +
267 "Allow some or all transitions by calling " +
268 "allow_transition or allow_all_transitions first.")
269
270 transitions_from = _calculate_from_transitions(self.transition_prob)
271 for from_state in transitions_from:
272 freqs = _gen_random_array(len(transitions_from[from_state]))
273 for to_state in transitions_from[from_state]:
274 self.transition_prob[(from_state, to_state)] = freqs.pop()
275
276 return self.transition_prob
277
279 """Set all allowed emission probabilities to a randomly generated distribution.
280
281 Returns the dictionary containing the emission probabilities.
282 """
283 if not self.emission_prob:
284 raise Exception("No emissions have been allowed yet. " +
285 "Allow some or all emissions.")
286
287 emissions = _calculate_emissions(self.emission_prob)
288 for state in emissions:
289 freqs = _gen_random_array(len(emissions[state]))
290 for symbol in emissions[state]:
291 self.emission_prob[(state, symbol)] = freqs.pop()
292
293 return self.emission_prob
294
304
305
306
308 """A convenience function to create transitions between all states.
309
310 By default all transitions within the alphabet are disallowed; this
311 is a way to change this to allow all possible transitions.
312 """
313
314
315 all_probs = self._all_blank(self._state_alphabet,
316 self._state_alphabet)
317
318 all_pseudo = self._all_pseudo(self._state_alphabet,
319 self._state_alphabet)
320
321
322
323 for set_key in self.transition_prob:
324 all_probs[set_key] = self.transition_prob[set_key]
325
326 for set_key in self.transition_pseudo:
327 all_pseudo[set_key] = self.transition_pseudo[set_key]
328
329
330 self.transition_prob = all_probs
331 self.transition_pseudo = all_pseudo
332
333 - def allow_transition(self, from_state, to_state, probability=None,
334 pseudocount=None):
335 """Set a transition as being possible between the two states.
336
337 probability and pseudocount are optional arguments
338 specifying the probabilities and pseudo counts for the transition.
339 If these are not supplied, then the values are set to the
340 default values.
341
342 Raises:
343 KeyError -- if the two states already have an allowed transition.
344 """
345
346 for state in [from_state, to_state]:
347 assert state in self._state_alphabet.letters, \
348 "State %s was not found in the sequence alphabet" % state
349
350
351 if (from_state, to_state) not in self.transition_prob and \
352 (from_state, to_state) not in self.transition_pseudo:
353
354 if probability is None:
355 probability = 0
356 self.transition_prob[(from_state, to_state)] = probability
357
358
359 if pseudocount is None:
360 pseudcount = self.DEFAULT_PSEUDO
361 self.transition_pseudo[(from_state, to_state)] = pseudocount
362 else:
363 raise KeyError("Transition from %s to %s is already allowed."
364 % (from_state, to_state))
365
367 """Restrict transitions between the two states.
368
369 Raises:
370 KeyError if the transition is not currently allowed.
371 """
372 try:
373 del self.transition_prob[(from_state, to_state)]
374 del self.transition_pseudo[(from_state, to_state)]
375 except KeyError:
376 raise KeyError("Transition from %s to %s is already disallowed."
377 % (from_state, to_state))
378
380 """Set the probability of a transition between two states.
381
382 Raises:
383 KeyError if the transition is not allowed.
384 """
385 if (from_state, to_state) in self.transition_prob:
386 self.transition_prob[(from_state, to_state)] = probability
387 else:
388 raise KeyError("Transition from %s to %s is not allowed."
389 % (from_state, to_state))
390
392 """Set the default pseudocount for a transition.
393
394 To avoid computational problems, it is helpful to be able to
395 set a 'default' pseudocount to start with for estimating
396 transition and emission probabilities (see p62 in Durbin et al
397 for more discussion on this. By default, all transitions have
398 a pseudocount of 1.
399
400 Raises:
401 KeyError if the transition is not allowed.
402 """
403 if (from_state, to_state) in self.transition_pseudo:
404 self.transition_pseudo[(from_state, to_state)] = count
405 else:
406 raise KeyError("Transition from %s to %s is not allowed."
407 % (from_state, to_state))
408
409
410
412 """Set the probability of a emission from a particular state.
413
414 Raises:
415 KeyError if the emission from the given state is not allowed.
416 """
417 if (seq_state, emission_state) in self.emission_prob:
418 self.emission_prob[(seq_state, emission_state)] = probability
419 else:
420 raise KeyError("Emission of %s from %s is not allowed."
421 % (emission_state, seq_state))
422
424 """Set the default pseudocount for an emission.
425
426 To avoid computational problems, it is helpful to be able to
427 set a 'default' pseudocount to start with for estimating
428 transition and emission probabilities (see p62 in Durbin et al
429 for more discussion on this. By default, all emissions have
430 a pseudocount of 1.
431
432 Raises:
433 KeyError if the emission from the given state is not allowed.
434 """
435 if (seq_state, emission_state) in self.emission_pseudo:
436 self.emission_pseudo[(seq_state, emission_state)] = count
437 else:
438 raise KeyError("Emission of %s from %s is not allowed."
439 % (emission_state, seq_state))
440
441
443 """Represent a hidden markov model that can be used for state estimation."""
444
445 - def __init__(self, initial_prob, transition_prob, emission_prob,
446 transition_pseudo, emission_pseudo):
447 """Initialize a Markov Model.
448
449 Note: You should use the MarkovModelBuilder class instead of
450 initiating this class directly.
451
452 Arguments:
453
454 o initial_prob - A dictionary of initial probabilities for all states.
455
456 o transition_prob -- A dictionary of transition probabilities for all
457 possible transitions in the sequence.
458
459 o emission_prob -- A dictionary of emission probabilities for all
460 possible emissions from the sequence states.
461
462 o transition_pseudo -- Pseudo-counts to be used for the transitions,
463 when counting for purposes of estimating transition probabilities.
464
465 o emission_pseudo -- Pseudo-counts to be used for the emissions,
466 when counting for purposes of estimating emission probabilities.
467 """
468 self.initial_prob = initial_prob
469
470 self._transition_pseudo = transition_pseudo
471 self._emission_pseudo = emission_pseudo
472
473 self.transition_prob = transition_prob
474 self.emission_prob = emission_prob
475
476
477
478
479 self._transitions_from = \
480 _calculate_from_transitions(self.transition_prob)
481
482
483
484
485 self._transitions_to = \
486 _calculate_to_transitions(self.transition_prob)
487
489 """Get the default transitions for the model.
490
491 Returns a dictionary of all of the default transitions between any
492 two letters in the sequence alphabet. The dictionary is structured
493 with keys as (letter1, letter2) and values as the starting number
494 of transitions.
495 """
496 return self._transition_pseudo
497
499 """Get the starting default emmissions for each sequence.
500
501 This returns a dictionary of the default emmissions for each
502 letter. The dictionary is structured with keys as
503 (seq_letter, emmission_letter) and values as the starting number
504 of emmissions.
505 """
506 return self._emission_pseudo
507
509 """Get all destination states to which there are transitions from the
510 state_letter source state.
511
512 This returns all letters which the given state_letter can transition
513 to. An empty list is returned if state_letter has no outgoing
514 transitions.
515 """
516 if state_letter in self._transitions_from:
517 return self._transitions_from[state_letter]
518 else:
519 return []
520
522 """Get all source states from which there are transitions to the
523 state_letter destination state.
524
525 This returns all letters which the given state_letter is reachable
526 from. An empty list is returned if state_letter is unreachable.
527 """
528 if state_letter in self._transitions_to:
529 return self._transitions_to[state_letter]
530 else:
531 return []
532
533 - def viterbi(self, sequence, state_alphabet):
534 """Calculate the most probable state path using the Viterbi algorithm.
535
536 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
537 al for a full explanation -- this is where I took my implementation
538 ideas from), to allow decoding of the state path, given a sequence
539 of emissions.
540
541 Arguments:
542
543 o sequence -- A Seq object with the emission sequence that we
544 want to decode.
545
546 o state_alphabet -- The alphabet of the possible state sequences
547 that can be generated.
548 """
549
550 log_initial = self._log_transform(self.initial_prob)
551 log_trans = self._log_transform(self.transition_prob)
552 log_emission = self._log_transform(self.emission_prob)
553
554 viterbi_probs = {}
555 pred_state_seq = {}
556 state_letters = state_alphabet.letters
557
558
559
560
561
562
563 for i in range(0, len(sequence)):
564
565 for cur_state in state_letters:
566
567 emission_part = log_emission[(cur_state, sequence[i])]
568
569 max_prob = 0
570 if i == 0:
571
572
573 max_prob = log_initial[cur_state]
574 else:
575
576 possible_state_probs = {}
577 for prev_state in self.transitions_to(cur_state):
578
579 trans_part = log_trans[(prev_state, cur_state)]
580
581
582 viterbi_part = viterbi_probs[(prev_state, i - 1)]
583 cur_prob = viterbi_part + trans_part
584
585 possible_state_probs[prev_state] = cur_prob
586
587
588 max_prob = max(possible_state_probs.values())
589
590
591 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
592
593 if i > 0:
594
595 for state in possible_state_probs:
596 if possible_state_probs[state] == max_prob:
597 pred_state_seq[(i - 1, cur_state)] = state
598 break
599
600
601
602
603 all_probs = {}
604 for state in state_letters:
605
606 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
607
608 state_path_prob = max(all_probs.values())
609
610
611 last_state = ''
612 for state in all_probs:
613 if all_probs[state] == state_path_prob:
614 last_state = state
615
616 assert last_state != '', "Didn't find the last state to trace from!"
617
618
619 traceback_seq = MutableSeq('', state_alphabet)
620
621 loop_seq = list(range(1, len(sequence)))
622 loop_seq.reverse()
623
624
625
626
627
628 state = last_state
629 traceback_seq.append(state)
630 for i in loop_seq:
631 state = pred_state_seq[(i - 1, state)]
632 traceback_seq.append(state)
633
634
635 traceback_seq.reverse()
636
637 return traceback_seq.toseq(), state_path_prob
638
662