Package Bio :: Package AlignIO :: Module ClustalIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.AlignIO.ClustalIO

  1  # Copyright 2006-2016 by Peter Cock.  All rights reserved. 
  2  # 
  3  # This file is part of the Biopython distribution and governed by your 
  4  # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 
  5  # Please see the LICENSE file that should have been included as part of this 
  6  # package. 
  7  """Bio.AlignIO support for "clustal" output from CLUSTAL W and other tools. 
  8   
  9  You are expected to use this module via the Bio.AlignIO functions (or the 
 10  Bio.SeqIO functions if you want to work directly with the gapped sequences). 
 11  """ 
 12   
 13  from __future__ import print_function 
 14   
 15  from Bio.Seq import Seq 
 16  from Bio.SeqRecord import SeqRecord 
 17  from Bio.Align import MultipleSeqAlignment 
 18  from .Interfaces import AlignmentIterator, SequentialAlignmentWriter 
 19   
 20   
21 -class ClustalWriter(SequentialAlignmentWriter):
22 """Clustalw alignment writer.""" 23
24 - def write_alignment(self, alignment):
25 """Use this to write (another) single alignment to an open file.""" 26 if len(alignment) == 0: 27 raise ValueError("Must have at least one sequence") 28 if alignment.get_alignment_length() == 0: 29 # This doubles as a check for an alignment object 30 raise ValueError("Non-empty sequences are required") 31 32 # Old versions of the parser in Bio.Clustalw used a ._version property, 33 try: 34 version = str(alignment._version) 35 except AttributeError: 36 version = "" 37 if not version: 38 version = '1.81' 39 if version.startswith("2."): 40 # e.g. 2.0.x 41 output = "CLUSTAL %s multiple sequence alignment\n\n\n" % version 42 else: 43 # e.g. 1.81 or 1.83 44 output = "CLUSTAL X (%s) multiple sequence alignment\n\n\n" % version 45 46 cur_char = 0 47 max_length = len(alignment[0]) 48 49 if max_length <= 0: 50 raise ValueError("Non-empty sequences are required") 51 52 if "clustal_consensus" in alignment.column_annotations: 53 star_info = alignment.column_annotations["clustal_consensus"] 54 elif hasattr(alignment, "_star_info"): 55 # This was originally stored by Bio.Clustalw as ._star_info 56 star_info = alignment._star_info 57 else: 58 star_info = None 59 60 # keep displaying sequences until we reach the end 61 while cur_char != max_length: 62 # calculate the number of sequences to show, which will 63 # be less if we are at the end of the sequence 64 if (cur_char + 50) > max_length: 65 show_num = max_length - cur_char 66 else: 67 show_num = 50 68 69 # go through all of the records and print out the sequences 70 # when we output, we do a nice 80 column output, although this 71 # may result in truncation of the ids. 72 for record in alignment: 73 # Make sure we don't get any spaces in the record 74 # identifier when output in the file by replacing 75 # them with underscores: 76 line = record.id[0:30].replace(" ", "_").ljust(36) 77 line += str(record.seq[cur_char:(cur_char + show_num)]) 78 output += line + "\n" 79 80 # now we need to print out the star info, if we've got it 81 if star_info: 82 output += (" " * 36) + star_info[cur_char:(cur_char + show_num)] + "\n" 83 84 output += "\n" 85 cur_char += show_num 86 87 # Want a trailing blank new line in case the output is concatenated 88 self.handle.write(output + "\n")
89 90
91 -class ClustalIterator(AlignmentIterator):
92 """Clustalw alignment iterator.""" 93 94 _header = None # for caching lines between __next__ calls 95
96 - def __next__(self):
97 handle = self.handle 98 99 if self._header is None: 100 line = handle.readline() 101 else: 102 # Header we saved from when we were parsing 103 # the previous alignment. 104 line = self._header 105 self._header = None 106 107 if not line: 108 raise StopIteration 109 110 # Whitelisted headers we know about 111 known_headers = ['CLUSTAL', 'PROBCONS', 'MUSCLE', 'MSAPROBS', 'Kalign'] 112 if line.strip().split()[0] not in known_headers: 113 raise ValueError("%s is not a known CLUSTAL header: %s" % 114 (line.strip().split()[0], 115 ", ".join(known_headers))) 116 117 # find the clustal version in the header line 118 version = None 119 for word in line.split(): 120 if word[0] == '(' and word[-1] == ')': 121 word = word[1:-1] 122 if word[0] in '0123456789': 123 version = word 124 break 125 126 # There should be two blank lines after the header line 127 line = handle.readline() 128 while line.strip() == "": 129 line = handle.readline() 130 131 # If the alignment contains entries with the same sequence 132 # identifier (not a good idea - but seems possible), then this 133 # dictionary based parser will merge their sequences. Fix this? 134 ids = [] 135 seqs = [] 136 consensus = "" 137 seq_cols = None # Used to extract the consensus 138 139 # Use the first block to get the sequence identifiers 140 while True: 141 if line[0] != " " and line.strip() != "": 142 # Sequences identifier... 143 fields = line.rstrip().split() 144 145 # We expect there to be two fields, there can be an optional 146 # "sequence number" field containing the letter count. 147 if len(fields) < 2 or len(fields) > 3: 148 raise ValueError("Could not parse line:\n%s" % line) 149 150 ids.append(fields[0]) 151 seqs.append(fields[1]) 152 153 # Record the sequence position to get the consensus 154 if seq_cols is None: 155 start = len(fields[0]) + \ 156 line[len(fields[0]):].find(fields[1]) 157 end = start + len(fields[1]) 158 seq_cols = slice(start, end) 159 del start, end 160 assert fields[1] == line[seq_cols] 161 162 if len(fields) == 3: 163 # This MAY be an old style file with a letter count... 164 try: 165 letters = int(fields[2]) 166 except ValueError: 167 raise ValueError("Could not parse line, " 168 "bad sequence number:\n%s" % line) 169 if len(fields[1].replace("-", "")) != letters: 170 raise ValueError("Could not parse line, " 171 "invalid sequence number:\n%s" % line) 172 elif line[0] == " ": 173 # Sequence consensus line... 174 assert len(ids) == len(seqs) 175 assert len(ids) > 0 176 assert seq_cols is not None 177 consensus = line[seq_cols] 178 assert not line[:seq_cols.start].strip() 179 assert not line[seq_cols.stop:].strip() 180 # Check for blank line (or end of file) 181 line = handle.readline() 182 assert line.strip() == "" 183 break 184 else: 185 # No consensus 186 break 187 line = handle.readline() 188 if not line: 189 break # end of file 190 191 assert line.strip() == "" 192 assert seq_cols is not None 193 194 # Confirm all same length 195 for s in seqs: 196 assert len(s) == len(seqs[0]) 197 if consensus: 198 assert len(consensus) == len(seqs[0]) 199 200 # Loop over any remaining blocks... 201 done = False 202 while not done: 203 # There should be a blank line between each block. 204 # Also want to ignore any consensus line from the 205 # previous block. 206 while (not line) or line.strip() == "": 207 line = handle.readline() 208 if not line: 209 break # end of file 210 if not line: 211 break # end of file 212 213 if line.split(None, 1)[0] in known_headers: 214 # Found concatenated alignment. 215 done = True 216 self._header = line 217 break 218 219 for i in range(len(ids)): 220 if line[0] == " ": 221 raise ValueError("Unexpected line:\n%s" % repr(line)) 222 fields = line.rstrip().split() 223 224 # We expect there to be two fields, there can be an optional 225 # "sequence number" field containing the letter count. 226 if len(fields) < 2 or len(fields) > 3: 227 raise ValueError("Could not parse line:\n%s" % repr(line)) 228 229 if fields[0] != ids[i]: 230 raise ValueError("Identifiers out of order? " 231 "Got '%s' but expected '%s'" 232 % (fields[0], ids[i])) 233 234 if fields[1] != line[seq_cols]: 235 start = len(fields[0]) + \ 236 line[len(fields[0]):].find(fields[1]) 237 if start != seq_cols.start: 238 raise ValueError('Old location %s -> %i:XX' % (seq_cols, start)) 239 end = start + len(fields[1]) 240 seq_cols = slice(start, end) 241 del start, end 242 243 # Append the sequence 244 seqs[i] += fields[1] 245 assert len(seqs[i]) == len(seqs[0]) 246 247 if len(fields) == 3: 248 # This MAY be an old style file with a letter count... 249 try: 250 letters = int(fields[2]) 251 except ValueError: 252 raise ValueError("Could not parse line, " 253 "bad sequence number:\n%s" % 254 line) 255 if len(seqs[i].replace("-", "")) != letters: 256 raise ValueError("Could not parse line, " 257 "invalid sequence number:\n%s" % line) 258 259 # Read in the next line 260 line = handle.readline() 261 # There should now be a consensus line 262 if consensus: 263 assert line[0] == " " 264 assert seq_cols is not None 265 consensus += line[seq_cols] 266 assert len(consensus) == len(seqs[0]) 267 assert not line[:seq_cols.start].strip() 268 assert not line[seq_cols.stop:].strip() 269 # Read in the next line 270 line = handle.readline() 271 272 assert len(ids) == len(seqs) 273 if len(seqs) == 0 or len(seqs[0]) == 0: 274 raise StopIteration 275 276 if self.records_per_alignment is not None and \ 277 self.records_per_alignment != len(ids): 278 raise ValueError("Found %i records in this alignment, " 279 "told to expect %i" 280 % (len(ids), self.records_per_alignment)) 281 282 records = (SeqRecord(Seq(s, self.alphabet), id=i, description=i) 283 for (i, s) in zip(ids, seqs)) 284 alignment = MultipleSeqAlignment(records, self.alphabet) 285 # TODO - Handle alignment annotation better, for now 286 # mimic the old parser in Bio.Clustalw 287 if version: 288 alignment._version = version 289 if consensus: 290 alignment_length = len(seqs[0]) 291 if len(consensus) != alignment_length: 292 raise ValueError("Alignment length is %i, consensus length is %i, '%s'" 293 % (alignment_length, len(consensus), consensus)) 294 alignment.column_annotations["clustal_consensus"] = consensus 295 # For backward compatibility prior to .column_annotations: 296 alignment._star_info = consensus 297 return alignment
298