# Source Code for Module Bio.MarkovModel

```  1  # This code is part of the Biopython distribution and governed by its
3  # as part of this package.
4  #
5
6  """A state-emitting MarkovModel.
7
8  Note terminology similar to Manning and Schutze is used.
9
10
11  Functions:
12  train_bw        Train a markov model using the Baum-Welch algorithm.
13  train_visible   Train a visible markov model using MLE.
14  find_states     Find the a state sequence that explains some observations.
15
17  save            Save a MarkovModel.
18
19  Classes:
20  MarkovModel     Holds the description of a markov model
21  """
22
23  import numpy
24
25
26  try:
28  except AttributeError:
29      # Numpy versions older than 1.3 do not contain logaddexp.
30      # Once we require Numpy version 1.3 or later, we should revisit this
31      # module to see if we can simplify some of the other functions in
32      # this module.
33      import warnings
34      warnings.warn("For optimal speed, please update to Numpy version 1.3 or later (current version is %s)" % numpy.__version__)
35
37          """Implement logaddexp method if Numpy version is older than 1.3."""
38          if logy - logx > 100:
39              return logy
40          elif logx - logy > 100:
41              return logx
42          minxy = min(logx, logy)
43          return minxy + numpy.log(numpy.exp(logx - minxy) + numpy.exp(logy - minxy))
44
45
46 -def itemindex(values):
47      """Return a dictionary of values with their sequence offset as keys."""
48      d = {}
49      entries = enumerate(values[::-1])
50      n = len(values) - 1
51      for index, key in entries:
52          d[key] = n - index
53      return d
54
55
56  numpy.random.seed()
57
58  VERY_SMALL_NUMBER = 1E-300
59  LOG0 = numpy.log(VERY_SMALL_NUMBER)
60
61
62 -class MarkovModel(object):
63      """Create a state-emitting MarkovModel object."""
64
65 -    def __init__(self, states, alphabet,
66                   p_initial=None, p_transition=None, p_emission=None):
67          """Initialize the class."""
68          self.states = states
69          self.alphabet = alphabet
70          self.p_initial = p_initial
71          self.p_transition = p_transition
72          self.p_emission = p_emission
73
74 -    def __str__(self):
75          """Create a string representation of the MarkovModel object."""
76          from Bio._py3k import StringIO
77          handle = StringIO()
78          save(self, handle)
79          handle.seek(0)
81
82
84      """Read the first line and evaluate that begisn with the correct start (PRIVATE)."""
86      if not line.startswith(start):
87          raise ValueError("I expected %r but got %r" % (start, line))
88      return line
89
90
92      """Parse a file handle into a MarkovModel object."""
95      states = line.split()[1:]
96
99      alphabet = line.split()[1:]
100
101      mm = MarkovModel(states, alphabet)
102      N, M = len(states), len(alphabet)
103
104      # Load the initial probabilities.
105      mm.p_initial = numpy.zeros(N)
107      for i in range(len(states)):
108          line = _readline_and_check_start(handle, "  %s:" % states[i])
109          mm.p_initial[i] = float(line.split()[-1])
110
112      mm.p_transition = numpy.zeros((N, N))
114      for i in range(len(states)):
115          line = _readline_and_check_start(handle, "  %s:" % states[i])
116          mm.p_transition[i, :] = [float(v) for v in line.split()[1:]]
117
119      mm.p_emission = numpy.zeros((N, M))
121      for i in range(len(states)):
122          line = _readline_and_check_start(handle, "  %s:" % states[i])
123          mm.p_emission[i, :] = [float(v) for v in line.split()[1:]]
124
125      return mm
126
127
128 -def save(mm, handle):
129      """Save MarkovModel object into handle."""
130      # This will fail if there are spaces in the states or alphabet.
131      w = handle.write
132      w("STATES: %s\n" % ' '.join(mm.states))
133      w("ALPHABET: %s\n" % ' '.join(mm.alphabet))
134      w("INITIAL:\n")
135      for i in range(len(mm.p_initial)):
136          w("  %s: %g\n" % (mm.states[i], mm.p_initial[i]))
137      w("TRANSITION:\n")
138      for i in range(len(mm.p_transition)):
139          w("  %s: %s\n" % (mm.states[i], ' '.join(str(x) for x in mm.p_transition[i])))
140      w("EMISSION:\n")
141      for i in range(len(mm.p_emission)):
142          w("  %s: %s\n" % (mm.states[i], ' '.join(str(x) for x in mm.p_emission[i])))
143
144
145  # XXX allow them to specify starting points
146 -def train_bw(states, alphabet, training_data,
147               pseudo_initial=None, pseudo_transition=None, pseudo_emission=None,
148               update_fn=None,
149               ):
150      """Train a MarkovModel using the Baum-Welch algorithm.
151
152      Train a MarkovModel using the Baum-Welch algorithm.  states is a list
153      of strings that describe the names of each state.  alphabet is a
154      list of objects that indicate the allowed outputs.  training_data
155      is a list of observations.  Each observation is a list of objects
156      from the alphabet.
157
158      pseudo_initial, pseudo_transition, and pseudo_emission are
159      optional parameters that you can use to assign pseudo-counts to
160      different matrices.  They should be matrices of the appropriate
161      size that contain numbers to add to each parameter matrix, before
162      normalization.
163
164      update_fn is an optional callback that takes parameters
165      (iteration, log_likelihood).  It is called once per iteration.
166      """
167      N, M = len(states), len(alphabet)
168      if not training_data:
169          raise ValueError("No training data given.")
170      if pseudo_initial is not None:
171          pseudo_initial = numpy.asarray(pseudo_initial)
172          if pseudo_initial.shape != (N,):
173              raise ValueError("pseudo_initial not shape len(states)")
174      if pseudo_transition is not None:
175          pseudo_transition = numpy.asarray(pseudo_transition)
176          if pseudo_transition.shape != (N, N):
177              raise ValueError("pseudo_transition not shape " +
178                               "len(states) X len(states)")
179      if pseudo_emission is not None:
180          pseudo_emission = numpy.asarray(pseudo_emission)
181          if pseudo_emission.shape != (N, M):
182              raise ValueError("pseudo_emission not shape " +
183                               "len(states) X len(alphabet)")
184
185      # Training data is given as a list of members of the alphabet.
186      # Replace those with indexes into the alphabet list for easier
187      # computation.
188      training_outputs = []
189      indexes = itemindex(alphabet)
190      for outputs in training_data:
191          training_outputs.append([indexes[x] for x in outputs])
192
193      # Do some sanity checking on the outputs.
194      lengths = [len(x) for x in training_outputs]
195      if min(lengths) == 0:
196          raise ValueError("I got training data with outputs of length 0")
197
198      # Do the training with baum welch.
199      x = _baum_welch(N, M, training_outputs,
200                      pseudo_initial=pseudo_initial,
201                      pseudo_transition=pseudo_transition,
202                      pseudo_emission=pseudo_emission,
203                      update_fn=update_fn)
204      p_initial, p_transition, p_emission = x
205      return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
206
207
208  MAX_ITERATIONS = 1000
209
210
211 -def _baum_welch(N, M, training_outputs,
212                  p_initial=None, p_transition=None, p_emission=None,
213                  pseudo_initial=None, pseudo_transition=None,
214                  pseudo_emission=None, update_fn=None):
215      """Implement the Baum-Welch algorithm to evaluate unknown parameters in the MarkovModel object (PRIVATE)."""
216      if p_initial is None:
217          p_initial = _random_norm(N)
218      else:
219          p_initial = _copy_and_check(p_initial, (N,))
220
221      if p_transition is None:
222          p_transition = _random_norm((N, N))
223      else:
224          p_transition = _copy_and_check(p_transition, (N, N))
225      if p_emission is None:
226          p_emission = _random_norm((N, M))
227      else:
228          p_emission = _copy_and_check(p_emission, (N, M))
229
230      # Do all the calculations in log space to avoid underflows.
231      lp_initial = numpy.log(p_initial)
232      lp_transition = numpy.log(p_transition)
233      lp_emission = numpy.log(p_emission)
234      if pseudo_initial is not None:
235          lpseudo_initial = numpy.log(pseudo_initial)
236      else:
237          lpseudo_initial = None
238      if pseudo_transition is not None:
239          lpseudo_transition = numpy.log(pseudo_transition)
240      else:
241          lpseudo_transition = None
242      if pseudo_emission is not None:
243          lpseudo_emission = numpy.log(pseudo_emission)
244      else:
245          lpseudo_emission = None
246
247      # Iterate through each sequence of output, updating the parameters
248      # to the HMM.  Stop when the log likelihoods of the sequences
249      # stops varying.
250      prev_llik = None
251      for i in range(MAX_ITERATIONS):
252          llik = LOG0
253          for outputs in training_outputs:
254              x = _baum_welch_one(
255                  N, M, outputs,
256                  lp_initial, lp_transition, lp_emission,
257                  lpseudo_initial, lpseudo_transition, lpseudo_emission,)
258              llik += x
259          if update_fn is not None:
260              update_fn(i, llik)
261          if prev_llik is not None and numpy.fabs(prev_llik - llik) < 0.1:
262              break
263          prev_llik = llik
264      else:
265          raise RuntimeError("HMM did not converge in %d iterations"
266                             % MAX_ITERATIONS)
267
268      # Return everything back in normal space.
269      return [numpy.exp(x) for x in (lp_initial, lp_transition, lp_emission)]
270
271
272 -def _baum_welch_one(N, M, outputs,
273                      lp_initial, lp_transition, lp_emission,
274                      lpseudo_initial, lpseudo_transition, lpseudo_emission):
275      """Execute one step for Baum-Welch algorithm (PRIVATE).
276
277      Do one iteration of Baum-Welch based on a sequence of output.
278      Changes the value for lp_initial, lp_transition and lp_emission in place.
279      """
280      T = len(outputs)
281      fmat = _forward(N, T, lp_initial, lp_transition, lp_emission, outputs)
282      bmat = _backward(N, T, lp_transition, lp_emission, outputs)
283
284      # Calculate the probability of traversing each arc for any given
285      # transition.
286      lp_arc = numpy.zeros((N, N, T))
287      for t in range(T):
288          k = outputs[t]
289          lp_traverse = numpy.zeros((N, N))  # P going over one arc.
290          for i in range(N):
291              for j in range(N):
292                  # P(getting to this arc)
293                  # P(making this transition)
294                  # P(emitting this character)
295                  # P(going to the end)
296                  lp = (fmat[i][t] +
297                        lp_transition[i][j] +
298                        lp_emission[i][k] +
299                        bmat[j][t + 1])
300                  lp_traverse[i][j] = lp
301          # Normalize the probability for this time step.
302          lp_arc[:, :, t] = lp_traverse - _logsum(lp_traverse)
303
304      # Sum of all the transitions out of state i at time t.
305      lp_arcout_t = numpy.zeros((N, T))
306      for t in range(T):
307          for i in range(N):
308              lp_arcout_t[i][t] = _logsum(lp_arc[i, :, t])
309
310      # Sum of all the transitions out of state i.
311      lp_arcout = numpy.zeros(N)
312      for i in range(N):
313          lp_arcout[i] = _logsum(lp_arcout_t[i, :])
314
315      # UPDATE P_INITIAL.
316      lp_initial = lp_arcout_t[:, 0]
317      if lpseudo_initial is not None:
319          lp_initial = lp_initial - _logsum(lp_initial)
320
321      # UPDATE P_TRANSITION.  p_transition[i][j] is the sum of all the
322      # transitions from i to j, normalized by the sum of the
323      # transitions out of i.
324      for i in range(N):
325          for j in range(N):
326              lp_transition[i][j] = _logsum(lp_arc[i, j, :]) - lp_arcout[i]
327          if lpseudo_transition is not None:
329              lp_transition[i] = lp_transition[i] - _logsum(lp_transition[i])
330
331      # UPDATE P_EMISSION.  lp_emission[i][k] is the sum of all the
332      # transitions out of i when k is observed, divided by the sum of
333      # the transitions out of i.
334      for i in range(N):
335          ksum = numpy.zeros(M) + LOG0    # ksum[k] is the sum of all i with k.
336          for t in range(T):
337              k = outputs[t]
338              for j in range(N):
339                  ksum[k] = logaddexp(ksum[k], lp_arc[i, j, t])
340          ksum = ksum - _logsum(ksum)      # Normalize
341          if lpseudo_emission is not None:
343              ksum = ksum - _logsum(ksum)  # Renormalize
344          lp_emission[i, :] = ksum
345
346      # Calculate the log likelihood of the output based on the forward
347      # matrix.  Since the parameters of the HMM has changed, the log
348      # likelihoods are going to be a step behind, and we might be doing
349      # one extra iteration of training.  The alternative is to rerun
350      # the _forward algorithm and calculate from the clean one, but
351      # that may be more expensive than overshooting the training by one
352      # step.
353      return _logsum(fmat[:, T])
354
355
356 -def _forward(N, T, lp_initial, lp_transition, lp_emission, outputs):
357      """Implement forward algorithm (PRIVATE).
358
359      Calculate a Nx(T+1) matrix, where the last column is the total
360      probability of the output.
361      """
362      matrix = numpy.zeros((N, T + 1))
363
364      # Initialize the first column to be the initial values.
365      matrix[:, 0] = lp_initial
366      for t in range(1, T + 1):
367          k = outputs[t - 1]
368          for j in range(N):
369              # The probability of the state is the sum of the
370              # transitions from all the states from time t-1.
371              lprob = LOG0
372              for i in range(N):
373                  lp = matrix[i][t - 1] + lp_transition[i][j] + lp_emission[i][k]
375              matrix[j][t] = lprob
376      return matrix
377
378
379 -def _backward(N, T, lp_transition, lp_emission, outputs):
380      """Implement backward algorithm."""
381      matrix = numpy.zeros((N, T + 1))
382      for t in range(T - 1, -1, -1):
383          k = outputs[t]
384          for i in range(N):
385              # The probability of the state is the sum of the
386              # transitions from all the states from time t+1.
387              lprob = LOG0
388              for j in range(N):
389                  lp = (matrix[j][t + 1] +
390                        lp_transition[i][j] +
391                        lp_emission[i][k])
393              matrix[i][t] = lprob
394      return matrix
395
396
397 -def train_visible(states, alphabet, training_data,
398                    pseudo_initial=None, pseudo_transition=None,
399                    pseudo_emission=None):
400      """Train a visible MarkovModel using maximum likelihoood estimates for each of the parameters.
401
402      Train a visible MarkovModel using maximum likelihoood estimates
403      for each of the parameters.  states is a list of strings that
404      describe the names of each state.  alphabet is a list of objects
405      that indicate the allowed outputs.  training_data is a list of
406      (outputs, observed states) where outputs is a list of the emission
407      from the alphabet, and observed states is a list of states from
408      states.
409
410      pseudo_initial, pseudo_transition, and pseudo_emission are
411      optional parameters that you can use to assign pseudo-counts to
412      different matrices.  They should be matrices of the appropriate
413      size that contain numbers to add to each parameter matrix.
414      """
415      N, M = len(states), len(alphabet)
416      if pseudo_initial is not None:
417          pseudo_initial = numpy.asarray(pseudo_initial)
418          if pseudo_initial.shape != (N,):
419              raise ValueError("pseudo_initial not shape len(states)")
420      if pseudo_transition is not None:
421          pseudo_transition = numpy.asarray(pseudo_transition)
422          if pseudo_transition.shape != (N, N):
423              raise ValueError("pseudo_transition not shape " +
424                               "len(states) X len(states)")
425      if pseudo_emission is not None:
426          pseudo_emission = numpy.asarray(pseudo_emission)
427          if pseudo_emission.shape != (N, M):
428              raise ValueError("pseudo_emission not shape " +
429                               "len(states) X len(alphabet)")
430
431      # Training data is given as a list of members of the alphabet.
432      # Replace those with indexes into the alphabet list for easier
433      # computation.
434      training_states, training_outputs = [], []
435      states_indexes = itemindex(states)
436      outputs_indexes = itemindex(alphabet)
437      for toutputs, tstates in training_data:
438          if len(tstates) != len(toutputs):
439              raise ValueError("states and outputs not aligned")
440          training_states.append([states_indexes[x] for x in tstates])
441          training_outputs.append([outputs_indexes[x] for x in toutputs])
442
443      x = _mle(N, M, training_outputs, training_states,
444               pseudo_initial, pseudo_transition, pseudo_emission)
445      p_initial, p_transition, p_emission = x
446
447      return MarkovModel(states, alphabet, p_initial, p_transition, p_emission)
448
449
450 -def _mle(N, M, training_outputs, training_states, pseudo_initial,
451           pseudo_transition, pseudo_emission):
452      """Implement Maximum likelihood estimation algorithm (PRIVATE)."""
453      # p_initial is the probability that a sequence of states starts
454      # off with a particular one.
455      p_initial = numpy.zeros(N)
456      if pseudo_initial:
457          p_initial = p_initial + pseudo_initial
458      for states in training_states:
459          p_initial[states[0]] += 1
460      p_initial = _normalize(p_initial)
461
462      # p_transition is the probability that a state leads to the next
463      # one.  C(i,j)/C(i) where i and j are states.
464      p_transition = numpy.zeros((N, N))
465      if pseudo_transition:
466          p_transition = p_transition + pseudo_transition
467      for states in training_states:
468          for n in range(len(states) - 1):
469              i, j = states[n], states[n + 1]
470              p_transition[i, j] += 1
471      for i in range(len(p_transition)):
472          p_transition[i, :] = p_transition[i, :] / sum(p_transition[i, :])
473
474      # p_emission is the probability of an output given a state.
475      # C(s,o)|C(s) where o is an output and s is a state.
476      p_emission = numpy.zeros((N, M))
477      if pseudo_emission:
478          p_emission = p_emission + pseudo_emission
479      p_emission = numpy.ones((N, M))
480      for outputs, states in zip(training_outputs, training_states):
481          for o, s in zip(outputs, states):
482              p_emission[s, o] += 1
483      for i in range(len(p_emission)):
484          p_emission[i, :] = p_emission[i, :] / sum(p_emission[i, :])
485
486      return p_initial, p_transition, p_emission
487
488
489 -def _argmaxes(vector, allowance=None):
490      """Return indeces of the maximum values aong the vector (PRIVATE)."""
491      return [numpy.argmax(vector)]
492
493
494 -def find_states(markov_model, output):
495      """Find states in the given Markov model output.
496
497      Returns a list of (states, score) tuples.
498      """
499      mm = markov_model
500      N = len(mm.states)
501
502      # _viterbi does calculations in log space.  Add a tiny bit to the
503      # matrices so that the logs will not break.
504      lp_initial = numpy.log(mm.p_initial + VERY_SMALL_NUMBER)
505      lp_transition = numpy.log(mm.p_transition + VERY_SMALL_NUMBER)
506      lp_emission = numpy.log(mm.p_emission + VERY_SMALL_NUMBER)
507      # Change output into a list of indexes into the alphabet.
508      indexes = itemindex(mm.alphabet)
509      output = [indexes[x] for x in output]
510
511      # Run the viterbi algorithm.
512      results = _viterbi(N, lp_initial, lp_transition, lp_emission, output)
513
514      for i in range(len(results)):
515          states, score = results[i]
516          results[i] = [mm.states[x] for x in states], numpy.exp(score)
517      return results
518
519
520 -def _viterbi(N, lp_initial, lp_transition, lp_emission, output):
521      """Implement Viterbi algorithm to find most likely states for a given input (PRIVATE)."""
522      T = len(output)
523      # Store the backtrace in a NxT matrix.
524      backtrace = []    # list of indexes of states in previous timestep.
525      for i in range(N):
526          backtrace.append([None] * T)
527
528      # Store the best scores.
529      scores = numpy.zeros((N, T))
530      scores[:, 0] = lp_initial + lp_emission[:, output[0]]
531      for t in range(1, T):
532          k = output[t]
533          for j in range(N):
534              # Find the most likely place it came from.
535              i_scores = (scores[:, t - 1] +
536                          lp_transition[:, j] +
537                          lp_emission[j, k])
538              indexes = _argmaxes(i_scores)
539              scores[j, t] = i_scores[indexes[0]]
540              backtrace[j][t] = indexes
541
542      # Do the backtrace.  First, find a good place to start.  Then,
543      # we'll follow the backtrace matrix to find the list of states.
544      # In the event of ties, there may be multiple paths back through
545      # the matrix, which implies a recursive solution.  We'll simulate
546      # it by keeping our own stack.
547      in_process = []    # list of (t, states, score)
548      results = []       # return values.  list of (states, score)
549      indexes = _argmaxes(scores[:, T - 1])      # pick the first place
550      for i in indexes:
551          in_process.append((T - 1, [i], scores[i][T - 1]))
552      while in_process:
553          t, states, score = in_process.pop()
554          if t == 0:
555              results.append((states, score))
556          else:
557              indexes = backtrace[states[0]][t]
558              for i in indexes:
559                  in_process.append((t - 1, [i] + states, score))
560      return results
561
562
563 -def _normalize(matrix):
564      """Normalize matrix object (PRIVATE)."""
565      if len(matrix.shape) == 1:
566          matrix = matrix / float(sum(matrix))
567      elif len(matrix.shape) == 2:
568          # Normalize by rows.
569          for i in range(len(matrix)):
570              matrix[i, :] = matrix[i, :] / sum(matrix[i, :])
571      else:
572          raise ValueError("I cannot handle matrixes of that shape")
573      return matrix
574
575
576 -def _uniform_norm(shape):
577      """Normalize a uniform matrix (PRIVATE)."""
578      matrix = numpy.ones(shape)
579      return _normalize(matrix)
580
581
582 -def _random_norm(shape):
583      """Normalize a random matrix (PRIVATE)."""
584      matrix = numpy.random.random(shape)
585      return _normalize(matrix)
586
587
588 -def _copy_and_check(matrix, desired_shape):
589      """Copy a matrix and check its dimension. Normalize at the end (PRIVATE)."""
590      # Copy the matrix.
591      matrix = numpy.array(matrix, copy=1)
592      # Check the dimensions.
593      if matrix.shape != desired_shape:
594          raise ValueError("Incorrect dimension")
595      # Make sure it's normalized.
596      if len(matrix.shape) == 1:
597          if numpy.fabs(sum(matrix) - 1.0) > 0.01:
598              raise ValueError("matrix not normalized to 1.0")
599      elif len(matrix.shape) == 2:
600          for i in range(len(matrix)):
601              if numpy.fabs(sum(matrix[i]) - 1.0) > 0.01:
602                  raise ValueError("matrix %d not normalized to 1.0" % i)
603      else:
604          raise ValueError("I don't handle matrices > 2 dimensions")
605      return matrix
606
607
608 -def _logsum(matrix):
609      """Implement logsum for a matrix object (PRIVATE)."""
610      if len(matrix.shape) > 1:
611          vec = numpy.reshape(matrix, (numpy.product(matrix.shape),))
612      else:
613          vec = matrix
614      sum = LOG0
615      for num in vec:
617      return sum
618
619
621      """Implement a log sum for two vector objects (PRIVATE)."""
622      assert len(logvec1) == len(logvec2), "vectors aren't the same length"
623      sumvec = numpy.zeros(len(logvec1))
624      for i in range(len(logvec1)):
626      return sumvec
627
628
629 -def _exp_logsum(numbers):
630      """Return the exponential of a logsum (PRIVATE)."""
631      sum = _logsum(numbers)
632      return numpy.exp(sum)
633
