Package Bio :: Package AlignIO :: Module EmbossIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.AlignIO.EmbossIO

  1  # Copyright 2008-2016 by Peter Cock.  All rights reserved. 
  2  # 
  3  # This file is part of the Biopython distribution and governed by your 
  4  # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 
  5  # Please see the LICENSE file that should have been included as part of this 
  6  # package. 
  7  """Bio.AlignIO support for "emboss" alignment output from EMBOSS tools. 
  8   
  9  You are expected to use this module via the Bio.AlignIO functions (or the 
 10  Bio.SeqIO functions if you want to work directly with the gapped sequences). 
 11   
 12  This module contains a parser for the EMBOSS pairs/simple file format, for 
 13  example from the alignret, water and needle tools. 
 14  """ 
 15   
 16  from __future__ import print_function 
 17   
 18  from Bio.Seq import Seq 
 19  from Bio.SeqRecord import SeqRecord 
 20  from Bio.Align import MultipleSeqAlignment 
 21  from .Interfaces import AlignmentIterator, SequentialAlignmentWriter 
 22   
 23   
24 -class EmbossWriter(SequentialAlignmentWriter):
25 """Emboss alignment writer (WORK IN PROGRESS). 26 27 Writes a simplfied version of the EMBOSS pairs/simple file format. 28 A lot of the information their tools record in their headers is not 29 available and is omitted. 30 """ 31
32 - def write_header(self):
33 handle = self.handle 34 handle.write("########################################\n") 35 handle.write("# Program: Biopython\n") 36 try: 37 handle.write("# Report_file: %s\n" % handle.name) 38 except AttributeError: 39 pass 40 handle.write("########################################\n")
41 46
47 - def write_alignment(self, alignment):
48 """Use this to write (another) single alignment to an open file.""" 49 handle = self.handle 50 handle.write("#=======================================\n") 51 handle.write("#\n") 52 handle.write("# Aligned_sequences: %i\n" % len(alignment)) 53 for i, record in enumerate(alignment): 54 handle.write("# %i: %s\n" % (i + 1, record.id)) 55 handle.write("#\n") 56 handle.write("# Length: %i\n" % alignment.get_alignment_length()) 57 handle.write("#\n") 58 handle.write("#=======================================\n") 59 handle.write("\n") 60 # ... 61 assert False
62 63
64 -class EmbossIterator(AlignmentIterator):
65 """Emboss alignment iterator. 66 67 For reading the (pairwise) alignments from EMBOSS tools in what they 68 call the "pairs" and "simple" formats. 69 """ 70 71 _header = None # for caching lines between __next__ calls 72
73 - def __next__(self):
74 75 handle = self.handle 76 77 if self._header is None: 78 line = handle.readline() 79 else: 80 # Header we saved from when we were parsing 81 # the previous alignment. 82 line = self._header 83 self._header = None 84 85 if not line: 86 raise StopIteration 87 88 while line.rstrip() != "#=======================================": 89 line = handle.readline() 90 if not line: 91 raise StopIteration 92 93 length_of_seqs = None 94 number_of_seqs = None 95 ids = [] 96 seqs = [] 97 header_dict = {} 98 99 while line[0] == "#": 100 # Read in the rest of this alignment header, 101 # try and discover the number of records expected 102 # and their length 103 parts = line[1:].split(":", 1) 104 key = parts[0].lower().strip() 105 if key == "aligned_sequences": 106 number_of_seqs = int(parts[1].strip()) 107 assert len(ids) == 0 108 # Should now expect the record identifiers... 109 for i in range(number_of_seqs): 110 line = handle.readline() 111 parts = line[1:].strip().split(":", 1) 112 assert i + 1 == int(parts[0].strip()) 113 ids.append(parts[1].strip()) 114 assert len(ids) == number_of_seqs 115 if key == "length": 116 length_of_seqs = int(parts[1].strip()) 117 118 # Parse the rest of the header 119 if key == 'identity': 120 header_dict['identity'] = int(parts[1].strip().split('/')[0]) 121 if key == 'similarity': 122 header_dict['similarity'] = int(parts[1].strip().split('/')[0]) 123 if key == 'gaps': 124 header_dict['gaps'] = int(parts[1].strip().split('/')[0]) 125 if key == 'score': 126 header_dict['score'] = float(parts[1].strip()) 127 128 # And read in another line... 129 line = handle.readline() 130 131 if number_of_seqs is None: 132 raise ValueError("Number of sequences missing!") 133 if length_of_seqs is None: 134 raise ValueError("Length of sequences missing!") 135 136 if self.records_per_alignment is not None \ 137 and self.records_per_alignment != number_of_seqs: 138 raise ValueError("Found %i records in this alignment, told to expect %i" 139 % (number_of_seqs, self.records_per_alignment)) 140 141 seqs = ["" for id in ids] 142 seq_starts = [] 143 index = 0 144 145 # Parse the seqs 146 while line: 147 if len(line) > 21: 148 id_start = line[:21].strip().split(None, 1) 149 seq_end = line[21:].strip().split(None, 1) 150 if len(id_start) == 2 and len(seq_end) == 2: 151 # identifier, seq start position, seq, seq end position 152 # (an aligned seq is broken up into multiple lines) 153 id, start = id_start 154 seq, end = seq_end 155 if start == end: 156 # Special case, either a single letter is present, 157 # or no letters at all. 158 if seq.replace("-", "") == "": 159 start = int(start) 160 end = int(end) 161 else: 162 start = int(start) - 1 163 end = int(end) 164 else: 165 assert seq.replace("-", "") != "", repr(line) 166 start = int(start) - 1 # python counting 167 end = int(end) 168 169 # The identifier is truncated... 170 assert 0 <= index and index < number_of_seqs, \ 171 "Expected index %i in range [0,%i)" \ 172 % (index, number_of_seqs) 173 assert id == ids[index] or id == ids[index][:len(id)] 174 175 if len(seq_starts) == index: 176 # Record the start 177 seq_starts.append(start) 178 179 # Check the start... 180 if start == end: 181 assert seq.replace("-", "") == "", line 182 else: 183 assert start - seq_starts[index] == len(seqs[index].replace("-", "")), \ 184 "Found %i chars so far for sequence %i (%s, %s), line says start %i:\n%s" \ 185 % (len(seqs[index].replace("-", "")), index, id, repr(seqs[index]), 186 start, line) 187 188 seqs[index] += seq 189 190 # Check the end ... 191 assert end == seq_starts[index] + len(seqs[index].replace("-", "")), \ 192 "Found %i chars so far for sequence %i (%s, %s, start=%i), file says end %i:\n%s" \ 193 % (len(seqs[index].replace("-", "")), index, id, repr(seqs[index]), 194 seq_starts[index], end, line) 195 196 index += 1 197 if index >= number_of_seqs: 198 index = 0 199 else: 200 # just a start value, this is just alignment annotation (?) 201 # print "Skipping: " + line.rstrip() 202 pass 203 elif line.strip() == "": 204 # Just a spacer? 205 pass 206 else: 207 print(line) 208 assert False 209 210 line = handle.readline() 211 if line.rstrip() == "#---------------------------------------" \ 212 or line.rstrip() == "#=======================================": 213 # End of alignment 214 self._header = line 215 break 216 217 assert index == 0 218 219 if self.records_per_alignment is not None \ 220 and self.records_per_alignment != len(ids): 221 raise ValueError("Found %i records in this alignment, told to expect %i" 222 % (len(ids), self.records_per_alignment)) 223 224 records = [] 225 for id, seq in zip(ids, seqs): 226 if len(seq) != length_of_seqs: 227 # EMBOSS 2.9.0 is known to use spaces instead of minus signs 228 # for leading gaps, and thus fails to parse. This old version 229 # is still used as of Dec 2008 behind the EBI SOAP webservice: 230 # http://www.ebi.ac.uk/Tools/webservices/wsdl/WSEmboss.wsdl 231 raise ValueError("Error parsing alignment - sequences of " 232 "different length? You could be using an " 233 "old version of EMBOSS.") 234 records.append(SeqRecord(Seq(seq, self.alphabet), 235 id=id, description=id)) 236 return MultipleSeqAlignment(records, self.alphabet, annotations=header_dict)
237