Package Bio :: Package HMM :: Module Trainer
[hide private]
[frames] | no frames]

Source Code for Module Bio.HMM.Trainer

  1  # This code is part of the Biopython distribution and governed by its 
  2  # license.  Please see the LICENSE file that should have been included 
  3  # as part of this package. 
  4  # 
  5   
  6  """Provide trainers which estimate parameters based on training sequences. 
  7   
  8  These should be used to 'train' a Markov Model prior to actually using 
  9  it to decode state paths. When supplied training sequences and a model 
 10  to work from, these classes will estimate parameters of the model. 
 11   
 12  This aims to estimate two parameters: 
 13   
 14  - a_{kl} -- the number of times there is a transition from k to l in the 
 15    training data. 
 16  - e_{k}(b) -- the number of emissions of the state b from the letter k 
 17    in the training data. 
 18   
 19  """ 
 20  # standard modules 
 21  import math 
 22   
 23  # local stuff 
 24  from .DynamicProgramming import ScaledDPAlgorithms 
 25   
 26   
27 -class TrainingSequence(object):
28 """Hold a training sequence with emissions and optionally, a state path.""" 29
30 - def __init__(self, emissions, state_path):
31 """Initialize a training sequence. 32 33 Arguments: 34 - emissions - A Seq object containing the sequence of emissions in 35 the training sequence, and the alphabet of the sequence. 36 - state_path - A Seq object containing the sequence of states and 37 the alphabet of the states. If there is no known state path, then 38 the sequence of states should be an empty string. 39 40 """ 41 if len(state_path) > 0: 42 assert len(emissions) == len(state_path), \ 43 "State path does not match associated emissions." 44 self.emissions = emissions 45 self.states = state_path
46 47
48 -class AbstractTrainer(object):
49 """Provide generic functionality needed in all trainers.""" 50
51 - def __init__(self, markov_model):
52 """Initialize.""" 53 self._markov_model = markov_model
54
55 - def log_likelihood(self, probabilities):
56 """Calculate the log likelihood of the training seqs. 57 58 Arguments: 59 - probabilities -- A list of the probabilities of each training 60 sequence under the current parameters, calculated using the 61 forward algorithm. 62 63 """ 64 total_likelihood = 0 65 for probability in probabilities: 66 total_likelihood += math.log(probability) 67 68 return total_likelihood
69
70 - def estimate_params(self, transition_counts, emission_counts):
71 """Get a maximum likelihood estimation of transition and emmission. 72 73 Arguments: 74 - transition_counts -- A dictionary with the total number of counts 75 of transitions between two states. 76 - emissions_counts -- A dictionary with the total number of counts 77 of emmissions of a particular emission letter by a state letter. 78 79 This then returns the maximum likelihood estimators for the 80 transitions and emissions, estimated by formulas 3.18 in 81 Durbin et al:: 82 83 a_{kl} = A_{kl} / sum(A_{kl'}) 84 e_{k}(b) = E_{k}(b) / sum(E_{k}(b')) 85 86 Returns: 87 Transition and emission dictionaries containing the maximum 88 likelihood estimators. 89 90 """ 91 # now calculate the information 92 ml_transitions = self.ml_estimator(transition_counts) 93 ml_emissions = self.ml_estimator(emission_counts) 94 95 return ml_transitions, ml_emissions
96
97 - def ml_estimator(self, counts):
98 """Calculate the maximum likelihood estimator. 99 100 This can calculate maximum likelihoods for both transitions 101 and emissions. 102 103 Arguments: 104 - counts -- A dictionary of the counts for each item. 105 106 See estimate_params for a description of the formula used for 107 calculation. 108 """ 109 # get an ordered list of all items 110 all_ordered = sorted(counts) 111 112 ml_estimation = {} 113 114 # the total counts for the current letter we are on 115 cur_letter = None 116 cur_letter_counts = 0 117 118 for cur_item in all_ordered: 119 # if we are on a new letter (ie. the first letter of the tuple) 120 if cur_item[0] != cur_letter: 121 # set the new letter we are working with 122 cur_letter = cur_item[0] 123 124 # count up the total counts for this letter 125 cur_letter_counts = counts[cur_item] 126 127 # add counts for all other items with the same first letter 128 cur_position = all_ordered.index(cur_item) + 1 129 130 # keep adding while we have the same first letter or until 131 # we get to the end of the ordered list 132 while (cur_position < len(all_ordered) and 133 all_ordered[cur_position][0] == cur_item[0]): 134 cur_letter_counts += counts[all_ordered[cur_position]] 135 cur_position += 1 136 # otherwise we've already got the total counts for this letter 137 else: 138 pass 139 140 # now calculate the ml and add it to the estimation 141 cur_ml = float(counts[cur_item]) / float(cur_letter_counts) 142 ml_estimation[cur_item] = cur_ml 143 144 return ml_estimation
145 146
147 -class BaumWelchTrainer(AbstractTrainer):
148 """Trainer that uses the Baum-Welch algorithm to estimate parameters. 149 150 These should be used when a training sequence for an HMM has unknown 151 paths for the actual states, and you need to make an estimation of the 152 model parameters from the observed emissions. 153 154 This uses the Baum-Welch algorithm, first described in 155 Baum, L.E. 1972. Inequalities. 3:1-8 156 This is based on the description in 'Biological Sequence Analysis' by 157 Durbin et al. in section 3.3 158 159 This algorithm is guaranteed to converge to a local maximum, but not 160 necessarily to the global maxima, so use with care! 161 """ 162
163 - def __init__(self, markov_model):
164 """Initialize the trainer. 165 166 Arguments: 167 - markov_model - The model we are going to estimate parameters for. 168 This should have the parameters with some initial estimates, that 169 we can build from. 170 171 """ 172 AbstractTrainer.__init__(self, markov_model)
173
174 - def train(self, training_seqs, stopping_criteria, 175 dp_method=ScaledDPAlgorithms):
176 """Estimate the parameters using training sequences. 177 178 The algorithm for this is taken from Durbin et al. p64, so this 179 is a good place to go for a reference on what is going on. 180 181 Arguments: 182 - training_seqs -- A list of TrainingSequence objects to be used 183 for estimating the parameters. 184 - stopping_criteria -- A function, that when passed the change 185 in log likelihood and threshold, will indicate if we should stop 186 the estimation iterations. 187 - dp_method -- A class instance specifying the dynamic programming 188 implementation we should use to calculate the forward and 189 backward variables. By default, we use the scaling method. 190 191 """ 192 prev_log_likelihood = None 193 num_iterations = 1 194 195 while True: 196 transition_count = self._markov_model.get_blank_transitions() 197 emission_count = self._markov_model.get_blank_emissions() 198 199 # remember all of the sequence probabilities 200 all_probabilities = [] 201 202 for training_seq in training_seqs: 203 # calculate the forward and backward variables 204 DP = dp_method(self._markov_model, training_seq) 205 forward_var, seq_prob = DP.forward_algorithm() 206 backward_var = DP.backward_algorithm() 207 208 all_probabilities.append(seq_prob) 209 210 # update the counts for transitions and emissions 211 transition_count = self.update_transitions(transition_count, 212 training_seq, 213 forward_var, 214 backward_var, 215 seq_prob) 216 emission_count = self.update_emissions(emission_count, 217 training_seq, 218 forward_var, 219 backward_var, 220 seq_prob) 221 222 # update the markov model with the new probabilities 223 ml_transitions, ml_emissions = \ 224 self.estimate_params(transition_count, emission_count) 225 self._markov_model.transition_prob = ml_transitions 226 self._markov_model.emission_prob = ml_emissions 227 228 cur_log_likelihood = self.log_likelihood(all_probabilities) 229 230 # if we have previously calculated the log likelihood (ie. 231 # not the first round), see if we can finish 232 if prev_log_likelihood is not None: 233 # XXX log likelihoods are negatives -- am I calculating 234 # the change properly, or should I use the negatives... 235 # I'm not sure at all if this is right. 236 log_likelihood_change = abs(abs(cur_log_likelihood) - 237 abs(prev_log_likelihood)) 238 239 # check whether we have completed enough iterations to have 240 # a good estimation 241 if stopping_criteria(log_likelihood_change, num_iterations): 242 break 243 244 # set up for another round of iterations 245 prev_log_likelihood = cur_log_likelihood 246 num_iterations += 1 247 248 return self._markov_model
249
250 - def update_transitions(self, transition_counts, training_seq, 251 forward_vars, backward_vars, training_seq_prob):
252 """Add the contribution of a new training sequence to the transitions. 253 254 Arguments: 255 - transition_counts -- A dictionary of the current counts for the 256 transitions 257 - training_seq -- The training sequence we are working with 258 - forward_vars -- Probabilities calculated using the forward 259 algorithm. 260 - backward_vars -- Probabilities calculated using the backwards 261 algorithm. 262 - training_seq_prob - The probability of the current sequence. 263 264 This calculates A_{kl} (the estimated transition counts from state 265 k to state l) using formula 3.20 in Durbin et al. 266 """ 267 # set up the transition and emission probabilities we are using 268 transitions = self._markov_model.transition_prob 269 emissions = self._markov_model.emission_prob 270 271 # loop over the possible combinations of state path letters 272 for k in training_seq.states.alphabet.letters: 273 for l in self._markov_model.transitions_from(k): 274 estimated_counts = 0 275 # now loop over the entire training sequence 276 for i in range(len(training_seq.emissions) - 1): 277 # the forward value of k at the current position 278 forward_value = forward_vars[(k, i)] 279 280 # the backward value of l in the next position 281 backward_value = backward_vars[(l, i + 1)] 282 283 # the probability of a transition from k to l 284 trans_value = transitions[(k, l)] 285 286 # the probability of getting the emission at the next pos 287 emm_value = emissions[(l, training_seq.emissions[i + 1])] 288 289 estimated_counts += (forward_value * trans_value * 290 emm_value * backward_value) 291 292 # update the transition approximation 293 transition_counts[(k, l)] += (float(estimated_counts) / 294 training_seq_prob) 295 296 return transition_counts
297
298 - def update_emissions(self, emission_counts, training_seq, 299 forward_vars, backward_vars, training_seq_prob):
300 """Add the contribution of a new training sequence to the emissions. 301 302 Arguments: 303 - emission_counts -- A dictionary of the current counts for the 304 emissions 305 - training_seq -- The training sequence we are working with 306 - forward_vars -- Probabilities calculated using the forward 307 algorithm. 308 - backward_vars -- Probabilities calculated using the backwards 309 algorithm. 310 - training_seq_prob - The probability of the current sequence. 311 312 This calculates E_{k}(b) (the estimated emission probability for 313 emission letter b from state k) using formula 3.21 in Durbin et al. 314 """ 315 # loop over the possible combinations of state path letters 316 for k in training_seq.states.alphabet.letters: 317 # now loop over all of the possible emissions 318 for b in training_seq.emissions.alphabet.letters: 319 expected_times = 0 320 # finally loop over the entire training sequence 321 for i in range(len(training_seq.emissions)): 322 # only count the forward and backward probability if the 323 # emission at the position is the same as b 324 if training_seq.emissions[i] == b: 325 # f_{k}(i) b_{k}(i) 326 expected_times += (forward_vars[(k, i)] * 327 backward_vars[(k, i)]) 328 329 # add to E_{k}(b) 330 emission_counts[(k, b)] += (float(expected_times) / 331 training_seq_prob) 332 333 return emission_counts
334 335
336 -class KnownStateTrainer(AbstractTrainer):
337 """Estimate probabilities with known state sequences. 338 339 This should be used for direct estimation of emission and transition 340 probabilities when both the state path and emission sequence are 341 known for the training examples. 342 """ 343
344 - def __init__(self, markov_model):
345 """Initialize.""" 346 AbstractTrainer.__init__(self, markov_model)
347
348 - def train(self, training_seqs):
349 """Estimate the Markov Model parameters with known state paths. 350 351 This trainer requires that both the state and the emissions are 352 known for all of the training sequences in the list of 353 TrainingSequence objects. 354 This training will then count all of the transitions and emissions, 355 and use this to estimate the parameters of the model. 356 """ 357 # count up all of the transitions and emissions 358 transition_counts = self._markov_model.get_blank_transitions() 359 emission_counts = self._markov_model.get_blank_emissions() 360 361 for training_seq in training_seqs: 362 emission_counts = self._count_emissions(training_seq, 363 emission_counts) 364 transition_counts = self._count_transitions(training_seq.states, 365 transition_counts) 366 367 # update the markov model from the counts 368 ml_transitions, ml_emissions = \ 369 self.estimate_params(transition_counts, 370 emission_counts) 371 self._markov_model.transition_prob = ml_transitions 372 self._markov_model.emission_prob = ml_emissions 373 374 return self._markov_model
375
376 - def _count_emissions(self, training_seq, emission_counts):
377 """Add emissions from the training sequence to the current counts. 378 379 Arguments: 380 - training_seq -- A TrainingSequence with states and emissions 381 to get the counts from 382 - emission_counts -- The current emission counts to add to. 383 384 """ 385 for index in range(len(training_seq.emissions)): 386 cur_state = training_seq.states[index] 387 cur_emission = training_seq.emissions[index] 388 389 try: 390 emission_counts[(cur_state, cur_emission)] += 1 391 except KeyError: 392 raise KeyError("Unexpected emission (%s, %s)" 393 % (cur_state, cur_emission)) 394 return emission_counts
395
396 - def _count_transitions(self, state_seq, transition_counts):
397 """Add transitions from the training sequence to the current counts. 398 399 Arguments: 400 - state_seq -- A Seq object with the states of the current training 401 sequence. 402 - transition_counts -- The current transition counts to add to. 403 404 """ 405 for cur_pos in range(len(state_seq) - 1): 406 cur_state = state_seq[cur_pos] 407 next_state = state_seq[cur_pos + 1] 408 409 try: 410 transition_counts[(cur_state, next_state)] += 1 411 except KeyError: 412 raise KeyError("Unexpected transition (%s, %s)" % 413 (cur_state, next_state)) 414 415 return transition_counts
416