Package Bio :: Package PopGen :: Package FDist :: Module Async
[hide private]
[frames] | no frames]

Source Code for Module Bio.PopGen.FDist.Async

  1  # Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Asynchronous execution of Fdist and spliting of loads. 
  7   
  8  FDistAsync Allows for the execution of FDist. 
  9   
 10  SplitFDist splits a single Fdist execution in several, taking advantage 
 11  of multi-core architectures. 
 12  """ 
 13   
 14  from __future__ import print_function 
 15   
 16  import os 
 17  import shutil 
 18  import threading 
 19  from time import sleep 
 20  from Bio.PopGen.Async import Local 
 21  from Bio.PopGen.FDist.Controller import FDistController 
 22   
 23   
24 -class FDistAsync(FDistController):
25 """Asynchronous FDist execution. 26 """ 27
28 - def __init__(self, fdist_dir="", ext=None):
29 """Constructor. 30 31 Parameters: 32 fdist_dir - Where fdist can be found, if = "", then it 33 should be on the path. 34 ext - Extension of binary names (e.g. nothing on Unix, 35 ".exe" on Windows 36 """ 37 FDistController.__init__(self, fdist_dir, ext)
38
39 - def run_job(self, parameters, input_files):
40 """Runs FDist asynchronously. 41 42 Gets typical Fdist parameters from a dictionary and 43 makes a "normal" call. This is run, normally, inside 44 a separate thread. 45 """ 46 npops = parameters['npops'] 47 nsamples = parameters['nsamples'] 48 fst = parameters['fst'] 49 sample_size = parameters['sample_size'] 50 mut = parameters.get('mut', 0) 51 num_sims = parameters.get('num_sims', 20000) 52 data_dir = parameters.get('data_dir', '.') 53 is_dominant = parameters.get('is_dominant', False) 54 theta = parameters.get('theta', 0.06) 55 beta = parameters.get('beta', (0.25, 0.25)) 56 max_freq = parameters.get('max_freq', 0.99) 57 fst = self.run_fdist(npops, nsamples, fst, sample_size, 58 mut, num_sims, data_dir, 59 is_dominant, theta, beta, 60 max_freq) 61 output_files = {} 62 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r') 63 return fst, output_files
64 65
66 -class SplitFDist(object):
67 """Splits a FDist run. 68 69 The idea is to split a certain number of simulations in smaller 70 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This 71 allows to run simulations in parallel, thus taking advantage 72 of multi-core CPUs. 73 74 Each SplitFDist object can only be used to run a single FDist 75 simulation. 76 """
77 - def __init__(self, report_fun=None, 78 num_thr=2, split_size=1000, fdist_dir='', ext=None):
79 """Constructor. 80 81 Parameters: 82 report_fun - Function that is called when a single packet is 83 run, it should have a single parameter: Fst. 84 num_thr - Number of desired threads, typically the number 85 of cores. 86 split_size - Size that a full simulation will be split in. 87 ext - Binary extension name (e.g. nothing on Unix, '.exe' on 88 Windows). 89 """ 90 self.async = Local.Local(num_thr) 91 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext) 92 self.report_fun = report_fun 93 self.split_size = split_size
94 95 #There might be races when reporting...
96 - def monitor(self):
97 """Monitors and reports (using report_fun) execution. 98 99 Every time a partial simulation ends, calls report_fun. 100 IMPORTANT: monitor calls can be concurrent with other 101 events, ie, a tasks might end while report_fun is being 102 called. This means that report_fun should be consider that 103 other events might be happening while it is running (it 104 can call acquire/release if necessary). 105 """ 106 while(True): 107 sleep(1) 108 self.async.access_ds.acquire() 109 keys = list(self.async.done.keys()) #copy it 110 self.async.access_ds.release() 111 for done in keys: 112 self.async.access_ds.acquire() 113 fst, files = self.async.done[done] 114 del self.async.done[done] 115 out_dat = files['out.dat'] 116 with open(self.data_dir + os.sep + 'out.dat', 'a') as f: 117 f.writelines(out_dat.readlines()) 118 out_dat.close() 119 self.async.access_ds.release() 120 for file in os.listdir(self.parts[done]): 121 os.remove(self.parts[done] + os.sep + file) 122 os.rmdir(self.parts[done]) 123 if self.report_fun: 124 self.report_fun(fst) 125 self.async.access_ds.acquire() 126 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \ 127 and len(self.async.done) == 0: 128 break 129 self.async.access_ds.release()
130
131 - def acquire(self):
132 """Allows the external acquisition of the lock. 133 """ 134 self.async.access_ds.acquire()
135
136 - def release(self):
137 """Allows the external release of the lock. 138 """ 139 self.async.access_ds.release()
140 141 #You can only run a fdist case at a time
142 - def run_fdist(self, npops, nsamples, fst, sample_size, 143 mut=0, num_sims=20000, data_dir='.', 144 is_dominant=False, theta=0.06, beta=(0.25, 0.25), 145 max_freq=0.99):
146 """Runs FDist. 147 148 Parameters can be seen on FDistController.run_fdist. 149 150 It will split a single execution in several parts and 151 create separated data directories. 152 """ 153 num_parts = num_sims // self.split_size 154 self.parts = {} 155 self.data_dir = data_dir 156 for directory in range(num_parts): 157 full_path = data_dir + os.sep + str(directory) 158 try: 159 os.mkdir(full_path) 160 except OSError: 161 pass # Its ok, if it is already there 162 if "ss_file" in os.listdir(data_dir): 163 shutil.copy(data_dir + os.sep + "ss_file", full_path) 164 id = self.async.run_program('fdist', { 165 'npops' : npops, 166 'nsamples' : nsamples, 167 'fst' : fst, 168 'sample_size' : sample_size, 169 'mut' : mut, 170 'num_sims' : self.split_size, 171 'data_dir' : full_path, 172 'is_dominant' : is_dominant, 173 'theta' : theta, 174 'beta' : beta, 175 'max_freq' : max_freq 176 }, {}) 177 self.parts[id] = full_path 178 threading.Thread(target=self.monitor).run()
179