1
2
3
4
5
6 """Deal with representations of Markov Models."""
7
8 import copy
9 import math
10 import random
11
12
13
14
15 from Bio._py3k import range
16
17 from Bio.Seq import MutableSeq
18
19
21 """Return an array of n random numbers summing to 1.0 (PRIVATE)."""
22 randArray = [random.random() for i in range(n)]
23 total = sum(randArray)
24 normalizedRandArray = [x / total for x in randArray]
25
26 return normalizedRandArray
27
28
30 """Calculate which symbols can be emitted in each state."""
31
32
33 emissions = dict()
34 for state, symbol in emission_probs:
35 try:
36 emissions[state].append(symbol)
37 except KeyError:
38 emissions[state] = [symbol]
39
40 return emissions
41
42
44 """Calculate which 'from transitions' are allowed for each state.
45
46 This looks through all of the trans_probs, and uses this dictionary
47 to determine allowed transitions. It converts this information into
48 a dictionary, whose keys are source states and whose values are
49 lists of destination states reachable from the source state via a
50 transition.
51 """
52 transitions = dict()
53 for from_state, to_state in trans_probs:
54 try:
55 transitions[from_state].append(to_state)
56 except KeyError:
57 transitions[from_state] = [to_state]
58
59 return transitions
60
61
63 """Calculate which 'to transitions' are allowed for each state.
64
65 This looks through all of the trans_probs, and uses this dictionary
66 to determine allowed transitions. It converts this information into
67 a dictionary, whose keys are destination states and whose values are
68 lists of source states from which the destination is reachable via a
69 transition.
70 """
71 transitions = dict()
72 for from_state, to_state in trans_probs:
73 try:
74 transitions[to_state].append(from_state)
75 except KeyError:
76 transitions[to_state] = [from_state]
77
78 return transitions
79
80
82 """Interface to build up a Markov Model.
83
84 This class is designed to try to separate the task of specifying the
85 Markov Model from the actual model itself. This is in hopes of making
86 the actual Markov Model classes smaller.
87
88 So, this builder class should be used to create Markov models instead
89 of trying to initiate a Markov Model directly.
90 """
91
92
93 DEFAULT_PSEUDO = 1
94
95 - def __init__(self, state_alphabet, emission_alphabet):
96 """Initialize a builder to create Markov Models.
97
98 Arguments:
99 - state_alphabet -- An alphabet containing all of the letters that
100 can appear in the states
101 - emission_alphabet -- An alphabet containing all of the letters for
102 states that can be emitted by the HMM.
103
104 """
105 self._state_alphabet = state_alphabet
106 self._emission_alphabet = emission_alphabet
107
108
109
110 self.initial_prob = {}
111
112
113
114 self.transition_prob = {}
115 self.emission_prob = self._all_blank(state_alphabet,
116 emission_alphabet)
117
118
119 self.transition_pseudo = {}
120 self.emission_pseudo = self._all_pseudo(state_alphabet,
121 emission_alphabet)
122
123 - def _all_blank(self, first_alphabet, second_alphabet):
124 """Return a dictionary with all counts set to zero.
125
126 This uses the letters in the first and second alphabet to create
127 a dictionary with keys of two tuples organized as
128 (letter of first alphabet, letter of second alphabet). The values
129 are all set to 0.
130 """
131 all_blank = {}
132 for first_state in first_alphabet.letters:
133 for second_state in second_alphabet.letters:
134 all_blank[(first_state, second_state)] = 0
135
136 return all_blank
137
138 - def _all_pseudo(self, first_alphabet, second_alphabet):
139 """Return a dictionary with all counts set to a default value.
140
141 This takes the letters in first alphabet and second alphabet and
142 creates a dictionary with keys of two tuples organized as:
143 (letter of first alphabet, letter of second alphabet). The values
144 are all set to the value of the class attribute DEFAULT_PSEUDO.
145 """
146 all_counts = {}
147 for first_state in first_alphabet.letters:
148 for second_state in second_alphabet.letters:
149 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
150
151 return all_counts
152
154 """Return the markov model corresponding with the current parameters.
155
156 Each markov model returned by a call to this function is unique
157 (ie. they don't influence each other).
158 """
159
160 if not self.initial_prob:
161 raise Exception("set_initial_probabilities must be called to " +
162 "fully initialize the Markov model")
163
164 initial_prob = copy.deepcopy(self.initial_prob)
165 transition_prob = copy.deepcopy(self.transition_prob)
166 emission_prob = copy.deepcopy(self.emission_prob)
167 transition_pseudo = copy.deepcopy(self.transition_pseudo)
168 emission_pseudo = copy.deepcopy(self.emission_pseudo)
169
170 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
171 transition_pseudo, emission_pseudo)
172
174 """Set initial state probabilities.
175
176 initial_prob is a dictionary mapping states to probabilities.
177 Suppose, for example, that the state alphabet is ['A', 'B']. Call
178 set_initial_prob({'A': 1}) to guarantee that the initial
179 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
180 to make each initial state equally probable.
181
182 This method must now be called in order to use the Markov model
183 because the calculation of initial probabilities has changed
184 incompatibly; the previous calculation was incorrect.
185
186 If initial probabilities are set for all states, then they should add up
187 to 1. Otherwise the sum should be <= 1. The residual probability is
188 divided up evenly between all the states for which the initial
189 probability has not been set. For example, calling
190 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
191 for the above example.
192 """
193 self.initial_prob = copy.copy(initial_prob)
194
195
196 for state in initial_prob:
197 assert state in self._state_alphabet.letters, \
198 "State %s was not found in the sequence alphabet" % state
199
200
201 num_states_not_set =\
202 len(self._state_alphabet.letters) - len(self.initial_prob)
203 if num_states_not_set < 0:
204 raise Exception("Initial probabilities can't exceed # of states")
205 prob_sum = sum(self.initial_prob.values())
206 if prob_sum > 1.0:
207 raise Exception("Total initial probability cannot exceed 1.0")
208 if num_states_not_set > 0:
209 prob = (1.0 - prob_sum) / num_states_not_set
210 for state in self._state_alphabet.letters:
211 if state not in self.initial_prob:
212 self.initial_prob[state] = prob
213
215 """Reset all probabilities to be an average value.
216
217 Resets the values of all initial probabilities and all allowed
218 transitions and all allowed emissions to be equal to 1 divided by the
219 number of possible elements.
220
221 This is useful if you just want to initialize a Markov Model to
222 starting values (ie. if you have no prior notions of what the
223 probabilities should be -- or if you are just feeling too lazy
224 to calculate them :-).
225
226 Warning 1 -- this will reset all currently set probabilities.
227
228 Warning 2 -- This just sets all probabilities for transitions and
229 emissions to total up to 1, so it doesn't ensure that the sum of
230 each set of transitions adds up to 1.
231 """
232
233 new_initial_prob = float(1) / float(len(self.transition_prob))
234 for state in self._state_alphabet.letters:
235 self.initial_prob[state] = new_initial_prob
236
237
238 new_trans_prob = float(1) / float(len(self.transition_prob))
239 for key in self.transition_prob:
240 self.transition_prob[key] = new_trans_prob
241
242
243 new_emission_prob = float(1) / float(len(self.emission_prob))
244 for key in self.emission_prob:
245 self.emission_prob[key] = new_emission_prob
246
248 """Set all initial state probabilities to a randomly generated distribution.
249
250 Returns the dictionary containing the initial probabilities.
251 """
252 initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
253 for state in self._state_alphabet.letters:
254 self.initial_prob[state] = initial_freqs.pop()
255
256 return self.initial_prob
257
259 """Set all allowed transition probabilities to a randomly generated distribution.
260
261 Returns the dictionary containing the transition probabilities.
262 """
263 if not self.transition_prob:
264 raise Exception("No transitions have been allowed yet. " +
265 "Allow some or all transitions by calling " +
266 "allow_transition or allow_all_transitions first.")
267
268 transitions_from = _calculate_from_transitions(self.transition_prob)
269 for from_state in transitions_from:
270 freqs = _gen_random_array(len(transitions_from[from_state]))
271 for to_state in transitions_from[from_state]:
272 self.transition_prob[(from_state, to_state)] = freqs.pop()
273
274 return self.transition_prob
275
277 """Set all allowed emission probabilities to a randomly generated distribution.
278
279 Returns the dictionary containing the emission probabilities.
280 """
281 if not self.emission_prob:
282 raise Exception("No emissions have been allowed yet. " +
283 "Allow some or all emissions.")
284
285 emissions = _calculate_emissions(self.emission_prob)
286 for state in emissions:
287 freqs = _gen_random_array(len(emissions[state]))
288 for symbol in emissions[state]:
289 self.emission_prob[(state, symbol)] = freqs.pop()
290
291 return self.emission_prob
292
302
303
304
306 """Create transitions between all states.
307
308 By default all transitions within the alphabet are disallowed;
309 this is a convenience function to change this to allow all
310 possible transitions.
311 """
312
313
314 all_probs = self._all_blank(self._state_alphabet,
315 self._state_alphabet)
316
317 all_pseudo = self._all_pseudo(self._state_alphabet,
318 self._state_alphabet)
319
320
321
322 for set_key in self.transition_prob:
323 all_probs[set_key] = self.transition_prob[set_key]
324
325 for set_key in self.transition_pseudo:
326 all_pseudo[set_key] = self.transition_pseudo[set_key]
327
328
329 self.transition_prob = all_probs
330 self.transition_pseudo = all_pseudo
331
332 - def allow_transition(self, from_state, to_state, probability=None,
333 pseudocount=None):
334 """Set a transition as being possible between the two states.
335
336 probability and pseudocount are optional arguments
337 specifying the probabilities and pseudo counts for the transition.
338 If these are not supplied, then the values are set to the
339 default values.
340
341 Raises:
342 KeyError -- if the two states already have an allowed transition.
343
344 """
345
346 for state in [from_state, to_state]:
347 assert state in self._state_alphabet.letters, \
348 "State %s was not found in the sequence alphabet" % state
349
350
351 if (from_state, to_state) not in self.transition_prob and \
352 (from_state, to_state) not in self.transition_pseudo:
353
354 if probability is None:
355 probability = 0
356 self.transition_prob[(from_state, to_state)] = probability
357
358
359 if pseudocount is None:
360 pseudcount = self.DEFAULT_PSEUDO
361 self.transition_pseudo[(from_state, to_state)] = pseudocount
362 else:
363 raise KeyError("Transition from %s to %s is already allowed."
364 % (from_state, to_state))
365
367 """Restrict transitions between the two states.
368
369 Raises:
370 KeyError if the transition is not currently allowed.
371
372 """
373 try:
374 del self.transition_prob[(from_state, to_state)]
375 del self.transition_pseudo[(from_state, to_state)]
376 except KeyError:
377 raise KeyError("Transition from %s to %s is already disallowed."
378 % (from_state, to_state))
379
381 """Set the probability of a transition between two states.
382
383 Raises:
384 KeyError if the transition is not allowed.
385
386 """
387 if (from_state, to_state) in self.transition_prob:
388 self.transition_prob[(from_state, to_state)] = probability
389 else:
390 raise KeyError("Transition from %s to %s is not allowed."
391 % (from_state, to_state))
392
394 """Set the default pseudocount for a transition.
395
396 To avoid computational problems, it is helpful to be able to
397 set a 'default' pseudocount to start with for estimating
398 transition and emission probabilities (see p62 in Durbin et al
399 for more discussion on this. By default, all transitions have
400 a pseudocount of 1.
401
402 Raises:
403 KeyError if the transition is not allowed.
404
405 """
406 if (from_state, to_state) in self.transition_pseudo:
407 self.transition_pseudo[(from_state, to_state)] = count
408 else:
409 raise KeyError("Transition from %s to %s is not allowed."
410 % (from_state, to_state))
411
412
413
415 """Set the probability of a emission from a particular state.
416
417 Raises:
418 KeyError if the emission from the given state is not allowed.
419
420 """
421 if (seq_state, emission_state) in self.emission_prob:
422 self.emission_prob[(seq_state, emission_state)] = probability
423 else:
424 raise KeyError("Emission of %s from %s is not allowed."
425 % (emission_state, seq_state))
426
428 """Set the default pseudocount for an emission.
429
430 To avoid computational problems, it is helpful to be able to
431 set a 'default' pseudocount to start with for estimating
432 transition and emission probabilities (see p62 in Durbin et al
433 for more discussion on this. By default, all emissions have
434 a pseudocount of 1.
435
436 Raises:
437 KeyError if the emission from the given state is not allowed.
438
439 """
440 if (seq_state, emission_state) in self.emission_pseudo:
441 self.emission_pseudo[(seq_state, emission_state)] = count
442 else:
443 raise KeyError("Emission of %s from %s is not allowed."
444 % (emission_state, seq_state))
445
446
448 """Represent a hidden markov model that can be used for state estimation."""
449
450 - def __init__(self, initial_prob, transition_prob, emission_prob,
451 transition_pseudo, emission_pseudo):
452 """Initialize a Markov Model.
453
454 Note: You should use the MarkovModelBuilder class instead of
455 initiating this class directly.
456
457 Arguments:
458 - initial_prob - A dictionary of initial probabilities for all states.
459 - transition_prob -- A dictionary of transition probabilities for all
460 possible transitions in the sequence.
461 - emission_prob -- A dictionary of emission probabilities for all
462 possible emissions from the sequence states.
463 - transition_pseudo -- Pseudo-counts to be used for the transitions,
464 when counting for purposes of estimating transition probabilities.
465 - emission_pseudo -- Pseudo-counts to be used for the emissions,
466 when counting for purposes of estimating emission probabilities.
467
468 """
469 self.initial_prob = initial_prob
470
471 self._transition_pseudo = transition_pseudo
472 self._emission_pseudo = emission_pseudo
473
474 self.transition_prob = transition_prob
475 self.emission_prob = emission_prob
476
477
478
479
480 self._transitions_from = \
481 _calculate_from_transitions(self.transition_prob)
482
483
484
485
486 self._transitions_to = \
487 _calculate_to_transitions(self.transition_prob)
488
490 """Get the default transitions for the model.
491
492 Returns a dictionary of all of the default transitions between any
493 two letters in the sequence alphabet. The dictionary is structured
494 with keys as (letter1, letter2) and values as the starting number
495 of transitions.
496 """
497 return self._transition_pseudo
498
500 """Get the starting default emmissions for each sequence.
501
502 This returns a dictionary of the default emmissions for each
503 letter. The dictionary is structured with keys as
504 (seq_letter, emmission_letter) and values as the starting number
505 of emmissions.
506 """
507 return self._emission_pseudo
508
510 """Get all destination states which can transition from source state_letter.
511
512 This returns all letters which the given state_letter can transition
513 to, i.e. all the destination states reachable from state_letter.
514
515 An empty list is returned if state_letter has no outgoing transitions.
516 """
517 if state_letter in self._transitions_from:
518 return self._transitions_from[state_letter]
519 else:
520 return []
521
523 """Get all source states which can transition to destination state_letter.
524
525 This returns all letters which the given state_letter is reachable
526 from, i.e. all the source states which can reach state_later
527
528 An empty list is returned if state_letter is unreachable.
529 """
530 if state_letter in self._transitions_to:
531 return self._transitions_to[state_letter]
532 else:
533 return []
534
535 - def viterbi(self, sequence, state_alphabet):
536 """Calculate the most probable state path using the Viterbi algorithm.
537
538 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
539 al for a full explanation -- this is where I took my implementation
540 ideas from), to allow decoding of the state path, given a sequence
541 of emissions.
542
543 Arguments:
544 - sequence -- A Seq object with the emission sequence that we
545 want to decode.
546 - state_alphabet -- The alphabet of the possible state sequences
547 that can be generated.
548
549 """
550
551 log_initial = self._log_transform(self.initial_prob)
552 log_trans = self._log_transform(self.transition_prob)
553 log_emission = self._log_transform(self.emission_prob)
554
555 viterbi_probs = {}
556 pred_state_seq = {}
557 state_letters = state_alphabet.letters
558
559
560
561
562
563
564 for i in range(0, len(sequence)):
565
566 for cur_state in state_letters:
567
568 emission_part = log_emission[(cur_state, sequence[i])]
569
570 max_prob = 0
571 if i == 0:
572
573
574 max_prob = log_initial[cur_state]
575 else:
576
577 possible_state_probs = {}
578 for prev_state in self.transitions_to(cur_state):
579
580 trans_part = log_trans[(prev_state, cur_state)]
581
582
583 viterbi_part = viterbi_probs[(prev_state, i - 1)]
584 cur_prob = viterbi_part + trans_part
585
586 possible_state_probs[prev_state] = cur_prob
587
588
589 max_prob = max(possible_state_probs.values())
590
591
592 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
593
594 if i > 0:
595
596 for state in possible_state_probs:
597 if possible_state_probs[state] == max_prob:
598 pred_state_seq[(i - 1, cur_state)] = state
599 break
600
601
602
603
604 all_probs = {}
605 for state in state_letters:
606
607 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
608
609 state_path_prob = max(all_probs.values())
610
611
612 last_state = ''
613 for state in all_probs:
614 if all_probs[state] == state_path_prob:
615 last_state = state
616
617 assert last_state != '', "Didn't find the last state to trace from!"
618
619
620 traceback_seq = MutableSeq('', state_alphabet)
621
622 loop_seq = list(range(1, len(sequence)))
623 loop_seq.reverse()
624
625
626
627
628
629 state = last_state
630 traceback_seq.append(state)
631 for i in loop_seq:
632 state = pred_state_seq[(i - 1, state)]
633 traceback_seq.append(state)
634
635
636 traceback_seq.reverse()
637
638 return traceback_seq.toseq(), state_path_prob
639
663