+ added configuration file parsing and checking functions
authorFabio <fabio@congo.fml.local>
Wed, 10 Sep 2008 14:20:27 +0000 (16:20 +0200)
committerFabio <fabio@congo.fml.local>
Wed, 10 Sep 2008 14:20:27 +0000 (16:20 +0200)
doc/qpalma.tex
qpalma/gridtools.py
scripts/qpalma_pipeline.py

index 24799dd..3e0abbb 100644 (file)
@@ -37,10 +37,21 @@ alignment specific options.
 The project results directory (\emph{result\_dir}) contains then the subdirectories
 \begin{itemize}
 \item \emph{mapping} with subdirs main and spliced
-\item \emph{alignment}  with subdirs for the different parameters and \emph{heuristic}
+\item \emph{alignment} with subdirs for the different parameters and \emph{heuristic}
 \item \emph{remapping}
 \end{itemize}
 
+
+Via the variable ``result\_dir'' you can specify where all of QPalma's data should reside.
+This directory contains the following subdirectories:
+\begin{itemize}
+\item preprocessing
+\item approximation
+\item prediction
+\item postprocessing, and 
+\item training
+\end{itemize}
+
 %
 %
 %
index f46ed3b..63c44f1 100644 (file)
@@ -38,50 +38,49 @@ class ClusterTask(Thread):
    Every task creates a status file.
    All cluster jobs submit then their exit status to this status file.
 
-   Job / status
+   Every cluster task subclass should override/implement the methods:
+
+   1. __init__
+   2. CreateJobs
+   3. TaskStarter
 
-   1 success
-   2 failed
-   3 waiting
-   
-   means then job with id 1 terminated with success.
    """
    
    def __init__(self,global_settings):
       self.sleep_time = 0
 
       # this list stores the cluster/local jobs objects
-      self.functionJobs = []
+      self.functionJobs    = []
 
+      # this object stores the configuration
       self.global_settings = global_settings
 
-      #self.allJobIds = {}
-
 
-   def createJobs(self):
+   def CreateJobs(self):
+      """
+      This method create an array of jobs called self.functionJobs.  It is only
+      virtual in this base class and has to be implemented specifically for
+      each cluster task.
+      """
       pass
 
 
-   def submit(self):
+   def Submit(self):
+      """
+      After creation of jobs this function submits them to the cluster.
+      """
       for current_job in self.functionJobs:
          self.sid, self.jobids = submit_jobs([functionJobs])
 
-   def restart(self,id):
+
+   def Restart(self,id):
       pass
 
 
-   def checkIfTaskFinished(self):
+   def CheckIfTaskFinished(self):
       """
-      currentJobStatus = {}
-      for line in open(self.statusFile):
-         id,status = line.split()
-         currentJobStatus[id] = status
-
-      for key in self.allJobIds.keys():
-         try:
-            self.currentJobStatus[key]
-         except:
-            self.currentJobStatus[key] = 'no response'
+      This function is responsible for checking whether the submitted jobs were
+      completed successfully.
       """
 
       print 'checking whether finished'
@@ -99,37 +98,37 @@ class ClusterTask(Thread):
 
 class ApproximationTask(ClusterTask):
    """
+   This task represents the first step towards a valid QPalma dataset.
+   It starts an approximative QPalma model on the putative unspliced reads to
+   identify true spliced reads.
    """
 
    def __init__(self):
       ClusterTask.__init__(self)
 
 
-   def g_heuristic(run_fname,data_fname,param_fname,result_fname):
-      ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
-      ph1.filter()
-
-      return 'finished filtering set %s.' % data_fname
-
-   def createJobs(self):
+   def CreateJobs(self):
+      """
+      """
       num_splits = self.global_settings['num_splits']
 
       run_dir  = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
       param_fname    = jp(run_dir,'param_526.pickle')
+      # param_fname = self.global_settings['prediction_parameter_fn']
       run_fname      = jp(run_dir,'run_obj.pickle')
+      # run_fname = self.global_settings['run_fn']
 
-      data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+      #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+      result_dir = self.global_settings['approximation_dir']
 
       original_map_fname = self.global_settings['read_ascii_data_fn']
-      split_file(original_map_fname,data_dir,num_splits)
-
-      functionJobs=[]
+      split_file(original_map_fname,result_dir,num_splits)
 
       for idx in range(0,num_splits):
-         data_fname     = jp(data_dir,'map.part_%d'%idx)
-         result_fname   = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
+         data_fname     = jp(result_dir,'map.part_%d'%idx)
+         result_fname   = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
 
-         current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
+         current_job = KybJob(grid_heuristic.TaskStarter,[run_fname,data_fname,param_fname,result_fname])
          current_job.h_vmem = '25.0G'
          #current_job.express = 'True'
 
@@ -138,6 +137,11 @@ class ApproximationTask(ClusterTask):
          self.functionJobs.append(current_job)
 
 
+   def TaskStarter(run_fname,data_fname,param_fname,result_fname):
+      ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
+      ph1.filter()
+
+      return 'finished filtering set %s.' % data_fname
 
 
 class PreprocessingTask(ClusterTask):
@@ -155,27 +159,7 @@ class AlignmentTask(ClusterTask):
       ClusterTask.__init__(self)
 
 
-
-
-   def makeJobs(run,dataset_fn,chunks,param):
-      """
-      """
-
-      jobs=[]
-
-      for c_name,current_chunk in chunks:
-         current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
-         current_job.h_vmem = '20.0G'
-         #current_job.express = 'True'
-
-         print "job #1: ", current_job.nativeSpecification
-
-         jobs.append(current_job)
-
-      return jobs
-
-
-   def create_and_submit():
+   def CreateJobs():
       """
 
       """
@@ -206,7 +190,14 @@ class AlignmentTask(ClusterTask):
             c_name = 'chunk_%d' % idx
             chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
 
-      self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
+      for c_name,current_chunk in chunks:
+         current_job = KybJob(grid_predict.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
+         current_job.h_vmem = '20.0G'
+         #current_job.express = 'True'
+
+         print "job #1: ", current_job.nativeSpecification
+
+         self.functionJobs.append(current_job)
 
       sum = 0
       for size in [len(elem) for name,elem in chunks]:
@@ -215,7 +206,7 @@ class AlignmentTask(ClusterTask):
       print 'Got %d job(s)' % len(functionJobs)
 
 
-   def g_predict(run,dataset_fn,prediction_keys,param,set_name):
+   def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
       """
      
       """
@@ -228,6 +219,9 @@ class AlignmentTask(ClusterTask):
 
 
 class TrainingTask(ClusterTask):
+   """
+   This class represents the cluster task of training QPalma.
+   """
 
    def __init__(self):
       ClusterTask.__init__(self)
@@ -235,18 +229,18 @@ class TrainingTask(ClusterTask):
 
 class PostprocessingTask(ClusterTask):
    """
-
+   After QPalma predicted alignments this task postprocesses the data.
    """
 
    def __init__(self):
       ClusterTask.__init__(self)
 
 
-   def g_alignment(chunk_fn,result_fn):
+   def TaskStarter(chunk_fn,result_fn):
       create_alignment_file(chunk_fn,result_fn)
 
 
-   def createJobs(self):
+   def CreateJobs(self):
       run_dir     = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
       result_dir  = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
 
@@ -264,6 +258,34 @@ class PostprocessingTask(ClusterTask):
          result_fn   = jp(result_dir,'%s.align_remap'%chunk_name)
          chunk_fn = jp(run_dir,chunk_fn) 
 
+         current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
+         current_job.h_vmem = '15.0G'
+         current_job.express = 'True'
+
+         print "job #1: ", current_job.nativeSpecification
+
+
+class DummyTask(ClusterTask):
+   """
+   This class represents a dummy task to make debugging easier.
+   """
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+   def TaskStarter(param):
+      create_alignment_file(chunk_fn,result_fn)
+
+
+   def CreateJobs(self):
+      run_dir     = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
+      result_dir  = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
+
+      for chunk_fn in chunks_fn:
+         chunk_name  = chunk_fn[:chunk_fn.find('.')]
+         result_fn   = jp(result_dir,'%s.align_remap'%chunk_name)
+         chunk_fn = jp(run_dir,chunk_fn) 
+
          current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
          current_job.h_vmem = '15.0G'
          current_job.express = 'True'
index 17d64e0..42e8c38 100644 (file)
 # This file contains the main interface to the QPalma pipeline.
 #
 
+import os
+import os.path
+import pdb
+import sys
+
 from optparse import OptionParser
 
 from qpalma.gridtools import ApproximationTask,PreprocessingTask
 from qpalma.gridtools import AlignmentTask,PostprocessingTask
 
+
+Errormsg = """Usage is: python qpalma_pipeline.py <config filename>"""
+
+
+"""
 def create_option_parser():
    parser = OptionParser()
 
@@ -31,13 +41,76 @@ def create_option_parser():
    parser.add_option("-xx", "--clear", action="store_false", dest="verbose", help="cleanup directories delete all created data")
 
    return parser
+"""
+
+jp = os.path.join
+
+def parseSettings(filename):
+   """
+   """
+
+   #global_settings = {\
+   #'result_dir':'/fml/ag-raetsch/...',\
+   #'read_ascii_data_fn':'/fml/ag-raetsch/...',\
+   #'num_splits':50
+   #'global_log_fn':'~/qpalma.log'
+   #}
+
+   global_settings = {}
+
+   for line in open(filename):
+      if not line.strip() or line.startswith('#'):
+         continue
+
+      key,val = line.strip().replace(' ','').split('=')
+      global_settings[key] = val
+
+   return global_settings
+
+
+def makeSettings(global_settings):
+   """
+   
+   """
+
+   # first check wether the top level result directory exists
+   assert os.path.exists(global_settings['result_dir']), 'Error: You have to specify a existing result directory!'
+
+   result_dir = global_settings['result_dir']
+
+   # now create some subdirectories needed for the different steps performed by QPalma 
+   global_settings['approximation_dir'] = jp(result_dir,'approximation')
+   global_settings['preproc_dir']       = jp(result_dir,'preprocessing')
+   global_settings['postproc_dir']      = jp(result_dir,'postprocessing')
+   global_settings['prediction_dir']    = jp(result_dir,'prediction')
+   global_settings['training_dir']      = jp(result_dir,'training')
+
+   for dir_name in ['approximation_dir', 'preproc_dir', 'postproc_dir', 'prediction_dir', 'training_dir']:
+      try:
+         os.mkdir(global_settings[dir_name])
+      except:
+         print 'Error: There was a problem generating the subdirectory: %s' % dir_name
+
+   try:
+      os.mkdir(global_settings['global_log_fn'])
+   except:
+      print 'Error: There was a problem generating the logfile %s' % global_settings['global_log_fn']
+
+   return global_settings
+
+
+def checkSettings(global_settings):
+   for key,val in global_settings.items():
+      if key.endswith('_fn'):
+         assert os.path.exists(val), 'Error: Path/File %s with value %s does not seem to exist!' % (key,val)
+
+
+      if key.endswith('_dir'):
+         assert os.path.exists(val), 'Error: Path/File %s with value %s does not seem to exist!' % (key,val)
+   
+
+   return True
 
-global_settings = {\
-'experiment_dir':'/fml/ag-raetsch/...',\
-'read_ascii_data_fn':'/fml/ag-raetsch/...',\
-'num_splits':50
-'global_log_fn':'~/qpalma.log'
-}
 
 
 
@@ -53,11 +126,22 @@ class System:
 
    """
 
-   def __init__(self):
+   def __init__(self,filename):
       """
+      Inititalize the system by loading and parsing the settings file to obtain
+      all parameters.
       """
-      parser = create_option_parser()
-      (options, args) = parser.parse_args()
+
+      #parser = create_option_parser()
+      #(options, args) = parser.parse_args()
+
+      global_settings = parseSettings(filename)
+      global_settings = makeSettings(global_settings)
+      assert checkSettings(global_settings), 'Check your settings some entries were invalid!'
+
+      self.global_settings = global_settings
+
+      pdb.set_trace()
 
    def training(self):
       """
@@ -66,7 +150,7 @@ class System:
       algorithm.
       """
 
-      pre_task = TrainingPreprocessingTask(global_settings,run_specific_settings)
+      pre_task = TrainingPreprocessingTask(self.global_settings)
       pre_task.createJobs()
       pre_task.submit() 
       while pre_task.checkIfTaskFinished() == False:
@@ -83,7 +167,7 @@ class System:
       # Before creating a candidate spliced read dataset we have to first filter
       # the matches from the first seed finding run.
 
-      approx_task = ApproximationTask(config_obj)
+      approx_task = ApproximationTask(self.global_settings)
       approx_task.createJobs()
       approx_task.submit()
       approx_task.checkIfTaskFinished()
@@ -91,33 +175,33 @@ class System:
       # After filtering combine the filtered matches from the first run and the
       # found matches from the second run to a full dataset
 
-      pre_task = PreprocessingTask(...)
+      pre_task = PreprocessingTask(self.global_settings)
       pre_task.createJobs()
       pre_task.submit() 
-      while pre_task.checkIfTaskFinished() == False:
-         sleep(20)
+      pre_task.checkIfTaskFinished()
 
       # Now that we have a dataset we can perform the accurate alignments for this
       # data
 
-      align_task = AlignmentTask(...)
+      align_task = AlignmentTask(self.global_settings)
       align_task.createJobs()
       align_task.submit()
-      while align_task.checkIfTaskFinished() == False:
-         sleep(20)
+      align_task.checkIfTaskFinished()
 
       # The results of the above alignment step can be converted to a data format
       # needed for further postprocessing.
 
-      post_task = PostprocessingTask(...)
+      post_task = PostprocessingTask(self.global_settings)
       post_task.createJobs()
       post_task.submit()
-      while post_task.checkIfTaskFinished() == False:
-         sleep(20)
+      post_task.checkIfTaskFinished()
 
       print "Success!"
    
 
 if __name__ == '__main__':
-   system_obj = System() 
-   system_obj.run()
+   filename = sys.argv[1]
+   assert os.path.exists(filename), Errormsg
+   system_obj = System(filename)
+   #system_obj.prediction()
+   #system_obj.training()