1db32fe91f29e24ca15c58271956d228bd673951
[qpalma.git] / tests / test_utils.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12
13 import cPickle
14 import random
15 import math
16 import numpy
17 from numpy import inf
18 import os.path
19 import pdb
20 import array
21 import unittest
22
23 from qpalma.qpalma_main import QPalma
24 from qpalma.utils import print_prediction
25
26 from qpalma.Run import Run
27
28 from qpalma.SettingsParser import parseSettings
29 from qpalma.OutputFormat import alignment_reconstruct
30 from qpalma.sequence_utils import DataAccessWrapper,SeqSpliceInfo,reverse_complement
31
32 from qpalma.Lookup import LookupTable
33
34 jp = os.path.join
35
36
37 def createScoresForSequence(full_chromo_seq,reverse_strand=False):
38 """
39 Given a genomic sequence this function calculates random scores for all
40 ocurring splice sites.
41 """
42
43 acc_pos = []
44 don_pos = []
45
46 total_size = len(full_chromo_seq)
47
48 # the first/last 205 pos do not have any predictions
49 for pos,elem in enumerate(full_chromo_seq[205:-205]):
50 if full_chromo_seq[pos-2:pos] == 'ag':
51 acc_pos.append(pos)
52 if full_chromo_seq[pos:pos+2] == 'gt' or full_chromo_seq[pos:pos+2] == 'gc':
53 don_pos.append(pos)
54
55 # make pos 1-based
56 acc_pos = [e+1 for e in acc_pos]
57 don_pos = [e+1 for e in don_pos]
58
59 acc_scores = [0.0]*len(acc_pos)
60 don_scores = [0.0]*len(don_pos)
61
62 for idx in range(len(acc_pos)):
63 acc_scores[idx] = random.uniform(0.1,1.0)
64
65 for idx in range(len(don_pos)):
66 don_scores[idx] = random.uniform(0.1,1.0)
67
68 # recalculate indices and reverse them in order to have positions relative
69 # to positive strand
70 if reverse_strand:
71 acc_pos = [total_size-1-e for e in acc_pos]
72 acc_pos.reverse()
73 acc_scores.reverse()
74
75 don_pos = [total_size-1-e for e in don_pos]
76 don_pos.reverse()
77 don_scores.reverse()
78
79 acc_pos = array.array('I',acc_pos)
80 acc_scores = array.array('f',acc_scores)
81
82 don_pos = array.array('I',don_pos)
83 don_scores = array.array('f',don_scores)
84
85 return acc_pos,acc_scores,don_pos,don_scores
86
87
88 def saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,strand):
89 """
90 """
91
92 acc_score_fn = 'test_data/acc/chromo_1%s.Conf'%strand
93 acc_pos_fn = 'test_data/acc/chromo_1%s.pos'%strand
94 don_score_fn = 'test_data/don/chromo_1%s.Conf'%strand
95 don_pos_fn = 'test_data/don/chromo_1%s.pos'%strand
96
97 acc_scores.tofile(open(acc_score_fn, 'wb'))
98 acc_pos.tofile(open(acc_pos_fn, 'wb'))
99
100 don_scores.tofile(open(don_score_fn, 'wb'))
101 don_pos.tofile(open(don_pos_fn, 'wb'))
102
103
104 def create_mini_chromosome():
105
106 chromo_fn = 'test_data/chromo1.flat'
107
108 chromo_fh = open(chromo_fn)
109 full_chromo_seq = chromo_fh.read()
110 full_chromo_seq = full_chromo_seq.strip()
111
112 print full_chromo_seq[:200]
113
114 # create data for forward strand
115 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq,reverse_strand=False)
116 print acc_pos[:5]
117 print don_pos[:5]
118 saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'+')
119
120 # create data for reverse strand
121 full_chromo_seq_rev = reverse_complement(full_chromo_seq)
122
123 total_size = len(full_chromo_seq_rev)
124
125 print full_chromo_seq_rev[:200]
126 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq_rev,reverse_strand=True)
127 acc_pos = [total_size-1-e for e in acc_pos]
128 acc_pos.reverse()
129 print acc_pos[:5]
130 don_pos = [total_size-1-e for e in don_pos]
131 don_pos.reverse()
132 print don_pos[:5]
133
134 #
135 # Remember: The positions are always saved one-based.
136 #
137
138 #saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
139
140 a = 103
141 b = 255
142 print 'pos: %s'%full_chromo_seq[a:b]
143 print 'neg: %s'%full_chromo_seq_rev[a:b]
144
145 total_size = len(full_chromo_seq)
146 ap = total_size-b
147 bp = total_size-a
148 print 'png: %s'%reverse_complement(full_chromo_seq[ap:bp])
149
150 class TestSequenceUtils(unittest.TestCase):
151
152 def setUp(self):
153 create_mini_chromosome()
154
155
156 def check_reverse_strand_calculation(self,id,b,e,seqInfo):
157
158 seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
159
160 total_size = seqInfo.getFragmentSize(1)
161 bp = total_size - e
162 ep = total_size - b
163 seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
164 seqp = reverse_complement(seqp)
165
166 self.assertEqual(seq,seqp)
167
168
169 def simple_check(self,settings,seqInfo,lt1):
170
171 print 'Checking sequences for some intervals...'
172
173 intervals = [(0,10000),(545,874),(999,1234)]
174
175 for (b,e) in intervals:
176 self.check_reverse_strand_calculation(1,b,e,seqInfo)
177
178 for (b,e) in [(206,874),(545,874),(999,1234)]:
179 lt1 = LookupTable(settings)
180 _dna,_acc,_don= lt1.get_seq_and_scores(1,'-',b,e)
181 dna,acc,don = seqInfo.get_seq_and_scores(1,'-',b,e,only_seq=False,perform_checks=False)
182
183 self.assertEqual(dna,_dna)
184 self.assertEqual(acc,_acc)
185 self.assertEqual(don,_don)
186
187
188 def testWithArtificalData(self):
189 settings = {}
190
191 settings['genome_dir'] = 'test_data/'
192 settings['acceptor_scores_loc'] = 'test_data/acc'
193 settings['donor_scores_loc'] = 'test_data/don'
194
195 settings['genome_file_fmt'] = 'chromo%d.flat'
196 settings['splice_score_file_fmt']= 'chromo_%d%s'
197
198 allowed_fragments = [1]
199 settings['allowed_fragments'] = allowed_fragments
200
201 accessWrapper = DataAccessWrapper(settings)
202 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
203 lt1 = LookupTable(settings)
204
205 print 'Checking with toy data...'
206 self.simple_check(settings,seqInfo,lt1)
207
208
209 def checks():
210 settings = {}
211
212 settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
213 settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
214 settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
215
216 settings['genome_file_fmt'] = 'chr%d.dna.flat'
217 settings['splice_score_file_fmt']= 'contig_%d%s'
218
219 allowed_fragments = [1]
220 settings['allowed_fragments'] = allowed_fragments
221
222 accessWrapper = DataAccessWrapper(settings)
223 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
224 lt1 = LookupTable(settings)
225
226 print 'Checking with real data...'
227 simple_check(settings,seqInfo,lt1)
228
229
230 def run():
231 print 'Creating some artifical data...'
232 create_mini_chromosome()
233 print 'Performing some checks...'
234 checks()
235
236 if __name__ == '__main__':
237 suite = unittest.TestLoader().loadTestsFromTestCase(TestSequenceUtils)
238 unittest.TextTestRunner(verbosity=2).run(suite)