Package Bio :: Package AlignIO :: Module EmbossIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.AlignIO.EmbossIO

  1  # Copyright 2008-2016 by Peter Cock.  All rights reserved. 
  2  # 
  3  # This file is part of the Biopython distribution and governed by your 
  4  # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 
  5  # Please see the LICENSE file that should have been included as part of this 
  6  # package. 
  7  """Bio.AlignIO support for "emboss" alignment output from EMBOSS tools. 
  8   
  9  You are expected to use this module via the Bio.AlignIO functions (or the 
 10  Bio.SeqIO functions if you want to work directly with the gapped sequences). 
 11   
 12  This module contains a parser for the EMBOSS pairs/simple file format, for 
 13  example from the alignret, water and needle tools. 
 14  """ 
 15   
 16  from __future__ import print_function 
 17   
 18  from Bio.Seq import Seq 
 19  from Bio.SeqRecord import SeqRecord 
 20  from Bio.Align import MultipleSeqAlignment 
 21  from .Interfaces import AlignmentIterator, SequentialAlignmentWriter 
 22   
 23   
24 -class EmbossWriter(SequentialAlignmentWriter):
25 """Emboss alignment writer (WORK IN PROGRESS). 26 27 Writes a simplfied version of the EMBOSS pairs/simple file format. 28 A lot of the information their tools record in their headers is not 29 available and is omitted. 30 """ 31
32 - def write_header(self):
33 handle = self.handle 34 handle.write("########################################\n") 35 handle.write("# Program: Biopython\n") 36 try: 37 handle.write("# Report_file: %s\n" % handle.name) 38 except AttributeError: 39 pass 40 handle.write("########################################\n")
41 46
47 - def write_alignment(self, alignment):
48 """Use this to write (another) single alignment to an open file.""" 49 handle = self.handle 50 handle.write("#=======================================\n") 51 handle.write("#\n") 52 handle.write("# Aligned_sequences: %i\n" % len(alignment)) 53 for i, record in enumerate(alignment): 54 handle.write("# %i: %s\n" % (i + 1, record.id)) 55 handle.write("#\n") 56 handle.write("# Length: %i\n" % alignment.get_alignment_length()) 57 handle.write("#\n") 58 handle.write("#=======================================\n") 59 handle.write("\n") 60 # ... 61 assert False
62 63
64 -class EmbossIterator(AlignmentIterator):
65 """Emboss alignment iterator. 66 67 For reading the (pairwise) alignments from EMBOSS tools in what they 68 call the "pairs" and "simple" formats. 69 """ 70 71 _header = None # for caching lines between __next__ calls 72
73 - def __next__(self):
74 75 handle = self.handle 76 77 if self._header is None: 78 line = handle.readline() 79 else: 80 # Header we saved from when we were parsing 81 # the previous alignment. 82 line = self._header 83 self._header = None 84 85 if not line: 86 raise StopIteration 87 88 while line.rstrip() != "#=======================================": 89 line = handle.readline() 90 if not line: 91 raise StopIteration 92 93 length_of_seqs = None 94 number_of_seqs = None 95 ids = [] 96 seqs = [] 97 header_dict = {} 98 99 while line[0] == "#": 100 # Read in the rest of this alignment header, 101 # try and discover the number of records expected 102 # and their length 103 parts = line[1:].split(":", 1) 104 key = parts[0].lower().strip() 105 if key == "aligned_sequences": 106 number_of_seqs = int(parts[1].strip()) 107 assert len(ids) == 0 108 # Should now expect the record identifiers... 109 for i in range(number_of_seqs): 110 line = handle.readline() 111 parts = line[1:].strip().split(":", 1) 112 assert i + 1 == int(parts[0].strip()) 113 ids.append(parts[1].strip()) 114 assert len(ids) == number_of_seqs 115 if key == "length": 116 length_of_seqs = int(parts[1].strip()) 117 118 # Parse the rest of the header 119 if key == 'identity': 120 header_dict['identity'] = int(parts[1].strip().split('/')[0]) 121 if key == 'similarity': 122 header_dict['similarity'] = int(parts[1].strip().split('/')[0]) 123 if key == 'gaps': 124 header_dict['gaps'] = int(parts[1].strip().split('/')[0]) 125 if key == 'score': 126 header_dict['score'] = float(parts[1].strip()) 127 128 # And read in another line... 129 line = handle.readline() 130 131 if number_of_seqs is None: 132 raise ValueError("Number of sequences missing!") 133 if length_of_seqs is None: 134 raise ValueError("Length of sequences missing!") 135 136 if self.records_per_alignment is not None \ 137 and self.records_per_alignment != number_of_seqs: 138 raise ValueError("Found %i records in this alignment, told to expect %i" 139 % (number_of_seqs, self.records_per_alignment)) 140 141 seqs = ["" for id in ids] 142 seq_starts = [] 143 index = 0 144 145 # Parse the seqs 146 while line: 147 if len(line) > 21: 148 id_start = line[:21].strip().split(None, 1) 149 seq_end = line[21:].strip().split(None, 1) 150 if len(id_start) == 2 and len(seq_end) == 2: 151 # identifier, seq start position, seq, seq end position 152 # (an aligned seq is broken up into multiple lines) 153 id, start = id_start 154 seq, end = seq_end 155 if start >= end: 156 # Special case, either a single letter is present, 157 # or no letters at all. 158 if seq.replace("-", "") == "": 159 start = int(start) 160 end = int(end) 161 else: 162 start = int(start) - 1 163 end = int(end) 164 else: 165 assert seq.replace("-", "") != "", repr(line) 166 start = int(start) - 1 # python counting 167 end = int(end) 168 169 if index < 0 or index >= number_of_seqs: 170 raise ValueError("Expected index %i in range [0,%i)" 171 % (index, number_of_seqs)) 172 # The identifier is truncated... 173 assert id == ids[index] or id == ids[index][:len(id)] 174 175 if len(seq_starts) == index: 176 # Record the start 177 seq_starts.append(start) 178 179 # Check the start... 180 if start >= end: 181 assert seq.replace("-", "") == "", line 182 elif start - seq_starts[index] != len(seqs[index].replace("-", "")): 183 raise ValueError("Found %i chars so far for sequence %i (%s, %s), line says start %i:\n%s" 184 % (len(seqs[index].replace("-", "")), index, id, repr(seqs[index]), 185 start, line)) 186 seqs[index] += seq 187 188 # Check the end ... 189 if end != seq_starts[index] + len(seqs[index].replace("-", "")): 190 raise ValueError( 191 "Found %i chars so far for sequence %i (%s, %s, start=%i), file says end %i:\n%s" 192 % (len(seqs[index].replace("-", "")), index, id, repr(seqs[index]), 193 seq_starts[index], end, line)) 194 195 index += 1 196 if index >= number_of_seqs: 197 index = 0 198 else: 199 # just a start value, this is just alignment annotation (?) 200 # print "Skipping: " + line.rstrip() 201 pass 202 elif line.strip() == "": 203 # Just a spacer? 204 pass 205 else: 206 print(line) 207 assert False 208 209 line = handle.readline() 210 if line.rstrip() == "#---------------------------------------" \ 211 or line.rstrip() == "#=======================================": 212 # End of alignment 213 self._header = line 214 break 215 216 assert index == 0 217 218 if self.records_per_alignment is not None \ 219 and self.records_per_alignment != len(ids): 220 raise ValueError("Found %i records in this alignment, told to expect %i" 221 % (len(ids), self.records_per_alignment)) 222 223 records = [] 224 for id, seq in zip(ids, seqs): 225 if len(seq) != length_of_seqs: 226 # EMBOSS 2.9.0 is known to use spaces instead of minus signs 227 # for leading gaps, and thus fails to parse. This old version 228 # is still used as of Dec 2008 behind the EBI SOAP webservice: 229 # http://www.ebi.ac.uk/Tools/webservices/wsdl/WSEmboss.wsdl 230 raise ValueError("Error parsing alignment - sequences of " 231 "different length? You could be using an " 232 "old version of EMBOSS.") 233 records.append(SeqRecord(Seq(seq, self.alphabet), 234 id=id, description=id)) 235 return MultipleSeqAlignment(records, self.alphabet, annotations=header_dict)
236