Package Bio :: Package SeqIO :: Module SeqXmlIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.SeqXmlIO

  1  # Copyright 2010 by Thomas Schmitt. 
  2  # All rights reserved. 
  3  # 
  4  # This module is for reading and writing SeqXML format files as 
  5  # SeqRecord objects, and is expected to be used via the Bio.SeqIO API. 
  6  """Bio.SeqIO support for the "seqxml" file format, SeqXML. 
  7   
  8  You are expected to use this module via the Bio.SeqIO functions. 
  9   
 10  SeqXML is a lightweight XML format which is supposed be an alternative for 
 11  FASTA files. For more Information see http://www.seqXML.org and Schmitt et al 
 12  (2011), http://dx.doi.org/10.1093/bib/bbr025 
 13  """ 
 14   
 15  from __future__ import print_function 
 16   
 17  from xml.sax.saxutils import XMLGenerator 
 18  from xml.sax.xmlreader import AttributesImpl 
 19  from xml.dom import pulldom 
 20  from xml.sax import SAXParseException 
 21   
 22  from Bio._py3k import range 
 23  from Bio._py3k import basestring 
 24   
 25  from Bio import Alphabet 
 26  from Bio.Seq import Seq 
 27  from Bio.Seq import UnknownSeq 
 28  from Bio.SeqRecord import SeqRecord 
 29  from .Interfaces import SequentialSequenceWriter 
 30   
 31   
32 -class XMLRecordIterator:
33 """Base class for building iterators for record style XML formats. 34 35 It is assumed that all information for one record can be found within a 36 record element or above. Two types of methods are called when the start 37 tag of an element is reached. To receive only the attributes of an 38 element before its end tag is reached implement _attr_TAGNAME. 39 To get an element and its children as a DOM tree implement _elem_TAGNAME. 40 Everything that is part of the DOM tree will not trigger any further 41 method calls. 42 """ 43
44 - def __init__(self, handle, recordTag, namespace=None):
45 """Creating the object and initializing the XML parser.""" 46 47 self._recordTag = recordTag 48 self._namespace = namespace 49 self._events = pulldom.parse(handle)
50
51 - def __iter__(self):
52 """Iterate over the records in the XML file. 53 Returns the last parsed record.""" 54 55 record = None 56 try: 57 for event, node in self._events: 58 59 if event == "START_ELEMENT" and node.namespaceURI == self._namespace: 60 61 if node.localName == self._recordTag: 62 #create an empty SeqRecord 63 record = SeqRecord('', id='') 64 65 #call matching methods with attributes only 66 if hasattr(self, "_attr_" + node.localName): 67 getattr(self, "_attr_" + node.localName)( 68 self._attributes(node), record) 69 70 #call matching methods with DOM tree 71 if hasattr(self, "_elem_" + node.localName): 72 #read the element and all nested elements into a DOM tree 73 self._events.expandNode(node) 74 node.normalize() 75 76 getattr(self, "_elem_" + node.localName)(node, record) 77 78 elif event == "END_ELEMENT" and node.namespaceURI == self._namespace and node.localName == self._recordTag: 79 yield record 80 81 except SAXParseException as e: 82 83 if e.getLineNumber() == 1 and e.getColumnNumber() == 0: 84 #empty file 85 pass 86 else: 87 import os 88 if e.getLineNumber() == 1 and e.getColumnNumber() == 1 \ 89 and os.name == "java": 90 #empty file, see http://bugs.jython.org/issue1774 91 pass 92 else: 93 raise
94
95 - def _attributes(self, node):
96 """Return the attributes of a DOM node as dictionary.""" 97 98 return dict((node.attributes.item(i).name, node.attributes.item(i).value) 99 for i in range(node.attributes.length))
100 101
102 -class SeqXmlIterator(XMLRecordIterator):
103 """Breaks seqXML file into SeqRecords. 104 105 Assumes valid seqXML please validate beforehand.""" 106
107 - def __init__(self, handle):
108 """Create the object.""" 109 XMLRecordIterator.__init__(self, handle, "entry") 110 111 self._source = None 112 self._source_version = None 113 self._version = None 114 self._speciesName = None 115 self._ncbiTaxId = None
116
117 - def _attr_seqXML(self, attr_dict, record):
118 """Parse the document metadata.""" 119 120 if "source" in attr_dict: 121 self._source = attr_dict["source"] 122 if "sourceVersion" in attr_dict: 123 self._source_version = attr_dict["sourceVersion"] 124 if "version" in attr_dict: 125 self._version = attr_dict["seqXMLversion"] 126 if "ncbiTaxID" in attr_dict: 127 self._ncbiTaxId = attr_dict["ncbiTaxID"] 128 if "speciesName" in attr_dict: 129 self._speciesName = attr_dict["speciesName"]
130
131 - def _attr_property(self, attr_dict, record):
132 """Parse key value pair properties and store them as annotations.""" 133 134 if "name" not in attr_dict: 135 raise ValueError("Malformed property element.") 136 137 value = attr_dict.get("value", None) 138 139 if attr_dict["name"] not in record.annotations: 140 record.annotations[attr_dict["name"]] = value 141 elif isinstance(record.annotations[attr_dict["name"]], list): 142 record.annotations[attr_dict["name"]].append(value) 143 else: 144 record.annotations[attr_dict["name"]] = [ 145 record.annotations[attr_dict["name"]], value]
146
147 - def _attr_species(self, attr_dict, record):
148 """Parse the species information.""" 149 150 if "name" not in attr_dict or "ncbiTaxID" not in attr_dict: 151 raise ValueError("Malformed species element!") 152 153 #the keywords for the species annotation are taken from SwissIO 154 record.annotations["organism"] = attr_dict["name"] 155 #TODO - Should have been a list to match SwissProt parser: 156 record.annotations["ncbi_taxid"] = attr_dict["ncbiTaxID"]
157
158 - def _attr_entry(self, attr_dict, record):
159 """New entry set id and the optional entry source.""" 160 161 if "id" not in attr_dict: 162 raise ValueError("Malformed entry! Identifier is missing.") 163 164 record.id = attr_dict["id"] 165 if "source" in attr_dict: 166 record.annotations["source"] = attr_dict["source"] 167 elif self._source is not None: 168 record.annotations["source"] = self._source 169 170 #initialize entry with global species definition 171 #the keywords for the species annotation are taken from SwissIO 172 if self._ncbiTaxId is not None: 173 record.annotations["ncbi_taxid"] = self._ncbiTaxId 174 if self._speciesName is not None: 175 record.annotations["organism"] = self._speciesName
176
177 - def _elem_DNAseq(self, node, record):
178 """Parse DNA sequence.""" 179 180 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 181 raise ValueError("Sequence length should be greater than 0.") 182 183 record.seq = Seq(node.firstChild.data, Alphabet.generic_dna)
184
185 - def _elem_RNAseq(self, node, record):
186 """Parse RNA sequence.""" 187 188 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 189 raise ValueError("Sequence length should be greater than 0.") 190 191 record.seq = Seq(node.firstChild.data, Alphabet.generic_rna)
192
193 - def _elem_AAseq(self, node, record):
194 """Parse protein sequence.""" 195 196 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 197 raise ValueError("Sequence length should be greater than 0.") 198 199 record.seq = Seq(node.firstChild.data, Alphabet.generic_protein)
200
201 - def _elem_description(self, node, record):
202 """Parse the description.""" 203 204 if node.hasChildNodes() and len(node.firstChild.data) > 0: 205 record.description = node.firstChild.data
206
207 - def _attr_DBRef(self, attr_dict, record):
208 """Parse a database cross reference""" 209 210 if "source" not in attr_dict or "id" not in attr_dict: 211 raise ValueError("Invalid DB cross reference.") 212 213 if "%s:%s" % (attr_dict["source"], attr_dict["id"]) not in record.dbxrefs: 214 record.dbxrefs.append( 215 "%s:%s" % (attr_dict["source"], attr_dict["id"]))
216 217
218 -class SeqXmlWriter(SequentialSequenceWriter):
219 """Writes SeqRecords into seqXML file. 220 221 SeqXML requires the sequence alphabet be explicitly RNA, DNA or protein, 222 i.e. an instance or subclass of Bio.Alphapet.RNAAlphabet, 223 Bio.Alphapet.DNAAlphabet or Bio.Alphapet.ProteinAlphabet. 224 """ 225
226 - def __init__(self, handle, source=None, source_version=None, 227 species=None, ncbiTaxId=None):
228 """Create Object and start the xml generator.""" 229 230 SequentialSequenceWriter.__init__(self, handle) 231 232 self.xml_generator = XMLGenerator(handle, "utf-8") 233 self.xml_generator.startDocument() 234 self.source = source 235 self.source_version = source_version 236 self.species = species 237 self.ncbiTaxId = ncbiTaxId
238
239 - def write_header(self):
240 """Write root node with document metadata.""" 241 SequentialSequenceWriter.write_header(self) 242 243 attrs = {"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", 244 "xsi:noNamespaceSchemaLocation": "http://www.seqxml.org/0.4/seqxml.xsd", 245 "seqXMLversion": "0.4"} 246 247 if self.source is not None: 248 attrs["source"] = self.source 249 if self.source_version is not None: 250 attrs["sourceVersion"] = self.source_version 251 if self.species is not None: 252 if not isinstance(self.species, basestring): 253 raise TypeError("species should be of type string") 254 attrs["speciesName"] = self.species 255 if self.ncbiTaxId is not None: 256 if not isinstance(self.ncbiTaxId, (basestring, int)): 257 raise TypeError("ncbiTaxID should be of type string or int") 258 attrs["ncbiTaxID"] = self.ncbiTaxId 259 260 self.xml_generator.startElement("seqXML", AttributesImpl(attrs))
261
262 - def write_record(self, record):
263 """Write one record.""" 264 265 if not record.id or record.id == "<unknown id>": 266 raise ValueError("SeqXML requires identifier") 267 268 if not isinstance(record.id, basestring): 269 raise TypeError("Identifier should be of type string") 270 271 attrb = {"id": record.id} 272 273 if "source" in record.annotations and self.source != record.annotations["source"]: 274 if not isinstance(record.annotations["source"], basestring): 275 raise TypeError("source should be of type string") 276 attrb["source"] = record.annotations["source"] 277 278 self.xml_generator.startElement("entry", AttributesImpl(attrb)) 279 self._write_species(record) 280 self._write_description(record) 281 self._write_seq(record) 282 self._write_dbxrefs(record) 283 self._write_properties(record) 284 self.xml_generator.endElement("entry")
285 293
294 - def _write_species(self, record):
295 """Write the species if given.""" 296 297 local_ncbi_taxid = None 298 if "ncbi_taxid" in record.annotations: 299 local_ncbi_taxid = record.annotations["ncbi_taxid"] 300 if isinstance(local_ncbi_taxid, list): 301 #SwissProt parser uses a list (which could cope with chimeras) 302 if len(local_ncbi_taxid) == 1: 303 local_ncbi_taxid = local_ncbi_taxid[0] 304 elif len(local_ncbi_taxid) == 0: 305 local_ncbi_taxid = None 306 else: 307 ValueError('Multiple entries for record.annotations["ncbi_taxid"], %r' 308 % local_ncbi_taxid) 309 if "organism" in record.annotations and local_ncbi_taxid: 310 local_org = record.annotations["organism"] 311 312 if not isinstance(local_org, basestring): 313 raise TypeError("organism should be of type string") 314 315 if not isinstance(local_ncbi_taxid, (basestring, int)): 316 raise TypeError("ncbiTaxID should be of type string or int") 317 318 #The local species definition is only written if it differs from the global species definition 319 if local_org != self.species or local_ncbi_taxid != self.ncbiTaxId: 320 321 attr = {"name": local_org, 322 "ncbiTaxID": local_ncbi_taxid} 323 self.xml_generator.startElement( 324 "species", AttributesImpl(attr)) 325 self.xml_generator.endElement("species")
326
327 - def _write_description(self, record):
328 """Write the description if given.""" 329 330 if record.description: 331 332 if not isinstance(record.description, basestring): 333 raise TypeError("Description should be of type string") 334 335 description = record.description 336 if description == "<unknown description>": 337 description = "" 338 339 if len(record.description) > 0: 340 self.xml_generator.startElement( 341 "description", AttributesImpl({})) 342 self.xml_generator.characters(description) 343 self.xml_generator.endElement("description")
344
345 - def _write_seq(self, record):
346 """Write the sequence. 347 348 Note that SeqXML requires a DNA, RNA or protein alphabet. 349 """ 350 351 if isinstance(record.seq, UnknownSeq): 352 raise TypeError( 353 "Sequence type is UnknownSeq but SeqXML requires sequence") 354 355 seq = str(record.seq) 356 357 if not len(seq) > 0: 358 raise ValueError("The sequence length should be greater than 0") 359 360 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 361 alpha = Alphabet._get_base_alphabet(record.seq.alphabet) 362 if isinstance(alpha, Alphabet.RNAAlphabet): 363 seqElem = "RNAseq" 364 elif isinstance(alpha, Alphabet.DNAAlphabet): 365 seqElem = "DNAseq" 366 elif isinstance(alpha, Alphabet.ProteinAlphabet): 367 seqElem = "AAseq" 368 else: 369 raise ValueError("Need a DNA, RNA or Protein alphabet") 370 371 self.xml_generator.startElement(seqElem, AttributesImpl({})) 372 self.xml_generator.characters(seq) 373 self.xml_generator.endElement(seqElem)
374
375 - def _write_dbxrefs(self, record):
376 """Write all database cross references.""" 377 if record.dbxrefs is not None: 378 379 for dbxref in record.dbxrefs: 380 381 if not isinstance(dbxref, basestring): 382 raise TypeError("dbxrefs should be of type list of string") 383 if dbxref.find(':') < 1: 384 raise ValueError("dbxrefs should be in the form ['source:id', 'source:id' ]") 385 386 dbsource, dbid = dbxref.split(':', 1) 387 388 attr = {"source": dbsource, "id": dbid} 389 self.xml_generator.startElement("DBRef", AttributesImpl(attr)) 390 self.xml_generator.endElement("DBRef")
391
392 - def _write_properties(self, record):
393 """Write all annotations that are key value pairs with values of a primitive type or list of primitive types.""" 394 395 for key, value in record.annotations.items(): 396 397 if key not in ("organism", "ncbi_taxid", "source"): 398 399 if value is None: 400 401 attr = {"name": key} 402 self.xml_generator.startElement( 403 "property", AttributesImpl(attr)) 404 self.xml_generator.endElement("property") 405 406 elif isinstance(value, list): 407 408 for v in value: 409 if isinstance(value, (int, float, basestring)): 410 attr = {"name": key, "value": v} 411 self.xml_generator.startElement( 412 "property", AttributesImpl(attr)) 413 self.xml_generator.endElement("property") 414 415 elif isinstance(value, (int, float, basestring)): 416 417 attr = {"name": key, "value": str(value)} 418 self.xml_generator.startElement( 419 "property", AttributesImpl(attr)) 420 self.xml_generator.endElement("property")
421 422 if __name__ == "__main__": 423 print("Running quick self test") 424 425 from Bio import SeqIO 426 import sys 427 428 with open("Tests/SeqXML/protein_example.xml", "r") as fileHandle: 429 records = list(SeqIO.parse(fileHandle, "seqxml")) 430 431 from Bio._py3k import StringIO 432 stringHandle = StringIO() 433 434 SeqIO.write(records, stringHandle, "seqxml") 435 SeqIO.write(records, sys.stdout, "seqxml") 436 print("") 437 438 stringHandle.seek(0) 439 records = list(SeqIO.parse(stringHandle, "seqxml")) 440 441 SeqIO.write(records, sys.stdout, "seqxml") 442 print("") 443