83a5da8f56e6101eba95d27b24c3ccbffb688c2d
[qpalma.git] / qpalma / gridtools.py
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
5 #
6 # Written (W) 2008 Fabio De Bona
7 # Copyright (C) 2008 Max-Planck-Society
8
9 import cPickle
10 import math
11 import time
12 import os
13 import os.path
14 import pdb
15 import sys
16 import time
17
18 from threading import Thread
19
20 from pythongrid import KybJob, Usage
21 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
22
23 from qpalma.OutputFormat import createAlignmentOutput
24
25 from PipelineHeuristic import *
26
27 import gridtools
28
29 from utils import get_slices,split_file,combine_files
30
31 from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
32
33 from qpalma_main import QPalma
34
35 jp = os.path.join
36 pjoin = lambda *args: reduce(lambda x,y: jp(x,y),args)
37
38
39 class ClusterTask(Thread):
40 """
41 This class..
42
43 Every task creates a status file.
44 All cluster jobs submit then their exit status to this status file.
45
46 Every cluster task subclass should override/implement the methods:
47
48 1. __init__
49 2. CreateJobs
50 3. TaskStarter
51
52 """
53
54 def __init__(self,settings):
55 self.sleep_time = 0
56
57 # this list stores the cluster/local jobs objects
58 self.functionJobs = []
59
60 # this object stores the configuration
61 self.settings = settings
62
63
64 def CreateJobs(self):
65 """
66 This method create an array of jobs called self.functionJobs. It is only
67 virtual in this base class and has to be implemented specifically for
68 each cluster task.
69 """
70 pass
71
72
73 def Submit(self):
74 """
75 After creation of jobs this function submits them to the cluster.
76 """
77 self.sid, self.jobids = submit_jobs(self.functionJobs)
78 #self.processedFunctionJobs = process_jobs(self.functionJobs,local=True,maxNumThreads=1)
79
80
81 def Restart(self,id):
82 pass
83
84
85 def collectResults(self):
86 pass
87
88
89 def CheckIfTaskFinished(self):
90 """
91 This function is responsible for checking whether the submitted jobs were
92 completed successfully.
93 """
94
95 print 'collecting jobs'
96 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs, True)
97 print "ret fields AFTER execution on cluster"
98 for (i, job) in enumerate(retjobs):
99 print "Job #", i, "- ret: ", job.ret
100
101 print '--------------'
102
103 self.collectResults()
104
105
106 class ApproximationTask(ClusterTask):
107 """
108 This task represents the first step towards a valid QPalma dataset.
109 It starts an approximative QPalma model on the putative unspliced reads to
110 identify true spliced reads.
111 """
112
113 def __init__(self,settings):
114 ClusterTask.__init__(self,settings)
115
116
117 def CreateJobs(self):
118 """
119 Create...
120 """
121
122 num_splits = self.settings['num_splits']
123
124 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
125 #param_fname = jp(run_dir,'param_526.pickle')
126 param_fname = self.settings['prediction_parameter_fn']
127 #run_fname = jp(run_dir,'run_obj.pickle')
128 run_fname = self.settings['run_fn']
129
130 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
131 result_dir = self.settings['approximation_dir']
132
133 original_map_fname = self.settings['unspliced_reads_fn']
134 split_file(original_map_fname,result_dir,num_splits)
135
136 self.result_files = []
137
138 for idx in range(0,num_splits):
139 data_fname = jp(result_dir,'map.part_%d'%idx)
140 result_fname = jp(result_dir,'map.vm.part_%d.heuristic.spliced'%idx)
141 self.result_files.append(result_fname)
142
143 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
144 current_job.h_vmem = '25.0G'
145 #current_job.express = 'True'
146
147 print "job #1: ", current_job.nativeSpecification
148
149 self.functionJobs.append(current_job)
150
151
152 def collectResults(self):
153 result_dir = self.settings['approximation_dir']
154 combined_fn = jp(result_dir,'map.vm.spliced')
155 combine_files(self.result_files,combined_fn)
156 combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
157
158
159 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
160 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
161 ph1.filter()
162
163 return 'finished filtering set %s.' % data_fname
164
165
166 class PreprocessingTask(ClusterTask):
167 """
168 This class encapsules some...
169 """
170
171 def __init__(self):
172 ClusterTask.__init__(self)
173
174
175 class AlignmentTask(ClusterTask):
176 """
177 This task represents the main part of QPalma.
178 """
179
180 def __init__(self,settings):
181 ClusterTask.__init__(self,settings)
182
183
184 def CreateJobs(self):
185 """
186
187 """
188
189 num_splits = self.settings['num_splits']
190
191 jp = os.path.join
192
193 run = cPickle.load(open(self.settings['run_fn']))
194 run['name'] = 'saved_run'
195
196 param_fn = self.settings['prediction_parameter_fn']
197
198 run['result_dir'] = self.settings['prediction_dir']
199 dataset_fn = self.settings['prediction_dataset_fn']
200 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
201
202 prediction_keys = cPickle.load(open(prediction_keys_fn))
203
204 print 'Found %d keys for prediction.' % len(prediction_keys)
205
206 slices = get_slices(len(prediction_keys),num_splits)
207 chunks = []
208 for idx,slice in enumerate(slices):
209 #if idx != 0:
210 c_name = 'chunk_%d' % idx
211 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
212
213 for c_name,current_chunk in chunks:
214 current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,run,dataset_fn,current_chunk,param_fn,c_name])
215 current_job.h_vmem = '20.0G'
216 #current_job.express = 'True'
217
218 print "job #1: ", current_job.nativeSpecification
219
220 self.functionJobs.append(current_job)
221
222 sum = 0
223 for size in [len(elem) for name,elem in chunks]:
224 sum += size
225
226 print 'Got %d job(s)' % len(self.functionJobs)
227
228
229 def AlignmentTaskStarter(settings,run,dataset_fn,prediction_keys,param_fn,set_name):
230 """
231
232 """
233 accessWrapper = DataAccessWrapper(settings)
234 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
235 qp = QPalma(run,seqInfo)
236 qp.init_prediction(dataset_fn,prediction_keys,param_fn,set_name)
237 return 'finished prediction of set %s.' % set_name
238
239
240
241 class TrainingTask(ClusterTask):
242 """
243 This class represents the cluster task of training QPalma.
244 """
245
246 def __init__(self):
247 ClusterTask.__init__(self)
248
249
250 class PostprocessingTask(ClusterTask):
251 """
252 After QPalma predicted alignments this task postprocesses the data.
253 """
254
255 def __init__(self,settings):
256 ClusterTask.__init__(self,settings)
257
258
259 def CreateJobs(self):
260 run_dir = self.settings['prediction_dir']
261 self.result_dir = self.settings['alignment_dir']
262
263 chunks_fn = []
264 for fn in os.listdir(run_dir):
265 if fn.startswith('chunk'):
266 chunks_fn.append(fn)
267
268 functionJobs=[]
269
270 self.result_files = []
271 for chunk_fn in chunks_fn:
272 chunk_name = chunk_fn[:chunk_fn.find('.')]
273 result_fn = jp(self.result_dir,'%s.%s'%(chunk_name,self.settings['output_format']))
274 chunk_fn = jp(run_dir,chunk_fn)
275
276 self.result_files.append(result_fn)
277
278 current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
279 current_job.h_vmem = '15.0G'
280 current_job.express = 'True'
281
282 print "job #1: ", current_job.nativeSpecification
283
284 self.functionJobs.append(current_job)
285
286
287 def collectResults(self):
288 combined_fn = jp(self.result_dir,'all_alignments.%s'%self.settings['output_format'])
289 combine_files(self.result_files,combined_fn)
290
291
292 def PostProcessingTaskStarter(settings,chunk_fn,result_fn):
293 createAlignmentOutput(settings,chunk_fn,result_fn)
294
295
296
297 class DummyTask(ClusterTask):
298 """
299 This class represents a dummy task to make debugging easier.
300 """
301 def __init__(self):
302 ClusterTask.__init__(self)
303
304
305 def DummyTaskStarter(param):
306 create_alignment_file(chunk_fn,result_fn)
307
308
309 def CreateJobs(self):
310 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
311 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
312
313 for chunk_fn in chunks_fn:
314 chunk_name = chunk_fn[:chunk_fn.find('.')]
315 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
316 chunk_fn = jp(run_dir,chunk_fn)
317
318 current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
319 current_job.h_vmem = '15.0G'
320 current_job.express = 'True'
321
322 print "job #1: ", current_job.nativeSpecification