1db32fe91f29e24ca15c58271956d228bd673951
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 # This program is free software; you can redistribute it and/or modify
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
13 import cPickle
14 import random
15 import math
16 import numpy
17 from numpy import inf
18 import os.path
19 import pdb
20 import array
21 import unittest
23 from qpalma.qpalma_main import QPalma
24 from qpalma.utils import print_prediction
26 from qpalma.Run import Run
28 from qpalma.SettingsParser import parseSettings
29 from qpalma.OutputFormat import alignment_reconstruct
30 from qpalma.sequence_utils import DataAccessWrapper,SeqSpliceInfo,reverse_complement
32 from qpalma.Lookup import LookupTable
34 jp = os.path.join
37 def createScoresForSequence(full_chromo_seq,reverse_strand=False):
38 """
39 Given a genomic sequence this function calculates random scores for all
40 ocurring splice sites.
41 """
43 acc_pos = []
44 don_pos = []
46 total_size = len(full_chromo_seq)
48 # the first/last 205 pos do not have any predictions
49 for pos,elem in enumerate(full_chromo_seq[205:-205]):
50 if full_chromo_seq[pos-2:pos] == 'ag':
51 acc_pos.append(pos)
52 if full_chromo_seq[pos:pos+2] == 'gt' or full_chromo_seq[pos:pos+2] == 'gc':
53 don_pos.append(pos)
55 # make pos 1-based
56 acc_pos = [e+1 for e in acc_pos]
57 don_pos = [e+1 for e in don_pos]
59 acc_scores = [0.0]*len(acc_pos)
60 don_scores = [0.0]*len(don_pos)
62 for idx in range(len(acc_pos)):
63 acc_scores[idx] = random.uniform(0.1,1.0)
65 for idx in range(len(don_pos)):
66 don_scores[idx] = random.uniform(0.1,1.0)
68 # recalculate indices and reverse them in order to have positions relative
69 # to positive strand
70 if reverse_strand:
71 acc_pos = [total_size-1-e for e in acc_pos]
72 acc_pos.reverse()
73 acc_scores.reverse()
75 don_pos = [total_size-1-e for e in don_pos]
76 don_pos.reverse()
77 don_scores.reverse()
79 acc_pos = array.array('I',acc_pos)
80 acc_scores = array.array('f',acc_scores)
82 don_pos = array.array('I',don_pos)
83 don_scores = array.array('f',don_scores)
85 return acc_pos,acc_scores,don_pos,don_scores
88 def saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,strand):
89 """
90 """
92 acc_score_fn = 'test_data/acc/chromo_1%s.Conf'%strand
93 acc_pos_fn = 'test_data/acc/chromo_1%s.pos'%strand
94 don_score_fn = 'test_data/don/chromo_1%s.Conf'%strand
95 don_pos_fn = 'test_data/don/chromo_1%s.pos'%strand
97 acc_scores.tofile(open(acc_score_fn, 'wb'))
98 acc_pos.tofile(open(acc_pos_fn, 'wb'))
100 don_scores.tofile(open(don_score_fn, 'wb'))
101 don_pos.tofile(open(don_pos_fn, 'wb'))
104 def create_mini_chromosome():
106 chromo_fn = 'test_data/chromo1.flat'
108 chromo_fh = open(chromo_fn)
110 full_chromo_seq = full_chromo_seq.strip()
112 print full_chromo_seq[:200]
114 # create data for forward strand
115 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq,reverse_strand=False)
116 print acc_pos[:5]
117 print don_pos[:5]
118 saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'+')
120 # create data for reverse strand
121 full_chromo_seq_rev = reverse_complement(full_chromo_seq)
123 total_size = len(full_chromo_seq_rev)
125 print full_chromo_seq_rev[:200]
126 acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq_rev,reverse_strand=True)
127 acc_pos = [total_size-1-e for e in acc_pos]
128 acc_pos.reverse()
129 print acc_pos[:5]
130 don_pos = [total_size-1-e for e in don_pos]
131 don_pos.reverse()
132 print don_pos[:5]
134 #
135 # Remember: The positions are always saved one-based.
136 #
138 #saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
140 a = 103
141 b = 255
142 print 'pos: %s'%full_chromo_seq[a:b]
143 print 'neg: %s'%full_chromo_seq_rev[a:b]
145 total_size = len(full_chromo_seq)
146 ap = total_size-b
147 bp = total_size-a
148 print 'png: %s'%reverse_complement(full_chromo_seq[ap:bp])
150 class TestSequenceUtils(unittest.TestCase):
152 def setUp(self):
153 create_mini_chromosome()
156 def check_reverse_strand_calculation(self,id,b,e,seqInfo):
158 seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
160 total_size = seqInfo.getFragmentSize(1)
161 bp = total_size - e
162 ep = total_size - b
163 seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
164 seqp = reverse_complement(seqp)
166 self.assertEqual(seq,seqp)
169 def simple_check(self,settings,seqInfo,lt1):
171 print 'Checking sequences for some intervals...'
173 intervals = [(0,10000),(545,874),(999,1234)]
175 for (b,e) in intervals:
176 self.check_reverse_strand_calculation(1,b,e,seqInfo)
178 for (b,e) in [(206,874),(545,874),(999,1234)]:
179 lt1 = LookupTable(settings)
180 _dna,_acc,_don= lt1.get_seq_and_scores(1,'-',b,e)
181 dna,acc,don = seqInfo.get_seq_and_scores(1,'-',b,e,only_seq=False,perform_checks=False)
183 self.assertEqual(dna,_dna)
184 self.assertEqual(acc,_acc)
185 self.assertEqual(don,_don)
188 def testWithArtificalData(self):
189 settings = {}
191 settings['genome_dir'] = 'test_data/'
192 settings['acceptor_scores_loc'] = 'test_data/acc'
193 settings['donor_scores_loc'] = 'test_data/don'
195 settings['genome_file_fmt'] = 'chromo%d.flat'
196 settings['splice_score_file_fmt']= 'chromo_%d%s'
198 allowed_fragments = [1]
199 settings['allowed_fragments'] = allowed_fragments
201 accessWrapper = DataAccessWrapper(settings)
202 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
203 lt1 = LookupTable(settings)
205 print 'Checking with toy data...'
206 self.simple_check(settings,seqInfo,lt1)
209 def checks():
210 settings = {}
212 settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
213 settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
214 settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
216 settings['genome_file_fmt'] = 'chr%d.dna.flat'
217 settings['splice_score_file_fmt']= 'contig_%d%s'
219 allowed_fragments = [1]
220 settings['allowed_fragments'] = allowed_fragments
222 accessWrapper = DataAccessWrapper(settings)
223 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
224 lt1 = LookupTable(settings)
226 print 'Checking with real data...'
227 simple_check(settings,seqInfo,lt1)
230 def run():
231 print 'Creating some artifical data...'
232 create_mini_chromosome()
233 print 'Performing some checks...'
234 checks()
236 if __name__ == '__main__':