Package Bio :: Package HMM :: Module MarkovModel
[hide private]
[frames] | no frames]

Source Code for Module Bio.HMM.MarkovModel

  1  # This code is part of the Biopython distribution and governed by its 
  2  # license.  Please see the LICENSE file that should have been included 
  3  # as part of this package. 
  4  # 
  5   
  6  """Deal with representations of Markov Models. 
  7  """ 
  8  # standard modules 
  9  import copy 
 10  import math 
 11  import random 
 12   
 13  # TODO - Take advantage of defaultdict once Python 2.4 is dead? 
 14  # from collections import defaultdict 
 15   
 16  from Bio._py3k import range 
 17   
 18  from Bio.Seq import MutableSeq 
 19   
 20   
21 -def _gen_random_array(n):
22 """ Return an array of n random numbers, where the elements of the array sum 23 to 1.0""" 24 randArray = [random.random() for i in range(n)] 25 total = sum(randArray) 26 normalizedRandArray = [x/total for x in randArray] 27 28 return normalizedRandArray
29 30
31 -def _calculate_emissions(emission_probs):
32 """Calculate which symbols can be emitted in each state 33 """ 34 # loop over all of the state-symbol duples, mapping states to 35 # lists of emitted symbols 36 emissions = dict() 37 for state, symbol in emission_probs: 38 try: 39 emissions[state].append(symbol) 40 except KeyError: 41 emissions[state] = [symbol] 42 43 return emissions
44 45
46 -def _calculate_from_transitions(trans_probs):
47 """Calculate which 'from transitions' are allowed for each state 48 49 This looks through all of the trans_probs, and uses this dictionary 50 to determine allowed transitions. It converts this information into 51 a dictionary, whose keys are source states and whose values are 52 lists of destination states reachable from the source state via a 53 transition. 54 """ 55 transitions = dict() 56 for from_state, to_state in trans_probs: 57 try: 58 transitions[from_state].append(to_state) 59 except KeyError: 60 transitions[from_state] = [to_state] 61 62 return transitions
63 64
65 -def _calculate_to_transitions(trans_probs):
66 """Calculate which 'to transitions' are allowed for each state 67 68 This looks through all of the trans_probs, and uses this dictionary 69 to determine allowed transitions. It converts this information into 70 a dictionary, whose keys are destination states and whose values are 71 lists of source states from which the destination is reachable via a 72 transition. 73 """ 74 transitions = dict() 75 for from_state, to_state in trans_probs: 76 try: 77 transitions[to_state].append(from_state) 78 except KeyError: 79 transitions[to_state] = [from_state] 80 81 return transitions
82 83
84 -class MarkovModelBuilder(object):
85 """Interface to build up a Markov Model. 86 87 This class is designed to try to separate the task of specifying the 88 Markov Model from the actual model itself. This is in hopes of making 89 the actual Markov Model classes smaller. 90 91 So, this builder class should be used to create Markov models instead 92 of trying to initiate a Markov Model directly. 93 """ 94 # the default pseudo counts to use 95 DEFAULT_PSEUDO = 1 96
97 - def __init__(self, state_alphabet, emission_alphabet):
98 """Initialize a builder to create Markov Models. 99 100 Arguments: 101 102 o state_alphabet -- An alphabet containing all of the letters that 103 can appear in the states 104 105 o emission_alphabet -- An alphabet containing all of the letters for 106 states that can be emitted by the HMM. 107 """ 108 self._state_alphabet = state_alphabet 109 self._emission_alphabet = emission_alphabet 110 111 # probabilities for the initial state, initialized by calling 112 # set_initial_probabilities (required) 113 self.initial_prob = {} 114 115 # the probabilities for transitions and emissions 116 # by default we have no transitions and all possible emissions 117 self.transition_prob = {} 118 self.emission_prob = self._all_blank(state_alphabet, 119 emission_alphabet) 120 121 # the default pseudocounts for transition and emission counting 122 self.transition_pseudo = {} 123 self.emission_pseudo = self._all_pseudo(state_alphabet, 124 emission_alphabet)
125
126 - def _all_blank(self, first_alphabet, second_alphabet):
127 """Return a dictionary with all counts set to zero. 128 129 This uses the letters in the first and second alphabet to create 130 a dictionary with keys of two tuples organized as 131 (letter of first alphabet, letter of second alphabet). The values 132 are all set to 0. 133 """ 134 all_blank = {} 135 for first_state in first_alphabet.letters: 136 for second_state in second_alphabet.letters: 137 all_blank[(first_state, second_state)] = 0 138 139 return all_blank
140
141 - def _all_pseudo(self, first_alphabet, second_alphabet):
142 """Return a dictionary with all counts set to a default value. 143 144 This takes the letters in first alphabet and second alphabet and 145 creates a dictionary with keys of two tuples organized as: 146 (letter of first alphabet, letter of second alphabet). The values 147 are all set to the value of the class attribute DEFAULT_PSEUDO. 148 """ 149 all_counts = {} 150 for first_state in first_alphabet.letters: 151 for second_state in second_alphabet.letters: 152 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO 153 154 return all_counts
155
156 - def get_markov_model(self):
157 """Return the markov model corresponding with the current parameters. 158 159 Each markov model returned by a call to this function is unique 160 (ie. they don't influence each other). 161 """ 162 163 # user must set initial probabilities 164 if not self.initial_prob: 165 raise Exception("set_initial_probabilities must be called to " + 166 "fully initialize the Markov model") 167 168 initial_prob = copy.deepcopy(self.initial_prob) 169 transition_prob = copy.deepcopy(self.transition_prob) 170 emission_prob = copy.deepcopy(self.emission_prob) 171 transition_pseudo = copy.deepcopy(self.transition_pseudo) 172 emission_pseudo = copy.deepcopy(self.emission_pseudo) 173 174 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob, 175 transition_pseudo, emission_pseudo)
176
177 - def set_initial_probabilities(self, initial_prob):
178 """Set initial state probabilities. 179 180 initial_prob is a dictionary mapping states to probabilities. 181 Suppose, for example, that the state alphabet is ['A', 'B']. Call 182 set_initial_prob({'A': 1}) to guarantee that the initial 183 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5}) 184 to make each initial state equally probable. 185 186 This method must now be called in order to use the Markov model 187 because the calculation of initial probabilities has changed 188 incompatibly; the previous calculation was incorrect. 189 190 If initial probabilities are set for all states, then they should add up 191 to 1. Otherwise the sum should be <= 1. The residual probability is 192 divided up evenly between all the states for which the initial 193 probability has not been set. For example, calling 194 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5, 195 for the above example. 196 """ 197 self.initial_prob = copy.copy(initial_prob) 198 199 # ensure that all referenced states are valid 200 for state in initial_prob: 201 assert state in self._state_alphabet.letters, \ 202 "State %s was not found in the sequence alphabet" % state 203 204 # distribute the residual probability, if any 205 num_states_not_set =\ 206 len(self._state_alphabet.letters) - len(self.initial_prob) 207 if num_states_not_set < 0: 208 raise Exception("Initial probabilities can't exceed # of states") 209 prob_sum = sum(self.initial_prob.values()) 210 if prob_sum > 1.0: 211 raise Exception("Total initial probability cannot exceed 1.0") 212 if num_states_not_set > 0: 213 prob = (1.0 - prob_sum) / num_states_not_set 214 for state in self._state_alphabet.letters: 215 if state not in self.initial_prob: 216 self.initial_prob[state] = prob
217
218 - def set_equal_probabilities(self):
219 """Reset all probabilities to be an average value. 220 221 Resets the values of all initial probabilities and all allowed 222 transitions and all allowed emissions to be equal to 1 divided by the 223 number of possible elements. 224 225 This is useful if you just want to initialize a Markov Model to 226 starting values (ie. if you have no prior notions of what the 227 probabilities should be -- or if you are just feeling too lazy 228 to calculate them :-). 229 230 Warning 1 -- this will reset all currently set probabilities. 231 232 Warning 2 -- This just sets all probabilities for transitions and 233 emissions to total up to 1, so it doesn't ensure that the sum of 234 each set of transitions adds up to 1. 235 """ 236 237 # set initial state probabilities 238 new_initial_prob = float(1) / float(len(self.transition_prob)) 239 for state in self._state_alphabet.letters: 240 self.initial_prob[state] = new_initial_prob 241 242 # set the transitions 243 new_trans_prob = float(1) / float(len(self.transition_prob)) 244 for key in self.transition_prob: 245 self.transition_prob[key] = new_trans_prob 246 247 # set the emissions 248 new_emission_prob = float(1) / float(len(self.emission_prob)) 249 for key in self.emission_prob: 250 self.emission_prob[key] = new_emission_prob
251
253 """Set all initial state probabilities to a randomly generated distribution. 254 Returns the dictionary containing the initial probabilities. 255 """ 256 initial_freqs = _gen_random_array(len(self._state_alphabet.letters)) 257 for state in self._state_alphabet.letters: 258 self.initial_prob[state] = initial_freqs.pop() 259 260 return self.initial_prob
261
263 """Set all allowed transition probabilities to a randomly generated distribution. 264 Returns the dictionary containing the transition probabilities. 265 """ 266 267 if not self.transition_prob: 268 raise Exception("No transitions have been allowed yet. " + 269 "Allow some or all transitions by calling " + 270 "allow_transition or allow_all_transitions first.") 271 272 transitions_from = _calculate_from_transitions(self.transition_prob) 273 for from_state in transitions_from: 274 freqs = _gen_random_array(len(transitions_from[from_state])) 275 for to_state in transitions_from[from_state]: 276 self.transition_prob[(from_state, to_state)] = freqs.pop() 277 278 return self.transition_prob
279
281 """Set all allowed emission probabilities to a randomly generated 282 distribution. Returns the dictionary containing the emission 283 probabilities. 284 """ 285 286 if not self.emission_prob: 287 raise Exception("No emissions have been allowed yet. " + 288 "Allow some or all emissions.") 289 290 emissions = _calculate_emissions(self.emission_prob) 291 for state in emissions: 292 freqs = _gen_random_array(len(emissions[state])) 293 for symbol in emissions[state]: 294 self.emission_prob[(state, symbol)] = freqs.pop() 295 296 return self.emission_prob
297
298 - def set_random_probabilities(self):
299 """Set all probabilities to randomly generated numbers. 300 301 Resets probabilities of all initial states, transitions, and 302 emissions to random values. 303 """ 304 self.set_random_initial_probabilities() 305 self.set_random_transition_probabilities() 306 self.set_random_emission_probabilities()
307 308 # --- functions to deal with the transitions in the sequence 309
310 - def allow_all_transitions(self):
311 """A convenience function to create transitions between all states. 312 313 By default all transitions within the alphabet are disallowed; this 314 is a way to change this to allow all possible transitions. 315 """ 316 # first get all probabilities and pseudo counts set 317 # to the default values 318 all_probs = self._all_blank(self._state_alphabet, 319 self._state_alphabet) 320 321 all_pseudo = self._all_pseudo(self._state_alphabet, 322 self._state_alphabet) 323 324 # now set any probabilities and pseudo counts that 325 # were previously set 326 for set_key in self.transition_prob: 327 all_probs[set_key] = self.transition_prob[set_key] 328 329 for set_key in self.transition_pseudo: 330 all_pseudo[set_key] = self.transition_pseudo[set_key] 331 332 # finally reinitialize the transition probs and pseudo counts 333 self.transition_prob = all_probs 334 self.transition_pseudo = all_pseudo
335
336 - def allow_transition(self, from_state, to_state, probability=None, 337 pseudocount=None):
338 """Set a transition as being possible between the two states. 339 340 probability and pseudocount are optional arguments 341 specifying the probabilities and pseudo counts for the transition. 342 If these are not supplied, then the values are set to the 343 default values. 344 345 Raises: 346 KeyError -- if the two states already have an allowed transition. 347 """ 348 # check the sanity of adding these states 349 for state in [from_state, to_state]: 350 assert state in self._state_alphabet.letters, \ 351 "State %s was not found in the sequence alphabet" % state 352 353 # ensure that the states are not already set 354 if (from_state, to_state) not in self.transition_prob and \ 355 (from_state, to_state) not in self.transition_pseudo: 356 # set the initial probability 357 if probability is None: 358 probability = 0 359 self.transition_prob[(from_state, to_state)] = probability 360 361 # set the initial pseudocounts 362 if pseudocount is None: 363 pseudcount = self.DEFAULT_PSEUDO 364 self.transition_pseudo[(from_state, to_state)] = pseudocount 365 else: 366 raise KeyError("Transition from %s to %s is already allowed." 367 % (from_state, to_state))
368
369 - def destroy_transition(self, from_state, to_state):
370 """Restrict transitions between the two states. 371 372 Raises: 373 KeyError if the transition is not currently allowed. 374 """ 375 try: 376 del self.transition_prob[(from_state, to_state)] 377 del self.transition_pseudo[(from_state, to_state)] 378 except KeyError: 379 raise KeyError("Transition from %s to %s is already disallowed." 380 % (from_state, to_state))
381
382 - def set_transition_score(self, from_state, to_state, probability):
383 """Set the probability of a transition between two states. 384 385 Raises: 386 KeyError if the transition is not allowed. 387 """ 388 if (from_state, to_state) in self.transition_prob: 389 self.transition_prob[(from_state, to_state)] = probability 390 else: 391 raise KeyError("Transition from %s to %s is not allowed." 392 % (from_state, to_state))
393
394 - def set_transition_pseudocount(self, from_state, to_state, count):
395 """Set the default pseudocount for a transition. 396 397 To avoid computational problems, it is helpful to be able to 398 set a 'default' pseudocount to start with for estimating 399 transition and emission probabilities (see p62 in Durbin et al 400 for more discussion on this. By default, all transitions have 401 a pseudocount of 1. 402 403 Raises: 404 KeyError if the transition is not allowed. 405 """ 406 if (from_state, to_state) in self.transition_pseudo: 407 self.transition_pseudo[(from_state, to_state)] = count 408 else: 409 raise KeyError("Transition from %s to %s is not allowed." 410 % (from_state, to_state))
411 412 # --- functions to deal with emissions from the sequence 413
414 - def set_emission_score(self, seq_state, emission_state, probability):
415 """Set the probability of a emission from a particular state. 416 417 Raises: 418 KeyError if the emission from the given state is not allowed. 419 """ 420 if (seq_state, emission_state) in self.emission_prob: 421 self.emission_prob[(seq_state, emission_state)] = probability 422 else: 423 raise KeyError("Emission of %s from %s is not allowed." 424 % (emission_state, seq_state))
425
426 - def set_emission_pseudocount(self, seq_state, emission_state, count):
427 """Set the default pseudocount for an emission. 428 429 To avoid computational problems, it is helpful to be able to 430 set a 'default' pseudocount to start with for estimating 431 transition and emission probabilities (see p62 in Durbin et al 432 for more discussion on this. By default, all emissions have 433 a pseudocount of 1. 434 435 Raises: 436 KeyError if the emission from the given state is not allowed. 437 """ 438 if (seq_state, emission_state) in self.emission_pseudo: 439 self.emission_pseudo[(seq_state, emission_state)] = count 440 else: 441 raise KeyError("Emission of %s from %s is not allowed." 442 % (emission_state, seq_state))
443 444
445 -class HiddenMarkovModel(object):
446 """Represent a hidden markov model that can be used for state estimation. 447 """
448 - def __init__(self, initial_prob, transition_prob, emission_prob, 449 transition_pseudo, emission_pseudo):
450 """Initialize a Markov Model. 451 452 Note: You should use the MarkovModelBuilder class instead of 453 initiating this class directly. 454 455 Arguments: 456 457 o initial_prob - A dictionary of initial probabilities for all states. 458 459 o transition_prob -- A dictionary of transition probabilities for all 460 possible transitions in the sequence. 461 462 o emission_prob -- A dictionary of emission probabilities for all 463 possible emissions from the sequence states. 464 465 o transition_pseudo -- Pseudo-counts to be used for the transitions, 466 when counting for purposes of estimating transition probabilities. 467 468 o emission_pseudo -- Pseudo-counts to be used for the emissions, 469 when counting for purposes of estimating emission probabilities. 470 """ 471 472 self.initial_prob = initial_prob 473 474 self._transition_pseudo = transition_pseudo 475 self._emission_pseudo = emission_pseudo 476 477 self.transition_prob = transition_prob 478 self.emission_prob = emission_prob 479 480 # a dictionary of the possible transitions from each state 481 # each key is a source state, mapped to a list of the destination states 482 # that are reachable from the source state via a transition 483 self._transitions_from = \ 484 _calculate_from_transitions(self.transition_prob) 485 486 # a dictionary of the possible transitions to each state 487 # each key is a destination state, mapped to a list of source states 488 # from which the destination is reachable via a transition 489 self._transitions_to = \ 490 _calculate_to_transitions(self.transition_prob)
491
492 - def get_blank_transitions(self):
493 """Get the default transitions for the model. 494 495 Returns a dictionary of all of the default transitions between any 496 two letters in the sequence alphabet. The dictionary is structured 497 with keys as (letter1, letter2) and values as the starting number 498 of transitions. 499 """ 500 return self._transition_pseudo
501
502 - def get_blank_emissions(self):
503 """Get the starting default emmissions for each sequence. 504 505 This returns a dictionary of the default emmissions for each 506 letter. The dictionary is structured with keys as 507 (seq_letter, emmission_letter) and values as the starting number 508 of emmissions. 509 """ 510 return self._emission_pseudo
511
512 - def transitions_from(self, state_letter):
513 """Get all destination states to which there are transitions from the 514 state_letter source state. 515 516 This returns all letters which the given state_letter can transition 517 to. An empty list is returned if state_letter has no outgoing 518 transitions. 519 """ 520 if state_letter in self._transitions_from: 521 return self._transitions_from[state_letter] 522 else: 523 return []
524
525 - def transitions_to(self, state_letter):
526 """Get all source states from which there are transitions to the 527 state_letter destination state. 528 529 This returns all letters which the given state_letter is reachable 530 from. An empty list is returned if state_letter is unreachable. 531 """ 532 if state_letter in self._transitions_to: 533 return self._transitions_to[state_letter] 534 else: 535 return []
536
537 - def viterbi(self, sequence, state_alphabet):
538 """Calculate the most probable state path using the Viterbi algorithm. 539 540 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et 541 al for a full explanation -- this is where I took my implementation 542 ideas from), to allow decoding of the state path, given a sequence 543 of emissions. 544 545 Arguments: 546 547 o sequence -- A Seq object with the emission sequence that we 548 want to decode. 549 550 o state_alphabet -- The alphabet of the possible state sequences 551 that can be generated. 552 """ 553 554 # calculate logarithms of the initial, transition, and emission probs 555 log_initial = self._log_transform(self.initial_prob) 556 log_trans = self._log_transform(self.transition_prob) 557 log_emission = self._log_transform(self.emission_prob) 558 559 viterbi_probs = {} 560 pred_state_seq = {} 561 state_letters = state_alphabet.letters 562 563 # --- recursion 564 # loop over the training squence (i = 1 .. L) 565 # NOTE: My index numbers are one less than what is given in Durbin 566 # et al, since we are indexing the sequence going from 0 to 567 # (Length - 1) not 1 to Length, like in Durbin et al. 568 for i in range(0, len(sequence)): 569 # loop over all of the possible i-th states in the state path 570 for cur_state in state_letters: 571 # e_{l}(x_{i}) 572 emission_part = log_emission[(cur_state, sequence[i])] 573 574 max_prob = 0 575 if i == 0: 576 # for the first state, use the initial probability rather 577 # than looking back to previous states 578 max_prob = log_initial[cur_state] 579 else: 580 # loop over all possible (i-1)-th previous states 581 possible_state_probs = {} 582 for prev_state in self.transitions_to(cur_state): 583 # a_{kl} 584 trans_part = log_trans[(prev_state, cur_state)] 585 586 # v_{k}(i - 1) 587 viterbi_part = viterbi_probs[(prev_state, i - 1)] 588 cur_prob = viterbi_part + trans_part 589 590 possible_state_probs[prev_state] = cur_prob 591 592 # calculate the viterbi probability using the max 593 max_prob = max(possible_state_probs.values()) 594 595 # v_{k}(i) 596 viterbi_probs[(cur_state, i)] = (emission_part + max_prob) 597 598 if i > 0: 599 # get the most likely prev_state leading to cur_state 600 for state in possible_state_probs: 601 if possible_state_probs[state] == max_prob: 602 pred_state_seq[(i - 1, cur_state)] = state 603 break 604 605 # --- termination 606 # calculate the probability of the state path 607 # loop over all states 608 all_probs = {} 609 for state in state_letters: 610 # v_{k}(L) 611 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)] 612 613 state_path_prob = max(all_probs.values()) 614 615 # find the last pointer we need to trace back from 616 last_state = '' 617 for state in all_probs: 618 if all_probs[state] == state_path_prob: 619 last_state = state 620 621 assert last_state != '', "Didn't find the last state to trace from!" 622 623 # --- traceback 624 traceback_seq = MutableSeq('', state_alphabet) 625 626 loop_seq = list(range(1, len(sequence))) 627 loop_seq.reverse() 628 629 # last_state is the last state in the most probable state sequence. 630 # Compute that sequence by walking backwards in time. From the i-th 631 # state in the sequence, find the (i-1)-th state as the most 632 # probable state preceding the i-th state. 633 state = last_state 634 traceback_seq.append(state) 635 for i in loop_seq: 636 state = pred_state_seq[(i - 1, state)] 637 traceback_seq.append(state) 638 639 # put the traceback sequence in the proper orientation 640 traceback_seq.reverse() 641 642 return traceback_seq.toseq(), state_path_prob
643
644 - def _log_transform(self, probability):
645 """Return log transform of the given probability dictionary. 646 647 When calculating the Viterbi equation, add logs of probabilities rather 648 than multiplying probabilities, to avoid underflow errors. This method 649 returns a new dictionary with the same keys as the given dictionary 650 and log-transformed values. 651 """ 652 log_prob = copy.copy(probability) 653 try: 654 neg_inf = float("-inf") 655 except ValueError: 656 # On Python 2.5 or older that was handled in C code, 657 # and failed on Windows XP 32bit 658 neg_inf = - 1E400 659 for key in log_prob: 660 prob = log_prob[key] 661 if prob > 0: 662 log_prob[key] = math.log(log_prob[key]) 663 else: 664 log_prob[key] = neg_inf 665 666 return log_prob
667