Package Bio :: Package SeqIO :: Module AbiIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.AbiIO

  1  # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com) 
  2  # Revisions copyright 2011, 2014 by Peter Cock. 
  3  # This code is part of the Biopython distribution and governed by its 
  4  # license. Please see the LICENSE file that should have been included 
  5  # as part of this package. 
  6   
  7  """Bio.SeqIO parser for the ABI format. 
  8   
  9  ABI is the format used by Applied Biosystem's sequencing machines to store 
 10  sequencing results. 
 11   
 12  For more details on the format specification, visit: 
 13  http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf 
 14   
 15  """ 
 16   
 17  __docformat__ = "restructuredtext en" 
 18   
 19  import datetime 
 20  import struct 
 21   
 22  from os.path import basename 
 23   
 24  from Bio import Alphabet 
 25  from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna 
 26  from Bio.Seq import Seq 
 27  from Bio.SeqRecord import SeqRecord 
 28   
 29  from Bio._py3k import _bytes_to_string, _as_bytes 
 30  from Bio._py3k import zip 
 31   
 32  # dictionary for determining which tags goes into SeqRecord annotation 
 33  # each key is tag_name + tag_number 
 34  # if a tag entry needs to be added, just add its key and its key 
 35  # for the annotations dictionary as the value 
 36  _EXTRACT = { 
 37      'TUBE1': 'sample_well', 
 38      'DySN1': 'dye', 
 39      'GTyp1': 'polymer', 
 40      'MODL1': 'machine_model', 
 41  } 
 42  # dictionary for tags that require preprocessing before use in creating 
 43  # seqrecords 
 44  _SPCTAGS = [ 
 45      'PBAS2',    # base-called sequence 
 46      'PCON2',    # quality values of base-called sequence 
 47      'SMPL1',    # sample id inputted before sequencing run 
 48      'RUND1',    # run start date 
 49      'RUND2',    # run finish date 
 50      'RUNT1',    # run start time 
 51      'RUNT2',    # run finish time 
 52  ] 
 53  # dictionary for data unpacking format 
 54  _BYTEFMT = { 
 55      1: 'b',     # byte 
 56      2: 's',     # char 
 57      3: 'H',     # word 
 58      4: 'h',     # short 
 59      5: 'i',     # long 
 60      6: '2i',    # rational, legacy unsupported 
 61      7: 'f',     # float 
 62      8: 'd',     # double 
 63      10: 'h2B',  # date 
 64      11: '4B',   # time 
 65      12: '2i2b',  # thumb 
 66      13: 'B',    # bool 
 67      14: '2h',   # point, legacy unsupported 
 68      15: '4h',   # rect, legacy unsupported 
 69      16: '2i',   # vPoint, legacy unsupported 
 70      17: '4i',   # vRect, legacy unsupported 
 71      18: 's',    # pString 
 72      19: 's',    # cString 
 73      20: '2i',   # tag, legacy unsupported 
 74  } 
 75  # header data structure (exluding 4 byte ABIF marker) 
 76  _HEADFMT = '>H4sI2H3I' 
 77  # directory data structure 
 78  _DIRFMT = '>4sI2H4I' 
 79   
 80   
81 -def AbiIterator(handle, alphabet=None, trim=False):
82 """Iterator for the Abi file format. 83 """ 84 # raise exception is alphabet is not dna 85 if alphabet is not None: 86 if isinstance(Alphabet._get_base_alphabet(alphabet), 87 Alphabet.ProteinAlphabet): 88 raise ValueError( 89 "Invalid alphabet, ABI files do not hold proteins.") 90 if isinstance(Alphabet._get_base_alphabet(alphabet), 91 Alphabet.RNAAlphabet): 92 raise ValueError("Invalid alphabet, ABI files do not hold RNA.") 93 94 # raise exception if handle mode is not 'rb' 95 if hasattr(handle, 'mode'): 96 if set('rb') != set(handle.mode.lower()): 97 raise ValueError("ABI files has to be opened in 'rb' mode.") 98 99 # check if input file is a valid Abi file 100 handle.seek(0) 101 marker = handle.read(4) 102 if not marker: 103 # handle empty file gracefully 104 raise StopIteration 105 if marker != _as_bytes('ABIF'): 106 raise IOError('File should start ABIF, not %r' % marker) 107 108 # dirty hack for handling time information 109 times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', } 110 111 # initialize annotations 112 annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT))) 113 114 # parse header and extract data from directories 115 header = struct.unpack(_HEADFMT, 116 handle.read(struct.calcsize(_HEADFMT))) 117 118 raw = dict() 119 for tag_name, tag_number, tag_data in _abi_parse_header(header, handle): 120 # stop iteration if all desired tags have been extracted 121 # 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3, 122 # and seq, qual, id 123 # todo 124 125 key = tag_name + str(tag_number) 126 127 # TODO - Why not store the raw data in bytes, not as strings? 128 raw[key] = tag_data 129 130 # PBAS2 is base-called sequence 131 if key == 'PBAS2': 132 seq = tag_data 133 ambigs = 'KYWMRS' 134 if alphabet is None: 135 if set(seq).intersection(ambigs): 136 alphabet = ambiguous_dna 137 else: 138 alphabet = unambiguous_dna 139 # PCON2 is quality values of base-called sequence 140 elif key == 'PCON2': 141 qual = [ord(val) for val in tag_data] 142 # SMPL1 is sample id entered before sequencing run 143 elif key == 'SMPL1': 144 sample_id = tag_data 145 elif key in times: 146 times[key] = tag_data 147 else: 148 # extract sequence annotation as defined in _EXTRACT 149 if key in _EXTRACT: 150 annot[_EXTRACT[key]] = tag_data 151 152 # set time annotations 153 annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1']) 154 annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2']) 155 156 # raw data (for advanced end users benefit) 157 annot['abif_raw'] = raw 158 159 # use the file name as SeqRecord.name if available 160 try: 161 file_name = basename(handle.name).replace('.ab1', '') 162 except: 163 file_name = "" 164 165 record = SeqRecord(Seq(seq, alphabet), 166 id=sample_id, name=file_name, 167 description='', 168 annotations=annot, 169 letter_annotations={'phred_quality': qual}) 170 171 if not trim: 172 yield record 173 else: 174 yield _abi_trim(record)
175 176
177 -def _AbiTrimIterator(handle):
178 """Iterator for the Abi file format that yields trimmed SeqRecord objects. 179 """ 180 return AbiIterator(handle, trim=True)
181 182
183 -def _abi_parse_header(header, handle):
184 """Generator that returns directory contents. 185 """ 186 # header structure (after ABIF marker): 187 # file version, tag name, tag number, 188 # element type code, element size, number of elements 189 # data size, data offset, handle (not file handle) 190 head_elem_size = header[4] 191 head_elem_num = header[5] 192 head_offset = header[7] 193 index = 0 194 195 while index < head_elem_num: 196 start = head_offset + index * head_elem_size 197 # add directory offset to tuple 198 # to handle directories with data size <= 4 bytes 199 handle.seek(start) 200 dir_entry = struct.unpack(_DIRFMT, 201 handle.read(struct.calcsize(_DIRFMT))) + (start,) 202 index += 1 203 # only parse desired dirs 204 key = _bytes_to_string(dir_entry[0]) 205 key += str(dir_entry[1]) 206 if key in (list(_EXTRACT) + _SPCTAGS): 207 tag_name = _bytes_to_string(dir_entry[0]) 208 tag_number = dir_entry[1] 209 elem_code = dir_entry[2] 210 elem_num = dir_entry[4] 211 data_size = dir_entry[5] 212 data_offset = dir_entry[6] 213 tag_offset = dir_entry[8] 214 # if data size <= 4 bytes, data is stored inside tag 215 # so offset needs to be changed 216 if data_size <= 4: 217 data_offset = tag_offset + 20 218 handle.seek(data_offset) 219 data = handle.read(data_size) 220 yield tag_name, tag_number, \ 221 _parse_tag_data(elem_code, elem_num, data)
222 223
224 -def _abi_trim(seq_record):
225 """Trims the sequence using Richard Mott's modified trimming algorithm. 226 227 Arguments: 228 - seq_record - SeqRecord object to be trimmed. 229 230 Trimmed bases are determined from their segment score, which is a 231 cumulative sum of each base's score. Base scores are calculated from 232 their quality values. 233 234 More about the trimming algorithm: 235 http://www.phrap.org/phredphrap/phred.html 236 http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html 237 """ 238 239 start = False # flag for starting position of trimmed sequence 240 segment = 20 # minimum sequence length 241 trim_start = 0 # init start index 242 cutoff = 0.05 # default cutoff value for calculating base score 243 244 if len(seq_record) <= segment: 245 return seq_record 246 else: 247 # calculate base score 248 score_list = [cutoff - (10 ** (qual / -10.0)) for qual in 249 seq_record.letter_annotations['phred_quality']] 250 251 # calculate cummulative score 252 # if cummulative value < 0, set it to 0 253 # first value is set to 0, because of the assumption that 254 # the first base will always be trimmed out 255 cummul_score = [0] 256 for i in range(1, len(score_list)): 257 score = cummul_score[-1] + score_list[i] 258 if score < 0: 259 cummul_score.append(0) 260 else: 261 cummul_score.append(score) 262 if not start: 263 # trim_start = value when cummulative score is first > 0 264 trim_start = i 265 start = True 266 267 # trim_finish = index of highest cummulative score, 268 # marking the end of sequence segment with highest cummulative score 269 trim_finish = cummul_score.index(max(cummul_score)) 270 271 return seq_record[trim_start:trim_finish]
272 273
274 -def _parse_tag_data(elem_code, elem_num, raw_data):
275 """Returns single data value. 276 277 Arguments: 278 - elem_code - What kind of data 279 - elem_num - How many data points 280 - raw_data - abi file object from which the tags would be unpacked 281 """ 282 if elem_code in _BYTEFMT: 283 # because '>1s' unpack differently from '>s' 284 if elem_num == 1: 285 num = '' 286 else: 287 num = str(elem_num) 288 fmt = '>' + num + _BYTEFMT[elem_code] 289 290 assert len(raw_data) == struct.calcsize(fmt) 291 data = struct.unpack(fmt, raw_data) 292 293 # no need to use tuple if len(data) == 1 294 # also if data is date / time 295 if elem_code not in [10, 11] and len(data) == 1: 296 data = data[0] 297 298 # account for different data types 299 if elem_code == 2: 300 return _bytes_to_string(data) 301 elif elem_code == 10: 302 return str(datetime.date(*data)) 303 elif elem_code == 11: 304 return str(datetime.time(*data[:3])) 305 elif elem_code == 13: 306 return bool(data) 307 elif elem_code == 18: 308 return _bytes_to_string(data[1:]) 309 elif elem_code == 19: 310 return _bytes_to_string(data[:-1]) 311 else: 312 return data 313 else: 314 return None
315 316 if __name__ == '__main__': 317 pass 318