Package Bio :: Package PopGen :: Package FDist :: Module Async
[hide private]
[frames] | no frames]

Source Code for Module Bio.PopGen.FDist.Async

  1  # Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Asynchronous execution of Fdist and spliting of loads. 
  7   
  8  FDistAsync Allows for the execution of FDist. 
  9   
 10  SplitFDist splits a single Fdist execution in several, taking advantage 
 11  of multi-core architectures. 
 12  """ 
 13   
 14  import os 
 15  import shutil 
 16  import threading 
 17  from time import sleep 
 18  from Bio.PopGen.Async import Local 
 19  from Bio.PopGen.FDist.Controller import FDistController 
 20   
 21  __docformat__ = "restructuredtext en" 
 22   
 23   
24 -class FDistAsync(FDistController):
25 """Asynchronous FDist execution. 26 """ 27
28 - def __init__(self, fdist_dir="", ext=None):
29 """Constructor. 30 31 Parameters: 32 33 - fdist_dir - Where fdist can be found, if = "", then it 34 should be on the path. 35 - ext - Extension of binary names (e.g. nothing on Unix, 36 ".exe" on Windows 37 """ 38 FDistController.__init__(self, fdist_dir, ext)
39
40 - def run_job(self, parameters, input_files):
41 """Runs FDist asynchronously. 42 43 Gets typical Fdist parameters from a dictionary and 44 makes a "normal" call. This is run, normally, inside 45 a separate thread. 46 """ 47 npops = parameters['npops'] 48 nsamples = parameters['nsamples'] 49 fst = parameters['fst'] 50 sample_size = parameters['sample_size'] 51 mut = parameters.get('mut', 0) 52 num_sims = parameters.get('num_sims', 20000) 53 data_dir = parameters.get('data_dir', '.') 54 is_dominant = parameters.get('is_dominant', False) 55 theta = parameters.get('theta', 0.06) 56 beta = parameters.get('beta', (0.25, 0.25)) 57 max_freq = parameters.get('max_freq', 0.99) 58 fst = self.run_fdist(npops, nsamples, fst, sample_size, 59 mut, num_sims, data_dir, 60 is_dominant, theta, beta, 61 max_freq) 62 output_files = {} 63 output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r') 64 return fst, output_files
65 66
67 -class SplitFDist(object):
68 """Splits a FDist run. 69 70 The idea is to split a certain number of simulations in smaller 71 numbers (e.g. 30.000 sims split in 30 packets of 1.000). This 72 allows to run simulations in parallel, thus taking advantage 73 of multi-core CPUs. 74 75 Each SplitFDist object can only be used to run a single FDist 76 simulation. 77 """
78 - def __init__(self, report_fun=None, 79 num_thr=2, split_size=1000, fdist_dir='', ext=None):
80 """Constructor. 81 82 Parameters: 83 84 - report_fun - Function that is called when a single packet is 85 run, it should have a single parameter: Fst. 86 - num_thr - Number of desired threads, typically the number 87 of cores. 88 - split_size - Size that a full simulation will be split in. 89 - ext - Binary extension name (e.g. nothing on Unix, '.exe' on 90 Windows). 91 """ 92 self.async = Local.Local(num_thr) 93 self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext) 94 self.report_fun = report_fun 95 self.split_size = split_size
96 97 # There might be races when reporting...
98 - def monitor(self):
99 """Monitors and reports (using report_fun) execution. 100 101 Every time a partial simulation ends, calls report_fun. 102 IMPORTANT: monitor calls can be concurrent with other 103 events, ie, a tasks might end while report_fun is being 104 called. This means that report_fun should be consider that 105 other events might be happening while it is running (it 106 can call acquire/release if necessary). 107 """ 108 while(True): 109 sleep(1) 110 self.async.access_ds.acquire() 111 keys = list(self.async.done.keys()) # copy it 112 self.async.access_ds.release() 113 for done in keys: 114 self.async.access_ds.acquire() 115 fst, files = self.async.done[done] 116 del self.async.done[done] 117 out_dat = files['out.dat'] 118 f = open(self.data_dir + os.sep + 'out.dat', 'a') 119 f.writelines(out_dat.readlines()) 120 f.close() 121 out_dat.close() 122 self.async.access_ds.release() 123 for file in os.listdir(self.parts[done]): 124 os.remove(self.parts[done] + os.sep + file) 125 os.rmdir(self.parts[done]) 126 if self.report_fun: 127 self.report_fun(fst) 128 self.async.access_ds.acquire() 129 if len(self.async.waiting) == 0 and len(self.async.running) == 0 \ 130 and len(self.async.done) == 0: 131 break 132 self.async.access_ds.release()
133
134 - def acquire(self):
135 """Allows the external acquisition of the lock. 136 """ 137 self.async.access_ds.acquire()
138
139 - def release(self):
140 """Allows the external release of the lock. 141 """ 142 self.async.access_ds.release()
143 144 # You can only run a fdist case at a time
145 - def run_fdist(self, npops, nsamples, fst, sample_size, 146 mut=0, num_sims=20000, data_dir='.', 147 is_dominant=False, theta=0.06, beta=(0.25, 0.25), 148 max_freq=0.99):
149 """Runs FDist. 150 151 Parameters can be seen on FDistController.run_fdist. 152 153 It will split a single execution in several parts and 154 create separated data directories. 155 """ 156 num_parts = num_sims // self.split_size 157 self.parts = {} 158 self.data_dir = data_dir 159 for directory in range(num_parts): 160 full_path = data_dir + os.sep + str(directory) 161 try: 162 os.mkdir(full_path) 163 except OSError: 164 pass # Its ok, if it is already there 165 if "ss_file" in os.listdir(data_dir): 166 shutil.copy(data_dir + os.sep + "ss_file", full_path) 167 id = self.async.run_program('fdist', { 168 'npops': npops, 169 'nsamples': nsamples, 170 'fst': fst, 171 'sample_size': sample_size, 172 'mut': mut, 173 'num_sims': self.split_size, 174 'data_dir': full_path, 175 'is_dominant': is_dominant, 176 'theta': theta, 177 'beta': beta, 178 'max_freq': max_freq 179 }, {}) 180 self.parts[id] = full_path 181 threading.Thread(target=self.monitor).run()
182