+ added configuration file parsing and checking functions
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 import gridtools
28
29 from utils import get_slices
30
31 jp = os.path.join
32
33
34 class ClusterTask(Thread):
35 """
36 This class..
37
38 Every task creates a status file.
39 All cluster jobs submit then their exit status to this status file.
40
41 Every cluster task subclass should override/implement the methods:
42
43 1. __init__
44 2. CreateJobs
45 3. TaskStarter
46
47 """
48
49 def __init__(self,global_settings):
50 self.sleep_time = 0
51
52 # this list stores the cluster/local jobs objects
53 self.functionJobs = []
54
55 # this object stores the configuration
56 self.global_settings = global_settings
57
58
59 def CreateJobs(self):
60 """
61 This method create an array of jobs called self.functionJobs. It is only
62 virtual in this base class and has to be implemented specifically for
63 each cluster task.
64 """
65 pass
66
67
68 def Submit(self):
69 """
70 After creation of jobs this function submits them to the cluster.
71 """
72 for current_job in self.functionJobs:
73 self.sid, self.jobids = submit_jobs([functionJobs])
74
75
76 def Restart(self,id):
77 pass
78
79
80 def CheckIfTaskFinished(self):
81 """
82 This function is responsible for checking whether the submitted jobs were
83 completed successfully.
84 """
85
86 print 'checking whether finished'
87 while not get_status(self.sid, self.jobids):
88 time.sleep(7)
89 print 'collecting jobs'
90 retjobs = collect_jobs(sid, jobids, myjobs)
91 print "ret fields AFTER execution on cluster"
92 for (i, job) in enumerate(retjobs):
93 print "Job #", i, "- ret: ", job.ret
94
95 print '--------------'
96
97
98
99 class ApproximationTask(ClusterTask):
100 """
101 This task represents the first step towards a valid QPalma dataset.
102 It starts an approximative QPalma model on the putative unspliced reads to
103 identify true spliced reads.
104 """
105
106 def __init__(self):
107 ClusterTask.__init__(self)
108
109
110 def CreateJobs(self):
111 """
112 """
113 num_splits = self.global_settings['num_splits']
114
115 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
116 param_fname = jp(run_dir,'param_526.pickle')
117 # param_fname = self.global_settings['prediction_parameter_fn']
118 run_fname = jp(run_dir,'run_obj.pickle')
119 # run_fname = self.global_settings['run_fn']
120
121 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
122 result_dir = self.global_settings['approximation_dir']
123
124 original_map_fname = self.global_settings['read_ascii_data_fn']
125 split_file(original_map_fname,result_dir,num_splits)
126
127 for idx in range(0,num_splits):
128 data_fname = jp(result_dir,'map.part_%d'%idx)
129 result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
130
131 current_job = KybJob(grid_heuristic.TaskStarter,[run_fname,data_fname,param_fname,result_fname])
132 current_job.h_vmem = '25.0G'
133 #current_job.express = 'True'
134
135 print "job #1: ", current_job.nativeSpecification
136
137 self.functionJobs.append(current_job)
138
139
140 def TaskStarter(run_fname,data_fname,param_fname,result_fname):
141 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
142 ph1.filter()
143
144 return 'finished filtering set %s.' % data_fname
145
146
147 class PreprocessingTask(ClusterTask):
148 """
149 This class encapsules some...
150 """
151
152 def __init__(self):
153 ClusterTask.__init__(self)
154
155
156 class AlignmentTask(ClusterTask):
157
158 def __init__(self):
159 ClusterTask.__init__(self)
160
161
162 def CreateJobs():
163 """
164
165 """
166
167 num_splits = self.global_settings['num_splits']
168
169 jp = os.path.join
170
171 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
172
173 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
174 run['name'] = 'saved_run'
175
176 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
177
178 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
179 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
180 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
181
182 prediction_keys = cPickle.load(open(prediction_keys_fn))
183
184 print 'Found %d keys for prediction.' % len(prediction_keys)
185
186 slices = get_slices(len(prediction_keys),num_splits)
187 chunks = []
188 for idx,slice in enumerate(slices):
189 #if idx != 0:
190 c_name = 'chunk_%d' % idx
191 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
192
193 for c_name,current_chunk in chunks:
194 current_job = KybJob(grid_predict.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
195 current_job.h_vmem = '20.0G'
196 #current_job.express = 'True'
197
198 print "job #1: ", current_job.nativeSpecification
199
200 self.functionJobs.append(current_job)
201
202 sum = 0
203 for size in [len(elem) for name,elem in chunks]:
204 sum += size
205
206 print 'Got %d job(s)' % len(functionJobs)
207
208
209 def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
210 """
211
212 """
213
214 qp = QPalma()
215 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
216
217 return 'finished prediction of set %s.' % set_name
218
219
220
221 class TrainingTask(ClusterTask):
222 """
223 This class represents the cluster task of training QPalma.
224 """
225
226 def __init__(self):
227 ClusterTask.__init__(self)
228
229
230 class PostprocessingTask(ClusterTask):
231 """
232 After QPalma predicted alignments this task postprocesses the data.
233 """
234
235 def __init__(self):
236 ClusterTask.__init__(self)
237
238
239 def TaskStarter(chunk_fn,result_fn):
240 create_alignment_file(chunk_fn,result_fn)
241
242
243 def CreateJobs(self):
244 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
245 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
246
247 chunks_fn = []
248 for fn in os.listdir(run_dir):
249 if fn.startswith('chunk'):
250 chunks_fn.append(fn)
251
252 print chunks_fn
253
254 functionJobs=[]
255
256 for chunk_fn in chunks_fn:
257 chunk_name = chunk_fn[:chunk_fn.find('.')]
258 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
259 chunk_fn = jp(run_dir,chunk_fn)
260
261 current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
262 current_job.h_vmem = '15.0G'
263 current_job.express = 'True'
264
265 print "job #1: ", current_job.nativeSpecification
266
267
268 class DummyTask(ClusterTask):
269 """
270 This class represents a dummy task to make debugging easier.
271 """
272 def __init__(self):
273 ClusterTask.__init__(self)
274
275
276 def TaskStarter(param):
277 create_alignment_file(chunk_fn,result_fn)
278
279
280 def CreateJobs(self):
281 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
282 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
283
284 for chunk_fn in chunks_fn:
285 chunk_name = chunk_fn[:chunk_fn.find('.')]
286 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
287 chunk_fn = jp(run_dir,chunk_fn)
288
289 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
290 current_job.h_vmem = '15.0G'
291 current_job.express = 'True'
292
293 print "job #1: ", current_job.nativeSpecification