+ extended test cases
[qpalma.git] / qpalma / Lookup.py
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
5 #
6 # Written (W) 2008 Fabio De Bona
7 # Copyright (C) 2008 Max-Planck-Society
8
9 import sys
10 import os
11 import pdb
12 from numpy import inf
13
14 from qpalma.sequence_utils import SeqSpliceInfo,get_flat_file_size,DataAccessWrapper,reverse_complement
15
16
17 class LookupTable:
18 """
19 Speed up genomic and splice site score queries by prefetching the sequences
20 and the scores once for all chromosomes.
21 """
22
23 def __init__(self,settings):
24 chromo_list = settings['allowed_fragments']
25 accessWrapper = DataAccessWrapper(settings)
26 self.seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
27
28 # take care that it also works say if we get a chromo_list [1,3,5]
29 # those are mapped then to 0,1,2
30 self.chromo_list = chromo_list
31 self.chromo_map = {}
32 for idx,elem in enumerate(chromo_list):
33 self.chromo_map[elem] = idx
34
35 print 'Chromo mapping is %s' % str(self.chromo_map)
36
37 self.genomicSequences = {}
38 self.acceptorScores = {}
39 self.donorScores = {}
40
41 self.genomicSequences = [0]*(len(self.chromo_list))
42 self.acceptorScoresPos = [0]*(len(self.chromo_list))
43 self.donorScoresPos = [0]*(len(self.chromo_list))
44 self.acceptorScoresNeg = [0]*(len(self.chromo_list))
45 self.donorScoresNeg = [0]*(len(self.chromo_list))
46
47 for chrId in self.chromo_list:
48 self.prefetch_seq_and_scores(chrId)
49
50
51 def get_seq_and_scores(self,chromo,strand,_start,_stop):
52 assert chromo in self.chromo_list
53
54 if strand == '-':
55 total_size = self.seqInfo.getFragmentSize(chromo)
56 startP = total_size - _stop
57 stopP = total_size - _start
58 stop = stopP
59 start = startP
60
61 assert start <= stop
62
63 chromo_idx = self.chromo_map[chromo]
64
65 currentSequence = self.genomicSequences[chromo_idx][start:stop]
66 if strand == '+':
67 currentAcceptorScores = self.acceptorScoresPos[chromo_idx][start:stop]
68 currentDonorScores = self.donorScoresPos[chromo_idx][start:stop]
69 elif strand == '-':
70 currentAcceptorScores = self.acceptorScoresNeg[chromo_idx][_start:_stop]
71 currentDonorScores = self.donorScoresNeg[chromo_idx][_start:_stop]
72
73 if strand == '-':
74 currentSequence = reverse_complement(currentSequence)
75
76 return currentSequence, currentAcceptorScores, currentDonorScores
77
78
79 def prefetch_seq_and_scores(self,chromo):
80 """
81 This function expects an interval, chromosome and strand information and
82 returns then the genomic sequence of this interval and the associated scores.
83 """
84 assert chromo in self.chromo_list
85
86
87 genomicSeq_start = 0
88 genomicSeq_stop = self.seqInfo.getFragmentSize(chromo)
89
90 print 'lt total_size %d' % self.seqInfo.getFragmentSize(chromo)
91
92 strand = '+'
93 genomicSeq,currentAcc,currentDon = self.seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop)
94 #print len(currentAcc),len(currentDon)
95 genomicSeq = genomicSeq.lower()
96
97 chromo_idx = self.chromo_map[chromo]
98
99 self.genomicSequences[chromo_idx] = genomicSeq
100 self.acceptorScoresPos[chromo_idx] = currentAcc
101 self.donorScoresPos[chromo_idx] = currentDon
102
103 strand = '-'
104 genomicSeq,currentAcc,currentDon = self.seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop)
105
106 genomicSeq = genomicSeq.lower()
107
108 newCurrentAcc = [-inf]
109 newCurrentAcc.extend(currentAcc[:-1])
110 currentAcc = newCurrentAcc
111
112 newCurrentDon = [-inf]
113 newCurrentDon.extend(currentDon[:-1])
114 #pdb.set_trace()
115 currentDon = newCurrentDon
116
117 self.acceptorScoresNeg[chromo_idx] = currentAcc
118 self.donorScoresNeg[chromo_idx] = currentDon