Every task creates a status file.
All cluster jobs submit then their exit status to this status file.
- Job / status
+ Every cluster task subclass should override/implement the methods:
+
+ 1. __init__
+ 2. CreateJobs
+ 3. TaskStarter
- 1 success
- 2 failed
- 3 waiting
-
- means then job with id 1 terminated with success.
"""
def __init__(self,global_settings):
self.sleep_time = 0
# this list stores the cluster/local jobs objects
- self.functionJobs = []
+ self.functionJobs = []
+ # this object stores the configuration
self.global_settings = global_settings
- #self.allJobIds = {}
-
- def createJobs(self):
+ def CreateJobs(self):
+ """
+ This method create an array of jobs called self.functionJobs. It is only
+ virtual in this base class and has to be implemented specifically for
+ each cluster task.
+ """
pass
- def submit(self):
+ def Submit(self):
+ """
+ After creation of jobs this function submits them to the cluster.
+ """
for current_job in self.functionJobs:
self.sid, self.jobids = submit_jobs([functionJobs])
- def restart(self,id):
+
+ def Restart(self,id):
pass
- def checkIfTaskFinished(self):
+ def CheckIfTaskFinished(self):
"""
- currentJobStatus = {}
- for line in open(self.statusFile):
- id,status = line.split()
- currentJobStatus[id] = status
-
- for key in self.allJobIds.keys():
- try:
- self.currentJobStatus[key]
- except:
- self.currentJobStatus[key] = 'no response'
+ This function is responsible for checking whether the submitted jobs were
+ completed successfully.
"""
print 'checking whether finished'
class ApproximationTask(ClusterTask):
"""
+ This task represents the first step towards a valid QPalma dataset.
+ It starts an approximative QPalma model on the putative unspliced reads to
+ identify true spliced reads.
"""
def __init__(self):
ClusterTask.__init__(self)
- def g_heuristic(run_fname,data_fname,param_fname,result_fname):
- ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
- ph1.filter()
-
- return 'finished filtering set %s.' % data_fname
-
- def createJobs(self):
+ def CreateJobs(self):
+ """
+ """
num_splits = self.global_settings['num_splits']
run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
param_fname = jp(run_dir,'param_526.pickle')
+ # param_fname = self.global_settings['prediction_parameter_fn']
run_fname = jp(run_dir,'run_obj.pickle')
+ # run_fname = self.global_settings['run_fn']
- data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+ #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
+ result_dir = self.global_settings['approximation_dir']
original_map_fname = self.global_settings['read_ascii_data_fn']
- split_file(original_map_fname,data_dir,num_splits)
-
- functionJobs=[]
+ split_file(original_map_fname,result_dir,num_splits)
for idx in range(0,num_splits):
- data_fname = jp(data_dir,'map.part_%d'%idx)
- result_fname = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
+ data_fname = jp(result_dir,'map.part_%d'%idx)
+ result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
- current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
+ current_job = KybJob(grid_heuristic.TaskStarter,[run_fname,data_fname,param_fname,result_fname])
current_job.h_vmem = '25.0G'
#current_job.express = 'True'
self.functionJobs.append(current_job)
+ def TaskStarter(run_fname,data_fname,param_fname,result_fname):
+ ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
+ ph1.filter()
+
+ return 'finished filtering set %s.' % data_fname
class PreprocessingTask(ClusterTask):
ClusterTask.__init__(self)
-
-
- def makeJobs(run,dataset_fn,chunks,param):
- """
- """
-
- jobs=[]
-
- for c_name,current_chunk in chunks:
- current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
- current_job.h_vmem = '20.0G'
- #current_job.express = 'True'
-
- print "job #1: ", current_job.nativeSpecification
-
- jobs.append(current_job)
-
- return jobs
-
-
- def create_and_submit():
+ def CreateJobs():
"""
"""
c_name = 'chunk_%d' % idx
chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
- self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
+ for c_name,current_chunk in chunks:
+ current_job = KybJob(grid_predict.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
+ current_job.h_vmem = '20.0G'
+ #current_job.express = 'True'
+
+ print "job #1: ", current_job.nativeSpecification
+
+ self.functionJobs.append(current_job)
sum = 0
for size in [len(elem) for name,elem in chunks]:
print 'Got %d job(s)' % len(functionJobs)
- def g_predict(run,dataset_fn,prediction_keys,param,set_name):
+ def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
"""
"""
class TrainingTask(ClusterTask):
+ """
+ This class represents the cluster task of training QPalma.
+ """
def __init__(self):
ClusterTask.__init__(self)
class PostprocessingTask(ClusterTask):
"""
-
+ After QPalma predicted alignments this task postprocesses the data.
"""
def __init__(self):
ClusterTask.__init__(self)
- def g_alignment(chunk_fn,result_fn):
+ def TaskStarter(chunk_fn,result_fn):
create_alignment_file(chunk_fn,result_fn)
- def createJobs(self):
+ def CreateJobs(self):
run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
functionJobs=[]
+ for chunk_fn in chunks_fn:
+ chunk_name = chunk_fn[:chunk_fn.find('.')]
+ result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
+ chunk_fn = jp(run_dir,chunk_fn)
+
+ current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
+ current_job.h_vmem = '15.0G'
+ current_job.express = 'True'
+
+ print "job #1: ", current_job.nativeSpecification
+
+
+class DummyTask(ClusterTask):
+ """
+ This class represents a dummy task to make debugging easier.
+ """
+ def __init__(self):
+ ClusterTask.__init__(self)
+
+
+ def TaskStarter(param):
+ create_alignment_file(chunk_fn,result_fn)
+
+
+ def CreateJobs(self):
+ run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
+ result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
+
for chunk_fn in chunks_fn:
chunk_name = chunk_fn[:chunk_fn.find('.')]
result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
# This file contains the main interface to the QPalma pipeline.
#
+import os
+import os.path
+import pdb
+import sys
+
from optparse import OptionParser
from qpalma.gridtools import ApproximationTask,PreprocessingTask
from qpalma.gridtools import AlignmentTask,PostprocessingTask
+
+Errormsg = """Usage is: python qpalma_pipeline.py <config filename>"""
+
+
+"""
def create_option_parser():
parser = OptionParser()
parser.add_option("-xx", "--clear", action="store_false", dest="verbose", help="cleanup directories delete all created data")
return parser
+"""
+
+jp = os.path.join
+
+def parseSettings(filename):
+ """
+ """
+
+ #global_settings = {\
+ #'result_dir':'/fml/ag-raetsch/...',\
+ #'read_ascii_data_fn':'/fml/ag-raetsch/...',\
+ #'num_splits':50
+ #'global_log_fn':'~/qpalma.log'
+ #}
+
+ global_settings = {}
+
+ for line in open(filename):
+ if not line.strip() or line.startswith('#'):
+ continue
+
+ key,val = line.strip().replace(' ','').split('=')
+ global_settings[key] = val
+
+ return global_settings
+
+
+def makeSettings(global_settings):
+ """
+
+ """
+
+ # first check wether the top level result directory exists
+ assert os.path.exists(global_settings['result_dir']), 'Error: You have to specify a existing result directory!'
+
+ result_dir = global_settings['result_dir']
+
+ # now create some subdirectories needed for the different steps performed by QPalma
+ global_settings['approximation_dir'] = jp(result_dir,'approximation')
+ global_settings['preproc_dir'] = jp(result_dir,'preprocessing')
+ global_settings['postproc_dir'] = jp(result_dir,'postprocessing')
+ global_settings['prediction_dir'] = jp(result_dir,'prediction')
+ global_settings['training_dir'] = jp(result_dir,'training')
+
+ for dir_name in ['approximation_dir', 'preproc_dir', 'postproc_dir', 'prediction_dir', 'training_dir']:
+ try:
+ os.mkdir(global_settings[dir_name])
+ except:
+ print 'Error: There was a problem generating the subdirectory: %s' % dir_name
+
+ try:
+ os.mkdir(global_settings['global_log_fn'])
+ except:
+ print 'Error: There was a problem generating the logfile %s' % global_settings['global_log_fn']
+
+ return global_settings
+
+
+def checkSettings(global_settings):
+ for key,val in global_settings.items():
+ if key.endswith('_fn'):
+ assert os.path.exists(val), 'Error: Path/File %s with value %s does not seem to exist!' % (key,val)
+
+
+ if key.endswith('_dir'):
+ assert os.path.exists(val), 'Error: Path/File %s with value %s does not seem to exist!' % (key,val)
+
+
+ return True
-global_settings = {\
-'experiment_dir':'/fml/ag-raetsch/...',\
-'read_ascii_data_fn':'/fml/ag-raetsch/...',\
-'num_splits':50
-'global_log_fn':'~/qpalma.log'
-}
"""
- def __init__(self):
+ def __init__(self,filename):
"""
+ Inititalize the system by loading and parsing the settings file to obtain
+ all parameters.
"""
- parser = create_option_parser()
- (options, args) = parser.parse_args()
+
+ #parser = create_option_parser()
+ #(options, args) = parser.parse_args()
+
+ global_settings = parseSettings(filename)
+ global_settings = makeSettings(global_settings)
+ assert checkSettings(global_settings), 'Check your settings some entries were invalid!'
+
+ self.global_settings = global_settings
+
+ pdb.set_trace()
def training(self):
"""
algorithm.
"""
- pre_task = TrainingPreprocessingTask(global_settings,run_specific_settings)
+ pre_task = TrainingPreprocessingTask(self.global_settings)
pre_task.createJobs()
pre_task.submit()
while pre_task.checkIfTaskFinished() == False:
# Before creating a candidate spliced read dataset we have to first filter
# the matches from the first seed finding run.
- approx_task = ApproximationTask(config_obj)
+ approx_task = ApproximationTask(self.global_settings)
approx_task.createJobs()
approx_task.submit()
approx_task.checkIfTaskFinished()
# After filtering combine the filtered matches from the first run and the
# found matches from the second run to a full dataset
- pre_task = PreprocessingTask(...)
+ pre_task = PreprocessingTask(self.global_settings)
pre_task.createJobs()
pre_task.submit()
- while pre_task.checkIfTaskFinished() == False:
- sleep(20)
+ pre_task.checkIfTaskFinished()
# Now that we have a dataset we can perform the accurate alignments for this
# data
- align_task = AlignmentTask(...)
+ align_task = AlignmentTask(self.global_settings)
align_task.createJobs()
align_task.submit()
- while align_task.checkIfTaskFinished() == False:
- sleep(20)
+ align_task.checkIfTaskFinished()
# The results of the above alignment step can be converted to a data format
# needed for further postprocessing.
- post_task = PostprocessingTask(...)
+ post_task = PostprocessingTask(self.global_settings)
post_task.createJobs()
post_task.submit()
- while post_task.checkIfTaskFinished() == False:
- sleep(20)
+ post_task.checkIfTaskFinished()
print "Success!"
if __name__ == '__main__':
- system_obj = System()
- system_obj.run()
+ filename = sys.argv[1]
+ assert os.path.exists(filename), Errormsg
+ system_obj = System(filename)
+ #system_obj.prediction()
+ #system_obj.training()