Package Bio :: Package SeqIO :: Module SeqXmlIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.SeqXmlIO

  1  # Copyright 2010 by Thomas Schmitt. 
  2  # All rights reserved. 
  3  # 
  4  # This module is for reading and writing SeqXML format files as 
  5  # SeqRecord objects, and is expected to be used via the Bio.SeqIO API. 
  6  """Bio.SeqIO support for the "seqxml" file format, SeqXML. 
  7   
  8  You are expected to use this module via the Bio.SeqIO functions. 
  9   
 10  SeqXML is a lightweight XML format which is supposed be an alternative for 
 11  FASTA files. For more Information see http://www.seqXML.org and Schmitt et al 
 12  (2011), http://dx.doi.org/10.1093/bib/bbr025 
 13  """ 
 14   
 15  from __future__ import print_function 
 16   
 17  from xml.sax.saxutils import XMLGenerator 
 18  from xml.sax.xmlreader import AttributesImpl 
 19  from xml.dom import pulldom 
 20  from xml.sax import SAXParseException 
 21   
 22  from Bio._py3k import range 
 23  from Bio._py3k import basestring 
 24   
 25  from Bio import Alphabet 
 26  from Bio.Seq import Seq 
 27  from Bio.Seq import UnknownSeq 
 28  from Bio.SeqRecord import SeqRecord 
 29  from .Interfaces import SequentialSequenceWriter 
 30   
 31   
32 -class XMLRecordIterator(object):
33 """Base class for building iterators for record style XML formats. 34 35 It is assumed that all information for one record can be found within a 36 record element or above. Two types of methods are called when the start 37 tag of an element is reached. To receive only the attributes of an 38 element before its end tag is reached implement _attr_TAGNAME. 39 To get an element and its children as a DOM tree implement _elem_TAGNAME. 40 Everything that is part of the DOM tree will not trigger any further 41 method calls. 42 """ 43
44 - def __init__(self, handle, recordTag, namespace=None):
45 """Creating the object and initializing the XML parser.""" 46 self._recordTag = recordTag 47 self._namespace = namespace 48 self._events = pulldom.parse(handle)
49 50 # TODO: Implement __next__ in order for Python to treat this class as 51 # an interator and not just as an iterable. The SequenceIterator API 52 # expects base implementation of __iter__ to call __next__ internally. 53
54 - def __iter__(self):
55 """Iterate over the records in the XML file.""" 56 record = None 57 try: 58 for event, node in self._events: 59 60 if event == "START_ELEMENT" and node.namespaceURI == self._namespace: 61 62 if node.localName == self._recordTag: 63 # create an empty SeqRecord 64 record = SeqRecord('', id='') 65 66 # call matching methods with attributes only 67 if hasattr(self, "_attr_" + node.localName): 68 getattr(self, "_attr_" + node.localName)( 69 self._attributes(node), record) 70 71 # call matching methods with DOM tree 72 if hasattr(self, "_elem_" + node.localName): 73 # read the element and all nested elements into a DOM tree 74 self._events.expandNode(node) 75 node.normalize() 76 77 getattr(self, "_elem_" + node.localName)(node, record) 78 79 elif event == "END_ELEMENT" and node.namespaceURI == self._namespace and node.localName == self._recordTag: 80 yield record 81 82 except SAXParseException as e: 83 84 if e.getLineNumber() == 1 and e.getColumnNumber() == 0: 85 # empty file 86 pass 87 else: 88 import os 89 if e.getLineNumber() == 1 and e.getColumnNumber() == 1 \ 90 and os.name == "java": 91 # empty file, see http://bugs.jython.org/issue1774 92 pass 93 else: 94 raise
95
96 - def _attributes(self, node):
97 """Return the attributes of a DOM node as dictionary.""" 98 return dict((node.attributes.item(i).name, node.attributes.item(i).value) 99 for i in range(node.attributes.length))
100 101
102 -class SeqXmlIterator(XMLRecordIterator):
103 """Breaks seqXML file into SeqRecords. 104 105 Assumes valid seqXML please validate beforehand. 106 """ 107
108 - def __init__(self, handle):
109 """Create the object.""" 110 XMLRecordIterator.__init__(self, handle, "entry") 111 112 self._source = None 113 self._source_version = None 114 self._version = None 115 self._speciesName = None 116 self._ncbiTaxId = None
117
118 - def _attr_seqXML(self, attr_dict, record):
119 """Parse the document metadata.""" 120 if "source" in attr_dict: 121 self._source = attr_dict["source"] 122 if "sourceVersion" in attr_dict: 123 self._source_version = attr_dict["sourceVersion"] 124 if "version" in attr_dict: 125 self._version = attr_dict["seqXMLversion"] 126 if "ncbiTaxID" in attr_dict: 127 self._ncbiTaxId = attr_dict["ncbiTaxID"] 128 if "speciesName" in attr_dict: 129 self._speciesName = attr_dict["speciesName"]
130
131 - def _attr_property(self, attr_dict, record):
132 """Parse key value pair properties and store them as annotations.""" 133 if "name" not in attr_dict: 134 raise ValueError("Malformed property element.") 135 136 value = attr_dict.get("value") 137 138 if attr_dict["name"] not in record.annotations: 139 record.annotations[attr_dict["name"]] = value 140 elif isinstance(record.annotations[attr_dict["name"]], list): 141 record.annotations[attr_dict["name"]].append(value) 142 else: 143 record.annotations[attr_dict["name"]] = [ 144 record.annotations[attr_dict["name"]], value]
145
146 - def _attr_species(self, attr_dict, record):
147 """Parse the species information.""" 148 if "name" not in attr_dict or "ncbiTaxID" not in attr_dict: 149 raise ValueError("Malformed species element!") 150 151 # the keywords for the species annotation are taken from SwissIO 152 record.annotations["organism"] = attr_dict["name"] 153 # TODO - Should have been a list to match SwissProt parser: 154 record.annotations["ncbi_taxid"] = attr_dict["ncbiTaxID"]
155
156 - def _attr_entry(self, attr_dict, record):
157 """New entry set id and the optional entry source.""" 158 if "id" not in attr_dict: 159 raise ValueError("Malformed entry! Identifier is missing.") 160 161 record.id = attr_dict["id"] 162 if "source" in attr_dict: 163 record.annotations["source"] = attr_dict["source"] 164 elif self._source is not None: 165 record.annotations["source"] = self._source 166 167 # initialize entry with global species definition 168 # the keywords for the species annotation are taken from SwissIO 169 if self._ncbiTaxId is not None: 170 record.annotations["ncbi_taxid"] = self._ncbiTaxId 171 if self._speciesName is not None: 172 record.annotations["organism"] = self._speciesName
173
174 - def _elem_DNAseq(self, node, record):
175 """Parse DNA sequence.""" 176 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 177 raise ValueError("Sequence length should be greater than 0.") 178 179 record.seq = Seq(node.firstChild.data, Alphabet.generic_dna)
180
181 - def _elem_RNAseq(self, node, record):
182 """Parse RNA sequence.""" 183 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 184 raise ValueError("Sequence length should be greater than 0.") 185 186 record.seq = Seq(node.firstChild.data, Alphabet.generic_rna)
187
188 - def _elem_AAseq(self, node, record):
189 """Parse protein sequence.""" 190 if not (node.hasChildNodes() and len(node.firstChild.data) > 0): 191 raise ValueError("Sequence length should be greater than 0.") 192 193 record.seq = Seq(node.firstChild.data, Alphabet.generic_protein)
194
195 - def _elem_description(self, node, record):
196 """Parse the description.""" 197 if node.hasChildNodes() and len(node.firstChild.data) > 0: 198 record.description = node.firstChild.data
199
200 - def _attr_DBRef(self, attr_dict, record):
201 """Parse a database cross reference""" 202 if "source" not in attr_dict or "id" not in attr_dict: 203 raise ValueError("Invalid DB cross reference.") 204 205 if "%s:%s" % (attr_dict["source"], attr_dict["id"]) not in record.dbxrefs: 206 record.dbxrefs.append( 207 "%s:%s" % (attr_dict["source"], attr_dict["id"]))
208 209
210 -class SeqXmlWriter(SequentialSequenceWriter):
211 """Writes SeqRecords into seqXML file. 212 213 SeqXML requires the sequence alphabet be explicitly RNA, DNA or protein, 214 i.e. an instance or subclass of Bio.Alphapet.RNAAlphabet, 215 Bio.Alphapet.DNAAlphabet or Bio.Alphapet.ProteinAlphabet. 216 """ 217
218 - def __init__(self, handle, source=None, source_version=None, 219 species=None, ncbiTaxId=None):
220 """Create Object and start the xml generator.""" 221 SequentialSequenceWriter.__init__(self, handle) 222 223 self.xml_generator = XMLGenerator(handle, "utf-8") 224 self.xml_generator.startDocument() 225 self.source = source 226 self.source_version = source_version 227 self.species = species 228 self.ncbiTaxId = ncbiTaxId
229
230 - def write_header(self):
231 """Write root node with document metadata.""" 232 SequentialSequenceWriter.write_header(self) 233 234 attrs = {"xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", 235 "xsi:noNamespaceSchemaLocation": "http://www.seqxml.org/0.4/seqxml.xsd", 236 "seqXMLversion": "0.4"} 237 238 if self.source is not None: 239 attrs["source"] = self.source 240 if self.source_version is not None: 241 attrs["sourceVersion"] = self.source_version 242 if self.species is not None: 243 if not isinstance(self.species, basestring): 244 raise TypeError("species should be of type string") 245 attrs["speciesName"] = self.species 246 if self.ncbiTaxId is not None: 247 if not isinstance(self.ncbiTaxId, (basestring, int)): 248 raise TypeError("ncbiTaxID should be of type string or int") 249 attrs["ncbiTaxID"] = self.ncbiTaxId 250 251 self.xml_generator.startElement("seqXML", AttributesImpl(attrs))
252
253 - def write_record(self, record):
254 """Write one record.""" 255 if not record.id or record.id == "<unknown id>": 256 raise ValueError("SeqXML requires identifier") 257 258 if not isinstance(record.id, basestring): 259 raise TypeError("Identifier should be of type string") 260 261 attrb = {"id": record.id} 262 263 if "source" in record.annotations and self.source != record.annotations["source"]: 264 if not isinstance(record.annotations["source"], basestring): 265 raise TypeError("source should be of type string") 266 attrb["source"] = record.annotations["source"] 267 268 self.xml_generator.startElement("entry", AttributesImpl(attrb)) 269 self._write_species(record) 270 self._write_description(record) 271 self._write_seq(record) 272 self._write_dbxrefs(record) 273 self._write_properties(record) 274 self.xml_generator.endElement("entry")
275 282
283 - def _write_species(self, record):
284 """Write the species if given.""" 285 local_ncbi_taxid = None 286 if "ncbi_taxid" in record.annotations: 287 local_ncbi_taxid = record.annotations["ncbi_taxid"] 288 if isinstance(local_ncbi_taxid, list): 289 # SwissProt parser uses a list (which could cope with chimeras) 290 if len(local_ncbi_taxid) == 1: 291 local_ncbi_taxid = local_ncbi_taxid[0] 292 elif len(local_ncbi_taxid) == 0: 293 local_ncbi_taxid = None 294 else: 295 ValueError('Multiple entries for record.annotations["ncbi_taxid"], %r' 296 % local_ncbi_taxid) 297 if "organism" in record.annotations and local_ncbi_taxid: 298 local_org = record.annotations["organism"] 299 300 if not isinstance(local_org, basestring): 301 raise TypeError("organism should be of type string") 302 303 if not isinstance(local_ncbi_taxid, (basestring, int)): 304 raise TypeError("ncbiTaxID should be of type string or int") 305 306 # The local species definition is only written if it differs from the global species definition 307 if local_org != self.species or local_ncbi_taxid != self.ncbiTaxId: 308 309 attr = {"name": local_org, 310 "ncbiTaxID": local_ncbi_taxid} 311 self.xml_generator.startElement( 312 "species", AttributesImpl(attr)) 313 self.xml_generator.endElement("species")
314
315 - def _write_description(self, record):
316 """Write the description if given.""" 317 if record.description: 318 319 if not isinstance(record.description, basestring): 320 raise TypeError("Description should be of type string") 321 322 description = record.description 323 if description == "<unknown description>": 324 description = "" 325 326 if len(record.description) > 0: 327 self.xml_generator.startElement( 328 "description", AttributesImpl({})) 329 self.xml_generator.characters(description) 330 self.xml_generator.endElement("description")
331
332 - def _write_seq(self, record):
333 """Write the sequence. 334 335 Note that SeqXML requires a DNA, RNA or protein alphabet. 336 """ 337 if isinstance(record.seq, UnknownSeq): 338 raise TypeError( 339 "Sequence type is UnknownSeq but SeqXML requires sequence") 340 341 seq = str(record.seq) 342 343 if not len(seq) > 0: 344 raise ValueError("The sequence length should be greater than 0") 345 346 # Get the base alphabet (underneath any Gapped or StopCodon encoding) 347 alpha = Alphabet._get_base_alphabet(record.seq.alphabet) 348 if isinstance(alpha, Alphabet.RNAAlphabet): 349 seqElem = "RNAseq" 350 elif isinstance(alpha, Alphabet.DNAAlphabet): 351 seqElem = "DNAseq" 352 elif isinstance(alpha, Alphabet.ProteinAlphabet): 353 seqElem = "AAseq" 354 else: 355 raise ValueError("Need a DNA, RNA or Protein alphabet") 356 357 self.xml_generator.startElement(seqElem, AttributesImpl({})) 358 self.xml_generator.characters(seq) 359 self.xml_generator.endElement(seqElem)
360
361 - def _write_dbxrefs(self, record):
362 """Write all database cross references.""" 363 if record.dbxrefs is not None: 364 365 for dbxref in record.dbxrefs: 366 367 if not isinstance(dbxref, basestring): 368 raise TypeError("dbxrefs should be of type list of string") 369 if dbxref.find(':') < 1: 370 raise ValueError("dbxrefs should be in the form ['source:id', 'source:id' ]") 371 372 dbsource, dbid = dbxref.split(':', 1) 373 374 attr = {"source": dbsource, "id": dbid} 375 self.xml_generator.startElement("DBRef", AttributesImpl(attr)) 376 self.xml_generator.endElement("DBRef")
377
378 - def _write_properties(self, record):
379 """Write all annotations that are key value pairs with values of a primitive type or list of primitive types.""" 380 for key, value in record.annotations.items(): 381 382 if key not in ("organism", "ncbi_taxid", "source"): 383 384 if value is None: 385 386 attr = {"name": key} 387 self.xml_generator.startElement( 388 "property", AttributesImpl(attr)) 389 self.xml_generator.endElement("property") 390 391 elif isinstance(value, list): 392 393 for v in value: 394 if isinstance(value, (int, float, basestring)): 395 attr = {"name": key, "value": v} 396 self.xml_generator.startElement( 397 "property", AttributesImpl(attr)) 398 self.xml_generator.endElement("property") 399 400 elif isinstance(value, (int, float, basestring)): 401 402 attr = {"name": key, "value": str(value)} 403 self.xml_generator.startElement( 404 "property", AttributesImpl(attr)) 405 self.xml_generator.endElement("property")
406