e3dddcd51ac9eb00dfd3a91d4c937bcd78e7fdc7
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 from PipelineHeuristic import *
28
29 import gridtools
30
31 from utils import get_slices,split_file
32
33 jp = os.path.join
34
35
36 class ClusterTask(Thread):
37 """
38 This class..
39
40 Every task creates a status file.
41 All cluster jobs submit then their exit status to this status file.
42
43 Every cluster task subclass should override/implement the methods:
44
45 1. __init__
46 2. CreateJobs
47 3. TaskStarter
48
49 """
50
51 def __init__(self,settings):
52 self.sleep_time = 0
53
54 # this list stores the cluster/local jobs objects
55 self.functionJobs = []
56
57 # this object stores the configuration
58 self.settings = settings
59
60
61 def CreateJobs(self):
62 """
63 This method create an array of jobs called self.functionJobs. It is only
64 virtual in this base class and has to be implemented specifically for
65 each cluster task.
66 """
67 pass
68
69
70 def Submit(self):
71 """
72 After creation of jobs this function submits them to the cluster.
73 """
74 for current_job in self.functionJobs:
75 self.sid, self.jobids = submit_jobs([current_job])
76
77
78 def Restart(self,id):
79 pass
80
81
82 def collectResults(self):
83 pass
84
85
86 def CheckIfTaskFinished(self):
87 """
88 This function is responsible for checking whether the submitted jobs were
89 completed successfully.
90 """
91
92 print 'checking whether finished'
93 while not get_status(self.sid, self.jobids):
94 time.sleep(7)
95 print 'collecting jobs'
96 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
97
98 #print "ret fields AFTER execution on cluster"
99 #for (i, job) in enumerate(retjobs):
100 # print "Job #", i, "- ret: ", job.ret
101 #print '--------------'
102 self.collectResults()
103
104
105
106 class ApproximationTask(ClusterTask):
107 """
108 This task represents the first step towards a valid QPalma dataset.
109 It starts an approximative QPalma model on the putative unspliced reads to
110 identify true spliced reads.
111 """
112
113 def __init__(self,settings):
114 ClusterTask.__init__(self,settings)
115
116
117 def CreateJobs(self):
118 """
119 Create...
120 """
121
122 num_splits = self.settings['num_splits']
123
124 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
125 #param_fname = jp(run_dir,'param_526.pickle')
126 param_fname = self.settings['prediction_parameter_fn']
127 #run_fname = jp(run_dir,'run_obj.pickle')
128 run_fname = self.settings['run_fn']
129
130 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
131 result_dir = self.settings['approximation_dir']
132
133 original_map_fname = self.settings['unspliced_reads_fn']
134 split_file(original_map_fname,result_dir,num_splits)
135
136 self.result_files = []
137
138 for idx in range(0,num_splits):
139 data_fname = jp(result_dir,'map.part_%d'%idx)
140 result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
141 self.result_files.append(result_fname)
142
143 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
144 current_job.h_vmem = '25.0G'
145 #current_job.express = 'True'
146
147 print "job #1: ", current_job.nativeSpecification
148
149 self.functionJobs.append(current_job)
150
151
152 def collectResults(self):
153 result_dir = self.settings['approximation_dir']
154 combined_fn = jp(result_dir,'map.vm.spliced')
155 combine_files(self.result_files,combined_fn)
156 combine_files([combined_fn,settings['spliced_reads_fn']],'map.vm')
157
158
159 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
160 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
161 ph1.filter()
162
163 return 'finished filtering set %s.' % data_fname
164
165
166 class PreprocessingTask(ClusterTask):
167 """
168 This class encapsules some...
169 """
170
171 def __init__(self):
172 ClusterTask.__init__(self)
173
174
175 class AlignmentTask(ClusterTask):
176 """
177 This task represents the main part of QPalma.
178 """
179
180 def __init__(self,settings):
181 ClusterTask.__init__(self,settings)
182
183
184 def CreateJobs(self):
185 """
186
187 """
188
189 num_splits = self.settings['num_splits']
190
191 jp = os.path.join
192
193 run = cPickle.load(open(self.settings['run_fn']))
194 run['name'] = 'saved_run'
195
196 param = self.settings['prediction_parameter_fn']
197
198 run['result_dir'] = self.settings['prediction_dir']
199 dataset_fn = self.settings['prediction_dataset_fn']
200 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
201
202 prediction_keys = cPickle.load(open(prediction_keys_fn))
203
204 print 'Found %d keys for prediction.' % len(prediction_keys)
205
206 slices = get_slices(len(prediction_keys),num_splits)
207 chunks = []
208 for idx,slice in enumerate(slices):
209 #if idx != 0:
210 c_name = 'chunk_%d' % idx
211 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
212
213 for c_name,current_chunk in chunks:
214 current_job = KybJob(gridtools.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
215 current_job.h_vmem = '20.0G'
216 #current_job.express = 'True'
217
218 print "job #1: ", current_job.nativeSpecification
219
220 self.functionJobs.append(current_job)
221
222 sum = 0
223 for size in [len(elem) for name,elem in chunks]:
224 sum += size
225
226 print 'Got %d job(s)' % len(self.functionJobs)
227
228
229 def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
230 """
231
232 """
233
234 qp = QPalma()
235 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
236
237 return 'finished prediction of set %s.' % set_name
238
239
240
241 class TrainingTask(ClusterTask):
242 """
243 This class represents the cluster task of training QPalma.
244 """
245
246 def __init__(self):
247 ClusterTask.__init__(self)
248
249
250 class PostprocessingTask(ClusterTask):
251 """
252 After QPalma predicted alignments this task postprocesses the data.
253 """
254
255 def __init__(self):
256 ClusterTask.__init__(self)
257
258
259 def TaskStarter(chunk_fn,result_fn):
260 create_alignment_file(chunk_fn,result_fn)
261
262
263 def CreateJobs(self):
264 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
265 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
266
267 chunks_fn = []
268 for fn in os.listdir(run_dir):
269 if fn.startswith('chunk'):
270 chunks_fn.append(fn)
271
272 print chunks_fn
273
274 functionJobs=[]
275
276 for chunk_fn in chunks_fn:
277 chunk_name = chunk_fn[:chunk_fn.find('.')]
278 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
279 chunk_fn = jp(run_dir,chunk_fn)
280
281 current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
282 current_job.h_vmem = '15.0G'
283 current_job.express = 'True'
284
285 print "job #1: ", current_job.nativeSpecification
286
287
288 class DummyTask(ClusterTask):
289 """
290 This class represents a dummy task to make debugging easier.
291 """
292 def __init__(self):
293 ClusterTask.__init__(self)
294
295
296 def TaskStarter(param):
297 create_alignment_file(chunk_fn,result_fn)
298
299
300 def CreateJobs(self):
301 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
302 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
303
304 for chunk_fn in chunks_fn:
305 chunk_name = chunk_fn[:chunk_fn.find('.')]
306 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
307 chunk_fn = jp(run_dir,chunk_fn)
308
309 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
310 current_job.h_vmem = '15.0G'
311 current_job.express = 'True'
312
313 print "job #1: ", current_job.nativeSpecification