Package Bio :: Package HMM :: Module MarkovModel
[hide private]
[frames] | no frames]

Source Code for Module Bio.HMM.MarkovModel

  1  # This code is part of the Biopython distribution and governed by its 
  2  # license.  Please see the LICENSE file that should have been included 
  3  # as part of this package. 
  4  # 
  5   
  6  """Deal with representations of Markov Models. 
  7  """ 
  8  # standard modules 
  9  import copy 
 10  import math 
 11  import random 
 12   
 13  # TODO - Take advantage of defaultdict once Python 2.4 is dead? 
 14  # from collections import defaultdict 
 15   
 16  from Bio._py3k import range 
 17   
 18  from Bio.Seq import MutableSeq 
 19   
 20   
21 -def _gen_random_array(n):
22 """ Return an array of n random numbers, where the elements of the array sum 23 to 1.0""" 24 randArray = [random.random() for i in range(n)] 25 total = sum(randArray) 26 normalizedRandArray = [x / total for x in randArray] 27 28 return normalizedRandArray
29 30
31 -def _calculate_emissions(emission_probs):
32 """Calculate which symbols can be emitted in each state 33 """ 34 # loop over all of the state-symbol duples, mapping states to 35 # lists of emitted symbols 36 emissions = dict() 37 for state, symbol in emission_probs: 38 try: 39 emissions[state].append(symbol) 40 except KeyError: 41 emissions[state] = [symbol] 42 43 return emissions
44 45
46 -def _calculate_from_transitions(trans_probs):
47 """Calculate which 'from transitions' are allowed for each state 48 49 This looks through all of the trans_probs, and uses this dictionary 50 to determine allowed transitions. It converts this information into 51 a dictionary, whose keys are source states and whose values are 52 lists of destination states reachable from the source state via a 53 transition. 54 """ 55 transitions = dict() 56 for from_state, to_state in trans_probs: 57 try: 58 transitions[from_state].append(to_state) 59 except KeyError: 60 transitions[from_state] = [to_state] 61 62 return transitions
63 64
65 -def _calculate_to_transitions(trans_probs):
66 """Calculate which 'to transitions' are allowed for each state 67 68 This looks through all of the trans_probs, and uses this dictionary 69 to determine allowed transitions. It converts this information into 70 a dictionary, whose keys are destination states and whose values are 71 lists of source states from which the destination is reachable via a 72 transition. 73 """ 74 transitions = dict() 75 for from_state, to_state in trans_probs: 76 try: 77 transitions[to_state].append(from_state) 78 except KeyError: 79 transitions[to_state] = [from_state] 80 81 return transitions
82 83
84 -class MarkovModelBuilder(object):
85 """Interface to build up a Markov Model. 86 87 This class is designed to try to separate the task of specifying the 88 Markov Model from the actual model itself. This is in hopes of making 89 the actual Markov Model classes smaller. 90 91 So, this builder class should be used to create Markov models instead 92 of trying to initiate a Markov Model directly. 93 """ 94 # the default pseudo counts to use 95 DEFAULT_PSEUDO = 1 96
97 - def __init__(self, state_alphabet, emission_alphabet):
98 """Initialize a builder to create Markov Models. 99 100 Arguments: 101 102 o state_alphabet -- An alphabet containing all of the letters that 103 can appear in the states 104 105 o emission_alphabet -- An alphabet containing all of the letters for 106 states that can be emitted by the HMM. 107 """ 108 self._state_alphabet = state_alphabet 109 self._emission_alphabet = emission_alphabet 110 111 # probabilities for the initial state, initialized by calling 112 # set_initial_probabilities (required) 113 self.initial_prob = {} 114 115 # the probabilities for transitions and emissions 116 # by default we have no transitions and all possible emissions 117 self.transition_prob = {} 118 self.emission_prob = self._all_blank(state_alphabet, 119 emission_alphabet) 120 121 # the default pseudocounts for transition and emission counting 122 self.transition_pseudo = {} 123 self.emission_pseudo = self._all_pseudo(state_alphabet, 124 emission_alphabet)
125
126 - def _all_blank(self, first_alphabet, second_alphabet):
127 """Return a dictionary with all counts set to zero. 128 129 This uses the letters in the first and second alphabet to create 130 a dictionary with keys of two tuples organized as 131 (letter of first alphabet, letter of second alphabet). The values 132 are all set to 0. 133 """ 134 all_blank = {} 135 for first_state in first_alphabet.letters: 136 for second_state in second_alphabet.letters: 137 all_blank[(first_state, second_state)] = 0 138 139 return all_blank
140
141 - def _all_pseudo(self, first_alphabet, second_alphabet):
142 """Return a dictionary with all counts set to a default value. 143 144 This takes the letters in first alphabet and second alphabet and 145 creates a dictionary with keys of two tuples organized as: 146 (letter of first alphabet, letter of second alphabet). The values 147 are all set to the value of the class attribute DEFAULT_PSEUDO. 148 """ 149 all_counts = {} 150 for first_state in first_alphabet.letters: 151 for second_state in second_alphabet.letters: 152 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO 153 154 return all_counts
155
156 - def get_markov_model(self):
157 """Return the markov model corresponding with the current parameters. 158 159 Each markov model returned by a call to this function is unique 160 (ie. they don't influence each other). 161 """ 162 # user must set initial probabilities 163 if not self.initial_prob: 164 raise Exception("set_initial_probabilities must be called to " + 165 "fully initialize the Markov model") 166 167 initial_prob = copy.deepcopy(self.initial_prob) 168 transition_prob = copy.deepcopy(self.transition_prob) 169 emission_prob = copy.deepcopy(self.emission_prob) 170 transition_pseudo = copy.deepcopy(self.transition_pseudo) 171 emission_pseudo = copy.deepcopy(self.emission_pseudo) 172 173 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob, 174 transition_pseudo, emission_pseudo)
175
176 - def set_initial_probabilities(self, initial_prob):
177 """Set initial state probabilities. 178 179 initial_prob is a dictionary mapping states to probabilities. 180 Suppose, for example, that the state alphabet is ['A', 'B']. Call 181 set_initial_prob({'A': 1}) to guarantee that the initial 182 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5}) 183 to make each initial state equally probable. 184 185 This method must now be called in order to use the Markov model 186 because the calculation of initial probabilities has changed 187 incompatibly; the previous calculation was incorrect. 188 189 If initial probabilities are set for all states, then they should add up 190 to 1. Otherwise the sum should be <= 1. The residual probability is 191 divided up evenly between all the states for which the initial 192 probability has not been set. For example, calling 193 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5, 194 for the above example. 195 """ 196 self.initial_prob = copy.copy(initial_prob) 197 198 # ensure that all referenced states are valid 199 for state in initial_prob: 200 assert state in self._state_alphabet.letters, \ 201 "State %s was not found in the sequence alphabet" % state 202 203 # distribute the residual probability, if any 204 num_states_not_set =\ 205 len(self._state_alphabet.letters) - len(self.initial_prob) 206 if num_states_not_set < 0: 207 raise Exception("Initial probabilities can't exceed # of states") 208 prob_sum = sum(self.initial_prob.values()) 209 if prob_sum > 1.0: 210 raise Exception("Total initial probability cannot exceed 1.0") 211 if num_states_not_set > 0: 212 prob = (1.0 - prob_sum) / num_states_not_set 213 for state in self._state_alphabet.letters: 214 if state not in self.initial_prob: 215 self.initial_prob[state] = prob
216
217 - def set_equal_probabilities(self):
218 """Reset all probabilities to be an average value. 219 220 Resets the values of all initial probabilities and all allowed 221 transitions and all allowed emissions to be equal to 1 divided by the 222 number of possible elements. 223 224 This is useful if you just want to initialize a Markov Model to 225 starting values (ie. if you have no prior notions of what the 226 probabilities should be -- or if you are just feeling too lazy 227 to calculate them :-). 228 229 Warning 1 -- this will reset all currently set probabilities. 230 231 Warning 2 -- This just sets all probabilities for transitions and 232 emissions to total up to 1, so it doesn't ensure that the sum of 233 each set of transitions adds up to 1. 234 """ 235 # set initial state probabilities 236 new_initial_prob = float(1) / float(len(self.transition_prob)) 237 for state in self._state_alphabet.letters: 238 self.initial_prob[state] = new_initial_prob 239 240 # set the transitions 241 new_trans_prob = float(1) / float(len(self.transition_prob)) 242 for key in self.transition_prob: 243 self.transition_prob[key] = new_trans_prob 244 245 # set the emissions 246 new_emission_prob = float(1) / float(len(self.emission_prob)) 247 for key in self.emission_prob: 248 self.emission_prob[key] = new_emission_prob
249
251 """Set all initial state probabilities to a randomly generated distribution. 252 Returns the dictionary containing the initial probabilities. 253 """ 254 initial_freqs = _gen_random_array(len(self._state_alphabet.letters)) 255 for state in self._state_alphabet.letters: 256 self.initial_prob[state] = initial_freqs.pop() 257 258 return self.initial_prob
259
261 """Set all allowed transition probabilities to a randomly generated distribution. 262 263 Returns the dictionary containing the transition probabilities. 264 """ 265 if not self.transition_prob: 266 raise Exception("No transitions have been allowed yet. " + 267 "Allow some or all transitions by calling " + 268 "allow_transition or allow_all_transitions first.") 269 270 transitions_from = _calculate_from_transitions(self.transition_prob) 271 for from_state in transitions_from: 272 freqs = _gen_random_array(len(transitions_from[from_state])) 273 for to_state in transitions_from[from_state]: 274 self.transition_prob[(from_state, to_state)] = freqs.pop() 275 276 return self.transition_prob
277
279 """Set all allowed emission probabilities to a randomly generated distribution. 280 281 Returns the dictionary containing the emission probabilities. 282 """ 283 if not self.emission_prob: 284 raise Exception("No emissions have been allowed yet. " + 285 "Allow some or all emissions.") 286 287 emissions = _calculate_emissions(self.emission_prob) 288 for state in emissions: 289 freqs = _gen_random_array(len(emissions[state])) 290 for symbol in emissions[state]: 291 self.emission_prob[(state, symbol)] = freqs.pop() 292 293 return self.emission_prob
294
295 - def set_random_probabilities(self):
296 """Set all probabilities to randomly generated numbers. 297 298 Resets probabilities of all initial states, transitions, and 299 emissions to random values. 300 """ 301 self.set_random_initial_probabilities() 302 self.set_random_transition_probabilities() 303 self.set_random_emission_probabilities()
304 305 # --- functions to deal with the transitions in the sequence 306
307 - def allow_all_transitions(self):
308 """A convenience function to create transitions between all states. 309 310 By default all transitions within the alphabet are disallowed; this 311 is a way to change this to allow all possible transitions. 312 """ 313 # first get all probabilities and pseudo counts set 314 # to the default values 315 all_probs = self._all_blank(self._state_alphabet, 316 self._state_alphabet) 317 318 all_pseudo = self._all_pseudo(self._state_alphabet, 319 self._state_alphabet) 320 321 # now set any probabilities and pseudo counts that 322 # were previously set 323 for set_key in self.transition_prob: 324 all_probs[set_key] = self.transition_prob[set_key] 325 326 for set_key in self.transition_pseudo: 327 all_pseudo[set_key] = self.transition_pseudo[set_key] 328 329 # finally reinitialize the transition probs and pseudo counts 330 self.transition_prob = all_probs 331 self.transition_pseudo = all_pseudo
332
333 - def allow_transition(self, from_state, to_state, probability=None, 334 pseudocount=None):
335 """Set a transition as being possible between the two states. 336 337 probability and pseudocount are optional arguments 338 specifying the probabilities and pseudo counts for the transition. 339 If these are not supplied, then the values are set to the 340 default values. 341 342 Raises: 343 KeyError -- if the two states already have an allowed transition. 344 """ 345 # check the sanity of adding these states 346 for state in [from_state, to_state]: 347 assert state in self._state_alphabet.letters, \ 348 "State %s was not found in the sequence alphabet" % state 349 350 # ensure that the states are not already set 351 if (from_state, to_state) not in self.transition_prob and \ 352 (from_state, to_state) not in self.transition_pseudo: 353 # set the initial probability 354 if probability is None: 355 probability = 0 356 self.transition_prob[(from_state, to_state)] = probability 357 358 # set the initial pseudocounts 359 if pseudocount is None: 360 pseudcount = self.DEFAULT_PSEUDO 361 self.transition_pseudo[(from_state, to_state)] = pseudocount 362 else: 363 raise KeyError("Transition from %s to %s is already allowed." 364 % (from_state, to_state))
365
366 - def destroy_transition(self, from_state, to_state):
367 """Restrict transitions between the two states. 368 369 Raises: 370 KeyError if the transition is not currently allowed. 371 """ 372 try: 373 del self.transition_prob[(from_state, to_state)] 374 del self.transition_pseudo[(from_state, to_state)] 375 except KeyError: 376 raise KeyError("Transition from %s to %s is already disallowed." 377 % (from_state, to_state))
378
379 - def set_transition_score(self, from_state, to_state, probability):
380 """Set the probability of a transition between two states. 381 382 Raises: 383 KeyError if the transition is not allowed. 384 """ 385 if (from_state, to_state) in self.transition_prob: 386 self.transition_prob[(from_state, to_state)] = probability 387 else: 388 raise KeyError("Transition from %s to %s is not allowed." 389 % (from_state, to_state))
390
391 - def set_transition_pseudocount(self, from_state, to_state, count):
392 """Set the default pseudocount for a transition. 393 394 To avoid computational problems, it is helpful to be able to 395 set a 'default' pseudocount to start with for estimating 396 transition and emission probabilities (see p62 in Durbin et al 397 for more discussion on this. By default, all transitions have 398 a pseudocount of 1. 399 400 Raises: 401 KeyError if the transition is not allowed. 402 """ 403 if (from_state, to_state) in self.transition_pseudo: 404 self.transition_pseudo[(from_state, to_state)] = count 405 else: 406 raise KeyError("Transition from %s to %s is not allowed." 407 % (from_state, to_state))
408 409 # --- functions to deal with emissions from the sequence 410
411 - def set_emission_score(self, seq_state, emission_state, probability):
412 """Set the probability of a emission from a particular state. 413 414 Raises: 415 KeyError if the emission from the given state is not allowed. 416 """ 417 if (seq_state, emission_state) in self.emission_prob: 418 self.emission_prob[(seq_state, emission_state)] = probability 419 else: 420 raise KeyError("Emission of %s from %s is not allowed." 421 % (emission_state, seq_state))
422
423 - def set_emission_pseudocount(self, seq_state, emission_state, count):
424 """Set the default pseudocount for an emission. 425 426 To avoid computational problems, it is helpful to be able to 427 set a 'default' pseudocount to start with for estimating 428 transition and emission probabilities (see p62 in Durbin et al 429 for more discussion on this. By default, all emissions have 430 a pseudocount of 1. 431 432 Raises: 433 KeyError if the emission from the given state is not allowed. 434 """ 435 if (seq_state, emission_state) in self.emission_pseudo: 436 self.emission_pseudo[(seq_state, emission_state)] = count 437 else: 438 raise KeyError("Emission of %s from %s is not allowed." 439 % (emission_state, seq_state))
440 441
442 -class HiddenMarkovModel(object):
443 """Represent a hidden markov model that can be used for state estimation.""" 444
445 - def __init__(self, initial_prob, transition_prob, emission_prob, 446 transition_pseudo, emission_pseudo):
447 """Initialize a Markov Model. 448 449 Note: You should use the MarkovModelBuilder class instead of 450 initiating this class directly. 451 452 Arguments: 453 454 o initial_prob - A dictionary of initial probabilities for all states. 455 456 o transition_prob -- A dictionary of transition probabilities for all 457 possible transitions in the sequence. 458 459 o emission_prob -- A dictionary of emission probabilities for all 460 possible emissions from the sequence states. 461 462 o transition_pseudo -- Pseudo-counts to be used for the transitions, 463 when counting for purposes of estimating transition probabilities. 464 465 o emission_pseudo -- Pseudo-counts to be used for the emissions, 466 when counting for purposes of estimating emission probabilities. 467 """ 468 self.initial_prob = initial_prob 469 470 self._transition_pseudo = transition_pseudo 471 self._emission_pseudo = emission_pseudo 472 473 self.transition_prob = transition_prob 474 self.emission_prob = emission_prob 475 476 # a dictionary of the possible transitions from each state 477 # each key is a source state, mapped to a list of the destination states 478 # that are reachable from the source state via a transition 479 self._transitions_from = \ 480 _calculate_from_transitions(self.transition_prob) 481 482 # a dictionary of the possible transitions to each state 483 # each key is a destination state, mapped to a list of source states 484 # from which the destination is reachable via a transition 485 self._transitions_to = \ 486 _calculate_to_transitions(self.transition_prob)
487
488 - def get_blank_transitions(self):
489 """Get the default transitions for the model. 490 491 Returns a dictionary of all of the default transitions between any 492 two letters in the sequence alphabet. The dictionary is structured 493 with keys as (letter1, letter2) and values as the starting number 494 of transitions. 495 """ 496 return self._transition_pseudo
497
498 - def get_blank_emissions(self):
499 """Get the starting default emmissions for each sequence. 500 501 This returns a dictionary of the default emmissions for each 502 letter. The dictionary is structured with keys as 503 (seq_letter, emmission_letter) and values as the starting number 504 of emmissions. 505 """ 506 return self._emission_pseudo
507
508 - def transitions_from(self, state_letter):
509 """Get all destination states to which there are transitions from the 510 state_letter source state. 511 512 This returns all letters which the given state_letter can transition 513 to. An empty list is returned if state_letter has no outgoing 514 transitions. 515 """ 516 if state_letter in self._transitions_from: 517 return self._transitions_from[state_letter] 518 else: 519 return []
520
521 - def transitions_to(self, state_letter):
522 """Get all source states from which there are transitions to the 523 state_letter destination state. 524 525 This returns all letters which the given state_letter is reachable 526 from. An empty list is returned if state_letter is unreachable. 527 """ 528 if state_letter in self._transitions_to: 529 return self._transitions_to[state_letter] 530 else: 531 return []
532
533 - def viterbi(self, sequence, state_alphabet):
534 """Calculate the most probable state path using the Viterbi algorithm. 535 536 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et 537 al for a full explanation -- this is where I took my implementation 538 ideas from), to allow decoding of the state path, given a sequence 539 of emissions. 540 541 Arguments: 542 543 o sequence -- A Seq object with the emission sequence that we 544 want to decode. 545 546 o state_alphabet -- The alphabet of the possible state sequences 547 that can be generated. 548 """ 549 # calculate logarithms of the initial, transition, and emission probs 550 log_initial = self._log_transform(self.initial_prob) 551 log_trans = self._log_transform(self.transition_prob) 552 log_emission = self._log_transform(self.emission_prob) 553 554 viterbi_probs = {} 555 pred_state_seq = {} 556 state_letters = state_alphabet.letters 557 558 # --- recursion 559 # loop over the training squence (i = 1 .. L) 560 # NOTE: My index numbers are one less than what is given in Durbin 561 # et al, since we are indexing the sequence going from 0 to 562 # (Length - 1) not 1 to Length, like in Durbin et al. 563 for i in range(0, len(sequence)): 564 # loop over all of the possible i-th states in the state path 565 for cur_state in state_letters: 566 # e_{l}(x_{i}) 567 emission_part = log_emission[(cur_state, sequence[i])] 568 569 max_prob = 0 570 if i == 0: 571 # for the first state, use the initial probability rather 572 # than looking back to previous states 573 max_prob = log_initial[cur_state] 574 else: 575 # loop over all possible (i-1)-th previous states 576 possible_state_probs = {} 577 for prev_state in self.transitions_to(cur_state): 578 # a_{kl} 579 trans_part = log_trans[(prev_state, cur_state)] 580 581 # v_{k}(i - 1) 582 viterbi_part = viterbi_probs[(prev_state, i - 1)] 583 cur_prob = viterbi_part + trans_part 584 585 possible_state_probs[prev_state] = cur_prob 586 587 # calculate the viterbi probability using the max 588 max_prob = max(possible_state_probs.values()) 589 590 # v_{k}(i) 591 viterbi_probs[(cur_state, i)] = (emission_part + max_prob) 592 593 if i > 0: 594 # get the most likely prev_state leading to cur_state 595 for state in possible_state_probs: 596 if possible_state_probs[state] == max_prob: 597 pred_state_seq[(i - 1, cur_state)] = state 598 break 599 600 # --- termination 601 # calculate the probability of the state path 602 # loop over all states 603 all_probs = {} 604 for state in state_letters: 605 # v_{k}(L) 606 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)] 607 608 state_path_prob = max(all_probs.values()) 609 610 # find the last pointer we need to trace back from 611 last_state = '' 612 for state in all_probs: 613 if all_probs[state] == state_path_prob: 614 last_state = state 615 616 assert last_state != '', "Didn't find the last state to trace from!" 617 618 # --- traceback 619 traceback_seq = MutableSeq('', state_alphabet) 620 621 loop_seq = list(range(1, len(sequence))) 622 loop_seq.reverse() 623 624 # last_state is the last state in the most probable state sequence. 625 # Compute that sequence by walking backwards in time. From the i-th 626 # state in the sequence, find the (i-1)-th state as the most 627 # probable state preceding the i-th state. 628 state = last_state 629 traceback_seq.append(state) 630 for i in loop_seq: 631 state = pred_state_seq[(i - 1, state)] 632 traceback_seq.append(state) 633 634 # put the traceback sequence in the proper orientation 635 traceback_seq.reverse() 636 637 return traceback_seq.toseq(), state_path_prob
638
639 - def _log_transform(self, probability):
640 """Return log transform of the given probability dictionary. 641 642 When calculating the Viterbi equation, add logs of probabilities rather 643 than multiplying probabilities, to avoid underflow errors. This method 644 returns a new dictionary with the same keys as the given dictionary 645 and log-transformed values. 646 """ 647 log_prob = copy.copy(probability) 648 try: 649 neg_inf = float("-inf") 650 except ValueError: 651 # On Python 2.5 or older that was handled in C code, 652 # and failed on Windows XP 32bit 653 neg_inf = - 1E400 654 for key in log_prob: 655 prob = log_prob[key] 656 if prob > 0: 657 log_prob[key] = math.log(log_prob[key]) 658 else: 659 log_prob[key] = neg_inf 660 661 return log_prob
662