2 # -*- coding: utf-8 -*-
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
20 from threading
import Thread
22 from pythongrid
import KybJob
, Usage
23 from pythongrid
import process_jobs
, submit_jobs
, collect_jobs
, get_status
25 from createAlignmentFileFromPrediction
import create_alignment_file
32 class ClusterTask(Thread
):
40 # this list stores the cluster/local jobs objects
41 self
.functionJobs
= []
49 for current_job
in self
.functionJobs
:
50 (sid
, jobids
) = submit_jobs([functionJobs
])
51 time
.sleep(self
.sleep_time
)
54 def checkIfTaskFinished(self
):
59 class ApproximationTask(ClusterTask
):
64 ClusterTask
.__init
__(self
)
67 def g_heuristic(run_fname
,data_fname
,param_fname
,result_fname
):
68 #print run_fname,data_fname,param_fname,result_fname
69 ph1
= PipelineHeuristic(run_fname
,data_fname
,param_fname
,result_fname
)
72 return 'finished filtering set %s.' % data_fname
77 run_dir
= '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
78 #data_dir = '/fml/ag-raetsch/home/fabio/tmp/lyrata_analysis/'
80 data_dir
= '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
82 run_fname
= jp(run_dir
,'run_obj.pickle')
84 #original_map_fname = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main/map.vm'
85 #split_file(original_map_fname,data_dir,num_splits)
87 param_fname
= jp(run_dir
,'param_526.pickle')
91 for idx
in range(0,num_splits
):
92 data_fname
= jp(data_dir
,'map.part_%d'%idx)
93 result_fname
= jp(data_dir
,'map.vm.part_%d.heuristic'%idx)
95 current_job
= KybJob(grid_heuristic
.g_heuristic
,[run_fname
,data_fname
,param_fname
,result_fname
])
96 current_job
.h_vmem
= '25.0G'
97 #current_job.express = 'True'
99 print "job #1: ", current_job
.nativeSpecification
101 self
.functionJobs
.append(current_job
)
106 class PreprocessingTask(ClusterTask
):
108 This class encapsules some...
112 ClusterTask
.__init
__(self
)
115 class AlignmentTask(ClusterTask
):
118 ClusterTask
.__init
__(self
)
121 def get_slices(dataset_size
,num_nodes
):
124 part
= dataset_size
/ num_nodes
127 for idx
in range(1,num_nodes
+1):
138 all_instances
.append(params
)
143 def makeJobs(run
,dataset_fn
,chunks
,param
):
149 for c_name
,current_chunk
in chunks
:
150 current_job
= KybJob(grid_predict
.g_predict
,[run
,dataset_fn
,current_chunk
,param
,c_name
])
151 current_job
.h_vmem
= '20.0G'
152 #current_job.express = 'True'
154 print "job #1: ", current_job
.nativeSpecification
156 jobs
.append(current_job
)
161 def create_and_submit():
168 run_dir
= '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
170 run
= cPickle
.load(open(jp(run_dir
,'run_obj.pickle')))
171 run
['name'] = 'saved_run'
173 param
= cPickle
.load(open(jp(run_dir
,'param_526.pickle')))
175 run
['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
176 dataset_fn
= '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
177 prediction_keys_fn
= '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
179 prediction_keys
= cPickle
.load(open(prediction_keys_fn
))
181 print 'Found %d keys for prediction.' % len(prediction_keys
)
185 slices
= get_slices(len(prediction_keys
),num_splits
)
187 for idx
,slice in enumerate(slices
):
189 c_name
= 'chunk_%d' % idx
190 chunks
.append((c_name
,prediction_keys
[slice[0]:slice[1]]))
192 self
.functionJobs
= makeJobs(run
,dataset_fn
,chunks
,param
)
195 for size
in [len(elem
) for name
,elem
in chunks
]:
198 print 'Got %d job(s)' % len(functionJobs
)
201 def g_predict(run
,dataset_fn
,prediction_keys
,param
,set_name
):
207 qp
.predict(run
,dataset_fn
,prediction_keys
,param
,set_name
)
209 return 'finished prediction of set %s.' % set_name
213 class TrainingTask(ClusterTask
):
216 ClusterTask
.__init
__(self
)
219 class PostprocessingTask(ClusterTask
):
225 ClusterTask
.__init
__(self
)
228 def g_alignment(chunk_fn
,result_fn
):
229 create_alignment_file(chunk_fn
,result_fn
)
232 def createJobs(self
):
233 run_dir
= '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
234 result_dir
= '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
237 for fn
in os
.listdir(run_dir
):
238 if fn
.startswith('chunk'):
245 for chunk_fn
in chunks_fn
:
246 chunk_name
= chunk_fn
[:chunk_fn
.find('.')]
247 result_fn
= jp(result_dir
,'%s.align_remap'%chunk
_name
)
248 chunk_fn
= jp(run_dir
,chunk_fn
)
250 current_job
= KybJob(grid_alignment
.g_alignment
,[chunk_fn
,result_fn
])
251 current_job
.h_vmem
= '15.0G'
252 current_job
.express
= 'True'
254 print "job #1: ", current_job
.nativeSpecification