+ update makefiles to fetch automatically valid Python includes and libs
[qpalma.git] / qpalma / gridtools.py
index be0a7fe..6aeb856 100644 (file)
@@ -1,8 +1,14 @@
-#!/usr/bin/env python 
-# -*- coding: utf-8 -*- 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# Written (W) 2008 Fabio De Bona
+# Copyright (C) 2008 Max-Planck-Society
 
 import cPickle
 import math
+import time
 import os
 import os.path
 import pdb
@@ -14,77 +20,126 @@ from threading import Thread
 from pythongrid import KybJob, Usage
 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
 
-from createAlignmentFileFromPrediction import create_alignment_file
+from qpalma.OutputFormat import createAlignmentOutput
+
+from PipelineHeuristic import *
 
 import gridtools
 
+from utils import get_slices,split_file,combine_files
+
+from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
+
+from qpalma_main import QPalma
+
 jp = os.path.join
+pjoin = lambda *args: reduce(lambda x,y: jp(x,y),args)
 
 
 class ClusterTask(Thread):
    """
    This class..
+
+   Every task creates a status file.
+   All cluster jobs submit then their exit status to this status file.
+
+   Every cluster task subclass should override/implement the methods:
+
+   1. __init__
+   2. CreateJobs
+   3. TaskStarter
+
    """
    
-   def __init__(self):
+   def __init__(self,settings):
       self.sleep_time = 0
 
       # this list stores the cluster/local jobs objects
-      self.functionJobs = []
+      self.functionJobs    = []
 
+      # this object stores the configuration
+      self.settings = settings
 
-   def createJobs(self):
+
+   def CreateJobs(self):
+      """
+      This method create an array of jobs called self.functionJobs.  It is only
+      virtual in this base class and has to be implemented specifically for
+      each cluster task.
+      """
       pass
 
 
-   def submit(self):
-      for current_job in self.functionJobs:
-         (sid, jobids) = submit_jobs([functionJobs])
-         time.sleep(self.sleep_time)
+   def Submit(self):
+      """
+      After creation of jobs this function submits them to the cluster.
+      """
+      self.sid, self.jobids = submit_jobs(self.functionJobs)
+      #self.processedFunctionJobs = process_jobs(self.functionJobs,local=True,maxNumThreads=1)
 
 
-   def checkIfTaskFinished(self):
+   def Restart(self,id):
       pass
-   
 
 
-class ApproximationTask(ClusterTask):
-   """
-   """
+   def collectResults(self):
+      pass
 
-   def __init__(self):
-      ClusterTask.__init__(self)
 
+   def CheckIfTaskFinished(self):
+      """
+      This function is responsible for checking whether the submitted jobs were
+      completed successfully.
+      """
 
-   def g_heuristic(run_fname,data_fname,param_fname,result_fname):
-      #print run_fname,data_fname,param_fname,result_fname
-      ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
-      ph1.filter()
+      print 'collecting jobs'
+      retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs, True)
+      print "ret fields AFTER execution on cluster"
+      for (i, job) in enumerate(retjobs):
+         print "Job #", i, "- ret: ", job.ret
 
-      return 'finished filtering set %s.' % data_fname
+      print '--------------'
 
-   def createJobs(self):
-      num_splits = 25
+      self.collectResults()
 
-      run_dir  = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
-      #data_dir = '/fml/ag-raetsch/home/fabio/tmp/lyrata_analysis/'
 
-      data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+class ApproximationTask(ClusterTask):
+   """
+   This task represents the first step towards a valid QPalma dataset.
+   It starts an approximative QPalma model on the putative unspliced reads to
+   identify true spliced reads.
+   """
 
-      run_fname      = jp(run_dir,'run_obj.pickle')
+   def __init__(self,settings):
+      ClusterTask.__init__(self,settings)
 
-      #original_map_fname = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main/map.vm'
-      #split_file(original_map_fname,data_dir,num_splits)
 
-      param_fname    = jp(run_dir,'param_526.pickle')
+   def CreateJobs(self):
+      """
+      Create...
+      """
 
-      functionJobs=[]
+      num_splits = self.settings['num_splits']
+
+      #run_dir  = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
+      #param_fname    = jp(run_dir,'param_526.pickle')
+      param_fname = self.settings['prediction_param_fn']
+      #run_fname      = jp(run_dir,'run_obj.pickle')
+
+      #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+      result_dir = self.settings['approximation_dir']
+
+      original_map_fname = self.settings['unspliced_reads_fn']
+      split_file(original_map_fname,result_dir,num_splits)
+   
+      self.result_files = []
 
       for idx in range(0,num_splits):
-         data_fname     = jp(data_dir,'map.part_%d'%idx)
-         result_fname   = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
+         data_fname     = jp(result_dir,'map.part_%d'%idx)
+         result_fname   = jp(result_dir,'map.vm.part_%d'%idx)
+         self.result_files.append(result_fname)
 
-         current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
+         current_job = KybJob(gridtools.ApproximationTaskStarter,[data_fname,param_fname,result_fname,self.settings])
          current_job.h_vmem = '25.0G'
          #current_job.express = 'True'
 
@@ -93,6 +148,19 @@ class ApproximationTask(ClusterTask):
          self.functionJobs.append(current_job)
 
 
+   def collectResults(self):
+      result_dir  = self.settings['approximation_dir']
+      combined_fn = jp(result_dir,'map.vm.spliced')
+      self.result_files = map(lambda x:x+'.spliced',self.result_files)
+      combine_files(self.result_files,combined_fn)
+      combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
+
+
+def ApproximationTaskStarter(data_fname,param_fname,result_fname,settings):
+   ph1 = PipelineHeuristic(data_fname,param_fname,result_fname,settings)
+   ph1.filter()
+
+   return 'finished filtering set %s.' % data_fname
 
 
 class PreprocessingTask(ClusterTask):
@@ -105,141 +173,173 @@ class PreprocessingTask(ClusterTask):
 
 
 class AlignmentTask(ClusterTask):
+   """
+   This task represents the main part of QPalma. 
+   """
 
-   def __init__(self):
-      ClusterTask.__init__(self)
-
+   def __init__(self,settings):
+      ClusterTask.__init__(self,settings)
 
-   def get_slices(dataset_size,num_nodes):
-      all_instances = []
 
-      part = dataset_size / num_nodes
-      begin = 0
-      end = 0
-      for idx in range(1,num_nodes+1):
-         
-         if idx == num_nodes:
-            begin = end
-            end   = dataset_size
-         else:
-            begin = end
-            end = begin+part
+   def CreateJobs(self):
+      """
 
-         params = (begin,end)
+      """
 
-         all_instances.append(params)
+      num_splits = self.settings['num_splits']
 
-      return all_instances
+      dataset_fn           = self.settings['prediction_dataset_fn']
+      prediction_keys_fn   = self.settings['prediction_dataset_keys_fn']
 
+      prediction_keys = cPickle.load(open(prediction_keys_fn))
 
-   def makeJobs(run,dataset_fn,chunks,param):
-      """
-      """
+      print 'Found %d keys for prediction.' % len(prediction_keys)
 
-      jobs=[]
+      slices = get_slices(len(prediction_keys),num_splits)
+      chunks = []
+      for idx,slice in enumerate(slices):
+         #if idx != 0:
+            c_name = 'chunk_%d' % idx
+            chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
 
       for c_name,current_chunk in chunks:
-         current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
-         current_job.h_vmem = '20.0G'
-         #current_job.express = 'True'
+         current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,dataset_fn,current_chunk,c_name])
+         current_job.h_vmem = '2.0G'
+         current_job.express = 'True'
 
          print "job #1: ", current_job.nativeSpecification
 
-         jobs.append(current_job)
+         self.functionJobs.append(current_job)
+
+      sum = 0
+      for size in [len(elem) for name,elem in chunks]:
+         sum += size
+      
+      print 'Got %d job(s)' % len(self.functionJobs)
+
+
+def AlignmentTaskStarter(settings,dataset_fn,prediction_keys,set_name):
+   """
+  
+   """
+   accessWrapper = DataAccessWrapper(settings)
+   seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+   qp = QPalma(seqInfo)
+   qp.init_prediction(dataset_fn,prediction_keys,settings,set_name)
+   return 'finished prediction of set %s.' % set_name
+
+
+
+class TrainingTask(ClusterTask):
+   """
+   This class represents the cluster task of training QPalma.
+   """
 
-      return jobs
+   def __init__(self,settings):
+      ClusterTask.__init__(self,settings)
 
 
-   def create_and_submit():
+   def CreateJobs(self):
       """
 
       """
 
-      jp = os.path.join
+      dataset_fn     = self.settings['training_dataset_fn']
 
-      run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
+      set_name = 'training_set'
 
-      run   = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
-      run['name'] = 'saved_run'
+      current_job = KybJob(gridtools.TrainingTaskStarter,[dataset_fn,self.settings,set_name])
+      current_job.h_vmem = '2.0G'
+      current_job.express = 'True'
 
-      param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
+      print "job #1: ", current_job.nativeSpecification
 
-      run['result_dir']    = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
-      dataset_fn           = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
-      prediction_keys_fn   = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
+      self.functionJobs.append(current_job)
 
-      prediction_keys = cPickle.load(open(prediction_keys_fn))
+      print 'Got %d job(s)' % len(self.functionJobs)
 
-      print 'Found %d keys for prediction.' % len(prediction_keys)
+   
+   def collectResults(self):
+      pass
 
-      num_splits = 25
-      #num_splits = 1
-      slices = get_slices(len(prediction_keys),num_splits)
-      chunks = []
-      for idx,slice in enumerate(slices):
-         #if idx != 0:
-            c_name = 'chunk_%d' % idx
-            chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
 
-      self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
+def TrainingTaskStarter(dataset_fn,settings,set_name):
+   accessWrapper = DataAccessWrapper(settings)
+   seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
+   qp = QPalma(seqInfo)
+   qp.init_training(dataset_fn,settings,set_name)
+   return 'finished prediction of set %s.' % set_name
 
-      sum = 0
-      for size in [len(elem) for name,elem in chunks]:
-         sum += size
-      
-      print 'Got %d job(s)' % len(functionJobs)
 
+class PostprocessingTask(ClusterTask):
+   """
+   After QPalma predicted alignments this task postprocesses the data.
+   """
 
-   def g_predict(run,dataset_fn,prediction_keys,param,set_name):
-      """
-     
-      """
+   def __init__(self,settings):
+      ClusterTask.__init__(self,settings)
+
+
+   def CreateJobs(self):
+      run_dir     = self.settings['prediction_dir']
+      self.result_dir  = self.settings['alignment_dir']
 
-      qp = QPalma()
-      qp.predict(run,dataset_fn,prediction_keys,param,set_name)
+      chunks_fn = []
+      for fn in os.listdir(run_dir):
+         if fn.startswith('chunk'):
+            chunks_fn.append(fn)
 
-      return 'finished prediction of set %s.' % set_name
+      functionJobs=[]
 
+      self.result_files = []
+      for chunk_fn in chunks_fn:
+         chunk_name  = chunk_fn[:chunk_fn.find('.')]
+         result_fn   = jp(self.result_dir,'%s.%s'%(chunk_name,self.settings['output_format']))
+         chunk_fn = jp(run_dir,chunk_fn) 
 
+         self.result_files.append(result_fn)
 
-class TrainingTask(ClusterTask):
+         current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
+         current_job.h_vmem = '2.0G'
+         current_job.express = 'True'
 
-   def __init__(self):
-      ClusterTask.__init__(self)
+         print "job #1: ", current_job.nativeSpecification
 
+         self.functionJobs.append(current_job)
 
-class PostprocessingTask(ClusterTask):
-   """
 
-   """
+   def collectResults(self):
+      combined_fn = jp(self.result_dir,'all_alignments.%s'%self.settings['output_format'])
+      combine_files(self.result_files,combined_fn)
+
+
+def PostProcessingTaskStarter(settings,chunk_fn,result_fn):
+   createAlignmentOutput(settings,chunk_fn,result_fn)
+
 
+
+class DummyTask(ClusterTask):
+   """
+   This class represents a dummy task to make debugging easier.
+   """
    def __init__(self):
       ClusterTask.__init__(self)
 
 
-   def g_alignment(chunk_fn,result_fn):
+   def DummyTaskStarter(param):
       create_alignment_file(chunk_fn,result_fn)
 
 
-   def createJobs(self):
+   def CreateJobs(self):
       run_dir     = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
       result_dir  = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
 
-      chunks_fn = []
-      for fn in os.listdir(run_dir):
-         if fn.startswith('chunk'):
-            chunks_fn.append(fn)
-
-      print chunks_fn
-
-      functionJobs=[]
-
       for chunk_fn in chunks_fn:
          chunk_name  = chunk_fn[:chunk_fn.find('.')]
          result_fn   = jp(result_dir,'%s.align_remap'%chunk_name)
          chunk_fn = jp(run_dir,chunk_fn) 
 
-         current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
+         current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
          current_job.h_vmem = '15.0G'
          current_job.express = 'True'