Package Bio :: Package SeqIO :: Module InsdcIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.InsdcIO

   1  # Copyright 2007-2011 by Peter Cock.  All rights reserved. 
   2  # 
   3  # This code is part of the Biopython distribution and governed by its 
   4  # license.  Please see the LICENSE file that should have been included 
   5  # as part of this package.. 
   6   
   7  """Bio.SeqIO support for the "genbank" and "embl" file formats. 
   8   
   9  You are expected to use this module via the Bio.SeqIO functions. 
  10  Note that internally this module calls Bio.GenBank to do the actual 
  11  parsing of GenBank, EMBL and IMGT files. 
  12   
  13  See also: 
  14   
  15  International Nucleotide Sequence Database Collaboration 
  16  http://www.insdc.org/ 
  17   
  18  GenBank 
  19  http://www.ncbi.nlm.nih.gov/Genbank/ 
  20   
  21  EMBL Nucleotide Sequence Database 
  22  http://www.ebi.ac.uk/embl/ 
  23   
  24  DDBJ (DNA Data Bank of Japan) 
  25  http://www.ddbj.nig.ac.jp/ 
  26   
  27  IMGT (use a variant of EMBL format with longer feature indents) 
  28  http://imgt.cines.fr/download/LIGM-DB/userman_doc.html 
  29  http://imgt.cines.fr/download/LIGM-DB/ftable_doc.html 
  30  http://www.ebi.ac.uk/imgt/hla/docs/manual.html 
  31   
  32  """ 
  33   
  34  from Bio.Seq import UnknownSeq 
  35  from Bio.GenBank.Scanner import GenBankScanner, EmblScanner, _ImgtScanner 
  36  from Bio import Alphabet 
  37  from Interfaces import SequentialSequenceWriter 
  38  from Bio import SeqFeature 
  39   
  40  from Bio._py3k import _is_int_or_long 
  41   
  42  # NOTE 
  43  # ==== 
  44  # The "brains" for parsing GenBank, EMBL and IMGT files (and any 
  45  # other flat file variants from the INSDC in future) is in 
  46  # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank) 
  47  # However, all the writing code is in this file. 
  48   
  49   
50 -def GenBankIterator(handle):
51 """Breaks up a Genbank file into SeqRecord objects. 52 53 Every section from the LOCUS line to the terminating // becomes 54 a single SeqRecord with associated annotation and features. 55 56 Note that for genomes or chromosomes, there is typically only 57 one record.""" 58 #This calls a generator function: 59 return GenBankScanner(debug=0).parse_records(handle)
60 61
62 -def EmblIterator(handle):
63 """Breaks up an EMBL file into SeqRecord objects. 64 65 Every section from the LOCUS line to the terminating // becomes 66 a single SeqRecord with associated annotation and features. 67 68 Note that for genomes or chromosomes, there is typically only 69 one record.""" 70 #This calls a generator function: 71 return EmblScanner(debug=0).parse_records(handle)
72 73
74 -def ImgtIterator(handle):
75 """Breaks up an IMGT file into SeqRecord objects. 76 77 Every section from the LOCUS line to the terminating // becomes 78 a single SeqRecord with associated annotation and features. 79 80 Note that for genomes or chromosomes, there is typically only 81 one record.""" 82 #This calls a generator function: 83 return _ImgtScanner(debug=0).parse_records(handle)
84 85
86 -def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
87 """Breaks up a Genbank file into SeqRecord objects for each CDS feature. 88 89 Every section from the LOCUS line to the terminating // can contain 90 many CDS features. These are returned as with the stated amino acid 91 translation sequence (if given). 92 """ 93 #This calls a generator function: 94 return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
95 96
97 -def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
98 """Breaks up a EMBL file into SeqRecord objects for each CDS feature. 99 100 Every section from the LOCUS line to the terminating // can contain 101 many CDS features. These are returned as with the stated amino acid 102 translation sequence (if given). 103 """ 104 #This calls a generator function: 105 return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
106 107
108 -def _insdc_feature_position_string(pos, offset=0):
109 """Build a GenBank/EMBL position string (PRIVATE). 110 111 Use offset=1 to add one to convert a start position from python counting. 112 """ 113 if isinstance(pos, SeqFeature.ExactPosition): 114 return "%i" % (pos.position + offset) 115 elif isinstance(pos, SeqFeature.WithinPosition): 116 return "(%i.%i)" % (pos.position + offset, 117 pos.position + pos.extension + offset) 118 elif isinstance(pos, SeqFeature.BetweenPosition): 119 return "(%i^%i)" % (pos.position + offset, 120 pos.position + pos.extension + offset) 121 elif isinstance(pos, SeqFeature.BeforePosition): 122 return "<%i" % (pos.position + offset) 123 elif isinstance(pos, SeqFeature.AfterPosition): 124 return ">%i" % (pos.position + offset) 125 elif isinstance(pos, SeqFeature.OneOfPosition): 126 return "one-of(%s)" \ 127 % ",".join([_insdc_feature_position_string(p, offset) 128 for p in pos.position_choices]) 129 elif isinstance(pos, SeqFeature.AbstractPosition): 130 raise NotImplementedError("Please report this as a bug in Biopython.") 131 else: 132 raise ValueError("Expected a SeqFeature position object.")
133 134
135 -def _insdc_location_string_ignoring_strand_and_subfeatures(location, rec_length):
136 if location.ref: 137 ref = "%s:" % location.ref 138 else: 139 ref = "" 140 assert not location.ref_db 141 if isinstance(location.start, SeqFeature.ExactPosition) \ 142 and isinstance(location.end, SeqFeature.ExactPosition) \ 143 and location.start.position == location.end.position: 144 #Special case, for 12:12 return 12^13 145 #(a zero length slice, meaning the point between two letters) 146 if location.end.position == rec_length: 147 #Very special case, for a between position at the end of a 148 #sequence (used on some circular genomes, Bug 3098) we have 149 #N:N so return N^1 150 return "%s%i^1" % (ref, rec_length) 151 else: 152 return "%s%i^%i" % (ref, location.end.position, 153 location.end.position + 1) 154 if isinstance(location.start, SeqFeature.ExactPosition) \ 155 and isinstance(location.end, SeqFeature.ExactPosition) \ 156 and location.start.position + 1 == location.end.position: 157 #Special case, for 11:12 return 12 rather than 12..12 158 #(a length one slice, meaning a single letter) 159 return "%s%i" % (ref, location.end.position) 160 elif isinstance(location.start, SeqFeature.UnknownPosition) \ 161 or isinstance(location.end, SeqFeature.UnknownPosition): 162 #Special case for features from SwissProt/UniProt files 163 if isinstance(location.start, SeqFeature.UnknownPosition) \ 164 and isinstance(location.end, SeqFeature.UnknownPosition): 165 #import warnings 166 #warnings.warn("Feature with unknown location") 167 #return "?" 168 raise ValueError("Feature with unknown location") 169 elif isinstance(location.start, SeqFeature.UnknownPosition): 170 #Treat the unknown start position as a BeforePosition 171 return "%s<%i..%s" \ 172 % (ref, 173 location.nofuzzy_end, 174 _insdc_feature_position_string(location.end)) 175 else: 176 #Treat the unknown end position as an AfterPosition 177 return "%s%s..>%i" \ 178 % (ref, 179 _insdc_feature_position_string(location.start), 180 location.nofuzzy_start) 181 else: 182 #Typical case, e.g. 12..15 gets mapped to 11:15 183 return ref \ 184 + _insdc_feature_position_string(location.start, +1) \ 185 + ".." + \ 186 _insdc_feature_position_string(location.end)
187 188
189 -def _insdc_feature_location_string(feature, rec_length):
190 """Build a GenBank/EMBL location string from a SeqFeature (PRIVATE). 191 192 There is a choice of how to show joins on the reverse complement strand, 193 GenBank used "complement(join(1,10),(20,100))" while EMBL used to use 194 "join(complement(20,100),complement(1,10))" instead (but appears to have 195 now adopted the GenBank convention). Notice that the order of the entries 196 is reversed! This function therefore uses the first form. In this situation 197 we expect the parent feature and the two children to all be marked as 198 strand == -1, and in the order 0:10 then 19:100. 199 200 Also need to consider dual-strand examples like these from the Arabidopsis 201 thaliana chloroplast NC_000932: join(complement(69611..69724),139856..140650) 202 gene ArthCp047, GeneID:844801 or its CDS (protein NP_051038.1 GI:7525057) 203 which is further complicated by a splice: 204 join(complement(69611..69724),139856..140087,140625..140650) 205 206 For mixed this mixed strand feature, the parent SeqFeature should have 207 no strand (either 0 or None) while the child features should have either 208 strand +1 or -1 as appropriate, and be listed in the order given here. 209 """ 210 211 if not feature.sub_features: 212 #Non-recursive. 213 #assert feature.location_operator == "", \ 214 # "%s has no subfeatures but location_operator %s" \ 215 # % (repr(feature), feature.location_operator) 216 location = _insdc_location_string_ignoring_strand_and_subfeatures( 217 feature.location, rec_length) 218 if feature.strand == -1: 219 location = "complement(%s)" % location 220 return location 221 # As noted above, treat reverse complement strand features carefully: 222 if feature.strand == -1: 223 for f in feature.sub_features: 224 if f.strand != -1: 225 raise ValueError("Inconsistent strands: %r for parent, %r for child" 226 % (feature.strand, f.strand)) 227 return "complement(%s(%s))" \ 228 % (feature.location_operator, 229 ",".join(_insdc_location_string_ignoring_strand_and_subfeatures(f.location, rec_length) 230 for f in feature.sub_features)) 231 #if feature.strand == +1: 232 # for f in feature.sub_features: 233 # assert f.strand == +1 234 #This covers typical forward strand features, and also an evil mixed strand: 235 assert feature.location_operator != "" 236 return "%s(%s)" % (feature.location_operator, 237 ",".join([_insdc_feature_location_string(f, rec_length) 238 for f in feature.sub_features]))
239 240
241 -class _InsdcWriter(SequentialSequenceWriter):
242 """Base class for GenBank and EMBL writers (PRIVATE).""" 243 MAX_WIDTH = 80 244 QUALIFIER_INDENT = 21 245 QUALIFIER_INDENT_STR = " " * QUALIFIER_INDENT 246 QUALIFIER_INDENT_TMP = " %s " # 21 if %s is empty 247 FTQUAL_NO_QUOTE = ("anticodon", "citation", "codon_start", "compare", 248 "direction", "estimated_length", "mod_base", "number", 249 "rpt_type", "rpt_unit_range", "tag_peptide", 250 "transl_except", "transl_table") 251
252 - def _write_feature_qualifier(self, key, value=None, quote=None):
253 if not value: 254 self.handle.write("%s/%s\n" % (self.QUALIFIER_INDENT_STR, key)) 255 return 256 #Quick hack with no line wrapping, may be useful for testing: 257 #self.handle.write('%s/%s="%s"\n' % (self.QUALIFIER_INDENT_STR, key, value)) 258 if quote is None: 259 #Try to mimic unwritten rules about when quotes can be left out: 260 if _is_int_or_long(value) or key in self.FTQUAL_NO_QUOTE: 261 quote = False 262 else: 263 quote = True 264 if quote: 265 line = '%s/%s="%s"' % (self.QUALIFIER_INDENT_STR, key, value) 266 else: 267 line = '%s/%s=%s' % (self.QUALIFIER_INDENT_STR, key, value) 268 if len(line) <= self.MAX_WIDTH: 269 self.handle.write(line + "\n") 270 return 271 while line.lstrip(): 272 if len(line) <= self.MAX_WIDTH: 273 self.handle.write(line + "\n") 274 return 275 #Insert line break... 276 for index in range(min(len(line) - 1, self.MAX_WIDTH), 277 self.QUALIFIER_INDENT + 1, -1): 278 if line[index] == " ": 279 break 280 if line[index] != " ": 281 #No nice place to break... 282 index = self.MAX_WIDTH 283 assert index <= self.MAX_WIDTH 284 self.handle.write(line[:index] + "\n") 285 line = self.QUALIFIER_INDENT_STR + line[index:].lstrip()
286
287 - def _wrap_location(self, location):
288 """Split a feature location into lines (break at commas).""" 289 #TODO - Rewrite this not to recurse! 290 length = self.MAX_WIDTH - self.QUALIFIER_INDENT 291 if len(location) <= length: 292 return location 293 index = location[:length].rfind(",") 294 if index == -1: 295 #No good place to split (!) 296 import warnings 297 warnings.warn("Couldn't split location:\n%s" % location) 298 return location 299 return location[:index + 1] + "\n" + \ 300 self.QUALIFIER_INDENT_STR + \ 301 self._wrap_location(location[index + 1:])
302
303 - def _write_feature(self, feature, record_length):
304 """Write a single SeqFeature object to features table.""" 305 assert feature.type, feature 306 location = _insdc_feature_location_string(feature, record_length) 307 f_type = feature.type.replace(" ", "_") 308 line = (self.QUALIFIER_INDENT_TMP % f_type)[:self.QUALIFIER_INDENT] \ 309 + self._wrap_location(location) + "\n" 310 self.handle.write(line) 311 #Now the qualifiers... 312 for key, values in feature.qualifiers.iteritems(): 313 if isinstance(values, list) or isinstance(values, tuple): 314 for value in values: 315 self._write_feature_qualifier(key, value) 316 elif values: 317 #String, int, etc 318 self._write_feature_qualifier(key, values) 319 else: 320 #e.g. a /pseudo entry 321 self._write_feature_qualifier(key)
322
323 - def _get_annotation_str(self, record, key, default=".", just_first=False):
324 """Get an annotation dictionary entry (as a string). 325 326 Some entries are lists, in which case if just_first=True the first entry 327 is returned. If just_first=False (default) this verifies there is only 328 one entry before returning it.""" 329 try: 330 answer = record.annotations[key] 331 except KeyError: 332 return default 333 if isinstance(answer, list): 334 if not just_first: 335 assert len(answer) == 1 336 return str(answer[0]) 337 else: 338 return str(answer)
339
340 - def _split_multi_line(self, text, max_len):
341 """Returns a list of strings. 342 343 Any single words which are too long get returned as a whole line 344 (e.g. URLs) without an exception or warning. 345 """ 346 #TODO - Do the line spliting while preserving white space? 347 text = text.strip() 348 if len(text) <= max_len: 349 return [text] 350 351 words = text.split() 352 text = "" 353 while words and len(text) + 1 + len(words[0]) <= max_len: 354 text += " " + words.pop(0) 355 text = text.strip() 356 #assert len(text) <= max_len 357 answer = [text] 358 while words: 359 text = words.pop(0) 360 while words and len(text) + 1 + len(words[0]) <= max_len: 361 text += " " + words.pop(0) 362 text = text.strip() 363 #assert len(text) <= max_len 364 answer.append(text) 365 assert not words 366 return answer
367
368 - def _split_contig(self, record, max_len):
369 """Returns a list of strings, splits on commas.""" 370 #TODO - Merge this with _write_multi_line method? 371 #It would need the addition of the comma splitting logic... 372 #are there any other cases where that would be sensible? 373 contig = record.annotations.get("contig", "") 374 if isinstance(contig, list) or isinstance(contig, tuple): 375 contig = "".join(contig) 376 contig = self.clean(contig) 377 answer = [] 378 while contig: 379 if len(contig) > max_len: 380 #Split lines at the commas 381 pos = contig[:max_len - 1].rfind(",") 382 if pos == -1: 383 raise ValueError("Could not break up CONTIG") 384 text, contig = contig[:pos + 1], contig[pos + 1:] 385 else: 386 text, contig = contig, "" 387 answer.append(text) 388 return answer
389 390
391 -class GenBankWriter(_InsdcWriter):
392 HEADER_WIDTH = 12 393 QUALIFIER_INDENT = 21 394
395 - def _write_single_line(self, tag, text):
396 """Used in the 'header' of each GenBank record.""" 397 assert len(tag) < self.HEADER_WIDTH 398 if len(text) > self.MAX_WIDTH - self.HEADER_WIDTH: 399 import warnings 400 from Bio import BiopythonWarning 401 warnings.warn("Annotation %r too long for %r line" % (text, tag), 402 BiopythonWarning) 403 self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH), 404 text.replace("\n", " ")))
405
406 - def _write_multi_line(self, tag, text):
407 """Used in the 'header' of each GenBank record.""" 408 #TODO - Do the line spliting while preserving white space? 409 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 410 lines = self._split_multi_line(text, max_len) 411 self._write_single_line(tag, lines[0]) 412 for line in lines[1:]: 413 self._write_single_line("", line)
414
415 - def _write_multi_entries(self, tag, text_list):
416 #used for DBLINK and any similar later line types. 417 #If the list of strings is empty, nothing is written. 418 for i, text in enumerate(text_list): 419 if i == 0: 420 self._write_single_line(tag, text) 421 else: 422 self._write_single_line("", text)
423
424 - def _get_date(self, record):
425 default = "01-JAN-1980" 426 try: 427 date = record.annotations["date"] 428 except KeyError: 429 return default 430 #Cope with a list of one string: 431 if isinstance(date, list) and len(date) == 1: 432 date = date[0] 433 #TODO - allow a Python date object 434 if not isinstance(date, basestring) or len(date) != 11 \ 435 or date[2] != "-" or date[6] != "-" \ 436 or not date[:2].isdigit() or not date[7:].isdigit() \ 437 or int(date[:2]) > 31 \ 438 or date[3:6] not in ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", 439 "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"]: 440 #TODO - Check is a valid date (e.g. not 31 Feb) 441 return default 442 return date
443
444 - def _get_data_division(self, record):
445 try: 446 division = record.annotations["data_file_division"] 447 except KeyError: 448 division = "UNK" 449 if division in ["PRI", "ROD", "MAM", "VRT", "INV", "PLN", "BCT", 450 "VRL", "PHG", "SYN", "UNA", "EST", "PAT", "STS", 451 "GSS", "HTG", "HTC", "ENV", "CON"]: 452 #Good, already GenBank style 453 # PRI - primate sequences 454 # ROD - rodent sequences 455 # MAM - other mammalian sequences 456 # VRT - other vertebrate sequences 457 # INV - invertebrate sequences 458 # PLN - plant, fungal, and algal sequences 459 # BCT - bacterial sequences [plus archea] 460 # VRL - viral sequences 461 # PHG - bacteriophage sequences 462 # SYN - synthetic sequences 463 # UNA - unannotated sequences 464 # EST - EST sequences (expressed sequence tags) 465 # PAT - patent sequences 466 # STS - STS sequences (sequence tagged sites) 467 # GSS - GSS sequences (genome survey sequences) 468 # HTG - HTGS sequences (high throughput genomic sequences) 469 # HTC - HTC sequences (high throughput cDNA sequences) 470 # ENV - Environmental sampling sequences 471 # CON - Constructed sequences 472 # 473 #(plus UNK for unknown) 474 pass 475 else: 476 #See if this is in EMBL style: 477 # Division Code 478 # ----------------- ---- 479 # Bacteriophage PHG - common 480 # Environmental Sample ENV - common 481 # Fungal FUN - map to PLN (plants + fungal) 482 # Human HUM - map to PRI (primates) 483 # Invertebrate INV - common 484 # Other Mammal MAM - common 485 # Other Vertebrate VRT - common 486 # Mus musculus MUS - map to ROD (rodent) 487 # Plant PLN - common 488 # Prokaryote PRO - map to BCT (poor name) 489 # Other Rodent ROD - common 490 # Synthetic SYN - common 491 # Transgenic TGN - ??? map to SYN ??? 492 # Unclassified UNC - map to UNK 493 # Viral VRL - common 494 # 495 #(plus XXX for submiting which we can map to UNK) 496 embl_to_gbk = {"FUN": "PLN", 497 "HUM": "PRI", 498 "MUS": "ROD", 499 "PRO": "BCT", 500 "UNC": "UNK", 501 "XXX": "UNK", 502 } 503 try: 504 division = embl_to_gbk[division] 505 except KeyError: 506 division = "UNK" 507 assert len(division) == 3 508 return division
509
510 - def _write_the_first_line(self, record):
511 """Write the LOCUS line.""" 512 513 locus = record.name 514 if not locus or locus == "<unknown name>": 515 locus = record.id 516 if not locus or locus == "<unknown id>": 517 locus = self._get_annotation_str( 518 record, "accession", just_first=True) 519 if len(locus) > 16: 520 raise ValueError("Locus identifier %r is too long" % str(locus)) 521 522 if len(record) > 99999999999: 523 #Currently GenBank only officially support up to 350000, but 524 #the length field can take eleven digits 525 raise ValueError("Sequence too long!") 526 527 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 528 a = Alphabet._get_base_alphabet(record.seq.alphabet) 529 if not isinstance(a, Alphabet.Alphabet): 530 raise TypeError("Invalid alphabet") 531 elif isinstance(a, Alphabet.ProteinAlphabet): 532 units = "aa" 533 elif isinstance(a, Alphabet.NucleotideAlphabet): 534 units = "bp" 535 else: 536 #Must be something like NucleotideAlphabet or 537 #just the generic Alphabet (default for fasta files) 538 raise ValueError("Need a Nucleotide or Protein alphabet") 539 540 #Get the molecule type 541 #TODO - record this explicitly in the parser? 542 if isinstance(a, Alphabet.ProteinAlphabet): 543 mol_type = "" 544 elif isinstance(a, Alphabet.DNAAlphabet): 545 mol_type = "DNA" 546 elif isinstance(a, Alphabet.RNAAlphabet): 547 mol_type = "RNA" 548 else: 549 #Must be something like NucleotideAlphabet or 550 #just the generic Alphabet (default for fasta files) 551 raise ValueError("Need a DNA, RNA or Protein alphabet") 552 553 division = self._get_data_division(record) 554 555 assert len(units) == 2 556 assert len(division) == 3 557 #TODO - date 558 #TODO - mol_type 559 line = "LOCUS %s %s %s %s %s %s\n" \ 560 % (locus.ljust(16), 561 str(len(record)).rjust(11), 562 units, 563 mol_type.ljust(6), 564 division, 565 self._get_date(record)) 566 assert len(line) == 79 + 1, repr(line) # plus one for new line 567 568 assert line[12:28].rstrip() == locus, \ 569 'LOCUS line does not contain the locus at the expected position:\n' + line 570 assert line[28:29] == " " 571 assert line[29:40].lstrip() == str(len(record)), \ 572 'LOCUS line does not contain the length at the expected position:\n' + line 573 574 #Tests copied from Bio.GenBank.Scanner 575 assert line[40:44] in [' bp ', ' aa '], \ 576 'LOCUS line does not contain size units at expected position:\n' + \ 577 line 578 assert line[44:47] in [' ', 'ss-', 'ds-', 'ms-'], \ 579 'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line 580 assert line[47:54].strip() == "" \ 581 or 'DNA' in line[47:54].strip() \ 582 or 'RNA' in line[47:54].strip(), \ 583 'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line 584 assert line[54:55] == ' ', \ 585 'LOCUS line does not contain space at position 55:\n' + line 586 assert line[55:63].strip() in ['', 'linear', 'circular'], \ 587 'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line 588 assert line[63:64] == ' ', \ 589 'LOCUS line does not contain space at position 64:\n' + line 590 assert line[67:68] == ' ', \ 591 'LOCUS line does not contain space at position 68:\n' + line 592 assert line[70:71] == '-', \ 593 'LOCUS line does not contain - at position 71 in date:\n' + line 594 assert line[74:75] == '-', \ 595 'LOCUS line does not contain - at position 75 in date:\n' + line 596 597 self.handle.write(line)
598
599 - def _write_references(self, record):
600 number = 0 601 for ref in record.annotations["references"]: 602 if not isinstance(ref, SeqFeature.Reference): 603 continue 604 number += 1 605 data = str(number) 606 #TODO - support more complex record reference locations? 607 if ref.location and len(ref.location) == 1: 608 a = Alphabet._get_base_alphabet(record.seq.alphabet) 609 if isinstance(a, Alphabet.ProteinAlphabet): 610 units = "residues" 611 else: 612 units = "bases" 613 data += " (%s %i to %i)" % (units, 614 ref.location[0].nofuzzy_start + 1, 615 ref.location[0].nofuzzy_end) 616 self._write_single_line("REFERENCE", data) 617 if ref.authors: 618 #We store the AUTHORS data as a single string 619 self._write_multi_line(" AUTHORS", ref.authors) 620 if ref.consrtm: 621 #We store the consortium as a single string 622 self._write_multi_line(" CONSRTM", ref.consrtm) 623 if ref.title: 624 #We store the title as a single string 625 self._write_multi_line(" TITLE", ref.title) 626 if ref.journal: 627 #We store this as a single string - holds the journal name, 628 #volume, year, and page numbers of the citation 629 self._write_multi_line(" JOURNAL", ref.journal) 630 if ref.medline_id: 631 #This line type is obsolete and was removed from the GenBank 632 #flatfile format in April 2005. Should we write it? 633 #Note this has a two space indent: 634 self._write_multi_line(" MEDLINE", ref.medline_id) 635 if ref.pubmed_id: 636 #Note this has a THREE space indent: 637 self._write_multi_line(" PUBMED", ref.pubmed_id) 638 if ref.comment: 639 self._write_multi_line(" REMARK", ref.comment)
640
641 - def _write_comment(self, record):
642 #This is a bit complicated due to the range of possible 643 #ways people might have done their annotation... 644 #Currently the parser uses a single string with newlines. 645 #A list of lines is also reasonable. 646 #A single (long) string is perhaps the most natural of all. 647 #This means we may need to deal with line wrapping. 648 comment = record.annotations["comment"] 649 if isinstance(comment, basestring): 650 lines = comment.split("\n") 651 elif isinstance(comment, list) or isinstance(comment, tuple): 652 lines = comment 653 else: 654 raise ValueError("Could not understand comment annotation") 655 self._write_multi_line("COMMENT", lines[0]) 656 for line in lines[1:]: 657 self._write_multi_line("", line)
658
659 - def _write_contig(self, record):
660 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 661 lines = self._split_contig(record, max_len) 662 self._write_single_line("CONTIG", lines[0]) 663 for text in lines[1:]: 664 self._write_single_line("", text)
665
666 - def _write_sequence(self, record):
667 #Loosely based on code from Howard Salis 668 #TODO - Force lower case? 669 LETTERS_PER_LINE = 60 670 SEQUENCE_INDENT = 9 671 672 if isinstance(record.seq, UnknownSeq): 673 #We have already recorded the length, and there is no need 674 #to record a long sequence of NNNNNNN...NNN or whatever. 675 if "contig" in record.annotations: 676 self._write_contig(record) 677 else: 678 self.handle.write("ORIGIN\n") 679 return 680 681 #Catches sequence being None: 682 data = self._get_seq_string(record).lower() 683 seq_len = len(data) 684 self.handle.write("ORIGIN\n") 685 for line_number in range(0, seq_len, LETTERS_PER_LINE): 686 self.handle.write(str(line_number + 1).rjust(SEQUENCE_INDENT)) 687 for words in range(line_number, 688 min(line_number + LETTERS_PER_LINE, seq_len), 10): 689 self.handle.write(" %s" % data[words:words + 10]) 690 self.handle.write("\n")
691
692 - def write_record(self, record):
693 """Write a single record to the output file.""" 694 handle = self.handle 695 self._write_the_first_line(record) 696 697 accession = self._get_annotation_str(record, "accession", 698 record.id.split(".", 1)[0], 699 just_first=True) 700 acc_with_version = accession 701 if record.id.startswith(accession + "."): 702 try: 703 acc_with_version = "%s.%i" \ 704 % (accession, 705 int(record.id.split(".", 1)[1])) 706 except ValueError: 707 pass 708 gi = self._get_annotation_str(record, "gi", just_first=True) 709 710 descr = record.description 711 if descr == "<unknown description>": 712 descr = "." 713 self._write_multi_line("DEFINITION", descr) 714 715 self._write_single_line("ACCESSION", accession) 716 if gi != ".": 717 self._write_single_line("VERSION", "%s GI:%s" 718 % (acc_with_version, gi)) 719 else: 720 self._write_single_line("VERSION", "%s" % (acc_with_version)) 721 722 #The NCBI only expect two types of link so far, 723 #e.g. "Project:28471" and "Trace Assembly Archive:123456" 724 #TODO - Filter the dbxrefs list to just these? 725 self._write_multi_entries("DBLINK", record.dbxrefs) 726 727 try: 728 #List of strings 729 #Keywords should be given separated with semi colons, 730 keywords = "; ".join(record.annotations["keywords"]) 731 #with a trailing period: 732 if not keywords.endswith("."): 733 keywords += "." 734 except KeyError: 735 #If no keywords, there should be just a period: 736 keywords = "." 737 self._write_multi_line("KEYWORDS", keywords) 738 739 if "segment" in record.annotations: 740 #Deal with SEGMENT line found only in segmented records, 741 #e.g. AH000819 742 segment = record.annotations["segment"] 743 if isinstance(segment, list): 744 assert len(segment) == 1, segment 745 segment = segment[0] 746 self._write_single_line("SEGMENT", segment) 747 748 self._write_multi_line("SOURCE", 749 self._get_annotation_str(record, "source")) 750 #The ORGANISM line MUST be a single line, as any continuation is the taxonomy 751 org = self._get_annotation_str(record, "organism") 752 if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH: 753 org = org[:self.MAX_WIDTH - self.HEADER_WIDTH - 4] + "..." 754 self._write_single_line(" ORGANISM", org) 755 try: 756 #List of strings 757 #Taxonomy should be given separated with semi colons, 758 taxonomy = "; ".join(record.annotations["taxonomy"]) 759 #with a trailing period: 760 if not taxonomy.endswith("."): 761 taxonomy += "." 762 except KeyError: 763 taxonomy = "." 764 self._write_multi_line("", taxonomy) 765 766 if "references" in record.annotations: 767 self._write_references(record) 768 769 if "comment" in record.annotations: 770 self._write_comment(record) 771 772 handle.write("FEATURES Location/Qualifiers\n") 773 rec_length = len(record) 774 for feature in record.features: 775 self._write_feature(feature, rec_length) 776 self._write_sequence(record) 777 handle.write("//\n")
778 779
780 -class EmblWriter(_InsdcWriter):
781 HEADER_WIDTH = 5 782 QUALIFIER_INDENT = 21 783 QUALIFIER_INDENT_STR = "FT" + " " * (QUALIFIER_INDENT - 2) 784 QUALIFIER_INDENT_TMP = "FT %s " # 21 if %s is empty 785 FEATURE_HEADER = "FH Key Location/Qualifiers\n" 786
787 - def _write_contig(self, record):
788 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 789 lines = self._split_contig(record, max_len) 790 for text in lines: 791 self._write_single_line("CO", text)
792
793 - def _write_sequence(self, record):
794 LETTERS_PER_BLOCK = 10 795 BLOCKS_PER_LINE = 6 796 LETTERS_PER_LINE = LETTERS_PER_BLOCK * BLOCKS_PER_LINE 797 POSITION_PADDING = 10 798 handle = self.handle # save looking up this multiple times 799 800 if isinstance(record.seq, UnknownSeq): 801 #We have already recorded the length, and there is no need 802 #to record a long sequence of NNNNNNN...NNN or whatever. 803 if "contig" in record.annotations: 804 self._write_contig(record) 805 else: 806 #TODO - Can the sequence just be left out as in GenBank files? 807 handle.write("SQ \n") 808 return 809 810 #Catches sequence being None 811 data = self._get_seq_string(record).lower() 812 seq_len = len(data) 813 814 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 815 a = Alphabet._get_base_alphabet(record.seq.alphabet) 816 if isinstance(a, Alphabet.DNAAlphabet): 817 #TODO - What if we have RNA? 818 a_count = data.count('A') + data.count('a') 819 c_count = data.count('C') + data.count('c') 820 g_count = data.count('G') + data.count('g') 821 t_count = data.count('T') + data.count('t') 822 other = seq_len - (a_count + c_count + g_count + t_count) 823 handle.write("SQ Sequence %i BP; %i A; %i C; %i G; %i T; %i other;\n" 824 % (seq_len, a_count, c_count, g_count, t_count, other)) 825 else: 826 handle.write("SQ \n") 827 828 for line_number in range(0, seq_len // LETTERS_PER_LINE): 829 handle.write(" ") # Just four, not five 830 for block in range(BLOCKS_PER_LINE): 831 index = LETTERS_PER_LINE * line_number + \ 832 LETTERS_PER_BLOCK * block 833 handle.write((" %s" % data[index:index + LETTERS_PER_BLOCK])) 834 handle.write(str((line_number + 1) 835 * LETTERS_PER_LINE).rjust(POSITION_PADDING)) 836 handle.write("\n") 837 if seq_len % LETTERS_PER_LINE: 838 #Final (partial) line 839 line_number = (seq_len // LETTERS_PER_LINE) 840 handle.write(" ") # Just four, not five 841 for block in range(BLOCKS_PER_LINE): 842 index = LETTERS_PER_LINE * line_number + \ 843 LETTERS_PER_BLOCK * block 844 handle.write( 845 (" %s" % data[index:index + LETTERS_PER_BLOCK]).ljust(11)) 846 handle.write(str(seq_len).rjust(POSITION_PADDING)) 847 handle.write("\n")
848
849 - def _write_single_line(self, tag, text):
850 assert len(tag) == 2 851 line = tag + " " + text 852 if len(text) > self.MAX_WIDTH: 853 import warnings 854 warnings.warn("Line %r too long" % line) 855 self.handle.write(line + "\n")
856
857 - def _write_multi_line(self, tag, text):
858 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 859 lines = self._split_multi_line(text, max_len) 860 for line in lines: 861 self._write_single_line(tag, line)
862
863 - def _write_the_first_lines(self, record):
864 """Write the ID and AC lines.""" 865 if "." in record.id and record.id.rsplit(".", 1)[1].isdigit(): 866 version = "SV " + record.id.rsplit(".", 1)[1] 867 accession = self._get_annotation_str(record, "accession", 868 record.id.rsplit(".", 1)[0], 869 just_first=True) 870 else: 871 version = "" 872 accession = self._get_annotation_str(record, "accession", 873 record.id, 874 just_first=True) 875 876 if ";" in accession: 877 raise ValueError("Cannot have semi-colon in EMBL accession, %s" 878 % repr(str(accession))) 879 if " " in accession: 880 #This is out of practicallity... might it be allowed? 881 raise ValueError("Cannot have spaces in EMBL accession, %s" 882 % repr(str(accession))) 883 884 #Get the molecule type 885 #TODO - record this explicitly in the parser? 886 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 887 a = Alphabet._get_base_alphabet(record.seq.alphabet) 888 if not isinstance(a, Alphabet.Alphabet): 889 raise TypeError("Invalid alphabet") 890 elif isinstance(a, Alphabet.DNAAlphabet): 891 mol_type = "DNA" 892 units = "BP" 893 elif isinstance(a, Alphabet.RNAAlphabet): 894 mol_type = "RNA" 895 units = "BP" 896 elif isinstance(a, Alphabet.ProteinAlphabet): 897 mol_type = "PROTEIN" 898 units = "AA" 899 else: 900 #Must be something like NucleotideAlphabet 901 raise ValueError("Need a DNA, RNA or Protein alphabet") 902 903 #Get the taxonomy division 904 division = self._get_data_division(record) 905 906 #TODO - Full ID line 907 handle = self.handle 908 #ID <1>; SV <2>; <3>; <4>; <5>; <6>; <7> BP. 909 #1. Primary accession number 910 #2. Sequence version number 911 #3. Topology: 'circular' or 'linear' 912 #4. Molecule type 913 #5. Data class 914 #6. Taxonomic division 915 #7. Sequence length 916 self._write_single_line("ID", "%s; %s; ; %s; ; %s; %i %s." 917 % (accession, version, mol_type, 918 division, len(record), units)) 919 handle.write("XX\n") 920 self._write_single_line("AC", accession + ";") 921 handle.write("XX\n")
922
923 - def _get_data_division(self, record):
924 try: 925 division = record.annotations["data_file_division"] 926 except KeyError: 927 division = "UNC" 928 if division in ["PHG", "ENV", "FUN", "HUM", "INV", "MAM", "VRT", 929 "MUS", "PLN", "PRO", "ROD", "SYN", "TGN", "UNC", 930 "VRL", "XXX"]: 931 #Good, already EMBL style 932 # Division Code 933 # ----------------- ---- 934 # Bacteriophage PHG 935 # Environmental Sample ENV 936 # Fungal FUN 937 # Human HUM 938 # Invertebrate INV 939 # Other Mammal MAM 940 # Other Vertebrate VRT 941 # Mus musculus MUS 942 # Plant PLN 943 # Prokaryote PRO 944 # Other Rodent ROD 945 # Synthetic SYN 946 # Transgenic TGN 947 # Unclassified UNC (i.e. unknown) 948 # Viral VRL 949 # 950 #(plus XXX used for submiting data to EMBL) 951 pass 952 else: 953 #See if this is in GenBank style & can be converted. 954 #Generally a problem as the GenBank groups are wider 955 #than those of EMBL. Note that GenBank use "BCT" for 956 #both bacteria and acherea thus this maps to EMBL's 957 #"PRO" nicely. 958 gbk_to_embl = {"BCT": "PRO", 959 "UNK": "UNC", 960 } 961 try: 962 division = gbk_to_embl[division] 963 except KeyError: 964 division = "UNC" 965 assert len(division) == 3 966 return division
967
968 - def _write_references(self, record):
969 #The order should be RN, RC, RP, RX, RG, RA, RT, RL 970 number = 0 971 for ref in record.annotations["references"]: 972 if not isinstance(ref, SeqFeature.Reference): 973 continue 974 number += 1 975 self._write_single_line("RN", "[%i]" % number) 976 #TODO - support for RC line (needed in parser too) 977 #TODO - support more complex record reference locations? 978 if ref.location and len(ref.location) == 1: 979 self._write_single_line( 980 "RP", "%i-%i" % (ref.location[0].nofuzzy_start + 1, 981 ref.location[0].nofuzzy_end)) 982 #TODO - record any DOI or AGRICOLA identifier in the reference object? 983 if ref.pubmed_id: 984 self._write_single_line("RX", "PUBMED; %s." % ref.pubmed_id) 985 if ref.consrtm: 986 self._write_single_line("RG", "%s" % ref.consrtm) 987 if ref.authors: 988 #We store the AUTHORS data as a single string 989 self._write_multi_line("RA", ref.authors + ";") 990 if ref.title: 991 #We store the title as a single string 992 self._write_multi_line("RT", '"%s";' % ref.title) 993 if ref.journal: 994 #We store this as a single string - holds the journal name, 995 #volume, year, and page numbers of the citation 996 self._write_multi_line("RL", ref.journal) 997 self.handle.write("XX\n")
998
999 - def _write_comment(self, record):
1000 #This is a bit complicated due to the range of possible 1001 #ways people might have done their annotation... 1002 #Currently the parser uses a single string with newlines. 1003 #A list of lines is also reasonable. 1004 #A single (long) string is perhaps the most natural of all. 1005 #This means we may need to deal with line wrapping. 1006 comment = record.annotations["comment"] 1007 if isinstance(comment, basestring): 1008 lines = comment.split("\n") 1009 elif isinstance(comment, list) or isinstance(comment, tuple): 1010 lines = comment 1011 else: 1012 raise ValueError("Could not understand comment annotation") 1013 #TODO - Merge this with the GenBank comment code? 1014 if not lines: 1015 return 1016 for line in lines: 1017 self._write_multi_line("CC", line) 1018 self.handle.write("XX\n")
1019
1020 - def write_record(self, record):
1021 """Write a single record to the output file.""" 1022 1023 handle = self.handle 1024 self._write_the_first_lines(record) 1025 1026 #PR line (0 or 1 lines only), project identifier 1027 for xref in record.dbxrefs: 1028 if xref.startswith("Project:"): 1029 self._write_single_line("PR", xref + ";") 1030 handle.write("XX\n") 1031 break 1032 1033 #TODO - DT lines (date) 1034 1035 descr = record.description 1036 if descr == "<unknown description>": 1037 descr = "." 1038 self._write_multi_line("DE", descr) 1039 handle.write("XX\n") 1040 1041 #Should this be "source" or "organism"? 1042 self._write_multi_line( 1043 "OS", self._get_annotation_str(record, "organism")) 1044 try: 1045 #List of strings 1046 taxonomy = "; ".join(record.annotations["taxonomy"]) + "." 1047 except KeyError: 1048 taxonomy = "." 1049 self._write_multi_line("OC", taxonomy) 1050 handle.write("XX\n") 1051 1052 if "references" in record.annotations: 1053 self._write_references(record) 1054 1055 if "comment" in record.annotations: 1056 self._write_comment(record) 1057 1058 handle.write(self.FEATURE_HEADER) 1059 rec_length = len(record) 1060 for feature in record.features: 1061 self._write_feature(feature, rec_length) 1062 1063 self._write_sequence(record) 1064 handle.write("//\n")
1065 1066
1067 -class ImgtWriter(EmblWriter):
1068 HEADER_WIDTH = 5 1069 QUALIFIER_INDENT = 25 # Not 21 as in EMBL 1070 QUALIFIER_INDENT_STR = "FT" + " " * (QUALIFIER_INDENT - 2) 1071 QUALIFIER_INDENT_TMP = "FT %s " # 25 if %s is empty 1072 FEATURE_HEADER = "FH Key Location/Qualifiers\n"
1073 1074 if __name__ == "__main__": 1075 print "Quick self test" 1076 import os 1077 from StringIO import StringIO 1078
1079 - def compare_record(old, new):
1080 if old.id != new.id and old.name != new.name: 1081 raise ValueError("'%s' or '%s' vs '%s' or '%s' records" 1082 % (old.id, old.name, new.id, new.name)) 1083 if len(old.seq) != len(new.seq): 1084 raise ValueError("%i vs %i" % (len(old.seq), len(new.seq))) 1085 if str(old.seq).upper() != str(new.seq).upper(): 1086 if len(old.seq) < 200: 1087 raise ValueError("'%s' vs '%s'" % (old.seq, new.seq)) 1088 else: 1089 raise ValueError( 1090 "'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100])) 1091 if old.features and new.features: 1092 return compare_features(old.features, new.features) 1093 #Just insist on at least one word in common: 1094 if (old.description or new.description) \ 1095 and not set(old.description.split()).intersection(new.description.split()): 1096 raise ValueError("%s versus %s" 1097 % (repr(old.description), repr(new.description))) 1098 #TODO - check annotation 1099 if "contig" in old.annotations: 1100 assert old.annotations["contig"] == \ 1101 new.annotations["contig"] 1102 return True
1103
1104 - def compare_records(old_list, new_list):
1105 """Check two lists of SeqRecords agree, raises a ValueError if mismatch.""" 1106 if len(old_list) != len(new_list): 1107 raise ValueError( 1108 "%i vs %i records" % (len(old_list), len(new_list))) 1109 for old, new in zip(old_list, new_list): 1110 if not compare_record(old, new): 1111 return False 1112 return True
1113
1114 - def compare_feature(old, new, ignore_sub_features=False):
1115 """Check two SeqFeatures agree.""" 1116 if old.type != new.type: 1117 raise ValueError("Type %s versus %s" % (old.type, new.type)) 1118 if old.location.nofuzzy_start != new.location.nofuzzy_start \ 1119 or old.location.nofuzzy_end != new.location.nofuzzy_end: 1120 raise ValueError("%s versus %s:\n%s\nvs:\n%s" 1121 % (old.location, new.location, str(old), str(new))) 1122 if old.strand != new.strand: 1123 raise ValueError( 1124 "Different strand:\n%s\nvs:\n%s" % (str(old), str(new))) 1125 if old.location.start != new.location.start: 1126 raise ValueError("Start %s versus %s:\n%s\nvs:\n%s" 1127 % (old.location.start, new.location.start, str(old), str(new))) 1128 if old.location.end != new.location.end: 1129 raise ValueError("End %s versus %s:\n%s\nvs:\n%s" 1130 % (old.location.end, new.location.end, str(old), str(new))) 1131 if not ignore_sub_features: 1132 if len(old.sub_features) != len(new.sub_features): 1133 raise ValueError("Different sub features") 1134 for a, b in zip(old.sub_features, new.sub_features): 1135 if not compare_feature(a, b): 1136 return False 1137 #This only checks key shared qualifiers 1138 #Would a white list be easier? 1139 #for key in ["name", "gene", "translation", "codon_table", "codon_start", "locus_tag"]: 1140 for key in set(old.qualifiers).intersection(new.qualifiers): 1141 if key in ["db_xref", "protein_id", "product", "note"]: 1142 #EMBL and GenBank files are use different references/notes/etc 1143 continue 1144 if old.qualifiers[key] != new.qualifiers[key]: 1145 raise ValueError("Qualifier mis-match for %s:\n%s\n%s" 1146 % (key, old.qualifiers[key], new.qualifiers[key])) 1147 return True
1148
1149 - def compare_features(old_list, new_list, ignore_sub_features=False):
1150 """Check two lists of SeqFeatures agree, raises a ValueError if mismatch.""" 1151 if len(old_list) != len(new_list): 1152 raise ValueError( 1153 "%i vs %i features" % (len(old_list), len(new_list))) 1154 for old, new in zip(old_list, new_list): 1155 #This assumes they are in the same order 1156 if not compare_feature(old, new, ignore_sub_features): 1157 return False 1158 return True
1159
1160 - def check_genbank_writer(records):
1161 handle = StringIO() 1162 GenBankWriter(handle).write_file(records) 1163 handle.seek(0) 1164 1165 records2 = list(GenBankIterator(handle)) 1166 assert compare_records(records, records2)
1167
1168 - def check_embl_writer(records):
1169 handle = StringIO() 1170 try: 1171 EmblWriter(handle).write_file(records) 1172 except ValueError, err: 1173 print err 1174 return 1175 handle.seek(0) 1176 1177 records2 = list(EmblIterator(handle)) 1178 assert compare_records(records, records2)
1179 1180 for filename in os.listdir("../../Tests/GenBank"): 1181 if not filename.endswith(".gbk") and not filename.endswith(".gb"): 1182 continue 1183 print filename 1184 1185 handle = open("../../Tests/GenBank/%s" % filename) 1186 records = list(GenBankIterator(handle)) 1187 handle.close() 1188 1189 check_genbank_writer(records) 1190 check_embl_writer(records) 1191 1192 for filename in os.listdir("../../Tests/EMBL"): 1193 if not filename.endswith(".embl"): 1194 continue 1195 print filename 1196 1197 handle = open("../../Tests/EMBL/%s" % filename) 1198 records = list(EmblIterator(handle)) 1199 handle.close() 1200 1201 check_genbank_writer(records) 1202 check_embl_writer(records) 1203 1204 from Bio import SeqIO 1205 for filename in os.listdir("../../Tests/SwissProt"): 1206 if not filename.startswith("sp"): 1207 continue 1208 print filename 1209 1210 handle = open("../../Tests/SwissProt/%s" % filename) 1211 records = list(SeqIO.parse(handle, "swiss")) 1212 handle.close() 1213 1214 check_genbank_writer(records) 1215