[frames] | no frames]

# Source Code for Module Bio.HMM.MarkovModel

```  1  # This code is part of the Biopython distribution and governed by its
3  # as part of this package.
4  #
5
6  """Deal with representations of Markov Models."""
7  # standard modules
8  import copy
9  import math
10  import random
11
12  # TODO - Take advantage of defaultdict once Python 2.4 is dead?
13  # from collections import defaultdict
14
15  from Bio._py3k import range
16
17  from Bio.Seq import MutableSeq
18
19
20 -def _gen_random_array(n):
21      """Return an array of n random numbers summing to 1.0 (PRIVATE)."""
22      randArray = [random.random() for i in range(n)]
23      total = sum(randArray)
24      normalizedRandArray = [x / total for x in randArray]
25
26      return normalizedRandArray
27
28
29 -def _calculate_emissions(emission_probs):
30      """Calculate which symbols can be emitted in each state."""
31      # loop over all of the state-symbol duples, mapping states to
32      # lists of emitted symbols
33      emissions = dict()
34      for state, symbol in emission_probs:
35          try:
36              emissions[state].append(symbol)
37          except KeyError:
38              emissions[state] = [symbol]
39
40      return emissions
41
42
43 -def _calculate_from_transitions(trans_probs):
44      """Calculate which 'from transitions' are allowed for each state.
45
46      This looks through all of the trans_probs, and uses this dictionary
47      to determine allowed transitions. It converts this information into
48      a dictionary, whose keys are source states and whose values are
49      lists of destination states reachable from the source state via a
50      transition.
51      """
52      transitions = dict()
53      for from_state, to_state in trans_probs:
54          try:
55              transitions[from_state].append(to_state)
56          except KeyError:
57              transitions[from_state] = [to_state]
58
59      return transitions
60
61
62 -def _calculate_to_transitions(trans_probs):
63      """Calculate which 'to transitions' are allowed for each state.
64
65      This looks through all of the trans_probs, and uses this dictionary
66      to determine allowed transitions. It converts this information into
67      a dictionary, whose keys are destination states and whose values are
68      lists of source states from which the destination is reachable via a
69      transition.
70      """
71      transitions = dict()
72      for from_state, to_state in trans_probs:
73          try:
74              transitions[to_state].append(from_state)
75          except KeyError:
76              transitions[to_state] = [from_state]
77
78      return transitions
79
80
81 -class MarkovModelBuilder(object):
82      """Interface to build up a Markov Model.
83
84      This class is designed to try to separate the task of specifying the
85      Markov Model from the actual model itself. This is in hopes of making
86      the actual Markov Model classes smaller.
87
88      So, this builder class should be used to create Markov models instead
89      of trying to initiate a Markov Model directly.
90      """
91
92      # the default pseudo counts to use
93      DEFAULT_PSEUDO = 1
94
95 -    def __init__(self, state_alphabet, emission_alphabet):
96          """Initialize a builder to create Markov Models.
97
98          Arguments:
99           - state_alphabet -- An alphabet containing all of the letters that
100             can appear in the states
101           - emission_alphabet -- An alphabet containing all of the letters for
102             states that can be emitted by the HMM.
103
104          """
105          self._state_alphabet = state_alphabet
106          self._emission_alphabet = emission_alphabet
107
108          # probabilities for the initial state, initialized by calling
109          # set_initial_probabilities (required)
110          self.initial_prob = {}
111
112          # the probabilities for transitions and emissions
113          # by default we have no transitions and all possible emissions
114          self.transition_prob = {}
115          self.emission_prob = self._all_blank(state_alphabet,
116                                               emission_alphabet)
117
118          # the default pseudocounts for transition and emission counting
119          self.transition_pseudo = {}
120          self.emission_pseudo = self._all_pseudo(state_alphabet,
121                                                  emission_alphabet)
122
123 -    def _all_blank(self, first_alphabet, second_alphabet):
124          """Return a dictionary with all counts set to zero.
125
126          This uses the letters in the first and second alphabet to create
127          a dictionary with keys of two tuples organized as
128          (letter of first alphabet, letter of second alphabet). The values
129          are all set to 0.
130          """
131          all_blank = {}
132          for first_state in first_alphabet.letters:
133              for second_state in second_alphabet.letters:
134                  all_blank[(first_state, second_state)] = 0
135
136          return all_blank
137
138 -    def _all_pseudo(self, first_alphabet, second_alphabet):
139          """Return a dictionary with all counts set to a default value.
140
141          This takes the letters in first alphabet and second alphabet and
142          creates a dictionary with keys of two tuples organized as:
143          (letter of first alphabet, letter of second alphabet). The values
144          are all set to the value of the class attribute DEFAULT_PSEUDO.
145          """
146          all_counts = {}
147          for first_state in first_alphabet.letters:
148              for second_state in second_alphabet.letters:
149                  all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
150
151          return all_counts
152
153 -    def get_markov_model(self):
154          """Return the markov model corresponding with the current parameters.
155
156          Each markov model returned by a call to this function is unique
157          (ie. they don't influence each other).
158          """
159          # user must set initial probabilities
160          if not self.initial_prob:
161              raise Exception("set_initial_probabilities must be called to " +
162                              "fully initialize the Markov model")
163
164          initial_prob = copy.deepcopy(self.initial_prob)
165          transition_prob = copy.deepcopy(self.transition_prob)
166          emission_prob = copy.deepcopy(self.emission_prob)
167          transition_pseudo = copy.deepcopy(self.transition_pseudo)
168          emission_pseudo = copy.deepcopy(self.emission_pseudo)
169
170          return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
171                                   transition_pseudo, emission_pseudo)
172
173 -    def set_initial_probabilities(self, initial_prob):
174          """Set initial state probabilities.
175
176          initial_prob is a dictionary mapping states to probabilities.
177          Suppose, for example, that the state alphabet is ['A', 'B']. Call
178          set_initial_prob({'A': 1}) to guarantee that the initial
179          state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
180          to make each initial state equally probable.
181
182          This method must now be called in order to use the Markov model
183          because the calculation of initial probabilities has changed
184          incompatibly; the previous calculation was incorrect.
185
186          If initial probabilities are set for all states, then they should add up
187          to 1. Otherwise the sum should be <= 1. The residual probability is
188          divided up evenly between all the states for which the initial
189          probability has not been set. For example, calling
190          set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
191          for the above example.
192          """
193          self.initial_prob = copy.copy(initial_prob)
194
195          # ensure that all referenced states are valid
196          for state in initial_prob:
197              assert state in self._state_alphabet.letters, \
198                     "State %s was not found in the sequence alphabet" % state
199
200          # distribute the residual probability, if any
201          num_states_not_set =\
202              len(self._state_alphabet.letters) - len(self.initial_prob)
203          if num_states_not_set < 0:
204              raise Exception("Initial probabilities can't exceed # of states")
205          prob_sum = sum(self.initial_prob.values())
206          if prob_sum > 1.0:
207              raise Exception("Total initial probability cannot exceed 1.0")
208          if num_states_not_set > 0:
209              prob = (1.0 - prob_sum) / num_states_not_set
210              for state in self._state_alphabet.letters:
211                  if state not in self.initial_prob:
212                      self.initial_prob[state] = prob
213
214 -    def set_equal_probabilities(self):
215          """Reset all probabilities to be an average value.
216
217          Resets the values of all initial probabilities and all allowed
218          transitions and all allowed emissions to be equal to 1 divided by the
219          number of possible elements.
220
221          This is useful if you just want to initialize a Markov Model to
222          starting values (ie. if you have no prior notions of what the
223          probabilities should be -- or if you are just feeling too lazy
224          to calculate them :-).
225
226          Warning 1 -- this will reset all currently set probabilities.
227
228          Warning 2 -- This just sets all probabilities for transitions and
229          emissions to total up to 1, so it doesn't ensure that the sum of
230          each set of transitions adds up to 1.
231          """
232          # set initial state probabilities
233          new_initial_prob = float(1) / float(len(self.transition_prob))
234          for state in self._state_alphabet.letters:
235              self.initial_prob[state] = new_initial_prob
236
237          # set the transitions
238          new_trans_prob = float(1) / float(len(self.transition_prob))
239          for key in self.transition_prob:
240              self.transition_prob[key] = new_trans_prob
241
242          # set the emissions
243          new_emission_prob = float(1) / float(len(self.emission_prob))
244          for key in self.emission_prob:
245              self.emission_prob[key] = new_emission_prob
246
247 -    def set_random_initial_probabilities(self):
248          """Set all initial state probabilities to a randomly generated distribution.
249
250          Returns the dictionary containing the initial probabilities.
251          """
252          initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
253          for state in self._state_alphabet.letters:
254              self.initial_prob[state] = initial_freqs.pop()
255
256          return self.initial_prob
257
258 -    def set_random_transition_probabilities(self):
259          """Set all allowed transition probabilities to a randomly generated distribution.
260
261          Returns the dictionary containing the transition probabilities.
262          """
263          if not self.transition_prob:
264              raise Exception("No transitions have been allowed yet. " +
265                              "Allow some or all transitions by calling " +
266                              "allow_transition or allow_all_transitions first.")
267
268          transitions_from = _calculate_from_transitions(self.transition_prob)
269          for from_state in transitions_from:
270              freqs = _gen_random_array(len(transitions_from[from_state]))
271              for to_state in transitions_from[from_state]:
272                  self.transition_prob[(from_state, to_state)] = freqs.pop()
273
274          return self.transition_prob
275
276 -    def set_random_emission_probabilities(self):
277          """Set all allowed emission probabilities to a randomly generated distribution.
278
279          Returns the dictionary containing the emission probabilities.
280          """
281          if not self.emission_prob:
282              raise Exception("No emissions have been allowed yet. " +
283                              "Allow some or all emissions.")
284
285          emissions = _calculate_emissions(self.emission_prob)
286          for state in emissions:
287              freqs = _gen_random_array(len(emissions[state]))
288              for symbol in emissions[state]:
289                  self.emission_prob[(state, symbol)] = freqs.pop()
290
291          return self.emission_prob
292
293 -    def set_random_probabilities(self):
294          """Set all probabilities to randomly generated numbers.
295
296          Resets probabilities of all initial states, transitions, and
297          emissions to random values.
298          """
299          self.set_random_initial_probabilities()
300          self.set_random_transition_probabilities()
301          self.set_random_emission_probabilities()
302
303      # --- functions to deal with the transitions in the sequence
304
305 -    def allow_all_transitions(self):
306          """Create transitions between all states.
307
308          By default all transitions within the alphabet are disallowed;
309          this is a convenience function to change this to allow all
310          possible transitions.
311          """
312          # first get all probabilities and pseudo counts set
313          # to the default values
314          all_probs = self._all_blank(self._state_alphabet,
315                                      self._state_alphabet)
316
317          all_pseudo = self._all_pseudo(self._state_alphabet,
318                                        self._state_alphabet)
319
320          # now set any probabilities and pseudo counts that
321          # were previously set
322          for set_key in self.transition_prob:
323              all_probs[set_key] = self.transition_prob[set_key]
324
325          for set_key in self.transition_pseudo:
326              all_pseudo[set_key] = self.transition_pseudo[set_key]
327
328          # finally reinitialize the transition probs and pseudo counts
329          self.transition_prob = all_probs
330          self.transition_pseudo = all_pseudo
331
332 -    def allow_transition(self, from_state, to_state, probability=None,
333                           pseudocount=None):
334          """Set a transition as being possible between the two states.
335
336          probability and pseudocount are optional arguments
337          specifying the probabilities and pseudo counts for the transition.
338          If these are not supplied, then the values are set to the
339          default values.
340
341          Raises:
342          KeyError -- if the two states already have an allowed transition.
343
344          """
345          # check the sanity of adding these states
346          for state in [from_state, to_state]:
347              assert state in self._state_alphabet.letters, \
348                     "State %s was not found in the sequence alphabet" % state
349
350          # ensure that the states are not already set
351          if (from_state, to_state) not in self.transition_prob and \
352             (from_state, to_state) not in self.transition_pseudo:
353              # set the initial probability
354              if probability is None:
355                  probability = 0
356              self.transition_prob[(from_state, to_state)] = probability
357
358              # set the initial pseudocounts
359              if pseudocount is None:
360                  pseudcount = self.DEFAULT_PSEUDO
361              self.transition_pseudo[(from_state, to_state)] = pseudocount
362          else:
363              raise KeyError("Transition from %s to %s is already allowed."
364                             % (from_state, to_state))
365
366 -    def destroy_transition(self, from_state, to_state):
367          """Restrict transitions between the two states.
368
369          Raises:
370          KeyError if the transition is not currently allowed.
371
372          """
373          try:
374              del self.transition_prob[(from_state, to_state)]
375              del self.transition_pseudo[(from_state, to_state)]
376          except KeyError:
377              raise KeyError("Transition from %s to %s is already disallowed."
378                             % (from_state, to_state))
379
380 -    def set_transition_score(self, from_state, to_state, probability):
381          """Set the probability of a transition between two states.
382
383          Raises:
384          KeyError if the transition is not allowed.
385
386          """
387          if (from_state, to_state) in self.transition_prob:
388              self.transition_prob[(from_state, to_state)] = probability
389          else:
390              raise KeyError("Transition from %s to %s is not allowed."
391                             % (from_state, to_state))
392
393 -    def set_transition_pseudocount(self, from_state, to_state, count):
394          """Set the default pseudocount for a transition.
395
396          To avoid computational problems, it is helpful to be able to
398          transition and emission probabilities (see p62 in Durbin et al
399          for more discussion on this. By default, all transitions have
400          a pseudocount of 1.
401
402          Raises:
403          KeyError if the transition is not allowed.
404
405          """
406          if (from_state, to_state) in self.transition_pseudo:
407              self.transition_pseudo[(from_state, to_state)] = count
408          else:
409              raise KeyError("Transition from %s to %s is not allowed."
410                             % (from_state, to_state))
411
412      # --- functions to deal with emissions from the sequence
413
414 -    def set_emission_score(self, seq_state, emission_state, probability):
415          """Set the probability of a emission from a particular state.
416
417          Raises:
418          KeyError if the emission from the given state is not allowed.
419
420          """
421          if (seq_state, emission_state) in self.emission_prob:
422              self.emission_prob[(seq_state, emission_state)] = probability
423          else:
424              raise KeyError("Emission of %s from %s is not allowed."
425                             % (emission_state, seq_state))
426
427 -    def set_emission_pseudocount(self, seq_state, emission_state, count):
428          """Set the default pseudocount for an emission.
429
430          To avoid computational problems, it is helpful to be able to
432          transition and emission probabilities (see p62 in Durbin et al
433          for more discussion on this. By default, all emissions have
434          a pseudocount of 1.
435
436          Raises:
437          KeyError if the emission from the given state is not allowed.
438
439          """
440          if (seq_state, emission_state) in self.emission_pseudo:
441              self.emission_pseudo[(seq_state, emission_state)] = count
442          else:
443              raise KeyError("Emission of %s from %s is not allowed."
444                             % (emission_state, seq_state))
445
446
447 -class HiddenMarkovModel(object):
448      """Represent a hidden markov model that can be used for state estimation."""
449
450 -    def __init__(self, initial_prob, transition_prob, emission_prob,
451                   transition_pseudo, emission_pseudo):
452          """Initialize a Markov Model.
453
454          Note: You should use the MarkovModelBuilder class instead of
455          initiating this class directly.
456
457          Arguments:
458           - initial_prob - A dictionary of initial probabilities for all states.
459           - transition_prob -- A dictionary of transition probabilities for all
460             possible transitions in the sequence.
461           - emission_prob -- A dictionary of emission probabilities for all
462             possible emissions from the sequence states.
463           - transition_pseudo -- Pseudo-counts to be used for the transitions,
464             when counting for purposes of estimating transition probabilities.
465           - emission_pseudo -- Pseudo-counts to be used for the emissions,
466             when counting for purposes of estimating emission probabilities.
467
468          """
469          self.initial_prob = initial_prob
470
471          self._transition_pseudo = transition_pseudo
472          self._emission_pseudo = emission_pseudo
473
474          self.transition_prob = transition_prob
475          self.emission_prob = emission_prob
476
477          # a dictionary of the possible transitions from each state
478          # each key is a source state, mapped to a list of the destination states
479          # that are reachable from the source state via a transition
480          self._transitions_from = \
481             _calculate_from_transitions(self.transition_prob)
482
483          # a dictionary of the possible transitions to each state
484          # each key is a destination state, mapped to a list of source states
485          # from which the destination is reachable via a transition
486          self._transitions_to = \
487             _calculate_to_transitions(self.transition_prob)
488
489 -    def get_blank_transitions(self):
490          """Get the default transitions for the model.
491
492          Returns a dictionary of all of the default transitions between any
493          two letters in the sequence alphabet. The dictionary is structured
494          with keys as (letter1, letter2) and values as the starting number
495          of transitions.
496          """
497          return self._transition_pseudo
498
499 -    def get_blank_emissions(self):
500          """Get the starting default emmissions for each sequence.
501
502          This returns a dictionary of the default emmissions for each
503          letter. The dictionary is structured with keys as
504          (seq_letter, emmission_letter) and values as the starting number
505          of emmissions.
506          """
507          return self._emission_pseudo
508
509 -    def transitions_from(self, state_letter):
510          """Get all destination states which can transition from source state_letter.
511
512          This returns all letters which the given state_letter can transition
513          to, i.e. all the destination states reachable from state_letter.
514
515          An empty list is returned if state_letter has no outgoing transitions.
516          """
517          if state_letter in self._transitions_from:
518              return self._transitions_from[state_letter]
519          else:
520              return []
521
522 -    def transitions_to(self, state_letter):
523          """Get all source states which can transition to destination state_letter.
524
525          This returns all letters which the given state_letter is reachable
526          from, i.e. all the source states which can reach state_later
527
528          An empty list is returned if state_letter is unreachable.
529          """
530          if state_letter in self._transitions_to:
531              return self._transitions_to[state_letter]
532          else:
533              return []
534
535 -    def viterbi(self, sequence, state_alphabet):
536          """Calculate the most probable state path using the Viterbi algorithm.
537
538          This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
539          al for a full explanation -- this is where I took my implementation
540          ideas from), to allow decoding of the state path, given a sequence
541          of emissions.
542
543          Arguments:
544           - sequence -- A Seq object with the emission sequence that we
545             want to decode.
546           - state_alphabet -- The alphabet of the possible state sequences
547             that can be generated.
548
549          """
550          # calculate logarithms of the initial, transition, and emission probs
551          log_initial = self._log_transform(self.initial_prob)
552          log_trans = self._log_transform(self.transition_prob)
553          log_emission = self._log_transform(self.emission_prob)
554
555          viterbi_probs = {}
556          pred_state_seq = {}
557          state_letters = state_alphabet.letters
558
559          # --- recursion
560          # loop over the training squence (i = 1 .. L)
561          # NOTE: My index numbers are one less than what is given in Durbin
562          # et al, since we are indexing the sequence going from 0 to
563          # (Length - 1) not 1 to Length, like in Durbin et al.
564          for i in range(0, len(sequence)):
565              # loop over all of the possible i-th states in the state path
566              for cur_state in state_letters:
567                  # e_{l}(x_{i})
568                  emission_part = log_emission[(cur_state, sequence[i])]
569
570                  max_prob = 0
571                  if i == 0:
572                      # for the first state, use the initial probability rather
573                      # than looking back to previous states
574                      max_prob = log_initial[cur_state]
575                  else:
576                      # loop over all possible (i-1)-th previous states
577                      possible_state_probs = {}
578                      for prev_state in self.transitions_to(cur_state):
579                          # a_{kl}
580                          trans_part = log_trans[(prev_state, cur_state)]
581
582                          # v_{k}(i - 1)
583                          viterbi_part = viterbi_probs[(prev_state, i - 1)]
584                          cur_prob = viterbi_part + trans_part
585
586                          possible_state_probs[prev_state] = cur_prob
587
588                      # calculate the viterbi probability using the max
589                      max_prob = max(possible_state_probs.values())
590
591                  # v_{k}(i)
592                  viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
593
594                  if i > 0:
595                      # get the most likely prev_state leading to cur_state
596                      for state in possible_state_probs:
597                          if possible_state_probs[state] == max_prob:
598                              pred_state_seq[(i - 1, cur_state)] = state
599                              break
600
601          # --- termination
602          # calculate the probability of the state path
603          # loop over all states
604          all_probs = {}
605          for state in state_letters:
606              # v_{k}(L)
607              all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
608
609          state_path_prob = max(all_probs.values())
610
611          # find the last pointer we need to trace back from
612          last_state = ''
613          for state in all_probs:
614              if all_probs[state] == state_path_prob:
615                  last_state = state
616
617          assert last_state != '', "Didn't find the last state to trace from!"
618
619          # --- traceback
620          traceback_seq = MutableSeq('', state_alphabet)
621
622          loop_seq = list(range(1, len(sequence)))
623          loop_seq.reverse()
624
625          # last_state is the last state in the most probable state sequence.
626          # Compute that sequence by walking backwards in time. From the i-th
627          # state in the sequence, find the (i-1)-th state as the most
628          # probable state preceding the i-th state.
629          state = last_state
630          traceback_seq.append(state)
631          for i in loop_seq:
632              state = pred_state_seq[(i - 1, state)]
633              traceback_seq.append(state)
634
635          # put the traceback sequence in the proper orientation
636          traceback_seq.reverse()
637
638          return traceback_seq.toseq(), state_path_prob
639
640 -    def _log_transform(self, probability):
641          """Return log transform of the given probability dictionary.
642
643          When calculating the Viterbi equation, add logs of probabilities rather
644          than multiplying probabilities, to avoid underflow errors. This method
645          returns a new dictionary with the same keys as the given dictionary
646          and log-transformed values.
647          """
648          log_prob = copy.copy(probability)
649          try:
650              neg_inf = float("-inf")
651          except ValueError:
652              # On Python 2.5 or older that was handled in C code,
653              # and failed on Windows XP 32bit
654              neg_inf = - 1E400
655          for key in log_prob:
656              prob = log_prob[key]
657              if prob > 0:
658                  log_prob[key] = math.log(log_prob[key])
659              else:
660                  log_prob[key] = neg_inf
661
662          return log_prob
663
<!--
expandto(location.href);
// -->

```

 Generated by Epydoc 3.0.1 on Mon Jul 10 15:17:06 2017 http://epydoc.sourceforge.net