Package Bio :: Package SeqIO :: Module SeqXmlIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.SeqXmlIO

  1  # Copyright 2010 by Thomas Schmitt. 
  2  # All rights reserved. 
  3  # 
  4  # This module is for reading and writing SeqXML format files as 
  5  # SeqRecord objects, and is expected to be used via the Bio.SeqIO API. 
  6  """Bio.SeqIO support for the "seqxml" file format, SeqXML. 
  7   
  8  You are expected to use this module via the Bio.SeqIO functions. 
  9   
 10  SeqXML is a lightweight XML format which is supposed be an alternative for 
 11  FASTA files. For more Information see http://www.seqXML.org and Schmitt et al 
 12  (2011), http://dx.doi.org/10.1093/bib/bbr025 
 13  """ 
 14   
 15  from __future__ import print_function 
 16   
 17  from xml.sax.saxutils import XMLGenerator 
 18  from xml.sax.xmlreader import AttributesImpl 
 19  from xml.dom import pulldom 
 20  from xml.sax import SAXParseException 
 21   
 22  from Bio._py3k import range 
 23  from Bio._py3k import basestring 
 24   
 25  from Bio import Alphabet 
 26  from Bio.Seq import Seq 
 27  from Bio.Seq import UnknownSeq 
 28  from Bio.SeqRecord import SeqRecord 
 29  from .Interfaces import SequentialSequenceWriter 
 30   
 31   
32 -class XMLRecordIterator(object):
33 """Base class for building iterators for record style XML formats. 34 35 It is assumed that all information for one record can be found within a 36 record element or above. Two types of methods are called when the start 37 tag of an element is reached. To receive only the attributes of an 38 element before its end tag is reached implement _attr_TAGNAME. 39 To get an element and its children as a DOM tree implement _elem_TAGNAME. 40 Everything that is part of the DOM tree will not trigger any further 41 method calls. 42 """ 43
44 - def __init__(self, handle, recordTag, namespace=None):
45 """Creating the object and initializing the XML parser.""" 46 self._recordTag = recordTag 47 self._namespace = namespace 48 self._events = pulldom.parse(handle)
49 50 # TODO: Implement __next__ in order for Python to treat this class as 51 # an interator and not just as an iterable. The SequenceIterator API 52 # expects base implementation of __iter__ to call __next__ internally. 53
54 - def __iter__(self):
55 """Iterate over the records in the XML file.""" 56 record = None 57 try: 58 for event, node in self._events: 59 60 if event == "START_ELEMENT" and node.namespaceURI == self._namespace: 61 62 if node.localName == self._recordTag: 63 # create an empty SeqRecord 64 record = SeqRecord('', id='') 65 66 # call matching methods with attributes only 67 if hasattr(self, "_attr_" + node.localName): 68 getattr(self, "_attr_" + node.localName)( 69 self._attributes(node), record) 70 71 # call matching methods with DOM tree 72 if hasattr(self, "_elem_" + node.localName): 73 # read the element and all nested elements into a DOM tree 74 self._events.expandNode(node) 75 node.normalize() 76 77 getattr(self, "_elem_" + node.localName)(node, record) 78 79 elif event == "END_ELEMENT" and node.namespaceURI == self._namespace and node.localName == self._recordTag: 80 yield record 81 82 except SAXParseException as e: 83 84 if e.getLineNumber() == 1 and e.getColumnNumber() == 0: 85 # empty file 86 pass 87 else: 88 import os 89 if e.getLineNumber() == 1 and e.getColumnNumber() == 1 \ 90 and os.name == "java": 91 # empty file, see http://bugs.jython.org/issue1774 92 pass 93 else: 94 raise
95
96 - def _attributes(self, node):
97 """Return the attributes of a DOM node as dictionary.""" 98 return dict((node.attributes.item(i).name, node.attributes.item(i).value) 99 for i in range(node.attributes.length))
100 101
102 -class SeqXmlIterator(XMLRecordIterator):
103 """Breaks seqXML file into SeqRecords. 104 105 Assumes valid seqXML please validate beforehand.""" 106
107 - def __init__(self, handle):
108 """Create the object.""" 109 XMLRecordIterator.__init__(self, handle, "entry") 110 111 self._source = None 112 self._source_version = None 113 self._version = None 114 self._speciesName = None 115 self._ncbiTaxId = None
116
117 - def _attr_seqXML(self, attr_dict, record):
118 """Parse the document metadata.""" 119 if "source" in attr_dict: 120 self._source = attr_dict["source"] 121 if "sourceVersion" in attr_dict: 122 self._source_version = attr_dict["sourceVersion"] 123 if "version" in attr_dict: 124 self._version = attr_dict["seqXMLversion"] 125 if "ncbiTaxID" in attr_dict: 126 self._ncbiTaxId = attr_dict["ncbiTaxID"] 127 if "speciesName" in attr_dict: 128 self._speciesName = attr_dict["speciesName"]
129
130 - def _attr_property(self, attr_dict, record):
131 """Parse key value pair properties and store them as annotations.""" 132 if "name" not in attr_dict: 133 raise ValueError("Malformed property element.") 134 135 value = attr_dict.get("value") 136 137 if attr_dict["name"] not in record.annotations: 138 record.annotations[attr_dict["name"]] = value 139 elif isinstance(record.annotations[attr_dict["name"]], list): 140 record.annotations[attr_dict["name"]].append(value) 141 else: 142 record.annotations[attr_dict["name"]] = [ 143 record.annotations[attr_dict["name"]], value]
144
145 - def _attr_species(self, attr_dict, record):
146 """Parse the species information.""" 147 if "name" not in attr_dict or "ncbiTaxID" not in attr_dict: 148 raise ValueError("Malformed species element!") 149 150 # the keywords for the species annotation are taken from SwissIO 151 record.annotations["organism"] = attr_dict["name"] 152 # TODO - Should have been a list to match SwissProt parser: 153 record.annotations["ncbi_taxid"] = attr_dict["ncbiTaxID"]
154
155 - def _attr_entry(self, attr_dict, record):
156 """New entry set id and the optional entry source.""" 157 if "id" not in attr_dict: 158 raise ValueError("Malformed entry! Identifier is missing.") 159 160 record.id = attr_dict["id"] 161 if "source" in attr_dict: 162 record.annotations["source"] = attr_dict["source"] 163 elif self._source is not None: 164 record.annotations["source"] = self._source 165 166 # initialize entry with global species definition 167 # the keywords for the species annotation are taken from SwissIO 168 if self._ncbiTaxId is not None: 169 record.annotations["ncbi_taxid"] = self._ncbiTaxId 170 if self._speciesName is not None: 171 record.annotations["organism"] = self._speciesName
172
173 - def _elem_DNAseq(self, node, record):
174 """Parse DNA sequence.""" 175 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 176 raise ValueError("Sequence length should be greater than 0.") 177 178 record.seq = Seq(node.firstChild.data, Alphabet.generic_dna)
179
180 - def _elem_RNAseq(self, node, record):
181 """Parse RNA sequence.""" 182 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 183 raise ValueError("Sequence length should be greater than 0.") 184 185 record.seq = Seq(node.firstChild.data, Alphabet.generic_rna)
186
187 - def _elem_AAseq(self, node, record):
188 """Parse protein sequence.""" 189 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 190 raise ValueError("Sequence length should be greater than 0.") 191 192 record.seq = Seq(node.firstChild.data, Alphabet.generic_protein)
193
194 - def _elem_description(self, node, record):
195 """Parse the description.""" 196 if node.hasChildNodes() and len(node.firstChild.data) > 0: 197 record.description = node.firstChild.data
198
199 - def _attr_DBRef(self, attr_dict, record):
200 """Parse a database cross reference""" 201 if "source" not in attr_dict or "id" not in attr_dict: 202 raise ValueError("Invalid DB cross reference.") 203 204 if "%s:%s" % (attr_dict["source"], attr_dict["id"]) not in record.dbxrefs: 205 record.dbxrefs.append( 206 "%s:%s" % (attr_dict["source"], attr_dict["id"]))
207 208
209 -class SeqXmlWriter(SequentialSequenceWriter):
210 """Writes SeqRecords into seqXML file. 211 212 SeqXML requires the sequence alphabet be explicitly RNA, DNA or protein, 213 i.e. an instance or subclass of Bio.Alphapet.RNAAlphabet, 214 Bio.Alphapet.DNAAlphabet or Bio.Alphapet.ProteinAlphabet. 215 """ 216
217 - def __init__(self, handle, source=None, source_version=None, 218 species=None, ncbiTaxId=None):
219 """Create Object and start the xml generator.""" 220 SequentialSequenceWriter.__init__(self, handle) 221 222 self.xml_generator = XMLGenerator(handle, "utf-8") 223 self.xml_generator.startDocument() 224 self.source = source 225 self.source_version = source_version 226 self.species = species 227 self.ncbiTaxId = ncbiTaxId
228
229 - def write_header(self):
230 """Write root node with document metadata.""" 231 SequentialSequenceWriter.write_header(self) 232 233 attrs = {"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", 234 "xsi:noNamespaceSchemaLocation": "http://www.seqxml.org/0.4/seqxml.xsd", 235 "seqXMLversion": "0.4"} 236 237 if self.source is not None: 238 attrs["source"] = self.source 239 if self.source_version is not None: 240 attrs["sourceVersion"] = self.source_version 241 if self.species is not None: 242 if not isinstance(self.species, basestring): 243 raise TypeError("species should be of type string") 244 attrs["speciesName"] = self.species 245 if self.ncbiTaxId is not None: 246 if not isinstance(self.ncbiTaxId, (basestring, int)): 247 raise TypeError("ncbiTaxID should be of type string or int") 248 attrs["ncbiTaxID"] = self.ncbiTaxId 249 250 self.xml_generator.startElement("seqXML", AttributesImpl(attrs))
251
252 - def write_record(self, record):
253 """Write one record.""" 254 if not record.id or record.id == "<unknown id>": 255 raise ValueError("SeqXML requires identifier") 256 257 if not isinstance(record.id, basestring): 258 raise TypeError("Identifier should be of type string") 259 260 attrb = {"id": record.id} 261 262 if "source" in record.annotations and self.source != record.annotations["source"]: 263 if not isinstance(record.annotations["source"], basestring): 264 raise TypeError("source should be of type string") 265 attrb["source"] = record.annotations["source"] 266 267 self.xml_generator.startElement("entry", AttributesImpl(attrb)) 268 self._write_species(record) 269 self._write_description(record) 270 self._write_seq(record) 271 self._write_dbxrefs(record) 272 self._write_properties(record) 273 self.xml_generator.endElement("entry")
274 281
282 - def _write_species(self, record):
283 """Write the species if given.""" 284 local_ncbi_taxid = None 285 if "ncbi_taxid" in record.annotations: 286 local_ncbi_taxid = record.annotations["ncbi_taxid"] 287 if isinstance(local_ncbi_taxid, list): 288 # SwissProt parser uses a list (which could cope with chimeras) 289 if len(local_ncbi_taxid) == 1: 290 local_ncbi_taxid = local_ncbi_taxid[0] 291 elif len(local_ncbi_taxid) == 0: 292 local_ncbi_taxid = None 293 else: 294 ValueError('Multiple entries for record.annotations["ncbi_taxid"], %r' 295 % local_ncbi_taxid) 296 if "organism" in record.annotations and local_ncbi_taxid: 297 local_org = record.annotations["organism"] 298 299 if not isinstance(local_org, basestring): 300 raise TypeError("organism should be of type string") 301 302 if not isinstance(local_ncbi_taxid, (basestring, int)): 303 raise TypeError("ncbiTaxID should be of type string or int") 304 305 # The local species definition is only written if it differs from the global species definition 306 if local_org != self.species or local_ncbi_taxid != self.ncbiTaxId: 307 308 attr = {"name": local_org, 309 "ncbiTaxID": local_ncbi_taxid} 310 self.xml_generator.startElement( 311 "species", AttributesImpl(attr)) 312 self.xml_generator.endElement("species")
313
314 - def _write_description(self, record):
315 """Write the description if given.""" 316 if record.description: 317 318 if not isinstance(record.description, basestring): 319 raise TypeError("Description should be of type string") 320 321 description = record.description 322 if description == "<unknown description>": 323 description = "" 324 325 if len(record.description) > 0: 326 self.xml_generator.startElement( 327 "description", AttributesImpl({})) 328 self.xml_generator.characters(description) 329 self.xml_generator.endElement("description")
330
331 - def _write_seq(self, record):
332 """Write the sequence. 333 334 Note that SeqXML requires a DNA, RNA or protein alphabet. 335 """ 336 if isinstance(record.seq, UnknownSeq): 337 raise TypeError( 338 "Sequence type is UnknownSeq but SeqXML requires sequence") 339 340 seq = str(record.seq) 341 342 if not len(seq) > 0: 343 raise ValueError("The sequence length should be greater than 0") 344 345 # Get the base alphabet (underneath any Gapped or StopCodon encoding) 346 alpha = Alphabet._get_base_alphabet(record.seq.alphabet) 347 if isinstance(alpha, Alphabet.RNAAlphabet): 348 seqElem = "RNAseq" 349 elif isinstance(alpha, Alphabet.DNAAlphabet): 350 seqElem = "DNAseq" 351 elif isinstance(alpha, Alphabet.ProteinAlphabet): 352 seqElem = "AAseq" 353 else: 354 raise ValueError("Need a DNA, RNA or Protein alphabet") 355 356 self.xml_generator.startElement(seqElem, AttributesImpl({})) 357 self.xml_generator.characters(seq) 358 self.xml_generator.endElement(seqElem)
359
360 - def _write_dbxrefs(self, record):
361 """Write all database cross references.""" 362 if record.dbxrefs is not None: 363 364 for dbxref in record.dbxrefs: 365 366 if not isinstance(dbxref, basestring): 367 raise TypeError("dbxrefs should be of type list of string") 368 if dbxref.find(':') < 1: 369 raise ValueError("dbxrefs should be in the form ['source:id', 'source:id' ]") 370 371 dbsource, dbid = dbxref.split(':', 1) 372 373 attr = {"source": dbsource, "id": dbid} 374 self.xml_generator.startElement("DBRef", AttributesImpl(attr)) 375 self.xml_generator.endElement("DBRef")
376
377 - def _write_properties(self, record):
378 """Write all annotations that are key value pairs with values of a primitive type or list of primitive types.""" 379 for key, value in record.annotations.items(): 380 381 if key not in ("organism", "ncbi_taxid", "source"): 382 383 if value is None: 384 385 attr = {"name": key} 386 self.xml_generator.startElement( 387 "property", AttributesImpl(attr)) 388 self.xml_generator.endElement("property") 389 390 elif isinstance(value, list): 391 392 for v in value: 393 if isinstance(value, (int, float, basestring)): 394 attr = {"name": key, "value": v} 395 self.xml_generator.startElement( 396 "property", AttributesImpl(attr)) 397 self.xml_generator.endElement("property") 398 399 elif isinstance(value, (int, float, basestring)): 400 401 attr = {"name": key, "value": str(value)} 402 self.xml_generator.startElement( 403 "property", AttributesImpl(attr)) 404 self.xml_generator.endElement("property")
405