#!/usr/bin/env python
# -*- coding: utf-8 -*-
+#
+# This module holds all functions for queries on the dna flat files and the
+# splice score files.
+# Furthermore it contains some utility functions such as reverse complement and
+# unbracketing est strings
+#
+
import os
import pdb
-import random
import re
-import sys
import subprocess
from numpy.matlib import inf
from Genefinding import *
from genome_utils import load_genomic
+
def get_flatfile_size(filename):
cmd = 'wc -c %s | cut -f1 -d \' \'' % filename
obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
read: aac
is written in bracket notation: aa[ac]
"""
- assert len(dna_seq) == len(read_seq)
+ assert len(dna_seq) == len(read_seq),pdb.set_trace()
return "".join(map(lambda x,y: ['[%s%s]'%(x,y),x][x==y],dna_seq,read_seq))
+
def getSpliceScores(chr,strand,intervalBegin,intervalEnd,total_size=0):
"""
Now we want to use interval_query to get the predicted splice scores trained
return acc, don
-def get_seq_and_scores(chr,strand,genomicSeq_start,genomicSeq_stop,dna_flat_files):
+def get_seq_and_scores(chr,strand,genomicSeq_start,genomicSeq_stop,dna_flat_files,only_seq=False):
"""
This function expects an interval, chromosome and strand information and
returns then the genomic sequence of this interval and the associated scores.
"""
+ # because splice site scores are predicted using a window of the sequence the
+ # first and the last NO_SCORE_WINDOW_SIZE nucleotides of a genomic sequence
+ # do not have any score predictions
+ NO_SCORE_WINDOW_SIZE = 250
+
assert genomicSeq_start < genomicSeq_stop
chrom = 'chr%d' % chr
no_base = re.compile('[^acgt]')
genomicSeq = no_base.sub('n',genomicSeq)
- if strand == '+':
- intervalBegin = genomicSeq_start-100
- intervalEnd = genomicSeq_stop+100
+ if only_seq:
+ return genomicSeq
- currentAcc, currentDon = getSpliceScores(chr,strand,intervalBegin,intervalEnd)
+ fn = 'chr%d.dna.flat' % chr
+ filename = os.path.join(dna_flat_files,fn)
+ total_size = get_flatfile_size(filename)
- currentAcc = currentAcc[100:-98]
- currentAcc = currentAcc[1:]
- currentDon = currentDon[100:-100]
+ intervalBegin = genomicSeq_start-100
+ intervalEnd = genomicSeq_stop+100
- length = len(genomicSeq)
- currentAcc = currentAcc[:length]
+ # shorten the intervalEnd to the maximal size of the genomic sequence if it
+ # would exceed this
+ if intervalEnd > total_size:
+ intervalEnd = total_size-1
+ # we have assertions below checking whether every gt or ag has a splice score
+ # not equal to -inf if the gt or ag are within the first NO_SCORE_WINDOW_SIZE
+ # nucleotide then they have no scores at all
+ check_assertions = True
+ if intervalEnd > total_size - NO_SCORE_WINDOW_SIZE:
+ check_assertions = False
+
+ if intervalBegin < NO_SCORE_WINDOW_SIZE:
+ check_assertions = False
+
+ currentAcc, currentDon = getSpliceScores(chr,strand,intervalBegin,intervalEnd,total_size)
+
+ currentAcc = currentAcc[100:-98]
+ currentAcc = currentAcc[1:]
+ currentDon = currentDon[100:-100]
+
+ length = len(genomicSeq)
+
+ if strand == '+':
+ currentAcc = currentAcc[:length]
currentDon = currentDon+[-inf]*(length-len(currentDon))
ag_tuple_pos = [p for p,e in enumerate(genomicSeq) if p>1 and genomicSeq[p-1]=='a' and genomicSeq[p]=='g' ]
gt_tuple_pos = [p for p,e in enumerate(genomicSeq) if p>0 and p<len(genomicSeq)-1 and e=='g' and (genomicSeq[p+1]=='t' or genomicSeq[p+1]=='c')]
- assert ag_tuple_pos == [p for p,e in enumerate(currentAcc) if e != -inf and p > 1], pdb.set_trace()
- assert gt_tuple_pos == [p for p,e in enumerate(currentDon) if e != -inf and p > 0], pdb.set_trace()
- assert len(currentAcc) == len(currentDon)
-
- return genomicSeq, currentAcc, currentDon
-
# build reverse complement if on negative strand
if strand == '-':
- fn = 'chr%d.dna.flat' % chr
- filename = os.path.join(dna_flat_files,fn)
- end = get_flatfile_size(filename)
-
- intervalBegin = genomicSeq_start-100
- intervalEnd = genomicSeq_stop+100
-
- total_size = end
- currentAcc, currentDon = getSpliceScores(chr,strand,intervalBegin,intervalEnd,total_size)
-
- currentAcc = currentAcc[100:-98]
- currentAcc = currentAcc[1:]
- currentDon = currentDon[100:-100]
-
- length = len(genomicSeq)
currentAcc = currentAcc[:length]
currentAcc = currentAcc+[-inf]*(length-len(currentAcc))
ag_tuple_pos = [p for p,e in enumerate(genomicSeq) if p>1 and p<len(genomicSeq) and genomicSeq[p-1]=='a' and genomicSeq[p]=='g']
gt_tuple_pos = [p for p,e in enumerate(genomicSeq) if p>0 and p<len(genomicSeq)-1 and e=='g' and (genomicSeq[p+1]=='t' or genomicSeq[p+1]=='c')]
+ if check_assertions:
assert ag_tuple_pos == [p for p,e in enumerate(currentAcc) if e != -inf and p > 1], pdb.set_trace()
assert gt_tuple_pos == [p for p,e in enumerate(currentDon) if e != -inf and p > 0], pdb.set_trace()
assert len(currentAcc) == len(currentDon), pdb.set_trace()
- return genomicSeq, currentAcc, currentDon
+ return genomicSeq, currentAcc, currentDon
def checks():
print p_seq == reverse_complement(n_seq)
-
if __name__ == '__main__':
checks()