+ update makefiles to fetch automatically valid Python includes and libs
[qpalma.git] / qpalma / gridtools.py
index d7fd5f0..6aeb856 100644 (file)
@@ -8,6 +8,7 @@
 
 import cPickle
 import math
+import time
 import os
 import os.path
 import pdb
@@ -32,6 +33,7 @@ from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
 from qpalma_main import QPalma
 
 jp = os.path.join
+pjoin = lambda *args: reduce(lambda x,y: jp(x,y),args)
 
 
 class ClusterTask(Thread):
@@ -73,7 +75,7 @@ class ClusterTask(Thread):
       After creation of jobs this function submits them to the cluster.
       """
       self.sid, self.jobids = submit_jobs(self.functionJobs)
-      #self.processedFunctionJobs = process_jobs(self.functionJobs)
+      #self.processedFunctionJobs = process_jobs(self.functionJobs,local=True,maxNumThreads=1)
 
 
    def Restart(self,id):
@@ -90,32 +92,17 @@ class ClusterTask(Thread):
       completed successfully.
       """
 
-      #print 'checking whether jobs finished...'
-      #while not get_status(self.sid, self.jobids):
-      #   time.sleep(7)
-      #print 'collecting jobs'
-      #retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
-
-      print 'checking whether finished'
-      while not get_status(self.sid, self.jobids):
-         time.sleep(10)
-
       print 'collecting jobs'
-      retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
+      retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs, True)
       print "ret fields AFTER execution on cluster"
       for (i, job) in enumerate(retjobs):
          print "Job #", i, "- ret: ", job.ret
 
       print '--------------'
 
-      #print "ret fields AFTER execution on cluster"
-      #for (i, job) in enumerate(self.processedFunctionJobs):
-      #   print "Job #", i, "- ret: ", job.ret
-
       self.collectResults()
 
 
-
 class ApproximationTask(ClusterTask):
    """
    This task represents the first step towards a valid QPalma dataset.
@@ -136,9 +123,8 @@ class ApproximationTask(ClusterTask):
 
       #run_dir  = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
       #param_fname    = jp(run_dir,'param_526.pickle')
-      param_fname = self.settings['prediction_parameter_fn']
+      param_fname = self.settings['prediction_param_fn']
       #run_fname      = jp(run_dir,'run_obj.pickle')
-      run_fname = self.settings['run_fn']
 
       #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
       result_dir = self.settings['approximation_dir']
@@ -150,10 +136,10 @@ class ApproximationTask(ClusterTask):
 
       for idx in range(0,num_splits):
          data_fname     = jp(result_dir,'map.part_%d'%idx)
-         result_fname   = jp(result_dir,'map.vm.part_%d.heuristic.spliced'%idx)
+         result_fname   = jp(result_dir,'map.vm.part_%d'%idx)
          self.result_files.append(result_fname)
 
-         current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
+         current_job = KybJob(gridtools.ApproximationTaskStarter,[data_fname,param_fname,result_fname,self.settings])
          current_job.h_vmem = '25.0G'
          #current_job.express = 'True'
 
@@ -165,12 +151,13 @@ class ApproximationTask(ClusterTask):
    def collectResults(self):
       result_dir  = self.settings['approximation_dir']
       combined_fn = jp(result_dir,'map.vm.spliced')
+      self.result_files = map(lambda x:x+'.spliced',self.result_files)
       combine_files(self.result_files,combined_fn)
       combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
 
 
-def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
-   ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
+def ApproximationTaskStarter(data_fname,param_fname,result_fname,settings):
+   ph1 = PipelineHeuristic(data_fname,param_fname,result_fname,settings)
    ph1.filter()
 
    return 'finished filtering set %s.' % data_fname
@@ -201,14 +188,6 @@ class AlignmentTask(ClusterTask):
 
       num_splits = self.settings['num_splits']
 
-      jp = os.path.join
-
-      run   = cPickle.load(open(self.settings['run_fn']))
-      run['name'] = 'saved_run'
-
-      param_fn = self.settings['prediction_parameter_fn']
-
-      run['result_dir']    = self.settings['prediction_dir']
       dataset_fn           = self.settings['prediction_dataset_fn']
       prediction_keys_fn   = self.settings['prediction_dataset_keys_fn']
 
@@ -224,9 +203,9 @@ class AlignmentTask(ClusterTask):
             chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
 
       for c_name,current_chunk in chunks:
-         current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,run,dataset_fn,current_chunk,param_fn,c_name])
-         current_job.h_vmem = '20.0G'
-         #current_job.express = 'True'
+         current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,dataset_fn,current_chunk,c_name])
+         current_job.h_vmem = '2.0G'
+         current_job.express = 'True'
 
          print "job #1: ", current_job.nativeSpecification
 
@@ -239,14 +218,14 @@ class AlignmentTask(ClusterTask):
       print 'Got %d job(s)' % len(self.functionJobs)
 
 
-def AlignmentTaskStarter(settings,run,dataset_fn,prediction_keys,param_fn,set_name):
+def AlignmentTaskStarter(settings,dataset_fn,prediction_keys,set_name):
    """
   
    """
    accessWrapper = DataAccessWrapper(settings)
    seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
-   qp = QPalma(run,seqInfo)
-   qp.init_prediction(dataset_fn,prediction_keys,param_fn,set_name)
+   qp = QPalma(seqInfo)
+   qp.init_prediction(dataset_fn,prediction_keys,settings,set_name)
    return 'finished prediction of set %s.' % set_name
 
 
@@ -256,8 +235,40 @@ class TrainingTask(ClusterTask):
    This class represents the cluster task of training QPalma.
    """
 
-   def __init__(self):
-      ClusterTask.__init__(self)
+   def __init__(self,settings):
+      ClusterTask.__init__(self,settings)
+
+
+   def CreateJobs(self):
+      """
+
+      """
+
+      dataset_fn     = self.settings['training_dataset_fn']
+
+      set_name = 'training_set'
+
+      current_job = KybJob(gridtools.TrainingTaskStarter,[dataset_fn,self.settings,set_name])
+      current_job.h_vmem = '2.0G'
+      current_job.express = 'True'
+
+      print "job #1: ", current_job.nativeSpecification
+
+      self.functionJobs.append(current_job)
+
+      print 'Got %d job(s)' % len(self.functionJobs)
+
+   
+   def collectResults(self):
+      pass
+
+
+def TrainingTaskStarter(dataset_fn,settings,set_name):
+   accessWrapper = DataAccessWrapper(settings)
+   seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+   qp = QPalma(seqInfo)
+   qp.init_training(dataset_fn,settings,set_name)
+   return 'finished prediction of set %s.' % set_name
 
 
 class PostprocessingTask(ClusterTask):
@@ -283,13 +294,13 @@ class PostprocessingTask(ClusterTask):
       self.result_files = []
       for chunk_fn in chunks_fn:
          chunk_name  = chunk_fn[:chunk_fn.find('.')]
-         result_fn   = jp(self.result_dir,'%s.align'%chunk_name)
+         result_fn   = jp(self.result_dir,'%s.%s'%(chunk_name,self.settings['output_format']))
          chunk_fn = jp(run_dir,chunk_fn) 
 
          self.result_files.append(result_fn)
 
          current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
-         current_job.h_vmem = '15.0G'
+         current_job.h_vmem = '2.0G'
          current_job.express = 'True'
 
          print "job #1: ", current_job.nativeSpecification
@@ -298,7 +309,7 @@ class PostprocessingTask(ClusterTask):
 
 
    def collectResults(self):
-      combined_fn = jp(self.result_dir,'all_alignments.align')
+      combined_fn = jp(self.result_dir,'all_alignments.%s'%self.settings['output_format'])
       combine_files(self.result_files,combined_fn)