+ added settings in the form of a global and a run specific part
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 import gridtools
28
29 from utils import get_slices
30
31 jp = os.path.join
32
33
34 class ClusterTask(Thread):
35 """
36 This class..
37
38 Every task creates a status file.
39 All cluster jobs submit then their exit status to this status file.
40
41 Job / status
42
43 1 success
44 2 failed
45 3 waiting
46
47 means then job with id 1 terminated with success.
48 """
49
50 def __init__(self,global_settings):
51 self.sleep_time = 0
52
53 # this list stores the cluster/local jobs objects
54 self.functionJobs = []
55
56 self.global_settings = global_settings
57
58 #self.allJobIds = {}
59
60
61 def createJobs(self):
62 pass
63
64
65 def submit(self):
66 for current_job in self.functionJobs:
67 self.sid, self.jobids = submit_jobs([functionJobs])
68
69 def restart(self,id):
70 pass
71
72
73 def checkIfTaskFinished(self):
74 """
75 currentJobStatus = {}
76 for line in open(self.statusFile):
77 id,status = line.split()
78 currentJobStatus[id] = status
79
80 for key in self.allJobIds.keys():
81 try:
82 self.currentJobStatus[key]
83 except:
84 self.currentJobStatus[key] = 'no response'
85 """
86
87 print 'checking whether finished'
88 while not get_status(self.sid, self.jobids):
89 time.sleep(7)
90 print 'collecting jobs'
91 retjobs = collect_jobs(sid, jobids, myjobs)
92 print "ret fields AFTER execution on cluster"
93 for (i, job) in enumerate(retjobs):
94 print "Job #", i, "- ret: ", job.ret
95
96 print '--------------'
97
98
99
100 class ApproximationTask(ClusterTask):
101 """
102 """
103
104 def __init__(self):
105 ClusterTask.__init__(self)
106
107
108 def g_heuristic(run_fname,data_fname,param_fname,result_fname):
109 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
110 ph1.filter()
111
112 return 'finished filtering set %s.' % data_fname
113
114 def createJobs(self):
115 num_splits = self.global_settings['num_splits']
116
117 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
118 param_fname = jp(run_dir,'param_526.pickle')
119 run_fname = jp(run_dir,'run_obj.pickle')
120
121 data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
122
123 original_map_fname = self.global_settings['read_ascii_data_fn']
124 split_file(original_map_fname,data_dir,num_splits)
125
126 functionJobs=[]
127
128 for idx in range(0,num_splits):
129 data_fname = jp(data_dir,'map.part_%d'%idx)
130 result_fname = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
131
132 current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
133 current_job.h_vmem = '25.0G'
134 #current_job.express = 'True'
135
136 print "job #1: ", current_job.nativeSpecification
137
138 self.functionJobs.append(current_job)
139
140
141
142
143 class PreprocessingTask(ClusterTask):
144 """
145 This class encapsules some...
146 """
147
148 def __init__(self):
149 ClusterTask.__init__(self)
150
151
152 class AlignmentTask(ClusterTask):
153
154 def __init__(self):
155 ClusterTask.__init__(self)
156
157
158
159
160 def makeJobs(run,dataset_fn,chunks,param):
161 """
162 """
163
164 jobs=[]
165
166 for c_name,current_chunk in chunks:
167 current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
168 current_job.h_vmem = '20.0G'
169 #current_job.express = 'True'
170
171 print "job #1: ", current_job.nativeSpecification
172
173 jobs.append(current_job)
174
175 return jobs
176
177
178 def create_and_submit():
179 """
180
181 """
182
183 num_splits = self.global_settings['num_splits']
184
185 jp = os.path.join
186
187 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
188
189 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
190 run['name'] = 'saved_run'
191
192 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
193
194 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
195 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
196 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
197
198 prediction_keys = cPickle.load(open(prediction_keys_fn))
199
200 print 'Found %d keys for prediction.' % len(prediction_keys)
201
202 slices = get_slices(len(prediction_keys),num_splits)
203 chunks = []
204 for idx,slice in enumerate(slices):
205 #if idx != 0:
206 c_name = 'chunk_%d' % idx
207 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
208
209 self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
210
211 sum = 0
212 for size in [len(elem) for name,elem in chunks]:
213 sum += size
214
215 print 'Got %d job(s)' % len(functionJobs)
216
217
218 def g_predict(run,dataset_fn,prediction_keys,param,set_name):
219 """
220
221 """
222
223 qp = QPalma()
224 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
225
226 return 'finished prediction of set %s.' % set_name
227
228
229
230 class TrainingTask(ClusterTask):
231
232 def __init__(self):
233 ClusterTask.__init__(self)
234
235
236 class PostprocessingTask(ClusterTask):
237 """
238
239 """
240
241 def __init__(self):
242 ClusterTask.__init__(self)
243
244
245 def g_alignment(chunk_fn,result_fn):
246 create_alignment_file(chunk_fn,result_fn)
247
248
249 def createJobs(self):
250 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
251 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
252
253 chunks_fn = []
254 for fn in os.listdir(run_dir):
255 if fn.startswith('chunk'):
256 chunks_fn.append(fn)
257
258 print chunks_fn
259
260 functionJobs=[]
261
262 for chunk_fn in chunks_fn:
263 chunk_name = chunk_fn[:chunk_fn.find('.')]
264 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
265 chunk_fn = jp(run_dir,chunk_fn)
266
267 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
268 current_job.h_vmem = '15.0G'
269 current_job.express = 'True'
270
271 print "job #1: ", current_job.nativeSpecification