Package Bio :: Package HMM :: Module MarkovModel
[hide private]
[frames] | no frames]

Source Code for Module Bio.HMM.MarkovModel

  1  """Deal with representations of Markov Models. 
  2  """ 
  3  # standard modules 
  4  import copy 
  5  import math 
  6  import random 
  7   
  8  #TODO - Take advantage of defaultdict once Python 2.4 is dead? 
  9  #from collections import defaultdict 
 10   
 11  # biopython 
 12  from Bio.Seq import MutableSeq 
 13   
 14   
15 -def _gen_random_array(n):
16 """ Return an array of n random numbers, where the elements of the array sum 17 to 1.0""" 18 randArray = [random.random() for i in range(n)] 19 total = sum(randArray) 20 normalizedRandArray = [x/total for x in randArray] 21 22 return normalizedRandArray
23 24
25 -def _calculate_emissions(emission_probs):
26 """Calculate which symbols can be emitted in each state 27 """ 28 # loop over all of the state-symbol duples, mapping states to 29 # lists of emitted symbols 30 emissions = dict() 31 for state, symbol in emission_probs: 32 try: 33 emissions[state].append(symbol) 34 except KeyError: 35 emissions[state] = [symbol] 36 37 return emissions
38 39
40 -def _calculate_from_transitions(trans_probs):
41 """Calculate which 'from transitions' are allowed for each state 42 43 This looks through all of the trans_probs, and uses this dictionary 44 to determine allowed transitions. It converts this information into 45 a dictionary, whose keys are source states and whose values are 46 lists of destination states reachable from the source state via a 47 transition. 48 """ 49 transitions = dict() 50 for from_state, to_state in trans_probs: 51 try: 52 transitions[from_state].append(to_state) 53 except KeyError: 54 transitions[from_state] = [to_state] 55 56 return transitions
57 58
59 -def _calculate_to_transitions(trans_probs):
60 """Calculate which 'to transitions' are allowed for each state 61 62 This looks through all of the trans_probs, and uses this dictionary 63 to determine allowed transitions. It converts this information into 64 a dictionary, whose keys are destination states and whose values are 65 lists of source states from which the destination is reachable via a 66 transition. 67 """ 68 transitions = dict() 69 for from_state, to_state in trans_probs: 70 try: 71 transitions[to_state].append(from_state) 72 except KeyError: 73 transitions[to_state] = [from_state] 74 75 return transitions
76 77
78 -class MarkovModelBuilder(object):
79 """Interface to build up a Markov Model. 80 81 This class is designed to try to separate the task of specifying the 82 Markov Model from the actual model itself. This is in hopes of making 83 the actual Markov Model classes smaller. 84 85 So, this builder class should be used to create Markov models instead 86 of trying to initiate a Markov Model directly. 87 """ 88 # the default pseudo counts to use 89 DEFAULT_PSEUDO = 1 90
91 - def __init__(self, state_alphabet, emission_alphabet):
92 """Initialize a builder to create Markov Models. 93 94 Arguments: 95 96 o state_alphabet -- An alphabet containing all of the letters that 97 can appear in the states 98 99 o emission_alphabet -- An alphabet containing all of the letters for 100 states that can be emitted by the HMM. 101 """ 102 self._state_alphabet = state_alphabet 103 self._emission_alphabet = emission_alphabet 104 105 # probabilities for the initial state, initialized by calling 106 # set_initial_probabilities (required) 107 self.initial_prob = {} 108 109 # the probabilities for transitions and emissions 110 # by default we have no transitions and all possible emissions 111 self.transition_prob = {} 112 self.emission_prob = self._all_blank(state_alphabet, 113 emission_alphabet) 114 115 # the default pseudocounts for transition and emission counting 116 self.transition_pseudo = {} 117 self.emission_pseudo = self._all_pseudo(state_alphabet, 118 emission_alphabet)
119
120 - def _all_blank(self, first_alphabet, second_alphabet):
121 """Return a dictionary with all counts set to zero. 122 123 This uses the letters in the first and second alphabet to create 124 a dictionary with keys of two tuples organized as 125 (letter of first alphabet, letter of second alphabet). The values 126 are all set to 0. 127 """ 128 all_blank = {} 129 for first_state in first_alphabet.letters: 130 for second_state in second_alphabet.letters: 131 all_blank[(first_state, second_state)] = 0 132 133 return all_blank
134
135 - def _all_pseudo(self, first_alphabet, second_alphabet):
136 """Return a dictionary with all counts set to a default value. 137 138 This takes the letters in first alphabet and second alphabet and 139 creates a dictionary with keys of two tuples organized as: 140 (letter of first alphabet, letter of second alphabet). The values 141 are all set to the value of the class attribute DEFAULT_PSEUDO. 142 """ 143 all_counts = {} 144 for first_state in first_alphabet.letters: 145 for second_state in second_alphabet.letters: 146 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO 147 148 return all_counts
149
150 - def get_markov_model(self):
151 """Return the markov model corresponding with the current parameters. 152 153 Each markov model returned by a call to this function is unique 154 (ie. they don't influence each other). 155 """ 156 157 # user must set initial probabilities 158 if not self.initial_prob: 159 raise Exception("set_initial_probabilities must be called to " + 160 "fully initialize the Markov model") 161 162 initial_prob = copy.deepcopy(self.initial_prob) 163 transition_prob = copy.deepcopy(self.transition_prob) 164 emission_prob = copy.deepcopy(self.emission_prob) 165 transition_pseudo = copy.deepcopy(self.transition_pseudo) 166 emission_pseudo = copy.deepcopy(self.emission_pseudo) 167 168 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob, 169 transition_pseudo, emission_pseudo)
170
171 - def set_initial_probabilities(self, initial_prob):
172 """Set initial state probabilities. 173 174 initial_prob is a dictionary mapping states to probabilities. 175 Suppose, for example, that the state alphabet is ['A', 'B']. Call 176 set_initial_prob({'A': 1}) to guarantee that the initial 177 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5}) 178 to make each initial state equally probable. 179 180 This method must now be called in order to use the Markov model 181 because the calculation of initial probabilities has changed 182 incompatibly; the previous calculation was incorrect. 183 184 If initial probabilities are set for all states, then they should add up 185 to 1. Otherwise the sum should be <= 1. The residual probability is 186 divided up evenly between all the states for which the initial 187 probability has not been set. For example, calling 188 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5, 189 for the above example. 190 """ 191 self.initial_prob = copy.copy(initial_prob) 192 193 # ensure that all referenced states are valid 194 for state in initial_prob.iterkeys(): 195 assert state in self._state_alphabet.letters, \ 196 "State %s was not found in the sequence alphabet" % state 197 198 # distribute the residual probability, if any 199 num_states_not_set =\ 200 len(self._state_alphabet.letters) - len(self.initial_prob) 201 if num_states_not_set < 0: 202 raise Exception("Initial probabilities can't exceed # of states") 203 prob_sum = sum(self.initial_prob.values()) 204 if prob_sum > 1.0: 205 raise Exception("Total initial probability cannot exceed 1.0") 206 if num_states_not_set > 0: 207 prob = (1.0 - prob_sum) / num_states_not_set 208 for state in self._state_alphabet.letters: 209 if not state in self.initial_prob: 210 self.initial_prob[state] = prob
211
212 - def set_equal_probabilities(self):
213 """Reset all probabilities to be an average value. 214 215 Resets the values of all initial probabilities and all allowed 216 transitions and all allowed emissions to be equal to 1 divided by the 217 number of possible elements. 218 219 This is useful if you just want to initialize a Markov Model to 220 starting values (ie. if you have no prior notions of what the 221 probabilities should be -- or if you are just feeling too lazy 222 to calculate them :-). 223 224 Warning 1 -- this will reset all currently set probabilities. 225 226 Warning 2 -- This just sets all probabilities for transitions and 227 emissions to total up to 1, so it doesn't ensure that the sum of 228 each set of transitions adds up to 1. 229 """ 230 231 # set initial state probabilities 232 new_initial_prob = float(1) / float(len(self.transition_prob)) 233 for state in self._state_alphabet.letters: 234 self.initial_prob[state] = new_initial_prob 235 236 # set the transitions 237 new_trans_prob = float(1) / float(len(self.transition_prob)) 238 for key in self.transition_prob: 239 self.transition_prob[key] = new_trans_prob 240 241 # set the emissions 242 new_emission_prob = float(1) / float(len(self.emission_prob)) 243 for key in self.emission_prob: 244 self.emission_prob[key] = new_emission_prob
245
247 """Set all initial state probabilities to a randomly generated distribution. 248 Returns the dictionary containing the initial probabilities. 249 """ 250 initial_freqs = _gen_random_array(len(self._state_alphabet.letters)) 251 for state in self._state_alphabet.letters: 252 self.initial_prob[state] = initial_freqs.pop() 253 254 return self.initial_prob
255
257 """Set all allowed transition probabilities to a randomly generated distribution. 258 Returns the dictionary containing the transition probabilities. 259 """ 260 261 if not self.transition_prob: 262 raise Exception("No transitions have been allowed yet. " + 263 "Allow some or all transitions by calling " + 264 "allow_transition or allow_all_transitions first.") 265 266 transitions_from = _calculate_from_transitions(self.transition_prob) 267 for from_state in transitions_from.keys(): 268 freqs = _gen_random_array(len(transitions_from[from_state])) 269 for to_state in transitions_from[from_state]: 270 self.transition_prob[(from_state, to_state)] = freqs.pop() 271 272 return self.transition_prob
273
275 """Set all allowed emission probabilities to a randomly generated 276 distribution. Returns the dictionary containing the emission 277 probabilities. 278 """ 279 280 if not self.emission_prob: 281 raise Exception("No emissions have been allowed yet. " + 282 "Allow some or all emissions.") 283 284 emissions = _calculate_emissions(self.emission_prob) 285 for state in emissions.iterkeys(): 286 freqs = _gen_random_array(len(emissions[state])) 287 for symbol in emissions[state]: 288 self.emission_prob[(state, symbol)] = freqs.pop() 289 290 return self.emission_prob
291
292 - def set_random_probabilities(self):
293 """Set all probabilities to randomly generated numbers. 294 295 Resets probabilities of all initial states, transitions, and 296 emissions to random values. 297 """ 298 self.set_random_initial_probabilities() 299 self.set_random_transition_probabilities() 300 self.set_random_emission_probabilities()
301 302 # --- functions to deal with the transitions in the sequence 303
304 - def allow_all_transitions(self):
305 """A convenience function to create transitions between all states. 306 307 By default all transitions within the alphabet are disallowed; this 308 is a way to change this to allow all possible transitions. 309 """ 310 # first get all probabilities and pseudo counts set 311 # to the default values 312 all_probs = self._all_blank(self._state_alphabet, 313 self._state_alphabet) 314 315 all_pseudo = self._all_pseudo(self._state_alphabet, 316 self._state_alphabet) 317 318 # now set any probabilities and pseudo counts that 319 # were previously set 320 for set_key in self.transition_prob: 321 all_probs[set_key] = self.transition_prob[set_key] 322 323 for set_key in self.transition_pseudo: 324 all_pseudo[set_key] = self.transition_pseudo[set_key] 325 326 # finally reinitialize the transition probs and pseudo counts 327 self.transition_prob = all_probs 328 self.transition_pseudo = all_pseudo
329
330 - def allow_transition(self, from_state, to_state, probability = None, 331 pseudocount = None):
332 """Set a transition as being possible between the two states. 333 334 probability and pseudocount are optional arguments 335 specifying the probabilities and pseudo counts for the transition. 336 If these are not supplied, then the values are set to the 337 default values. 338 339 Raises: 340 KeyError -- if the two states already have an allowed transition. 341 """ 342 # check the sanity of adding these states 343 for state in [from_state, to_state]: 344 assert state in self._state_alphabet.letters, \ 345 "State %s was not found in the sequence alphabet" % state 346 347 # ensure that the states are not already set 348 if (from_state, to_state) not in self.transition_prob and \ 349 (from_state, to_state) not in self.transition_pseudo: 350 # set the initial probability 351 if probability is None: 352 probability = 0 353 self.transition_prob[(from_state, to_state)] = probability 354 355 # set the initial pseudocounts 356 if pseudocount is None: 357 pseudcount = self.DEFAULT_PSEUDO 358 self.transition_pseudo[(from_state, to_state)] = pseudocount 359 else: 360 raise KeyError("Transition from %s to %s is already allowed." 361 % (from_state, to_state))
362
363 - def destroy_transition(self, from_state, to_state):
364 """Restrict transitions between the two states. 365 366 Raises: 367 KeyError if the transition is not currently allowed. 368 """ 369 try: 370 del self.transition_prob[(from_state, to_state)] 371 del self.transition_pseudo[(from_state, to_state)] 372 except KeyError: 373 raise KeyError("Transition from %s to %s is already disallowed." 374 % (from_state, to_state))
375
376 - def set_transition_score(self, from_state, to_state, probability):
377 """Set the probability of a transition between two states. 378 379 Raises: 380 KeyError if the transition is not allowed. 381 """ 382 if (from_state, to_state) in self.transition_prob: 383 self.transition_prob[(from_state, to_state)] = probability 384 else: 385 raise KeyError("Transition from %s to %s is not allowed." 386 % (from_state, to_state))
387
388 - def set_transition_pseudocount(self, from_state, to_state, count):
389 """Set the default pseudocount for a transition. 390 391 To avoid computational problems, it is helpful to be able to 392 set a 'default' pseudocount to start with for estimating 393 transition and emission probabilities (see p62 in Durbin et al 394 for more discussion on this. By default, all transitions have 395 a pseudocount of 1. 396 397 Raises: 398 KeyError if the transition is not allowed. 399 """ 400 if (from_state, to_state) in self.transition_pseudo: 401 self.transition_pseudo[(from_state, to_state)] = count 402 else: 403 raise KeyError("Transition from %s to %s is not allowed." 404 % (from_state, to_state))
405 406 # --- functions to deal with emissions from the sequence 407
408 - def set_emission_score(self, seq_state, emission_state, probability):
409 """Set the probability of a emission from a particular state. 410 411 Raises: 412 KeyError if the emission from the given state is not allowed. 413 """ 414 if (seq_state, emission_state) in self.emission_prob: 415 self.emission_prob[(seq_state, emission_state)] = probability 416 else: 417 raise KeyError("Emission of %s from %s is not allowed." 418 % (emission_state, seq_state))
419
420 - def set_emission_pseudocount(self, seq_state, emission_state, count):
421 """Set the default pseudocount for an emission. 422 423 To avoid computational problems, it is helpful to be able to 424 set a 'default' pseudocount to start with for estimating 425 transition and emission probabilities (see p62 in Durbin et al 426 for more discussion on this. By default, all emissions have 427 a pseudocount of 1. 428 429 Raises: 430 KeyError if the emission from the given state is not allowed. 431 """ 432 if (seq_state, emission_state) in self.emission_pseudo: 433 self.emission_pseudo[(seq_state, emission_state)] = count 434 else: 435 raise KeyError("Emission of %s from %s is not allowed." 436 % (emission_state, seq_state))
437 438
439 -class HiddenMarkovModel(object):
440 """Represent a hidden markov model that can be used for state estimation. 441 """
442 - def __init__(self, initial_prob, transition_prob, emission_prob, 443 transition_pseudo, emission_pseudo):
444 """Initialize a Markov Model. 445 446 Note: You should use the MarkovModelBuilder class instead of 447 initiating this class directly. 448 449 Arguments: 450 451 o initial_prob - A dictionary of initial probabilities for all states. 452 453 o transition_prob -- A dictionary of transition probabilities for all 454 possible transitions in the sequence. 455 456 o emission_prob -- A dictionary of emission probabilities for all 457 possible emissions from the sequence states. 458 459 o transition_pseudo -- Pseudo-counts to be used for the transitions, 460 when counting for purposes of estimating transition probabilities. 461 462 o emission_pseudo -- Pseudo-counts to be used for the emissions, 463 when counting for purposes of estimating emission probabilities. 464 """ 465 466 self.initial_prob = initial_prob 467 468 self._transition_pseudo = transition_pseudo 469 self._emission_pseudo = emission_pseudo 470 471 self.transition_prob = transition_prob 472 self.emission_prob = emission_prob 473 474 # a dictionary of the possible transitions from each state 475 # each key is a source state, mapped to a list of the destination states 476 # that are reachable from the source state via a transition 477 self._transitions_from = \ 478 _calculate_from_transitions(self.transition_prob) 479 480 # a dictionary of the possible transitions to each state 481 # each key is a destination state, mapped to a list of source states 482 # from which the destination is reachable via a transition 483 self._transitions_to = \ 484 _calculate_to_transitions(self.transition_prob)
485
486 - def get_blank_transitions(self):
487 """Get the default transitions for the model. 488 489 Returns a dictionary of all of the default transitions between any 490 two letters in the sequence alphabet. The dictionary is structured 491 with keys as (letter1, letter2) and values as the starting number 492 of transitions. 493 """ 494 return self._transition_pseudo
495
496 - def get_blank_emissions(self):
497 """Get the starting default emmissions for each sequence. 498 499 This returns a dictionary of the default emmissions for each 500 letter. The dictionary is structured with keys as 501 (seq_letter, emmission_letter) and values as the starting number 502 of emmissions. 503 """ 504 return self._emission_pseudo
505
506 - def transitions_from(self, state_letter):
507 """Get all destination states to which there are transitions from the 508 state_letter source state. 509 510 This returns all letters which the given state_letter can transition 511 to. An empty list is returned if state_letter has no outgoing 512 transitions. 513 """ 514 if state_letter in self._transitions_from: 515 return self._transitions_from[state_letter] 516 else: 517 return []
518
519 - def transitions_to(self, state_letter):
520 """Get all source states from which there are transitions to the 521 state_letter destination state. 522 523 This returns all letters which the given state_letter is reachable 524 from. An empty list is returned if state_letter is unreachable. 525 """ 526 if state_letter in self._transitions_to: 527 return self._transitions_to[state_letter] 528 else: 529 return []
530
531 - def viterbi(self, sequence, state_alphabet):
532 """Calculate the most probable state path using the Viterbi algorithm. 533 534 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et 535 al for a full explanation -- this is where I took my implementation 536 ideas from), to allow decoding of the state path, given a sequence 537 of emissions. 538 539 Arguments: 540 541 o sequence -- A Seq object with the emission sequence that we 542 want to decode. 543 544 o state_alphabet -- The alphabet of the possible state sequences 545 that can be generated. 546 """ 547 548 # calculate logarithms of the initial, transition, and emission probs 549 log_initial = self._log_transform(self.initial_prob) 550 log_trans = self._log_transform(self.transition_prob) 551 log_emission = self._log_transform(self.emission_prob) 552 553 viterbi_probs = {} 554 pred_state_seq = {} 555 state_letters = state_alphabet.letters 556 557 # --- recursion 558 # loop over the training squence (i = 1 .. L) 559 # NOTE: My index numbers are one less than what is given in Durbin 560 # et al, since we are indexing the sequence going from 0 to 561 # (Length - 1) not 1 to Length, like in Durbin et al. 562 for i in range(0, len(sequence)): 563 # loop over all of the possible i-th states in the state path 564 for cur_state in state_letters: 565 # e_{l}(x_{i}) 566 emission_part = log_emission[(cur_state, sequence[i])] 567 568 max_prob = 0 569 if i == 0: 570 # for the first state, use the initial probability rather 571 # than looking back to previous states 572 max_prob = log_initial[cur_state] 573 else: 574 # loop over all possible (i-1)-th previous states 575 possible_state_probs = {} 576 for prev_state in self.transitions_to(cur_state): 577 # a_{kl} 578 trans_part = log_trans[(prev_state, cur_state)] 579 580 # v_{k}(i - 1) 581 viterbi_part = viterbi_probs[(prev_state, i - 1)] 582 cur_prob = viterbi_part + trans_part 583 584 possible_state_probs[prev_state] = cur_prob 585 586 # calculate the viterbi probability using the max 587 max_prob = max(possible_state_probs.values()) 588 589 # v_{k}(i) 590 viterbi_probs[(cur_state, i)] = (emission_part + max_prob) 591 592 if i > 0: 593 # get the most likely prev_state leading to cur_state 594 for state in possible_state_probs: 595 if possible_state_probs[state] == max_prob: 596 pred_state_seq[(i - 1, cur_state)] = state 597 break 598 599 # --- termination 600 # calculate the probability of the state path 601 # loop over all states 602 all_probs = {} 603 for state in state_letters: 604 # v_{k}(L) 605 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)] 606 607 state_path_prob = max(all_probs.values()) 608 609 # find the last pointer we need to trace back from 610 last_state = '' 611 for state in all_probs: 612 if all_probs[state] == state_path_prob: 613 last_state = state 614 615 assert last_state != '', "Didn't find the last state to trace from!" 616 617 # --- traceback 618 traceback_seq = MutableSeq('', state_alphabet) 619 620 loop_seq = range(1, len(sequence)) 621 loop_seq.reverse() 622 623 # last_state is the last state in the most probable state sequence. 624 # Compute that sequence by walking backwards in time. From the i-th 625 # state in the sequence, find the (i-1)-th state as the most 626 # probable state preceding the i-th state. 627 state = last_state 628 traceback_seq.append(state) 629 for i in loop_seq: 630 state = pred_state_seq[(i - 1, state)] 631 traceback_seq.append(state) 632 633 # put the traceback sequence in the proper orientation 634 traceback_seq.reverse() 635 636 return traceback_seq.toseq(), state_path_prob
637
638 - def _log_transform(self, probability):
639 """Return log transform of the given probability dictionary. 640 641 When calculating the Viterbi equation, add logs of probabilities rather 642 than multiplying probabilities, to avoid underflow errors. This method 643 returns a new dictionary with the same keys as the given dictionary 644 and log-transformed values. 645 """ 646 log_prob = copy.copy(probability) 647 try: 648 neg_inf = float("-inf") 649 except ValueError: 650 #On Python 2.5 or older that was handled in C code, 651 #and failed on Windows XP 32bit 652 neg_inf = - 1E400 653 for key in log_prob: 654 prob = log_prob[key] 655 if prob > 0: 656 log_prob[key] = math.log(log_prob[key]) 657 else: 658 log_prob[key] = neg_inf 659 660 return log_prob
661