+ changed parameter handling a bit (input files command line args and not inside...
[qpalma.git] / tests / test_utils.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12
13 import cPickle
14 import random
15 import array
16 import sys
17 import math
18 import numpy
19 from numpy import inf
20 import os.path
21 import pdb
22 import array
23 import unittest
24
25 from qpalma.qpalma_main import preprocessExample
26
27 from qpalma.utils import print_prediction
28
29 from qpalma.SettingsParser import parseSettings
30 from qpalma.OutputFormat import alignment_reconstruct
31 from qpalma.sequence_utils import DataAccessWrapper,SeqSpliceInfo,reverse_complement
32
33 from qpalma.Lookup import LookupTable
34
35 from qpalma.DatasetUtils import checkExons,processQuality
36
37
38 jp = os.path.join
39
40 pos_chr1 = 'ccctaaaccctaaaccctaaaccctaaacctctgaatccttaatccctaaatccctaaatctttaaatcctacatccatgaatccctaaatacctaattccctaaacccgaaaccggtttctctggttgaaaatcattgtgtatataatgataattttatcgtttttatgtaattgcttattgttgtgtgtagattttttaaaaatatcatttgaggtcaatacaaatcctatttcttgtggttttctttccttcacttagctatggatggtttatcttcatttgttatattggatacaagctttgctacgatctacatttgggaatgtgagtctcttattgtaaccttagggttggtttatctcaagaatcttattaattgtttggactgtttatgtttggacatttattgtcattcttactcctttgtggaaatgtttgttctatcaatttatcttttgtgggaaaattatttagttgtagggatgaagtctttcttcgttgttgttacgcttgtcatctcatctctcaatgatatgggatggtcctttagcatttat'+'x'*205
41
42 neg_chr1 = ''
43
44 acc_p = [217, 262, 302, 333, 352, 369, 478, 484, 492, 554]
45 don_p = [217, 239, 242, 261, 271, 285, 301, 306, 328, 332, 342, 353, 357, 382, 391, 397, 412, 429, 437, 441, 461, 477, 480, 491, 501, 504, 507, 512, 516, 545, 553]
46
47 acc_n = [229, 235, 246, 251, 256, 261, 276, 301, 306, 313, 333, 335, 346, 362, 371, 388, 417, 421, 424, 443, 455, 492, 496, 512, 520, 525, 527, 547]
48 don_n = [224, 257, 262, 298, 302, 307, 310, 317, 346, 389, 404, 422, 511, 513, 554]
49
50
51 def check_createScoresForSequence():
52 print 'Positive strand:'
53 createScoresForSequence(pos_chr1,reverse_strand=False)
54 print acc_p
55 print don_p
56
57 print 'Negative strand:'
58 filename = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/chr1.dna.flat.neg'
59 neg_chr1 = open(filename).read().strip()
60 print len(neg_chr1)
61 createScoresForSequence(neg_chr1,reverse_strand=True)
62 print acc_n
63 print don_n
64
65
66 def createScoresForSequence(full_chromo_seq,reverse_strand=False):
67 """
68 Given a genomic sequence this function calculates random scores for all
69 ocurring splice sites.
70 """
71
72 acc_pos = []
73 don_pos = []
74
75 total_size = len(full_chromo_seq)
76
77 # the first/last 205 pos do not have any predictions
78 for pos,elem in enumerate(full_chromo_seq):
79 if pos < 205 or pos > total_size-205:
80 continue
81
82 if full_chromo_seq[pos-2:pos] == 'ag':
83 acc_pos.append(pos)
84 if full_chromo_seq[pos:pos+2] == 'gt' or full_chromo_seq[pos:pos+2] == 'gc':
85 don_pos.append(pos)
86
87 acc_scores = [0.0]*len(acc_pos)
88 don_scores = [0.0]*len(don_pos)
89
90 for idx in range(len(acc_pos)):
91 acc_scores[idx] = random.uniform(0.1,1.0)
92
93 for idx in range(len(don_pos)):
94 don_scores[idx] = random.uniform(0.1,1.0)
95
96 # recalculate indices and reverse them in order to have positions relative
97 # to positive strand
98 if reverse_strand:
99 acc_pos = [total_size-1-e for e in acc_pos]
100 acc_pos.reverse()
101 acc_scores.reverse()
102
103 don_pos = [total_size-1-e for e in don_pos]
104 don_pos.reverse()
105 don_scores.reverse()
106
107 # make pos 1-based
108 acc_pos = [e+1 for e in acc_pos]
109 don_pos = [e+1 for e in don_pos]
110
111 #print acc_pos[:10]
112 #print don_pos[:10]
113
114 acc_pos = array.array('I',acc_pos)
115 acc_scores = array.array('f',acc_scores)
116
117 don_pos = array.array('I',don_pos)
118 don_scores = array.array('f',don_scores)
119
120 return acc_pos,acc_scores,don_pos,don_scores
121
122
123 def saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,strand):
124 """
125 """
126
127 acc_score_fn = 'test_data/acc/chromo_1%s.Conf'%strand
128 acc_pos_fn = 'test_data/acc/chromo_1%s.pos'%strand
129 don_score_fn = 'test_data/don/chromo_1%s.Conf'%strand
130 don_pos_fn = 'test_data/don/chromo_1%s.pos'%strand
131
132 acc_scores.tofile(open(acc_score_fn, 'wb'))
133 acc_pos.tofile(open(acc_pos_fn, 'wb'))
134
135 don_scores.tofile(open(don_score_fn, 'wb'))
136 don_pos.tofile(open(don_pos_fn, 'wb'))
137
138
139 def create_mini_chromosome():
140
141 chromo_fn = 'test_data/chromo1.flat'
142
143 chromo_fh = open(chromo_fn)
144 full_chromo_seq = chromo_fh.read()
145 full_chromo_seq = full_chromo_seq.strip()
146
147 print full_chromo_seq[:200]
148
149 # create data for forward strand
150 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq,reverse_strand=False)
151 print acc_pos[:5]
152 print don_pos[:5]
153 saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'+')
154
155 # create data for reverse strand
156 full_chromo_seq_rev = reverse_complement(full_chromo_seq)
157
158 total_size = len(full_chromo_seq_rev)
159
160 print full_chromo_seq_rev[:200]
161 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq_rev,reverse_strand=True)
162 saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
163
164
165 def test_rev_comp():
166 get_seq_and_scores(self,chromo,strand,genomicSeq_start,genomicSeq_stop,only_seq=False,perform_checks=True)
167
168
169 class TestQPalmaPrediction(unittest.TestCase):
170 """
171 This class...
172 """
173
174
175 def _setUp(self):
176 self.prediction_set = {}
177
178 # chr1 + 20-120
179 read = 'catctatgcaacagcattacagtgatcaccggcccaaaaaacctgtgtctggggttttgcctgatgatagcagtgatactgaaactggatcaatggtaag'
180 currentQualities = [[40]*len(read)]
181
182 id = 3
183 chromo = 1
184 strand = '+'
185
186 genomicSeq_start = 3500
187 genomicSeq_stop = 6500
188 print 'Position: ',
189 print genomicSeq_start,genomicSeq_stop
190
191 currentSeqInfo = id,chromo,strand,genomicSeq_start,genomicSeq_stop
192
193 example = (currentSeqInfo,read,currentQualities)
194 self.prediction_set[id] = [example]
195
196 # chr1 - 5000-5100
197 read = 'ctgtgtatctggttgctcaatatgctcgccggaaaatgaagatcatggatgctgtgagttctccttattgttcattatcaaactgatatgagtttctgat'
198 currentQualities = [[40]*len(read)]
199
200 id = 4
201 chromo = 1
202 strand = '-'
203
204 total_size = 30432563
205
206 genomicSeq_start = total_size - 6500
207 genomicSeq_stop = total_size - 3500
208 print 'Position: ',
209 print genomicSeq_start,genomicSeq_stop
210
211 currentSeqInfo = id,chromo,strand,genomicSeq_start,genomicSeq_stop
212
213 example = (currentSeqInfo,read,currentQualities)
214 self.prediction_set[id] = [example]
215
216
217 def testAlignments(self):
218
219 settings = parseSettings('testcase.conf')
220
221 print self.prediction_set
222 for example_key in self.prediction_set.keys():
223 print 'Current example %d' % example_key
224
225 for example in self.prediction_set[example_key]:
226 print example
227 print 'size'
228 print len(example)
229
230 accessWrapper = DataAccessWrapper(settings)
231 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
232
233 qp = QPalma(seqInfo,True)
234 allPredictions = qp.predict(self.prediction_set,settings)
235
236 for current_prediction in allPredictions:
237 align_str = print_prediction(current_prediction)
238 print align_str
239
240 id = current_prediction['id']
241 seq = current_prediction['read']
242 dna = current_prediction['dna']
243 chromo = current_prediction['chr']
244 strand = current_prediction['strand']
245 start_pos = current_prediction['start_pos']
246 predExons = current_prediction['predExons']
247
248 numExons = int(math.ceil(len(predExons) / 2))
249
250 print alignment_reconstruct(current_prediction,numExons)
251 print id,start_pos,predExons
252
253 print 'Problem counter is %d' % qp.problem_ctr
254
255
256 def _testAlignments(self):
257 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
258
259 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
260 run['name'] = 'test_run'
261 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/sandbox/testcases'
262
263 param_fn = jp(run_dir,'param_526.pickle')
264 param = cPickle.load(open(param_fn))
265
266 print self.prediction_set
267 for example_key in self.prediction_set.keys():
268 print 'Current example %d' % example_key
269
270 for example in self.prediction_set[example_key]:
271 print example
272 print 'size'
273 print len(example)
274
275 # fetch the data needed
276 settings = {}
277
278 settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
279 settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
280 settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
281
282 settings['genome_file_fmt'] = 'chr%d.dna.flat'
283 settings['splice_score_file_fmt']= 'contig_%d%s'
284
285 allowed_fragments = [1]
286 settings['allowed_fragments'] = allowed_fragments
287
288 accessWrapper = DataAccessWrapper(settings)
289 seqInfo = SeqSpliceInfo(accessWrapper,allowed_fragments)
290
291 qp = QPalma(run,seqInfo,True)
292 allPredictions = qp.predict(self.prediction_set,param)
293
294 for current_prediction in allPredictions:
295 align_str = print_prediction(current_prediction)
296 print align_str
297
298 id = current_prediction['id']
299 seq = current_prediction['read']
300 dna = current_prediction['dna']
301 chromo = current_prediction['chr']
302 strand = current_prediction['strand']
303 start_pos = current_prediction['start_pos']
304 predExons = current_prediction['predExons']
305
306 numExons = int(math.ceil(len(predExons) / 2))
307
308 print alignment_reconstruct(current_prediction,numExons)
309 print id,start_pos,predExons
310
311 print 'Problem counter is %d' % qp.problem_ctr
312
313
314 def check_reverse_strand_calculation(id,b,e,seqInfo):
315 total_size = seqInfo.getFragmentSize(1)
316 bp = total_size - e
317 ep = total_size - b
318
319 seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
320 seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
321 seqp = reverse_complement(seqp)
322
323 res1 = (seq == seqp)
324 #print seq
325 #print seqp
326
327 seq,acc,don = seqInfo.get_seq_and_scores(id,'+',b,e,only_seq=False,perform_checks=False)
328 seq,acc,don = seqInfo.get_seq_and_scores(id,'-',bp,ep,only_seq=False,perform_checks=False)
329 #seqp = reverse_complement(seq)
330
331 res2 = (seq == seqp)
332 print seq
333 print seqp
334
335 return res1,res2
336
337
338 def check_example(chromo,strand,b,e,seqInfo,lt1):
339 dna,acc,don = seqInfo.get_seq_and_scores(1,strand,b,e)
340
341 if lt1 != None:
342 _dna,_acc,_don= lt1.get_seq_and_scores(chromo,strand,b,e)
343 else:
344 _dna,_acc,_don = seqInfo.get_seq_and_scores(chromo,strand,b,e)
345
346 print 'Current interval: (%d,%d), current strand: %s'%(b,e,strand)
347 print 'Results for dna,acc,don: %s %s %s'%(str(dna==_dna),str(acc==_acc),str(don==_don))
348
349 if dna != _dna:
350 print dna[:20]
351 print _dna[:20]
352
353 if acc != _acc or don != _don:
354 print [p for p,e in enumerate(acc) if e != -inf][:10]
355 print [p for p,e in enumerate(_acc) if e != -inf][:10]
356 print [p for p,e in enumerate(don) if e != -inf][:10]
357 print [p for p,e in enumerate(_don) if e != -inf][:10]
358
359
360 def simple_check(settings,seqInfo,lt1):
361
362 print 'Checking sequences for some intervals...'
363
364 intervals = [(0,10000),(545,874),(999,1234)]
365
366 chromo = 1
367 total_size = seqInfo.getFragmentSize(chromo)
368
369 #for strand in ['+','-']:
370 for strand in ['-']:
371 for (b,e) in [(206,874),(545,874),(999,1234),(1000,total_size-1000),(3,total_size-3),(0,total_size)]:
372 check_example(chromo,strand,b,e,seqInfo,lt1)
373
374 #for (b,e) in intervals:
375 # r1,r2 = check_reverse_strand_calculation(1,b,e,seqInfo)
376 # print b,e
377 # print 'Rev strand calculation: %s %s'%(str(r1),str(r2))
378
379
380 def checks():
381 settings = {}
382
383 settings['genome_dir'] = 'test_data/'
384 settings['acceptor_scores_loc'] = 'test_data/acc'
385 settings['donor_scores_loc'] = 'test_data/don'
386
387 settings['genome_file_fmt'] = 'chromo%d.flat'
388 settings['splice_score_file_fmt']= 'chromo_%d%s'
389
390 allowed_fragments = [1]
391 settings['allowed_fragments'] = allowed_fragments
392
393 accessWrapper = DataAccessWrapper(settings)
394 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
395
396 lt1 = None
397 lt1 = LookupTable(settings)
398
399 print 'Checking with toy data...'
400 simple_check(settings,seqInfo,lt1)
401
402 settings = {}
403
404 settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
405 settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
406 settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
407
408 settings['genome_file_fmt'] = 'chr%d.dna.flat'
409 settings['splice_score_file_fmt']= 'contig_%d%s'
410
411 allowed_fragments = [1]
412 settings['allowed_fragments'] = allowed_fragments
413
414 accessWrapper = DataAccessWrapper(settings)
415 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
416
417 lt1 = None
418 lt1 = LookupTable(settings)
419
420 print 'Checking with real data...'
421 simple_check(settings,seqInfo,lt1)
422
423
424 def run():
425 print 'Creating some artifical data...'
426 create_mini_chromosome()
427 print 'Performing some checks...'
428 checks()
429
430
431 class TestDatasetUtils(unittest.TestCase):
432 """
433 Here we check ...
434 """
435
436 def setUp(self):
437
438 # set some data for the first test
439
440 self.raw_qualities = [ 'hhh'\
441 ]
442
443 self.quality_results = [ [40,40,40]\
444 ]
445
446 self.quality_results = [array.array('b',e) for e in self.quality_results]
447
448 # now some data for the second test
449
450 self.dna_fragments = [ 'ggggttttgtccccagaaaaaggg'\
451 ]
452
453 self.readAlignments = [ 'ttttaaaaa'\
454 ]
455
456 self.exons = [ numpy.mat([4,8,16,21]).reshape((2,2))\
457 ]
458
459 self.results = [ True\
460 ]
461
462 def test_processQuality(self):
463
464 quality_interval = (-5,40)
465
466 perform_checks = True
467
468 for prb_offset in [64]:
469 for raw_quality in self.raw_qualities:
470 for quality_result in self.quality_results:
471
472 result = processQuality(raw_quality,prb_offset,quality_interval,perform_checks)
473 self.assertEqual(quality_result,result)
474
475
476 def test_checkExons(self):
477
478 for idx in range(len(self.dna_fragments)):
479 dna = self.dna_fragments[idx]
480 readAlignment = self.readAlignments[idx]
481
482 exons = self.exons[idx]
483 gt = self.results[idx]
484
485 result = checkExons(dna,exons,readAlignment,1)
486 self.assertEqual(gt,result)
487
488
489
490
491 if __name__ == '__main__':
492 #run()
493 #check_createScoresForSequence()
494 #suite = unittest.TestLoader().loadTestsFromTestCase(TestQPalmaPrediction)
495 suite = unittest.TestLoader().loadTestsFromTestCase(TestDatasetUtils)
496 unittest.TextTestRunner(verbosity=2).run(suite)