+ higher level cluster tasks interface for qpalma
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import cPickle
5 import math
6 import os
7 import os.path
8 import pdb
9 import sys
10 import time
11
12 from threading import Thread
13
14 from pythongrid import KybJob, Usage
15 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
16
17 from createAlignmentFileFromPrediction import create_alignment_file
18
19 import gridtools
20
21 jp = os.path.join
22
23
24 class ClusterTask(Thread):
25 """
26 This class..
27 """
28
29 def __init__(self):
30 self.sleep_time = 0
31
32 # this list stores the cluster/local jobs objects
33 self.functionJobs = []
34
35
36 def createJobs(self):
37 pass
38
39
40 def submit(self):
41 for current_job in self.functionJobs:
42 (sid, jobids) = submit_jobs([functionJobs])
43 time.sleep(self.sleep_time)
44
45
46 def checkIfTaskFinished(self):
47 pass
48
49
50
51 class ApproximationTask(ClusterTask):
52 """
53 """
54
55 def __init__(self):
56 ClusterTask.__init__(self)
57
58
59 def g_heuristic(run_fname,data_fname,param_fname,result_fname):
60 #print run_fname,data_fname,param_fname,result_fname
61 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
62 ph1.filter()
63
64 return 'finished filtering set %s.' % data_fname
65
66 def createJobs(self):
67 num_splits = 25
68
69 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
70 #data_dir = '/fml/ag-raetsch/home/fabio/tmp/lyrata_analysis/'
71
72 data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
73
74 run_fname = jp(run_dir,'run_obj.pickle')
75
76 #original_map_fname = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main/map.vm'
77 #split_file(original_map_fname,data_dir,num_splits)
78
79 param_fname = jp(run_dir,'param_526.pickle')
80
81 functionJobs=[]
82
83 for idx in range(0,num_splits):
84 data_fname = jp(data_dir,'map.part_%d'%idx)
85 result_fname = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
86
87 current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
88 current_job.h_vmem = '25.0G'
89 #current_job.express = 'True'
90
91 print "job #1: ", current_job.nativeSpecification
92
93 self.functionJobs.append(current_job)
94
95
96
97
98 class PreprocessingTask(ClusterTask):
99 """
100 This class encapsules some...
101 """
102
103 def __init__(self):
104 ClusterTask.__init__(self)
105
106
107 class AlignmentTask(ClusterTask):
108
109 def __init__(self):
110 ClusterTask.__init__(self)
111
112
113 def get_slices(dataset_size,num_nodes):
114 all_instances = []
115
116 part = dataset_size / num_nodes
117 begin = 0
118 end = 0
119 for idx in range(1,num_nodes+1):
120
121 if idx == num_nodes:
122 begin = end
123 end = dataset_size
124 else:
125 begin = end
126 end = begin+part
127
128 params = (begin,end)
129
130 all_instances.append(params)
131
132 return all_instances
133
134
135 def makeJobs(run,dataset_fn,chunks,param):
136 """
137 """
138
139 jobs=[]
140
141 for c_name,current_chunk in chunks:
142 current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
143 current_job.h_vmem = '20.0G'
144 #current_job.express = 'True'
145
146 print "job #1: ", current_job.nativeSpecification
147
148 jobs.append(current_job)
149
150 return jobs
151
152
153 def create_and_submit():
154 """
155
156 """
157
158 jp = os.path.join
159
160 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
161
162 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
163 run['name'] = 'saved_run'
164
165 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
166
167 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
168 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
169 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
170
171 prediction_keys = cPickle.load(open(prediction_keys_fn))
172
173 print 'Found %d keys for prediction.' % len(prediction_keys)
174
175 num_splits = 25
176 #num_splits = 1
177 slices = get_slices(len(prediction_keys),num_splits)
178 chunks = []
179 for idx,slice in enumerate(slices):
180 #if idx != 0:
181 c_name = 'chunk_%d' % idx
182 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
183
184 self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
185
186 sum = 0
187 for size in [len(elem) for name,elem in chunks]:
188 sum += size
189
190 print 'Got %d job(s)' % len(functionJobs)
191
192
193 def g_predict(run,dataset_fn,prediction_keys,param,set_name):
194 """
195
196 """
197
198 qp = QPalma()
199 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
200
201 return 'finished prediction of set %s.' % set_name
202
203
204
205 class TrainingTask(ClusterTask):
206
207 def __init__(self):
208 ClusterTask.__init__(self)
209
210
211 class PostprocessingTask(ClusterTask):
212 """
213
214 """
215
216 def __init__(self):
217 ClusterTask.__init__(self)
218
219
220 def g_alignment(chunk_fn,result_fn):
221 create_alignment_file(chunk_fn,result_fn)
222
223
224 def createJobs(self):
225 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
226 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
227
228 chunks_fn = []
229 for fn in os.listdir(run_dir):
230 if fn.startswith('chunk'):
231 chunks_fn.append(fn)
232
233 print chunks_fn
234
235 functionJobs=[]
236
237 for chunk_fn in chunks_fn:
238 chunk_name = chunk_fn[:chunk_fn.find('.')]
239 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
240 chunk_fn = jp(run_dir,chunk_fn)
241
242 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
243 current_job.h_vmem = '15.0G'
244 current_job.express = 'True'
245
246 print "job #1: ", current_job.nativeSpecification