Strand specific direction means that \QP assumes that the reads are already in
their true orientation and the qualities as well.
+
+\subsection{Training file format}
+
+The read input files for \QP contain the read sequences with their quality as
+well as some information from the first mapping / seed region finding. The
+format of the file containing the mapped short reads is as follows. Each line
+corresponds to one short read. Each line has six tab-separated entries, namely:
+
+\begin{enumerate}
+\item unique read id
+\item chromosome/contig id
+\item strand
+\item beginning of sequence fragment
+\item end of sequence fragment
+\item read sequence (in strand specific direction) with alignment information (see below)
+\item read quality (in strand specific direction)
+\item beginning of $1^{st}$ exon
+\item end of $1^{st}$ exon
+\item beginning of $2^{nd}$ exon
+\item end of $2^{nd}$ exon
+\end{enumerate}
+
+Strand specific direction means that \QP assumes that the reads are already in
+their true orientation and the qualities as well.
+\\ \noindent
+Alignment information means that an alignment of a read to a genomic sequence A
+mismatch is encoded as $[AG]$ if $A$ is on the sequence and $G$ on the read
+side. A gap is denoted by $[-X]$ resp. $[X-]$ denotinge a gap on the sequence
+resp. read side with $X \in {A,C,G,T,N}$.
+
+
\subsection{Splice Scores}
As mentioned before the splice site scores where generated using a tool
#roche_454_range =
+def processQuality(raw_qualities,prb_offset,quality_interval,perform_checks):
+ """
+ In order to save some space we use a signed char to store the
+ qualities. Each quality element can range as follows: -128 <= elem <= 127
+ """
+
+ q_values = map(lambda x: ord(x)-prb_offset,raw_qualities)
+
+ if perform_checks:
+ for entry in q_values:
+ assert quality_interval[0] <= entry <= quality_interval[1], 'Error: Your read has invalid quality values: %d' % entry
+
+ return array.array('b',q_values)
+
+
+def checkExons(dna,exons,readAlignment,exampleKey):
+ """
+
+ """
+
+ fetched_dna_subseq = dna[exons[0,0]:exons[0,1]] + dna[exons[1,0]:exons[1,1]]
+
+ donor_elem = dna[exons[0,1]:exons[0,1]+2]
+ acceptor_elem = dna[exons[1,0]-2:exons[1,0]]
+
+ if not ( donor_elem == 'gt' or donor_elem == 'gc' ):
+ print 'invalid donor in example %d'% exampleKey
+ return False
+
+ if not ( acceptor_elem == 'ag' ):
+ print 'invalid acceptor in example %d'% exampleKey
+ return False
+
+ read = unbracket_seq(readAlignment)
+ read = read.replace('-','')
+
+ assert len(fetched_dna_subseq) == len(read), pdb.set_trace()
+
+ return True
+
+
def generateTrainingDataset(settings):
"""
This function creates a training dataset.
+
+
+ Create lower case original ("bracketed") reads
+
"""
+
dataset = []
+ half_window_size = settings['half_window_size']
+
+ instance_counter = 0
+
+ accessWrapper = DataAccessWrapper(settings)
+ seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+
+ for line in open(map_file):
+ line = line.strip()
+ if line.startswith('#') or line == '':
+ continue
+
+ if instance_counter > 0 and instance_counter % 5000 == 0:
+ print 'processed %d examples' % instance_counter
+
+ slist = line.split()
+
+ id = int(slist[0])
+ chromo = int(slist[1])
+
+ # for the time being we only consider chromosomes 1-5
+ if not chromo in settings['allowed_fragments']:
+ continue
+
+ # convert D/P strand info to +/-
+ strand = ['-','+'][slist[2] == 'D']
+
+ seqBeginning = int(slist[3])
+ seqEnd = int(slist[4])
+
+ readAlignment = slist[5]
+ prb = processQuality(slist[6],prb_offset,quality_intervals,perform_checks)
+
+ exons = numpy.mat([0,0,0,0]).reshape((2,2))
+
+ exons[0,0] = int(slist[7])
+ exons[0,1] = int(slist[8])
+ exons[1,0] = int(slist[9])
+ exons[1,1] = int(slist[10])
+
+ dna = seqInfo.get_seq_and_scores(chr,strand,up_cut,down_cut,only_seq=True)
+ relative_exons = exons - up_cut
+
+ assert checkExons(dna,relative_exons,readAlignment,id)
+
+ dataset.setdefault(id, []).append((currentSeqInfo,readAlignment,[prb],exons))
+
saveData('training',dataset,settings)
def addToDataset(map_file,dataset,settings):
assert os.path.exists(map_file), 'Error: Can not find map file'
+ perform_checks = settings['perform_checks']
+
prb_offset = settings['prb_offset']
# This tuple specifies an interval for valid Illumina Genome Analyzer quality values
dna_seq = seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop,True)
- # In order to save some space we use a signed char to store the
- # qualities. Each quality element can range as follows: -128 <= elem <= 127
-
- q_values = map(lambda x: ord(x)-prb_offset,slist[5])
-
- if settings['perform_checks']:
- for entry in q_values:
- assert quality_interval[0] <= entry <= quality_interval[1], 'Error: Your read has invalid quality values: %d' % entry
-
- prb = array.array('b',q_values)
-
+ prb = processQuality(slist[5],prb_offset,quality_interval,perform_checks)
+
currentSeqInfo = (id,chromo,strand,genomicSeq_start,genomicSeq_stop)
- currentQualities = [prb]
# As one single read can have several vmatch matches we store all these
# matches under the unique id of the read
- dataset.setdefault(id, []).append((currentSeqInfo,read_seq,currentQualities))
+ dataset.setdefault(id, []).append((currentSeqInfo,read_seq,[prb]))
instance_counter += 1
vmatch is spliced an should be then newly aligned using QPalma or not.
"""
- def __init__(self,run_fname,data_fname,param_fname,result_fname,settings):
+ def __init__(self,data_fname,param_fname,result_fname,settings):
"""
We need a run object holding information about the nr. of support points
etc.
"""
- run = cPickle.load(open(run_fname))
- self.run = run
-
start = cpu()
self.all_remapped_reads = BracketWrapper(data_fname)
stop = cpu()
self.param = cPickle.load(open(param_fname))
# Set the parameters such as limits penalties for the Plifs
- [h,d,a,mmatrix,qualityPlifs] = set_param_palma(self.param,True,run)
+ [h,d,a,mmatrix,qualityPlifs] = set_param_palma(self.param,True,settings)
self.h = h
self.d = d
# id = int(id)
# self.original_reads[id] = seq
- lengthSP = run['numLengthSuppPoints']
- donSP = run['numDonSuppPoints']
- accSP = run['numAccSuppPoints']
- mmatrixSP = run['matchmatrixRows']*run['matchmatrixCols']
- numq = run['numQualSuppPoints']
- totalQualSP = run['totalQualSuppPoints']
+ lengthSP = settings['numLengthSuppPoints']
+ donSP = settings['numDonSuppPoints']
+ accSP = settings['numAccSuppPoints']
+ mmatrixSP = settings['matchmatrixRows']*settings['matchmatrixCols']
+ numq = settings['numQualSuppPoints']
+ totalQualSP = settings['totalQualSuppPoints']
- currentPhi = zeros((run['numFeatures'],1))
+ currentPhi = zeros((settings['numFeatures'],1))
currentPhi[0:lengthSP] = mat(h.penalties[:]).reshape(lengthSP,1)
currentPhi[lengthSP:lengthSP+donSP] = mat(d.penalties[:]).reshape(donSP,1)
currentPhi[lengthSP+donSP:lengthSP+donSP+accSP] = mat(a.penalties[:]).reshape(accSP,1)
"""
This method...
"""
- run = self.run
ctr = 0
unspliced_ctr = 0
donor/acceptor positions.
"""
- run = self.run
-
id = location['id']
chr = location['chr']
pos = location['pos']
#print original_est, original_est_cut
- score = computeSpliceAlignScoreWithQuality(original_est_cut, quality, qualityPlifs, run, self.currentPhi)
+ score = computeSpliceAlignScoreWithQuality(original_est_cut, quality, qualityPlifs, settings, self.currentPhi)
score += AcceptorScore + IntronScore + DonorScore + self.spliced_bias
alternativeScores.append(score)
#print original_est, original_est_cut
- score = computeSpliceAlignScoreWithQuality(original_est_cut, quality, qualityPlifs, run, self.currentPhi)
+ score = computeSpliceAlignScoreWithQuality(original_est_cut, quality, qualityPlifs, settings, self.currentPhi)
score += AcceptorScore + IntronScore + DonorScore + self.spliced_bias
alternativeScores.append(score)
"""
start = cpu()
- run = self.run
-
# Lets start calculation
dna, exons, est, original_est, quality, acc_supp, don_supp = alignment
- score = computeSpliceAlignScoreWithQuality(original_est, quality, self.qualityPlifs, run, self.currentPhi)
+ score = computeSpliceAlignScoreWithQuality(original_est, quality, self.qualityPlifs, settings, self.currentPhi)
stop = cpu()
self.calcAlignmentScoreTime += stop-start
except:
print 'Error: %s has to be an float!'%parameter
+ settings['numFeatures'] = settings['numLengthSuppPoints'] + settings['numAccSuppPoints'] + settings['numDonSuppPoints']\
+ + settings['matchmatrixCols']*settings['matchmatrixRows'] + settings['numQualPlifs']*settings['numQualSuppPoints']
+
return settings
#run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
#param_fname = jp(run_dir,'param_526.pickle')
- param_fname = self.settings['prediction_parameter_fn']
+ param_fname = self.settings['prediction_param_fn']
#run_fname = jp(run_dir,'run_obj.pickle')
- run_fname = self.settings['run_fn']
#result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
result_dir = self.settings['approximation_dir']
result_fname = jp(result_dir,'map.vm.part_%d'%idx)
self.result_files.append(result_fname)
- current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
+ current_job = KybJob(gridtools.ApproximationTaskStarter,[data_fname,param_fname,result_fname,self.settings])
current_job.h_vmem = '25.0G'
#current_job.express = 'True'
combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
-def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
- ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
+def ApproximationTaskStarter(data_fname,param_fname,result_fname,settings):
+ ph1 = PipelineHeuristic(data_fname,param_fname,result_fname,settings)
ph1.filter()
return 'finished filtering set %s.' % data_fname
def __init__(self):
ClusterTask.__init__(self)
+
def CreateJobs(self):
+ """
+
+ """
+
+ jp = os.path.join
+
+ dataset_fn = self.settings['training_dataset_fn']
+ training_keys = cPickle.load(open(self.settings['training_dataset_keys_fn']))
+
+ print 'Found %d keys for training.' % len(training_keys)
+
+ set_name = 'training_set'
+
+ current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,dataset_fn,training_keys,set_name])
+ current_job.h_vmem = '2.0G'
+ current_job.express = 'True'
+
+ print "job #1: ", current_job.nativeSpecification
+
+ self.functionJobs.append(current_job)
+
+ print 'Got %d job(s)' % len(self.functionJobs)
+
+
+ def collectResults(self):
pass
- #cPickle.dump(settings,open(jp(,'training_settings.pickle''run_obj.pickle','w+'))
+
+def TrainingTaskStarter(settings,dataset_fn,training_keys,set_name):
+ accessWrapper = DataAccessWrapper(settings)
+ seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+ qp = QPalma(seqInfo)
+ qp.init_training(dataset_fn,training_keys,settings,set_name)
+ return 'finished prediction of set %s.' % set_name
class PostprocessingTask(ClusterTask):
jp = os.path.join
dotProd = lambda x,y: (x.T * y)[0,0]
+
class SpliceSiteException:
pass
-def getData(training_set,exampleKey,settings):
- """ This function... """
+def preprocessExample(training_set,exampleKey,seqInfo,settings):
+ """
+ This function...
+ """
- currentSeqInfo,original_est,currentQualities,currentExons = training_set[exampleKey]
+ currentSeqInfo,originalRead,currentQualities,currentExons = training_set[exampleKey]
id,chr,strand,up_cut,down_cut = currentSeqInfo
- est = original_est
- est = "".join(est)
- est = est.lower()
- est = unbracket_est(est)
- est = est.replace('-','')
-
- est_len = len(est)
-
- original_est = "".join(original_est)
- original_est = original_est.lower()
-
- dna_flat_files = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/'
- dna, acc_supp, don_supp = get_seq_and_scores(chr,strand,up_cut,down_cut,dna_flat_files)
+ read = unbracket_seq(originalRead)
+ read = read.replace('-','')
- #currentDNASeq, currentAcc, currentDon = seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop)
+ dna, acc_supp, don_supp = seqInfo.get_seq_and_scores(chr,strand,up_cut,down_cut)
- original_exons = currentExons
- exons = original_exons - (up_cut-1)
- exons[0,0] -= 1
- exons[1,0] -= 1
+ exons = currentExons - up_cut
if exons.shape == (2,2):
fetched_dna_subseq = dna[exons[0,0]:exons[0,1]] + dna[exons[1,0]:exons[1,1]]
print 'invalid acceptor in example %d'% exampleKey
raise SpliceSiteException
- assert len(fetched_dna_subseq) == len(est), pdb.set_trace()
+ assert len(fetched_dna_subseq) == len(read), pdb.set_trace()
+
+ return dna,read,acc_supp,don_supp,exons,originalRead,currentQualities
+
+
+def performAlignment(dna,read,quality,mmatrix,donor,acceptor,ps,qualityPlifs,current_num_path,prediction_mode,settings):
+ """
+ Given the needed input this method calls the QPalma C module which
+ calculates a dynamic programming in order to obtain an alignment
+ """
+
+ prb = QPalmaDP.createDoubleArrayFromList(quality)
+ chastity = QPalmaDP.createDoubleArrayFromList([.0]*len(read))
+
+ matchmatrix = QPalmaDP.createDoubleArrayFromList(mmatrix.flatten().tolist()[0])
+ mm_len = settings['matchmatrixRows']*settings['matchmatrixCols']
+
+ donor = QPalmaDP.createDoubleArrayFromList(donor)
+ acceptor = QPalmaDP.createDoubleArrayFromList(acceptor)
+
+ # Create the alignment object representing the interface to the C/C++ code.
+ currentAlignment = QPalmaDP.Alignment(settings['numQualPlifs'],settings['numQualSuppPoints'], settings['enable_quality_scores'])
+
+ c_qualityPlifs = QPalmaDP.createPenaltyArrayFromList([elem.convert2SWIG() for elem in qualityPlifs])
+
+ # calculates SpliceAlign, EstAlign, weightMatch, Gesamtscores, dnaest
+ currentAlignment.myalign( current_num_path, dna, len(dna),\
+ read, len(read), prb, chastity, ps, matchmatrix, mm_len, donor, len(donor),\
+ acceptor, len(acceptor), c_qualityPlifs, settings['remove_duplicate_scores'],\
+ settings['print_matrix'] )
+
+ if prediction_mode:
+ # part that is only needed for prediction
+ result_len = currentAlignment.getResultLength()
+ dna_array,read_array = currentAlignment.getAlignmentArraysNew()
+ else:
+ dna_array = None
+ read_array = None
+
+ newSpliceAlign, newEstAlign, newWeightMatch, newDPScores, newQualityPlifsFeatures =\
+ currentAlignment.getAlignmentResultsNew()
+
+ return newSpliceAlign, newEstAlign, newWeightMatch, newDPScores,\
+ newQualityPlifsFeatures, dna_array, read_array
- return dna,est,acc_supp,don_supp,exons,original_est,currentQualities
class QPalma:
self.logfh.flush()
- def do_alignment(self,dna,est,quality,mmatrix,donor,acceptor,ps,qualityPlifs,current_num_path,prediction_mode,settings):
- """
- Given the needed input this method calls the QPalma C module which
- calculates a dynamic programming in order to obtain an alignment
- """
-
- dna_len = len(dna)
- est_len = len(est)
-
- prb = QPalmaDP.createDoubleArrayFromList(quality)
- chastity = QPalmaDP.createDoubleArrayFromList([.0]*est_len)
-
- matchmatrix = QPalmaDP.createDoubleArrayFromList(mmatrix.flatten().tolist()[0])
- mm_len = settings['matchmatrixRows']*settings['matchmatrixCols']
-
- d_len = len(donor)
- donor = QPalmaDP.createDoubleArrayFromList(donor)
- a_len = len(acceptor)
- acceptor = QPalmaDP.createDoubleArrayFromList(acceptor)
-
- # Create the alignment object representing the interface to the C/C++ code.
- currentAlignment = QPalmaDP.Alignment(settings['numQualPlifs'],settings['numQualSuppPoints'], settings['enable_quality_scores'])
- c_qualityPlifs = QPalmaDP.createPenaltyArrayFromList([elem.convert2SWIG() for elem in qualityPlifs])
- # calculates SpliceAlign, EstAlign, weightMatch, Gesamtscores, dnaest
- currentAlignment.myalign( current_num_path, dna, dna_len,\
- est, est_len, prb, chastity, ps, matchmatrix, mm_len, donor, d_len,\
- acceptor, a_len, c_qualityPlifs, settings['remove_duplicate_scores'],\
- settings['print_matrix'] )
-
- if prediction_mode:
- # part that is only needed for prediction
- result_len = currentAlignment.getResultLength()
- dna_array,est_array = currentAlignment.getAlignmentArraysNew()
- else:
- dna_array = None
- est_array = None
-
- newSpliceAlign, newEstAlign, newWeightMatch, newDPScores, newQualityPlifsFeatures =\
- currentAlignment.getAlignmentResultsNew()
-
- return newSpliceAlign, newEstAlign, newWeightMatch, newDPScores,\
- newQualityPlifsFeatures, dna_array, est_array
-
-
- def init_train(self,settings,run_name):
- full_working_path = jp(settings['alignment_dir'],run_name)
+ def init_training(self,dataset_fn,training_keys,settings,set_name):
+ full_working_path = jp(settings['training_dir'],run_name)
#assert not os.path.exists(full_working_path)
if not os.path.exists(full_working_path):
self.logfh = open('_qpalma_train.log','w+')
- train(settings)
+ training_set = cPickle.load(open(dataset_fn))
+
+ self.train(training_set,settings,set_name)
- def train(self,settings,training_set):
+ def train(self,training_set,settings,set_name):
"""
The mainloop for training.
"""
try:
dna,est,acc_supp,don_supp,exons,original_est,currentQualities =\
- getData(training_set,example_key,settings)
+ preprocessExample(training_set,example_key,self.seqInfo,settings)
except SpliceSiteException:
continue
if elem != -inf:
acc_supp[idx] = 0.0
-
# Berechne die Parameter des wirklichen Alignments (but with untrained d,a,h ...)
if settings['enable_quality_scores']:
trueSpliceAlign, trueWeightMatch, trueWeightQuality ,dna_calc =\
newSpliceAlign, newEstAlign, newWeightMatch, newDPScores,\
newQualityPlifsFeatures, unneeded1, unneeded2 =\
- self.do_alignment(dna,est,quality,mmatrix,donor,acceptor,ps,qualityPlifs,num_path[exampleIdx],False,settings)
+ performAlignment(dna,est,quality,mmatrix,donor,acceptor,ps,qualityPlifs,num_path[exampleIdx],False,settings)
mm_len = settings['matchmatrixRows']*settings['matchmatrixCols']
newSpliceAlign = newSpliceAlign.reshape(num_path[exampleIdx],len(dna))
# end of one example processing
- # call solver every nth example //added constraint
+ # call solver every nth example / added constraint
if exampleIdx != 0 and exampleIdx % numConstPerRound == 0:
objValue,w,self.slacks = solver.solve()
solver_call_ctr += 1
[h,d,a,mmatrix,qualityPlifs] =\
set_param_palma(param,self.ARGS.train_with_intronlengthinformation,settings)
- ##############################################
- # end of one iteration through all examples #
- ##############################################
-
+ # end of one iteration through all examples
self.plog("suboptimal rounds %d\n" %suboptimal_example)
if self.noImprovementCtr == numExamples*2:
newSpliceAlign, newEstAlign, newWeightMatch, newDPScores,\
newQualityPlifsFeatures, dna_array, read_array =\
- self.do_alignment(dna,read,quality,mmatrix,donor,acceptor,ps,qualityPlifs,1,True,settings)
+ performAlignment(dna,read,quality,mmatrix,donor,acceptor,ps,qualityPlifs,1,True,settings)
mm_len = settings['matchmatrixRows']*settings['matchmatrixCols']
# Before creating a candidate spliced read dataset we have to first filter
# the matches from the first seed finding run.
- #approx_task = ApproximationTask(self.settings)
- #approx_task.CreateJobs()
- #approx_task.Submit()
- #approx_task.CheckIfTaskFinished()
+ approx_task = ApproximationTask(self.settings)
+ approx_task.CreateJobs()
+ approx_task.Submit()
+ approx_task.CheckIfTaskFinished()
# After filtering combine the filtered matches from the first run and the
# found matches from the second run to a full dataset
jp = os.path.join
-pos_chr1 = 'ccctaaaccctaaaccctaaaccctaaacctctgaatccttaatccctaaatccctaaatctttaaatcctacatccatgaatccctaaatacctaattccctaaacccgaaaccggtttctctggttgaaaatcattgtgtatataatgataattttatcgtttttatgtaattgcttattgttgtgtgtagattttttaaaaatatcatttgaggtcaatacaaatcctatttcttgtggttttctttccttcacttagctatggatggtttatcttcatttgttatattggatacaagctttgctacgatctacatttgggaatgtgagtctcttattgtaaccttagggttggtttatctcaagaatcttattaattgtttggactgtttatgtttggacatttattgtcattcttactcctttgtggaaatgtttgttctatcaatttatcttttgtgggaaaattatttagttgtagggatgaagtctttcttcgttgttgttacgcttgtcatctcatctctcaatgatatgggatggtcctttagcatttat'+'x'*205
-neg_chr1 = ''
+class TestQPalmaTraining(unittest.TestCase):
-acc_p = [217, 262, 302, 333, 352, 369, 478, 484, 492, 554]
-don_p = [217, 239, 242, 261, 271, 285, 301, 306, 328, 332, 342, 353, 357, 382, 391, 397, 412, 429, 437, 441, 461, 477, 480, 491, 501, 504, 507, 512, 516, 545, 553]
+ def setUp(self):
-acc_n = [229, 235, 246, 251, 256, 261, 276, 301, 306, 313, 333, 335, 346, 362, 371, 388, 417, 421, 424, 443, 455, 492, 496, 512, 520, 525, 527, 547]
-don_n = [224, 257, 262, 298, 302, 307, 310, 317, 346, 389, 404, 422, 511, 513, 554]
+ self.settings = parseSettings('testcase.conf')
-def check_createScoresForSequence():
- print 'Positive strand:'
- createScoresForSequence(pos_chr1,reverse_strand=False)
- print acc_p
- print don_p
+ def testInitialization(self):
+ pass
- print 'Negative strand:'
- filename = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/chr1.dna.flat.neg'
- neg_chr1 = open(filename).read().strip()
- print len(neg_chr1)
- createScoresForSequence(neg_chr1,reverse_strand=True)
- print acc_n
- print don_n
-
-def createScoresForSequence(full_chromo_seq,reverse_strand=False):
- """
- Given a genomic sequence this function calculates random scores for all
- ocurring splice sites.
- """
-
- acc_pos = []
- don_pos = []
-
- total_size = len(full_chromo_seq)
-
- # the first/last 205 pos do not have any predictions
- for pos,elem in enumerate(full_chromo_seq):
- if pos < 205 or pos > total_size-205:
- continue
-
- if full_chromo_seq[pos-2:pos] == 'ag':
- acc_pos.append(pos)
- if full_chromo_seq[pos:pos+2] == 'gt' or full_chromo_seq[pos:pos+2] == 'gc':
- don_pos.append(pos)
-
- acc_scores = [0.0]*len(acc_pos)
- don_scores = [0.0]*len(don_pos)
-
- for idx in range(len(acc_pos)):
- acc_scores[idx] = random.uniform(0.1,1.0)
+ def test_preprocessExample(self):
- for idx in range(len(don_pos)):
- don_scores[idx] = random.uniform(0.1,1.0)
-
- # recalculate indices and reverse them in order to have positions relative
- # to positive strand
- if reverse_strand:
- acc_pos = [total_size-1-e for e in acc_pos]
- acc_pos.reverse()
- acc_scores.reverse()
-
- don_pos = [total_size-1-e for e in don_pos]
- don_pos.reverse()
- don_scores.reverse()
-
- # make pos 1-based
- acc_pos = [e+1 for e in acc_pos]
- don_pos = [e+1 for e in don_pos]
+ currentSeqInfo = ()
+ originalRead =
+ currentQualities =
+ currentExons =
- #print acc_pos[:10]
- #print don_pos[:10]
+ training_set[1] = (currentSeqInfo,originalRead,currentQualities,currentExons)
- acc_pos = array.array('I',acc_pos)
- acc_scores = array.array('f',acc_scores)
+ dna,read,acc_supp,don_supp,exons,originalRead,currentQualities =\
+ preprocessExample(training_set,1,seqInfo,settings)
- don_pos = array.array('I',don_pos)
- don_scores = array.array('f',don_scores)
- return acc_pos,acc_scores,don_pos,don_scores
-
-
-def saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,strand):
- """
- """
+ def test_performAlignment(self):
+ """
+ """
+ #performAlignment(dna,read,quality,mmatrix,donor,acceptor,ps,qualityPlifs,current_num_path,prediction_mode,settings)
+ pass
- acc_score_fn = 'test_data/acc/chromo_1%s.Conf'%strand
- acc_pos_fn = 'test_data/acc/chromo_1%s.pos'%strand
- don_score_fn = 'test_data/don/chromo_1%s.Conf'%strand
- don_pos_fn = 'test_data/don/chromo_1%s.pos'%strand
- acc_scores.tofile(open(acc_score_fn, 'wb'))
- acc_pos.tofile(open(acc_pos_fn, 'wb'))
- don_scores.tofile(open(don_score_fn, 'wb'))
- don_pos.tofile(open(don_pos_fn, 'wb'))
+ def test_train(self):
+ """
+ This function
+ """
-
-def create_mini_chromosome():
-
- chromo_fn = 'test_data/chromo1.flat'
-
- chromo_fh = open(chromo_fn)
- full_chromo_seq = chromo_fh.read()
- full_chromo_seq = full_chromo_seq.strip()
-
- print full_chromo_seq[:200]
-
- # create data for forward strand
- acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq,reverse_strand=False)
- print acc_pos[:5]
- print don_pos[:5]
- saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'+')
-
- # create data for reverse strand
- full_chromo_seq_rev = reverse_complement(full_chromo_seq)
-
- total_size = len(full_chromo_seq_rev)
-
- print full_chromo_seq_rev[:200]
- acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq_rev,reverse_strand=True)
- saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
+ set_name = 'test_train'
+
+ qp = QPalma(seqInfo,True)
+ qp.train(self.training_set,self.settings,set_name)
-def test_rev_comp():
- get_seq_and_scores(self,chromo,strand,genomicSeq_start,genomicSeq_stop,only_seq=False,perform_checks=True)
class TestQPalmaPrediction(unittest.TestCase):
"""
print 'Problem counter is %d' % qp.problem_ctr
-def check_reverse_strand_calculation(id,b,e,seqInfo):
- total_size = seqInfo.getFragmentSize(1)
- bp = total_size - e
- ep = total_size - b
-
- seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
- seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
- seqp = reverse_complement(seqp)
-
- res1 = (seq == seqp)
- #print seq
- #print seqp
-
- seq,acc,don = seqInfo.get_seq_and_scores(id,'+',b,e,only_seq=False,perform_checks=False)
- seq,acc,don = seqInfo.get_seq_and_scores(id,'-',bp,ep,only_seq=False,perform_checks=False)
- #seqp = reverse_complement(seq)
-
- res2 = (seq == seqp)
- print seq
- print seqp
-
- return res1,res2
-
-
-def check_example(chromo,strand,b,e,seqInfo,lt1):
- dna,acc,don = seqInfo.get_seq_and_scores(1,strand,b,e)
-
- if lt1 != None:
- _dna,_acc,_don= lt1.get_seq_and_scores(chromo,strand,b,e)
- else:
- _dna,_acc,_don = seqInfo.get_seq_and_scores(chromo,strand,b,e)
-
- print 'Current interval: (%d,%d), current strand: %s'%(b,e,strand)
- print 'Results for dna,acc,don: %s %s %s'%(str(dna==_dna),str(acc==_acc),str(don==_don))
-
- if dna != _dna:
- print dna[:20]
- print _dna[:20]
-
- if acc != _acc or don != _don:
- print [p for p,e in enumerate(acc) if e != -inf][:10]
- print [p for p,e in enumerate(_acc) if e != -inf][:10]
- print [p for p,e in enumerate(don) if e != -inf][:10]
- print [p for p,e in enumerate(_don) if e != -inf][:10]
-
-
-def simple_check(settings,seqInfo,lt1):
-
- print 'Checking sequences for some intervals...'
-
- intervals = [(0,10000),(545,874),(999,1234)]
-
- chromo = 1
- total_size = seqInfo.getFragmentSize(chromo)
-
- #for strand in ['+','-']:
- for strand in ['-']:
- for (b,e) in [(206,874),(545,874),(999,1234),(1000,total_size-1000),(3,total_size-3),(0,total_size)]:
- check_example(chromo,strand,b,e,seqInfo,lt1)
-
- #for (b,e) in intervals:
- # r1,r2 = check_reverse_strand_calculation(1,b,e,seqInfo)
- # print b,e
- # print 'Rev strand calculation: %s %s'%(str(r1),str(r2))
-
-
-def checks():
- settings = {}
-
- settings['genome_dir'] = 'test_data/'
- settings['acceptor_scores_loc'] = 'test_data/acc'
- settings['donor_scores_loc'] = 'test_data/don'
-
- settings['genome_file_fmt'] = 'chromo%d.flat'
- settings['splice_score_file_fmt']= 'chromo_%d%s'
-
- allowed_fragments = [1]
- settings['allowed_fragments'] = allowed_fragments
-
- accessWrapper = DataAccessWrapper(settings)
- seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
-
- lt1 = None
- lt1 = LookupTable(settings)
-
- print 'Checking with toy data...'
- simple_check(settings,seqInfo,lt1)
-
- settings = {}
-
- settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
- settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
- settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
-
- settings['genome_file_fmt'] = 'chr%d.dna.flat'
- settings['splice_score_file_fmt']= 'contig_%d%s'
-
- allowed_fragments = [1]
- settings['allowed_fragments'] = allowed_fragments
-
- accessWrapper = DataAccessWrapper(settings)
- seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
-
- lt1 = None
- lt1 = LookupTable(settings)
-
- print 'Checking with real data...'
- simple_check(settings,seqInfo,lt1)
-
-
-def run():
- print 'Creating some artifical data...'
- create_mini_chromosome()
- print 'Performing some checks...'
- checks()
-
if __name__ == '__main__':
- run()
- #check_createScoresForSequence()
- #suite = unittest.TestLoader().loadTestsFromTestCase(TestQPalmaPrediction)
- #unittest.TextTestRunner(verbosity=2).run(suite)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestQPalmaTraining)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestQPalmaPrediction)
+ unittest.TextTestRunner(verbosity=2).run(suite)
import unittest
from test_approximation import TestApproximation
-from test_qpalma import TestQPalmaPrediction
-from test_utils import TestSequenceUtils
+#from test_qpalma import TestQPalmaPrediction
+from test_utils import TestQPalmaPrediction
SUCCESS = True
import cPickle
import random
+import array
+import sys
import math
import numpy
from numpy import inf
import array
import unittest
-from qpalma.qpalma_main import QPalma
-from qpalma.utils import print_prediction
+from qpalma.qpalma_main import preprocessExample
-from qpalma.Run import Run
+from qpalma.utils import print_prediction
from qpalma.SettingsParser import parseSettings
from qpalma.OutputFormat import alignment_reconstruct
from qpalma.Lookup import LookupTable
+from qpalma.DatasetUtils import checkExons,processQuality
+
+
jp = os.path.join
+pos_chr1 = 'ccctaaaccctaaaccctaaaccctaaacctctgaatccttaatccctaaatccctaaatctttaaatcctacatccatgaatccctaaatacctaattccctaaacccgaaaccggtttctctggttgaaaatcattgtgtatataatgataattttatcgtttttatgtaattgcttattgttgtgtgtagattttttaaaaatatcatttgaggtcaatacaaatcctatttcttgtggttttctttccttcacttagctatggatggtttatcttcatttgttatattggatacaagctttgctacgatctacatttgggaatgtgagtctcttattgtaaccttagggttggtttatctcaagaatcttattaattgtttggactgtttatgtttggacatttattgtcattcttactcctttgtggaaatgtttgttctatcaatttatcttttgtgggaaaattatttagttgtagggatgaagtctttcttcgttgttgttacgcttgtcatctcatctctcaatgatatgggatggtcctttagcatttat'+'x'*205
+
+neg_chr1 = ''
+
+acc_p = [217, 262, 302, 333, 352, 369, 478, 484, 492, 554]
+don_p = [217, 239, 242, 261, 271, 285, 301, 306, 328, 332, 342, 353, 357, 382, 391, 397, 412, 429, 437, 441, 461, 477, 480, 491, 501, 504, 507, 512, 516, 545, 553]
+
+acc_n = [229, 235, 246, 251, 256, 261, 276, 301, 306, 313, 333, 335, 346, 362, 371, 388, 417, 421, 424, 443, 455, 492, 496, 512, 520, 525, 527, 547]
+don_n = [224, 257, 262, 298, 302, 307, 310, 317, 346, 389, 404, 422, 511, 513, 554]
+
+
+def check_createScoresForSequence():
+ print 'Positive strand:'
+ createScoresForSequence(pos_chr1,reverse_strand=False)
+ print acc_p
+ print don_p
+
+ print 'Negative strand:'
+ filename = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome/chr1.dna.flat.neg'
+ neg_chr1 = open(filename).read().strip()
+ print len(neg_chr1)
+ createScoresForSequence(neg_chr1,reverse_strand=True)
+ print acc_n
+ print don_n
+
def createScoresForSequence(full_chromo_seq,reverse_strand=False):
"""
total_size = len(full_chromo_seq)
# the first/last 205 pos do not have any predictions
- for pos,elem in enumerate(full_chromo_seq[205:-205]):
+ for pos,elem in enumerate(full_chromo_seq):
+ if pos < 205 or pos > total_size-205:
+ continue
+
if full_chromo_seq[pos-2:pos] == 'ag':
acc_pos.append(pos)
if full_chromo_seq[pos:pos+2] == 'gt' or full_chromo_seq[pos:pos+2] == 'gc':
don_pos.append(pos)
- # make pos 1-based
- acc_pos = [e+1 for e in acc_pos]
- don_pos = [e+1 for e in don_pos]
-
acc_scores = [0.0]*len(acc_pos)
don_scores = [0.0]*len(don_pos)
don_pos.reverse()
don_scores.reverse()
+ # make pos 1-based
+ acc_pos = [e+1 for e in acc_pos]
+ don_pos = [e+1 for e in don_pos]
+
+ #print acc_pos[:10]
+ #print don_pos[:10]
+
acc_pos = array.array('I',acc_pos)
acc_scores = array.array('f',acc_scores)
print full_chromo_seq_rev[:200]
acc_pos,acc_scores,don_pos,don_scores = createScoresForSequence(full_chromo_seq_rev,reverse_strand=True)
- acc_pos = [total_size-1-e for e in acc_pos]
- acc_pos.reverse()
- print acc_pos[:5]
- don_pos = [total_size-1-e for e in don_pos]
- don_pos.reverse()
- print don_pos[:5]
+ saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
- #
- # Remember: The positions are always saved one-based.
- #
- #saveSpliceInfo(acc_pos,acc_scores,don_pos,don_scores,'-')
+def test_rev_comp():
+ get_seq_and_scores(self,chromo,strand,genomicSeq_start,genomicSeq_stop,only_seq=False,perform_checks=True)
- a = 103
- b = 255
- print 'pos: %s'%full_chromo_seq[a:b]
- print 'neg: %s'%full_chromo_seq_rev[a:b]
- total_size = len(full_chromo_seq)
- ap = total_size-b
- bp = total_size-a
- print 'png: %s'%reverse_complement(full_chromo_seq[ap:bp])
+class TestQPalmaPrediction(unittest.TestCase):
+ """
+ This class...
+ """
-class TestSequenceUtils(unittest.TestCase):
+
+ def _setUp(self):
+ self.prediction_set = {}
- def setUp(self):
- create_mini_chromosome()
+ # chr1 + 20-120
+ read = 'catctatgcaacagcattacagtgatcaccggcccaaaaaacctgtgtctggggttttgcctgatgatagcagtgatactgaaactggatcaatggtaag'
+ currentQualities = [[40]*len(read)]
+ id = 3
+ chromo = 1
+ strand = '+'
- def check_reverse_strand_calculation(self,id,b,e,seqInfo):
-
- seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
+ genomicSeq_start = 3500
+ genomicSeq_stop = 6500
+ print 'Position: ',
+ print genomicSeq_start,genomicSeq_stop
- total_size = seqInfo.getFragmentSize(1)
- bp = total_size - e
- ep = total_size - b
- seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
- seqp = reverse_complement(seqp)
+ currentSeqInfo = id,chromo,strand,genomicSeq_start,genomicSeq_stop
- self.assertEqual(seq,seqp)
+ example = (currentSeqInfo,read,currentQualities)
+ self.prediction_set[id] = [example]
+ # chr1 - 5000-5100
+ read = 'ctgtgtatctggttgctcaatatgctcgccggaaaatgaagatcatggatgctgtgagttctccttattgttcattatcaaactgatatgagtttctgat'
+ currentQualities = [[40]*len(read)]
- def simple_check(self,settings,seqInfo,lt1):
+ id = 4
+ chromo = 1
+ strand = '-'
- print 'Checking sequences for some intervals...'
-
- intervals = [(0,10000),(545,874),(999,1234)]
+ total_size = 30432563
+
+ genomicSeq_start = total_size - 6500
+ genomicSeq_stop = total_size - 3500
+ print 'Position: ',
+ print genomicSeq_start,genomicSeq_stop
+
+ currentSeqInfo = id,chromo,strand,genomicSeq_start,genomicSeq_stop
+
+ example = (currentSeqInfo,read,currentQualities)
+ self.prediction_set[id] = [example]
+
+
+ def testAlignments(self):
+
+ settings = parseSettings('testcase.conf')
+
+ print self.prediction_set
+ for example_key in self.prediction_set.keys():
+ print 'Current example %d' % example_key
+
+ for example in self.prediction_set[example_key]:
+ print example
+ print 'size'
+ print len(example)
+
+ accessWrapper = DataAccessWrapper(settings)
+ seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+
+ qp = QPalma(seqInfo,True)
+ allPredictions = qp.predict(self.prediction_set,settings)
+
+ for current_prediction in allPredictions:
+ align_str = print_prediction(current_prediction)
+ print align_str
- for (b,e) in intervals:
- self.check_reverse_strand_calculation(1,b,e,seqInfo)
+ id = current_prediction['id']
+ seq = current_prediction['read']
+ dna = current_prediction['dna']
+ chromo = current_prediction['chr']
+ strand = current_prediction['strand']
+ start_pos = current_prediction['start_pos']
+ predExons = current_prediction['predExons']
- for (b,e) in [(206,874),(545,874),(999,1234)]:
- lt1 = LookupTable(settings)
- _dna,_acc,_don= lt1.get_seq_and_scores(1,'-',b,e)
- dna,acc,don = seqInfo.get_seq_and_scores(1,'-',b,e,only_seq=False,perform_checks=False)
+ numExons = int(math.ceil(len(predExons) / 2))
+
+ print alignment_reconstruct(current_prediction,numExons)
+ print id,start_pos,predExons
- self.assertEqual(dna,_dna)
- self.assertEqual(acc,_acc)
- self.assertEqual(don,_don)
+ print 'Problem counter is %d' % qp.problem_ctr
- def testWithArtificalData(self):
+ def _testAlignments(self):
+ run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
+
+ run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
+ run['name'] = 'test_run'
+ run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/sandbox/testcases'
+
+ param_fn = jp(run_dir,'param_526.pickle')
+ param = cPickle.load(open(param_fn))
+
+ print self.prediction_set
+ for example_key in self.prediction_set.keys():
+ print 'Current example %d' % example_key
+
+ for example in self.prediction_set[example_key]:
+ print example
+ print 'size'
+ print len(example)
+
+ # fetch the data needed
settings = {}
- settings['genome_dir'] = 'test_data/'
- settings['acceptor_scores_loc'] = 'test_data/acc'
- settings['donor_scores_loc'] = 'test_data/don'
+ settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
+ settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
+ settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
- settings['genome_file_fmt'] = 'chromo%d.flat'
- settings['splice_score_file_fmt']= 'chromo_%d%s'
+ settings['genome_file_fmt'] = 'chr%d.dna.flat'
+ settings['splice_score_file_fmt']= 'contig_%d%s'
allowed_fragments = [1]
settings['allowed_fragments'] = allowed_fragments
accessWrapper = DataAccessWrapper(settings)
- seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
- lt1 = LookupTable(settings)
+ seqInfo = SeqSpliceInfo(accessWrapper,allowed_fragments)
+
+ qp = QPalma(run,seqInfo,True)
+ allPredictions = qp.predict(self.prediction_set,param)
+
+ for current_prediction in allPredictions:
+ align_str = print_prediction(current_prediction)
+ print align_str
+
+ id = current_prediction['id']
+ seq = current_prediction['read']
+ dna = current_prediction['dna']
+ chromo = current_prediction['chr']
+ strand = current_prediction['strand']
+ start_pos = current_prediction['start_pos']
+ predExons = current_prediction['predExons']
+
+ numExons = int(math.ceil(len(predExons) / 2))
+
+ print alignment_reconstruct(current_prediction,numExons)
+ print id,start_pos,predExons
+
+ print 'Problem counter is %d' % qp.problem_ctr
+
+
+def check_reverse_strand_calculation(id,b,e,seqInfo):
+ total_size = seqInfo.getFragmentSize(1)
+ bp = total_size - e
+ ep = total_size - b
+
+ seq,acc,don = seqInfo.get_seq_and_scores(id,'-',b,e,only_seq=False,perform_checks=False)
+ seqp,acc,don = seqInfo.get_seq_and_scores(id,'+',bp,ep,only_seq=False,perform_checks=False)
+ seqp = reverse_complement(seqp)
- print 'Checking with toy data...'
- self.simple_check(settings,seqInfo,lt1)
+ res1 = (seq == seqp)
+ #print seq
+ #print seqp
+
+ seq,acc,don = seqInfo.get_seq_and_scores(id,'+',b,e,only_seq=False,perform_checks=False)
+ seq,acc,don = seqInfo.get_seq_and_scores(id,'-',bp,ep,only_seq=False,perform_checks=False)
+ #seqp = reverse_complement(seq)
+
+ res2 = (seq == seqp)
+ print seq
+ print seqp
+
+ return res1,res2
+
+
+def check_example(chromo,strand,b,e,seqInfo,lt1):
+ dna,acc,don = seqInfo.get_seq_and_scores(1,strand,b,e)
+
+ if lt1 != None:
+ _dna,_acc,_don= lt1.get_seq_and_scores(chromo,strand,b,e)
+ else:
+ _dna,_acc,_don = seqInfo.get_seq_and_scores(chromo,strand,b,e)
+
+ print 'Current interval: (%d,%d), current strand: %s'%(b,e,strand)
+ print 'Results for dna,acc,don: %s %s %s'%(str(dna==_dna),str(acc==_acc),str(don==_don))
+
+ if dna != _dna:
+ print dna[:20]
+ print _dna[:20]
+
+ if acc != _acc or don != _don:
+ print [p for p,e in enumerate(acc) if e != -inf][:10]
+ print [p for p,e in enumerate(_acc) if e != -inf][:10]
+ print [p for p,e in enumerate(don) if e != -inf][:10]
+ print [p for p,e in enumerate(_don) if e != -inf][:10]
+
+
+def simple_check(settings,seqInfo,lt1):
+
+ print 'Checking sequences for some intervals...'
+
+ intervals = [(0,10000),(545,874),(999,1234)]
+
+ chromo = 1
+ total_size = seqInfo.getFragmentSize(chromo)
+
+ #for strand in ['+','-']:
+ for strand in ['-']:
+ for (b,e) in [(206,874),(545,874),(999,1234),(1000,total_size-1000),(3,total_size-3),(0,total_size)]:
+ check_example(chromo,strand,b,e,seqInfo,lt1)
+
+ #for (b,e) in intervals:
+ # r1,r2 = check_reverse_strand_calculation(1,b,e,seqInfo)
+ # print b,e
+ # print 'Rev strand calculation: %s %s'%(str(r1),str(r2))
def checks():
settings = {}
+ settings['genome_dir'] = 'test_data/'
+ settings['acceptor_scores_loc'] = 'test_data/acc'
+ settings['donor_scores_loc'] = 'test_data/don'
+
+ settings['genome_file_fmt'] = 'chromo%d.flat'
+ settings['splice_score_file_fmt']= 'chromo_%d%s'
+
+ allowed_fragments = [1]
+ settings['allowed_fragments'] = allowed_fragments
+
+ accessWrapper = DataAccessWrapper(settings)
+ seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+
+ lt1 = None
+ lt1 = LookupTable(settings)
+
+ print 'Checking with toy data...'
+ simple_check(settings,seqInfo,lt1)
+
+ settings = {}
+
settings['genome_dir'] = '/fml/ag-raetsch/share/projects/genomes/A_thaliana_best/genome'
settings['acceptor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/acc'
settings['donor_scores_loc'] = '/fml/ag-raetsch/home/fabio/tmp/interval_query_files/don'
accessWrapper = DataAccessWrapper(settings)
seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+
+ lt1 = None
lt1 = LookupTable(settings)
print 'Checking with real data...'
print 'Performing some checks...'
checks()
+
+class TestDatasetUtils(unittest.TestCase):
+ """
+ Here we check ...
+ """
+
+ def setUp(self):
+
+ # set some data for the first test
+
+ self.raw_qualities = [ 'hhh'\
+ ]
+
+ self.quality_results = [ [40,40,40]\
+ ]
+
+ self.quality_results = [array.array('b',e) for e in self.quality_results]
+
+ # now some data for the second test
+
+ self.dna_fragments = [ 'ggggttttgtccccagaaaaaggg'\
+ ]
+
+ self.readAlignments = [ 'ttttaaaaa'\
+ ]
+
+ self.exons = [ numpy.mat([4,8,16,21]).reshape((2,2))\
+ ]
+
+ self.results = [ True\
+ ]
+
+ def test_processQuality(self):
+
+ quality_interval = (-5,40)
+
+ perform_checks = True
+
+ for prb_offset in [64]:
+ for raw_quality in self.raw_qualities:
+ for quality_result in self.quality_results:
+
+ result = processQuality(raw_quality,prb_offset,quality_interval,perform_checks)
+ self.assertEqual(quality_result,result)
+
+
+ def test_checkExons(self):
+
+ for idx in range(len(self.dna_fragments)):
+ dna = self.dna_fragments[idx]
+ readAlignment = self.readAlignments[idx]
+
+ exons = self.exons[idx]
+ gt = self.results[idx]
+
+ result = checkExons(dna,exons,readAlignment,1)
+ self.assertEqual(gt,result)
+
+
+
+
if __name__ == '__main__':
- suite = unittest.TestLoader().loadTestsFromTestCase(TestSequenceUtils)
+ #run()
+ #check_createScoresForSequence()
+ #suite = unittest.TestLoader().loadTestsFromTestCase(TestQPalmaPrediction)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestDatasetUtils)
unittest.TextTestRunner(verbosity=2).run(suite)