+ higher level cluster tasks interface for qpalma
authorfabio <fabio@e1793c9e-67f9-0310-80fc-b846ff1f7b36>
Tue, 5 Aug 2008 10:25:32 +0000 (10:25 +0000)
committerfabio <fabio@e1793c9e-67f9-0310-80fc-b846ff1f7b36>
Tue, 5 Aug 2008 10:25:32 +0000 (10:25 +0000)
git-svn-id: http://svn.tuebingen.mpg.de/ag-raetsch/projects/QPalma@10328 e1793c9e-67f9-0310-80fc-b846ff1f7b36

qpalma/gridtools.py [new file with mode: 0644]
scripts/qpalma_pipeline.py [new file with mode: 0644]

diff --git a/qpalma/gridtools.py b/qpalma/gridtools.py
new file mode 100644 (file)
index 0000000..be0a7fe
--- /dev/null
@@ -0,0 +1,246 @@
+#!/usr/bin/env python 
+# -*- coding: utf-8 -*- 
+
+import cPickle
+import math
+import os
+import os.path
+import pdb
+import sys
+import time
+
+from threading import Thread
+
+from pythongrid import KybJob, Usage
+from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
+
+from createAlignmentFileFromPrediction import create_alignment_file
+
+import gridtools
+
+jp = os.path.join
+
+
+class ClusterTask(Thread):
+   """
+   This class..
+   """
+   
+   def __init__(self):
+      self.sleep_time = 0
+
+      # this list stores the cluster/local jobs objects
+      self.functionJobs = []
+
+
+   def createJobs(self):
+      pass
+
+
+   def submit(self):
+      for current_job in self.functionJobs:
+         (sid, jobids) = submit_jobs([functionJobs])
+         time.sleep(self.sleep_time)
+
+
+   def checkIfTaskFinished(self):
+      pass
+   
+
+
+class ApproximationTask(ClusterTask):
+   """
+   """
+
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+   def g_heuristic(run_fname,data_fname,param_fname,result_fname):
+      #print run_fname,data_fname,param_fname,result_fname
+      ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
+      ph1.filter()
+
+      return 'finished filtering set %s.' % data_fname
+
+   def createJobs(self):
+      num_splits = 25
+
+      run_dir  = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
+      #data_dir = '/fml/ag-raetsch/home/fabio/tmp/lyrata_analysis/'
+
+      data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+
+      run_fname      = jp(run_dir,'run_obj.pickle')
+
+      #original_map_fname = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main/map.vm'
+      #split_file(original_map_fname,data_dir,num_splits)
+
+      param_fname    = jp(run_dir,'param_526.pickle')
+
+      functionJobs=[]
+
+      for idx in range(0,num_splits):
+         data_fname     = jp(data_dir,'map.part_%d'%idx)
+         result_fname   = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
+
+         current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
+         current_job.h_vmem = '25.0G'
+         #current_job.express = 'True'
+
+         print "job #1: ", current_job.nativeSpecification
+
+         self.functionJobs.append(current_job)
+
+
+
+
+class PreprocessingTask(ClusterTask):
+   """
+   This class encapsules some...
+   """
+
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+class AlignmentTask(ClusterTask):
+
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+   def get_slices(dataset_size,num_nodes):
+      all_instances = []
+
+      part = dataset_size / num_nodes
+      begin = 0
+      end = 0
+      for idx in range(1,num_nodes+1):
+         
+         if idx == num_nodes:
+            begin = end
+            end   = dataset_size
+         else:
+            begin = end
+            end = begin+part
+
+         params = (begin,end)
+
+         all_instances.append(params)
+
+      return all_instances
+
+
+   def makeJobs(run,dataset_fn,chunks,param):
+      """
+      """
+
+      jobs=[]
+
+      for c_name,current_chunk in chunks:
+         current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
+         current_job.h_vmem = '20.0G'
+         #current_job.express = 'True'
+
+         print "job #1: ", current_job.nativeSpecification
+
+         jobs.append(current_job)
+
+      return jobs
+
+
+   def create_and_submit():
+      """
+
+      """
+
+      jp = os.path.join
+
+      run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
+
+      run   = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
+      run['name'] = 'saved_run'
+
+      param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
+
+      run['result_dir']    = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
+      dataset_fn           = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
+      prediction_keys_fn   = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
+
+      prediction_keys = cPickle.load(open(prediction_keys_fn))
+
+      print 'Found %d keys for prediction.' % len(prediction_keys)
+
+      num_splits = 25
+      #num_splits = 1
+      slices = get_slices(len(prediction_keys),num_splits)
+      chunks = []
+      for idx,slice in enumerate(slices):
+         #if idx != 0:
+            c_name = 'chunk_%d' % idx
+            chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
+
+      self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
+
+      sum = 0
+      for size in [len(elem) for name,elem in chunks]:
+         sum += size
+      
+      print 'Got %d job(s)' % len(functionJobs)
+
+
+   def g_predict(run,dataset_fn,prediction_keys,param,set_name):
+      """
+     
+      """
+
+      qp = QPalma()
+      qp.predict(run,dataset_fn,prediction_keys,param,set_name)
+
+      return 'finished prediction of set %s.' % set_name
+
+
+
+class TrainingTask(ClusterTask):
+
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+class PostprocessingTask(ClusterTask):
+   """
+
+   """
+
+   def __init__(self):
+      ClusterTask.__init__(self)
+
+
+   def g_alignment(chunk_fn,result_fn):
+      create_alignment_file(chunk_fn,result_fn)
+
+
+   def createJobs(self):
+      run_dir     = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
+      result_dir  = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
+
+      chunks_fn = []
+      for fn in os.listdir(run_dir):
+         if fn.startswith('chunk'):
+            chunks_fn.append(fn)
+
+      print chunks_fn
+
+      functionJobs=[]
+
+      for chunk_fn in chunks_fn:
+         chunk_name  = chunk_fn[:chunk_fn.find('.')]
+         result_fn   = jp(result_dir,'%s.align_remap'%chunk_name)
+         chunk_fn = jp(run_dir,chunk_fn) 
+
+         current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
+         current_job.h_vmem = '15.0G'
+         current_job.express = 'True'
+
+         print "job #1: ", current_job.nativeSpecification
diff --git a/scripts/qpalma_pipeline.py b/scripts/qpalma_pipeline.py
new file mode 100644 (file)
index 0000000..3670dd7
--- /dev/null
@@ -0,0 +1,87 @@
+#!/usr/bin/env python 
+# -*- coding: utf-8 -*-
+
+#
+# This file contains the main interface to the QPalma pipeline.
+#
+#
+#
+
+
+from optparse import OptionParser
+
+
+from qpalma.gridtools import *
+
+
+def create_option_parser():
+   parser = OptionParser()
+
+   #  
+   parser.add_option("-ci", "--check_and_init", help="check configuration and initialize directories")
+
+   #
+   parser.add_option("-r", "--run", help="write report to FILE", metavar="FILE")
+
+   #
+   parser.add_option("-xx", "--clear", action="store_false", dest="verbose", help="cleanup directories delete all created data")
+
+   return parser
+
+
+
+
+class System:
+   """
+   This class wraps the outer loop of the qpalma project
+
+   It is responsible for:
+
+   - loading and checking the config file(s)
+   - setting up the different pipeline modules
+   - run the experiment and report the results
+
+   
+   """
+
+   # Before creating a candidate spliced read dataset we have to first filter
+   # the matches from the first seed finding run.
+
+   grid_heuristic()
+
+   # approx_task = ApproximationTask(...)
+   # approx_task.createJobs()
+   # approx_task.submit()
+   # approx_task.checkIfTaskFinished()
+
+   # After filtering combine the filtered matches from the first run and the
+   # found matches from the second run to a full dataset
+
+   createNewDataset
+   
+   # pre_task = PreprocessingTask(...)
+   # pre_task.createJobs()
+   # pre_task.submit()
+
+   # Now that we have a dataset we can perform the accurate alignments for this
+   # data
+
+   grid_predict()
+
+   # align_task = AlignmentTask(...)
+   # align_task.createJobs()
+   # align_task.submit()
+
+   # The results of the above alignment step can be converted to a data format
+   # needed for further postprocessing
+
+   grid_alignment()
+
+   # post_task = PostprocessingTask(...)
+   # post_task.createJobs()
+   # post_task.submit()
+
+
+if __name__ == '__main__':
+   parser = create_option_parser()
+   (options, args) = parser.parse_args()