+ added dataset generation function for training set
authorFabio <fabio@congo.fml.local>
Thu, 11 Sep 2008 21:34:38 +0000 (23:34 +0200)
committerFabio <fabio@congo.fml.local>
Thu, 11 Sep 2008 21:34:38 +0000 (23:34 +0200)
+ added result collection function for approximation step

qpalma/DatasetUtils.py
qpalma/gridtools.py
qpalma/utils.py
scripts/qpalma_pipeline.py

index 09d241c..b98de77 100644 (file)
@@ -11,13 +11,24 @@ import cPickle
 import os
 import pdb
 
-#from parsers import *
 from sequence_utils import SeqSpliceInfo,DataAccessWrapper,get_flat_file_size,reverse_complement,unbracket_seq,create_bracket_seq,reconstruct_dna_seq
 
+illumina_ga_range = (-5,40)
+#roche_454_range   =
 
-def generateDataset(global_settings):
+
+def generateTrainingDataset(settings):
+   """
+   This function creates a training dataset.
    """
-   This function is responsible for the dataset generation for the matches of
+   dataset = []
+
+   saveData('training',dataset,settings)
+
+
+def generatePredictionDataset(settings):
+   """
+   This function is responsible for the prediction dataset generation for the matches of
    the second vmatch run and the heuristic-filtered matches of the first run.
 
    An example in our dataset consists of:
@@ -40,19 +51,21 @@ def generateDataset(global_settings):
    'id'. This is because one read can be found several times on the genome.
    """
 
+   #map_file = settings['map_file']
+   map_file = jp(result_dir  = self.global_settings['approximation_dir'],'map.vm')
    assert os.path.exists(map_file), 'Error: Can not find map file'
-   print map_2nd_file
-   assert os.path.exists(map_2nd_file), 'Error: Can not find map_2nd file'
-   self.map_file = map_file
-   self.map_2nd_file = map_2nd_file
 
    dataset = {}
 
    prb_offset = 64
 
+   # This tuple specifies an interval for valid Illumina Genome Analyzer quality values
+   if settings['platform'] == 'IGA':
+      quality_interval = illumina_ga_range
+
    # this means that around each vmatch hit we cut out a window of 1500 bases
    # each up-/downstream.
-   half_window_size = 1500
+   half_window_size = settings['half_window_size']
 
    instance_counter = 0
 
@@ -70,7 +83,7 @@ def generateDataset(global_settings):
       pos      = int(slist[2])
 
       # for the time being we only consider chromosomes 1-5
-      if not chromo in range(1,6):
+      if not chromo in settings['allowed_fragments']:
          continue
 
       # convert D/P strand info to +/-
@@ -79,14 +92,10 @@ def generateDataset(global_settings):
       # QPalma uses lowercase characters
       bracket_seq = slist[4].lower()
       read_seq = unbracket_seq(bracket_seq)
-      reconstructed_dna_seq = reconstruct_dna_seq(bracket_seq)
-
-      if strand == '-' and first_round:
-         read_seq = reverse_complement(read_seq)
-         #reconstructed_dna_seq = reverse_complement(reconstructed_dna_seq)
 
+      # in prediction we do not have any bracketed reads anymore
       assert not '[' in read_seq and not ']' in read_seq
-      
+
       # we use an area of +/-  `self.half_window_size` nucleotides around the seed position
       if pos > self.half_window_size+1:
          us_offset = self.half_window_size
@@ -103,22 +112,16 @@ def generateDataset(global_settings):
 
       dna_seq = seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop,True)
       
-      #if strand == '-':
-      #   dna_part = dna_seq[us_offset-len(read_seq):us_offset]
-      #else:
-      #   dna_part = dna_seq[us_offset:us_offset+len(read_seq)]
-
-      ## check whether everything is ok
-      #   try:
-      #      currentDNASeq, currentAcc, currentDon = seqInfo.get_seq_and_scores(chromo,strand,genomicSeq_start,genomicSeq_stop)
-      #   except:
-      #      problem_ctr += 1
-      #      continue
-
       # In order to save some space we use a signed char to store the
       # qualities. Each quality element can range as follows: -128 <= elem <= 127
       
-      prb = array.array('b',map(lambda x: ord(x)-self.prb_offset,slist[5]))
+      q_values = map(lambda x: ord(x)-self.prb_offset,slist[5])
+
+      if settings['perform_checks']:
+         for entry in q_values:
+            assert quality_interval[0] <= entry <= quality_interval[1], 'Error: Your read has invalid quality values: %d' % entry
+
+      prb = array.array('b',q_values)
 
       currentSeqInfo = (id,chromo,strand,genomicSeq_start,genomicSeq_stop)
       currentQualities = [prb]
@@ -131,9 +134,15 @@ def generateDataset(global_settings):
 
    print 'Total examples processed: %d' % instance_counter
 
-   ddir = global_settings['dataset_dir']
-   dataset_fn        = jp(ddir,'data.pickle')
-   dataset_keys_fn   = jp(ddir,'data.keys.pickle')
+   saveData('prediction',dataset,settings)
+
+
+def saveData(prefix,dataset,settings):
+   """
+   """
+   ddir = settings['dataset_dir']
+   dataset_fn        = jp(ddir,'%s_data.pickle'%prefix)
+   dataset_keys_fn   = jp(ddir,'%s_data.keys.pickle'%prefix)
 
    assert not os.path.exists(dataset_fn), 'The data_file already exists!'
    assert not os.path.exists(dataset_keys_fn), 'The data_keys file already exists!'
index 2733390..34301c5 100644 (file)
@@ -79,6 +79,10 @@ class ClusterTask(Thread):
       pass
 
 
+   def collectResults(self):
+      pass
+
+
    def CheckIfTaskFinished(self):
       """
       This function is responsible for checking whether the submitted jobs were
@@ -95,6 +99,7 @@ class ClusterTask(Thread):
       #for (i, job) in enumerate(retjobs):
       #   print "Job #", i, "- ret: ", job.ret
       #print '--------------'
+      self.collectResults()
 
 
 
@@ -127,10 +132,13 @@ class ApproximationTask(ClusterTask):
 
       original_map_fname = self.global_settings['read_ascii_data_fn']
       split_file(original_map_fname,result_dir,num_splits)
+   
+      self.result_files = []
 
       for idx in range(0,num_splits):
          data_fname     = jp(result_dir,'map.part_%d'%idx)
          result_fname   = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
+         self.result_files.append(result_fname)
 
          current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.global_settings])
          current_job.h_vmem = '25.0G'
@@ -141,6 +149,13 @@ class ApproximationTask(ClusterTask):
          self.functionJobs.append(current_job)
 
 
+   def collectResults(self):
+      result_dir  = self.global_settings['approximation_dir']
+      combined_fn = jp(result_dir,'map.vm.spliced')
+      combine_files(self.result_files,combined_fn)
+      combine_files([combined_fn,settings[spliced_reads_fn]],'map.vm')
+
+
 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
    ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
    ph1.filter()
index 6fb55c2..38d4558 100644 (file)
@@ -166,6 +166,20 @@ def get_alignment(_newSpliceAlign,_newEstAlign, dna_array, est_array):
 
 ##########
 
+def combine_files(partial_files,new_file):
+   try:
+      fh = open(new_file,'w')
+   except IOError:
+      print 'Problem while trying to open file: %s' % new_file
+      sys.exit(1)
+
+   for pfile in partial_files:
+      for line in open(pfile):
+         fh.write(line)
+
+   fh.close()
+
+
 def split_file(filename,result_dir,num_parts):
    """
    Splits a file of n lines into num_parts parts
index 26d30e2..5e4bea2 100644 (file)
@@ -21,6 +21,8 @@ import sys
 from qpalma.gridtools import ApproximationTask,PreprocessingTask
 from qpalma.gridtools import AlignmentTask,PostprocessingTask
 
+from qpalma.DatasetUtils import generateDataset
+
 from SettingsParser import parseSettings
 
 
@@ -58,9 +60,8 @@ class System:
       pre_task = TrainingPreprocessingTask(self.global_settings)
       pre_task.createJobs()
       pre_task.submit() 
-      while pre_task.checkIfTaskFinished() == False:
-         sleep(20)
-      
+      pre_task.checkIfTaskFinished()
+
 
    def prediction(self):
       """
@@ -80,10 +81,11 @@ class System:
       # After filtering combine the filtered matches from the first run and the
       # found matches from the second run to a full dataset
 
-      pre_task = PreprocessingTask(self.global_settings)
-      pre_task.CreateJobs()
-      pre_task.Submit() 
-      pre_task.CheckIfTaskFinished()
+      generateDataset(self.global_settings)
+      #pre_task = PreprocessingTask(self.global_settings)
+      #pre_task.CreateJobs()
+      #pre_task.Submit() 
+      #pre_task.CheckIfTaskFinished()
 
       sys.exit(0)