[frames] | no frames]

# Source Code for Module Bio.HMM.MarkovModel

```  1  """Deal with representations of Markov Models.
2  """
3  # standard modules
4  import copy
5  import math
6  import random
7
8  #TODO - Take advantage of defaultdict once Python 2.4 is dead?
9  #from collections import defaultdict
10
11  # biopython
12  from Bio.Seq import MutableSeq
13
14
15 -def _gen_random_array(n):
16      """ Return an array of n random numbers, where the elements of the array sum
17      to 1.0"""
18      randArray = [random.random() for i in range(n)]
19      total = sum(randArray)
20      normalizedRandArray = [x/total for x in randArray]
21
22      return normalizedRandArray
23
24
25 -def _calculate_emissions(emission_probs):
26      """Calculate which symbols can be emitted in each state
27      """
28      # loop over all of the state-symbol duples, mapping states to
29      # lists of emitted symbols
30      emissions = dict()
31      for state, symbol in emission_probs:
32          try:
33              emissions[state].append(symbol)
34          except KeyError:
35              emissions[state] = [symbol]
36
37      return emissions
38
39
40 -def _calculate_from_transitions(trans_probs):
41      """Calculate which 'from transitions' are allowed for each state
42
43      This looks through all of the trans_probs, and uses this dictionary
44      to determine allowed transitions. It converts this information into
45      a dictionary, whose keys are source states and whose values are
46      lists of destination states reachable from the source state via a
47      transition.
48      """
49      transitions = dict()
50      for from_state, to_state in trans_probs:
51          try:
52              transitions[from_state].append(to_state)
53          except KeyError:
54              transitions[from_state] = [to_state]
55
56      return transitions
57
58
59 -def _calculate_to_transitions(trans_probs):
60      """Calculate which 'to transitions' are allowed for each state
61
62      This looks through all of the trans_probs, and uses this dictionary
63      to determine allowed transitions. It converts this information into
64      a dictionary, whose keys are destination states and whose values are
65      lists of source states from which the destination is reachable via a
66      transition.
67      """
68      transitions = dict()
69      for from_state, to_state in trans_probs:
70          try:
71              transitions[to_state].append(from_state)
72          except KeyError:
73              transitions[to_state] = [from_state]
74
75      return transitions
76
77
78 -class MarkovModelBuilder(object):
79      """Interface to build up a Markov Model.
80
81      This class is designed to try to separate the task of specifying the
82      Markov Model from the actual model itself. This is in hopes of making
83      the actual Markov Model classes smaller.
84
85      So, this builder class should be used to create Markov models instead
86      of trying to initiate a Markov Model directly.
87      """
88      # the default pseudo counts to use
89      DEFAULT_PSEUDO = 1
90
91 -    def __init__(self, state_alphabet, emission_alphabet):
92          """Initialize a builder to create Markov Models.
93
94          Arguments:
95
96          o state_alphabet -- An alphabet containing all of the letters that
97          can appear in the states
98
99          o emission_alphabet -- An alphabet containing all of the letters for
100          states that can be emitted by the HMM.
101          """
102          self._state_alphabet = state_alphabet
103          self._emission_alphabet = emission_alphabet
104
105          # probabilities for the initial state, initialized by calling
106          # set_initial_probabilities (required)
107          self.initial_prob = {}
108
109          # the probabilities for transitions and emissions
110          # by default we have no transitions and all possible emissions
111          self.transition_prob = {}
112          self.emission_prob = self._all_blank(state_alphabet,
113                                               emission_alphabet)
114
115          # the default pseudocounts for transition and emission counting
116          self.transition_pseudo = {}
117          self.emission_pseudo = self._all_pseudo(state_alphabet,
118                                                  emission_alphabet)
119
120 -    def _all_blank(self, first_alphabet, second_alphabet):
121          """Return a dictionary with all counts set to zero.
122
123          This uses the letters in the first and second alphabet to create
124          a dictionary with keys of two tuples organized as
125          (letter of first alphabet, letter of second alphabet). The values
126          are all set to 0.
127          """
128          all_blank = {}
129          for first_state in first_alphabet.letters:
130              for second_state in second_alphabet.letters:
131                  all_blank[(first_state, second_state)] = 0
132
133          return all_blank
134
135 -    def _all_pseudo(self, first_alphabet, second_alphabet):
136          """Return a dictionary with all counts set to a default value.
137
138          This takes the letters in first alphabet and second alphabet and
139          creates a dictionary with keys of two tuples organized as:
140          (letter of first alphabet, letter of second alphabet). The values
141          are all set to the value of the class attribute DEFAULT_PSEUDO.
142          """
143          all_counts = {}
144          for first_state in first_alphabet.letters:
145              for second_state in second_alphabet.letters:
146                  all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
147
148          return all_counts
149
150 -    def get_markov_model(self):
151          """Return the markov model corresponding with the current parameters.
152
153          Each markov model returned by a call to this function is unique
154          (ie. they don't influence each other).
155          """
156
157          # user must set initial probabilities
158          if not self.initial_prob:
159              raise Exception("set_initial_probabilities must be called to " +
160                              "fully initialize the Markov model")
161
162          initial_prob = copy.deepcopy(self.initial_prob)
163          transition_prob = copy.deepcopy(self.transition_prob)
164          emission_prob = copy.deepcopy(self.emission_prob)
165          transition_pseudo = copy.deepcopy(self.transition_pseudo)
166          emission_pseudo = copy.deepcopy(self.emission_pseudo)
167
168          return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
169                                   transition_pseudo, emission_pseudo)
170
171 -    def set_initial_probabilities(self, initial_prob):
172          """Set initial state probabilities.
173
174          initial_prob is a dictionary mapping states to probabilities.
175          Suppose, for example, that the state alphabet is ['A', 'B']. Call
176          set_initial_prob({'A': 1}) to guarantee that the initial
177          state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
178          to make each initial state equally probable.
179
180          This method must now be called in order to use the Markov model
181          because the calculation of initial probabilities has changed
182          incompatibly; the previous calculation was incorrect.
183
184          If initial probabilities are set for all states, then they should add up
185          to 1. Otherwise the sum should be <= 1. The residual probability is
186          divided up evenly between all the states for which the initial
187          probability has not been set. For example, calling
188          set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
189          for the above example.
190          """
191          self.initial_prob = copy.copy(initial_prob)
192
193          # ensure that all referenced states are valid
194          for state in initial_prob.iterkeys():
195              assert state in self._state_alphabet.letters, \
196                     "State %s was not found in the sequence alphabet" % state
197
198          # distribute the residual probability, if any
199          num_states_not_set =\
200              len(self._state_alphabet.letters) - len(self.initial_prob)
201          if num_states_not_set < 0:
202              raise Exception("Initial probabilities can't exceed # of states")
203          prob_sum = sum(self.initial_prob.values())
204          if prob_sum > 1.0:
205              raise Exception("Total initial probability cannot exceed 1.0")
206          if num_states_not_set > 0:
207              prob = (1.0 - prob_sum) / num_states_not_set
208              for state in self._state_alphabet.letters:
209                  if not state in self.initial_prob:
210                      self.initial_prob[state] = prob
211
212 -    def set_equal_probabilities(self):
213          """Reset all probabilities to be an average value.
214
215          Resets the values of all initial probabilities and all allowed
216          transitions and all allowed emissions to be equal to 1 divided by the
217          number of possible elements.
218
219          This is useful if you just want to initialize a Markov Model to
220          starting values (ie. if you have no prior notions of what the
221          probabilities should be -- or if you are just feeling too lazy
222          to calculate them :-).
223
224          Warning 1 -- this will reset all currently set probabilities.
225
226          Warning 2 -- This just sets all probabilities for transitions and
227          emissions to total up to 1, so it doesn't ensure that the sum of
228          each set of transitions adds up to 1.
229          """
230
231          # set initial state probabilities
232          new_initial_prob = float(1) / float(len(self.transition_prob))
233          for state in self._state_alphabet.letters:
234              self.initial_prob[state] = new_initial_prob
235
236          # set the transitions
237          new_trans_prob = float(1) / float(len(self.transition_prob))
238          for key in self.transition_prob:
239              self.transition_prob[key] = new_trans_prob
240
241          # set the emissions
242          new_emission_prob = float(1) / float(len(self.emission_prob))
243          for key in self.emission_prob:
244              self.emission_prob[key] = new_emission_prob
245
246 -    def set_random_initial_probabilities(self):
247          """Set all initial state probabilities to a randomly generated distribution.
248          Returns the dictionary containing the initial probabilities.
249          """
250          initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
251          for state in self._state_alphabet.letters:
252              self.initial_prob[state] = initial_freqs.pop()
253
254          return self.initial_prob
255
256 -    def set_random_transition_probabilities(self):
257          """Set all allowed transition probabilities to a randomly generated distribution.
258          Returns the dictionary containing the transition probabilities.
259          """
260
261          if not self.transition_prob:
262              raise Exception("No transitions have been allowed yet. " +
263                              "Allow some or all transitions by calling " +
264                              "allow_transition or allow_all_transitions first.")
265
266          transitions_from = _calculate_from_transitions(self.transition_prob)
267          for from_state in transitions_from.keys():
268              freqs = _gen_random_array(len(transitions_from[from_state]))
269              for to_state in transitions_from[from_state]:
270                  self.transition_prob[(from_state, to_state)] = freqs.pop()
271
272          return self.transition_prob
273
274 -    def set_random_emission_probabilities(self):
275          """Set all allowed emission probabilities to a randomly generated
276          distribution.  Returns the dictionary containing the emission
277          probabilities.
278          """
279
280          if not self.emission_prob:
281              raise Exception("No emissions have been allowed yet. " +
282                              "Allow some or all emissions.")
283
284          emissions = _calculate_emissions(self.emission_prob)
285          for state in emissions.iterkeys():
286              freqs = _gen_random_array(len(emissions[state]))
287              for symbol in emissions[state]:
288                  self.emission_prob[(state, symbol)] = freqs.pop()
289
290          return self.emission_prob
291
292 -    def set_random_probabilities(self):
293          """Set all probabilities to randomly generated numbers.
294
295          Resets probabilities of all initial states, transitions, and
296          emissions to random values.
297          """
298          self.set_random_initial_probabilities()
299          self.set_random_transition_probabilities()
300          self.set_random_emission_probabilities()
301
302      # --- functions to deal with the transitions in the sequence
303
304 -    def allow_all_transitions(self):
305          """A convenience function to create transitions between all states.
306
307          By default all transitions within the alphabet are disallowed; this
308          is a way to change this to allow all possible transitions.
309          """
310          # first get all probabilities and pseudo counts set
311          # to the default values
312          all_probs = self._all_blank(self._state_alphabet,
313                                      self._state_alphabet)
314
315          all_pseudo = self._all_pseudo(self._state_alphabet,
316                                        self._state_alphabet)
317
318          # now set any probabilities and pseudo counts that
319          # were previously set
320          for set_key in self.transition_prob:
321              all_probs[set_key] = self.transition_prob[set_key]
322
323          for set_key in self.transition_pseudo:
324              all_pseudo[set_key] = self.transition_pseudo[set_key]
325
326          # finally reinitialize the transition probs and pseudo counts
327          self.transition_prob = all_probs
328          self.transition_pseudo = all_pseudo
329
330 -    def allow_transition(self, from_state, to_state, probability = None,
331                           pseudocount = None):
332          """Set a transition as being possible between the two states.
333
334          probability and pseudocount are optional arguments
335          specifying the probabilities and pseudo counts for the transition.
336          If these are not supplied, then the values are set to the
337          default values.
338
339          Raises:
340          KeyError -- if the two states already have an allowed transition.
341          """
342          # check the sanity of adding these states
343          for state in [from_state, to_state]:
344              assert state in self._state_alphabet.letters, \
345                     "State %s was not found in the sequence alphabet" % state
346
347          # ensure that the states are not already set
348          if (from_state, to_state) not in self.transition_prob and \
349             (from_state, to_state) not in self.transition_pseudo:
350              # set the initial probability
351              if probability is None:
352                  probability = 0
353              self.transition_prob[(from_state, to_state)] = probability
354
355              # set the initial pseudocounts
356              if pseudocount is None:
357                  pseudcount = self.DEFAULT_PSEUDO
358              self.transition_pseudo[(from_state, to_state)] = pseudocount
359          else:
360              raise KeyError("Transition from %s to %s is already allowed."
361                             % (from_state, to_state))
362
363 -    def destroy_transition(self, from_state, to_state):
364          """Restrict transitions between the two states.
365
366          Raises:
367          KeyError if the transition is not currently allowed.
368          """
369          try:
370              del self.transition_prob[(from_state, to_state)]
371              del self.transition_pseudo[(from_state, to_state)]
372          except KeyError:
373              raise KeyError("Transition from %s to %s is already disallowed."
374                             % (from_state, to_state))
375
376 -    def set_transition_score(self, from_state, to_state, probability):
377          """Set the probability of a transition between two states.
378
379          Raises:
380          KeyError if the transition is not allowed.
381          """
382          if (from_state, to_state) in self.transition_prob:
383              self.transition_prob[(from_state, to_state)] = probability
384          else:
385              raise KeyError("Transition from %s to %s is not allowed."
386                             % (from_state, to_state))
387
388 -    def set_transition_pseudocount(self, from_state, to_state, count):
389          """Set the default pseudocount for a transition.
390
391          To avoid computational problems, it is helpful to be able to
393          transition and emission probabilities (see p62 in Durbin et al
394          for more discussion on this. By default, all transitions have
395          a pseudocount of 1.
396
397          Raises:
398          KeyError if the transition is not allowed.
399          """
400          if (from_state, to_state) in self.transition_pseudo:
401              self.transition_pseudo[(from_state, to_state)] = count
402          else:
403              raise KeyError("Transition from %s to %s is not allowed."
404                             % (from_state, to_state))
405
406      # --- functions to deal with emissions from the sequence
407
408 -    def set_emission_score(self, seq_state, emission_state, probability):
409          """Set the probability of a emission from a particular state.
410
411          Raises:
412          KeyError if the emission from the given state is not allowed.
413          """
414          if (seq_state, emission_state) in self.emission_prob:
415              self.emission_prob[(seq_state, emission_state)] = probability
416          else:
417              raise KeyError("Emission of %s from %s is not allowed."
418                             % (emission_state, seq_state))
419
420 -    def set_emission_pseudocount(self, seq_state, emission_state, count):
421          """Set the default pseudocount for an emission.
422
423          To avoid computational problems, it is helpful to be able to
425          transition and emission probabilities (see p62 in Durbin et al
426          for more discussion on this. By default, all emissions have
427          a pseudocount of 1.
428
429          Raises:
430          KeyError if the emission from the given state is not allowed.
431          """
432          if (seq_state, emission_state) in self.emission_pseudo:
433              self.emission_pseudo[(seq_state, emission_state)] = count
434          else:
435              raise KeyError("Emission of %s from %s is not allowed."
436                             % (emission_state, seq_state))
437
438
439 -class HiddenMarkovModel(object):
440      """Represent a hidden markov model that can be used for state estimation.
441      """
442 -    def __init__(self, initial_prob, transition_prob, emission_prob,
443                   transition_pseudo, emission_pseudo):
444          """Initialize a Markov Model.
445
446          Note: You should use the MarkovModelBuilder class instead of
447          initiating this class directly.
448
449          Arguments:
450
451          o initial_prob - A dictionary of initial probabilities for all states.
452
453          o transition_prob -- A dictionary of transition probabilities for all
454          possible transitions in the sequence.
455
456          o emission_prob -- A dictionary of emission probabilities for all
457          possible emissions from the sequence states.
458
459          o transition_pseudo -- Pseudo-counts to be used for the transitions,
460          when counting for purposes of estimating transition probabilities.
461
462          o emission_pseudo -- Pseudo-counts to be used for the emissions,
463          when counting for purposes of estimating emission probabilities.
464          """
465
466          self.initial_prob = initial_prob
467
468          self._transition_pseudo = transition_pseudo
469          self._emission_pseudo = emission_pseudo
470
471          self.transition_prob = transition_prob
472          self.emission_prob = emission_prob
473
474          # a dictionary of the possible transitions from each state
475          # each key is a source state, mapped to a list of the destination states
476          # that are reachable from the source state via a transition
477          self._transitions_from = \
478             _calculate_from_transitions(self.transition_prob)
479
480          # a dictionary of the possible transitions to each state
481          # each key is a destination state, mapped to a list of source states
482          # from which the destination is reachable via a transition
483          self._transitions_to = \
484             _calculate_to_transitions(self.transition_prob)
485
486 -    def get_blank_transitions(self):
487          """Get the default transitions for the model.
488
489          Returns a dictionary of all of the default transitions between any
490          two letters in the sequence alphabet. The dictionary is structured
491          with keys as (letter1, letter2) and values as the starting number
492          of transitions.
493          """
494          return self._transition_pseudo
495
496 -    def get_blank_emissions(self):
497          """Get the starting default emmissions for each sequence.
498
499          This returns a dictionary of the default emmissions for each
500          letter. The dictionary is structured with keys as
501          (seq_letter, emmission_letter) and values as the starting number
502          of emmissions.
503          """
504          return self._emission_pseudo
505
506 -    def transitions_from(self, state_letter):
507          """Get all destination states to which there are transitions from the
508          state_letter source state.
509
510          This returns all letters which the given state_letter can transition
511          to. An empty list is returned if state_letter has no outgoing
512          transitions.
513          """
514          if state_letter in self._transitions_from:
515              return self._transitions_from[state_letter]
516          else:
517              return []
518
519 -    def transitions_to(self, state_letter):
520          """Get all source states from which there are transitions to the
521          state_letter destination state.
522
523          This returns all letters which the given state_letter is reachable
524          from. An empty list is returned if state_letter is unreachable.
525          """
526          if state_letter in self._transitions_to:
527              return self._transitions_to[state_letter]
528          else:
529              return []
530
531 -    def viterbi(self, sequence, state_alphabet):
532          """Calculate the most probable state path using the Viterbi algorithm.
533
534          This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
535          al for a full explanation -- this is where I took my implementation
536          ideas from), to allow decoding of the state path, given a sequence
537          of emissions.
538
539          Arguments:
540
541          o sequence -- A Seq object with the emission sequence that we
542          want to decode.
543
544          o state_alphabet -- The alphabet of the possible state sequences
545          that can be generated.
546          """
547
548          # calculate logarithms of the initial, transition, and emission probs
549          log_initial = self._log_transform(self.initial_prob)
550          log_trans = self._log_transform(self.transition_prob)
551          log_emission = self._log_transform(self.emission_prob)
552
553          viterbi_probs = {}
554          pred_state_seq = {}
555          state_letters = state_alphabet.letters
556
557          # --- recursion
558          # loop over the training squence (i = 1 .. L)
559          # NOTE: My index numbers are one less than what is given in Durbin
560          # et al, since we are indexing the sequence going from 0 to
561          # (Length - 1) not 1 to Length, like in Durbin et al.
562          for i in range(0, len(sequence)):
563              # loop over all of the possible i-th states in the state path
564              for cur_state in state_letters:
565                  # e_{l}(x_{i})
566                  emission_part = log_emission[(cur_state, sequence[i])]
567
568                  max_prob = 0
569                  if i == 0:
570                      # for the first state, use the initial probability rather
571                      # than looking back to previous states
572                      max_prob = log_initial[cur_state]
573                  else:
574                      # loop over all possible (i-1)-th previous states
575                      possible_state_probs = {}
576                      for prev_state in self.transitions_to(cur_state):
577                          # a_{kl}
578                          trans_part = log_trans[(prev_state, cur_state)]
579
580                          # v_{k}(i - 1)
581                          viterbi_part = viterbi_probs[(prev_state, i - 1)]
582                          cur_prob = viterbi_part + trans_part
583
584                          possible_state_probs[prev_state] = cur_prob
585
586                      # calculate the viterbi probability using the max
587                      max_prob = max(possible_state_probs.values())
588
589                  # v_{k}(i)
590                  viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
591
592                  if i > 0:
593                      # get the most likely prev_state leading to cur_state
594                      for state in possible_state_probs:
595                          if possible_state_probs[state] == max_prob:
596                              pred_state_seq[(i - 1, cur_state)] = state
597                              break
598
599          # --- termination
600          # calculate the probability of the state path
601          # loop over all states
602          all_probs = {}
603          for state in state_letters:
604              # v_{k}(L)
605              all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
606
607          state_path_prob = max(all_probs.values())
608
609          # find the last pointer we need to trace back from
610          last_state = ''
611          for state in all_probs:
612              if all_probs[state] == state_path_prob:
613                  last_state = state
614
615          assert last_state != '', "Didn't find the last state to trace from!"
616
617          # --- traceback
618          traceback_seq = MutableSeq('', state_alphabet)
619
620          loop_seq = range(1, len(sequence))
621          loop_seq.reverse()
622
623          # last_state is the last state in the most probable state sequence.
624          # Compute that sequence by walking backwards in time. From the i-th
625          # state in the sequence, find the (i-1)-th state as the most
626          # probable state preceding the i-th state.
627          state = last_state
628          traceback_seq.append(state)
629          for i in loop_seq:
630              state = pred_state_seq[(i - 1, state)]
631              traceback_seq.append(state)
632
633          # put the traceback sequence in the proper orientation
634          traceback_seq.reverse()
635
636          return traceback_seq.toseq(), state_path_prob
637
638 -    def _log_transform(self, probability):
639          """Return log transform of the given probability dictionary.
640
641          When calculating the Viterbi equation, add logs of probabilities rather
642          than multiplying probabilities, to avoid underflow errors. This method
643          returns a new dictionary with the same keys as the given dictionary
644          and log-transformed values.
645          """
646          log_prob = copy.copy(probability)
647          try:
648              neg_inf = float("-inf")
649          except ValueError:
650              #On Python 2.5 or older that was handled in C code,
651              #and failed on Windows XP 32bit
652              neg_inf = - 1E400
653          for key in log_prob:
654              prob = log_prob[key]
655              if prob > 0:
656                  log_prob[key] = math.log(log_prob[key])
657              else:
658                  log_prob[key] = neg_inf
659
660          return log_prob
661
<!--
expandto(location.href);
// -->

```

 Generated by Epydoc 3.0.1 on Tue Feb 5 18:03:08 2013 http://epydoc.sourceforge.net