5d29743590f4d38b8b0ac69e605875f6a8eba496
[qpalma.git] / qpalma / gridtools.py
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
5 #
6 # Written (W) 2008 Fabio De Bona
7 # Copyright (C) 2008 Max-Planck-Society
8
9 import cPickle
10 import math
11 import time
12 import os
13 import os.path
14 import pdb
15 import sys
16 import time
17
18 from threading import Thread
19
20 from pythongrid import KybJob, Usage
21 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
22
23 from qpalma.OutputFormat import createAlignmentOutput
24
25 from PipelineHeuristic import *
26
27 import gridtools
28
29 from utils import get_slices,split_file,combine_files
30
31 from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
32
33 from qpalma_main import QPalma
34
35 jp = os.path.join
36 pjoin = lambda *args: reduce(lambda x,y: jp(x,y),args)
37
38
39 class ClusterTask(Thread):
40 """
41 This class..
42
43 Every task creates a status file.
44 All cluster jobs submit then their exit status to this status file.
45
46 Every cluster task subclass should override/implement the methods:
47
48 1. __init__
49 2. CreateJobs
50 3. TaskStarter
51
52 """
53
54 def __init__(self,settings):
55 self.sleep_time = 0
56
57 # this list stores the cluster/local jobs objects
58 self.functionJobs = []
59
60 # this object stores the configuration
61 self.settings = settings
62
63
64 def CreateJobs(self):
65 """
66 This method create an array of jobs called self.functionJobs. It is only
67 virtual in this base class and has to be implemented specifically for
68 each cluster task.
69 """
70 pass
71
72
73 def Submit(self):
74 """
75 After creation of jobs this function submits them to the cluster.
76 """
77 self.sid, self.jobids = submit_jobs(self.functionJobs)
78 #self.processedFunctionJobs = process_jobs(self.functionJobs,local=True,maxNumThreads=1)
79
80
81 def Restart(self,id):
82 pass
83
84
85 def collectResults(self):
86 pass
87
88
89 def CheckIfTaskFinished(self):
90 """
91 This function is responsible for checking whether the submitted jobs were
92 completed successfully.
93 """
94
95 print 'collecting jobs'
96 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs, True)
97 print "ret fields AFTER execution on cluster"
98 for (i, job) in enumerate(retjobs):
99 print "Job #", i, "- ret: ", job.ret
100
101 print '--------------'
102
103 self.collectResults()
104
105
106 class ApproximationTask(ClusterTask):
107 """
108 This task represents the first step towards a valid QPalma dataset.
109 It starts an approximative QPalma model on the putative unspliced reads to
110 identify true spliced reads.
111 """
112
113 def __init__(self,settings):
114 ClusterTask.__init__(self,settings)
115
116
117 def CreateJobs(self):
118 """
119 Create...
120 """
121
122 num_splits = self.settings['num_splits']
123
124 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
125 #param_fname = jp(run_dir,'param_526.pickle')
126 param_fname = self.settings['prediction_parameter_fn']
127 #run_fname = jp(run_dir,'run_obj.pickle')
128 run_fname = self.settings['run_fn']
129
130 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
131 result_dir = self.settings['approximation_dir']
132
133 original_map_fname = self.settings['unspliced_reads_fn']
134 split_file(original_map_fname,result_dir,num_splits)
135
136 self.result_files = []
137
138 for idx in range(0,num_splits):
139 data_fname = jp(result_dir,'map.part_%d'%idx)
140 result_fname = jp(result_dir,'map.vm.part_%d'%idx)
141 self.result_files.append(result_fname)
142
143 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
144 current_job.h_vmem = '25.0G'
145 #current_job.express = 'True'
146
147 print "job #1: ", current_job.nativeSpecification
148
149 self.functionJobs.append(current_job)
150
151
152 def collectResults(self):
153 result_dir = self.settings['approximation_dir']
154 combined_fn = jp(result_dir,'map.vm.spliced')
155 self.result_files = map(lambda x:x+'.spliced',self.result_files)
156 combine_files(self.result_files,combined_fn)
157 combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
158
159
160 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
161 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
162 ph1.filter()
163
164 return 'finished filtering set %s.' % data_fname
165
166
167 class PreprocessingTask(ClusterTask):
168 """
169 This class encapsules some...
170 """
171
172 def __init__(self):
173 ClusterTask.__init__(self)
174
175
176 class AlignmentTask(ClusterTask):
177 """
178 This task represents the main part of QPalma.
179 """
180
181 def __init__(self,settings):
182 ClusterTask.__init__(self,settings)
183
184
185 def CreateJobs(self):
186 """
187
188 """
189
190 num_splits = self.settings['num_splits']
191
192 jp = os.path.join
193
194 run = cPickle.load(open(self.settings['run_fn']))
195 run['name'] = 'saved_run'
196
197 param_fn = self.settings['prediction_parameter_fn']
198
199 run['result_dir'] = self.settings['prediction_dir']
200 dataset_fn = self.settings['prediction_dataset_fn']
201 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
202
203 prediction_keys = cPickle.load(open(prediction_keys_fn))
204
205 print 'Found %d keys for prediction.' % len(prediction_keys)
206
207 slices = get_slices(len(prediction_keys),num_splits)
208 chunks = []
209 for idx,slice in enumerate(slices):
210 #if idx != 0:
211 c_name = 'chunk_%d' % idx
212 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
213
214 for c_name,current_chunk in chunks:
215 current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,run,dataset_fn,current_chunk,param_fn,c_name])
216 current_job.h_vmem = '20.0G'
217 #current_job.express = 'True'
218
219 print "job #1: ", current_job.nativeSpecification
220
221 self.functionJobs.append(current_job)
222
223 sum = 0
224 for size in [len(elem) for name,elem in chunks]:
225 sum += size
226
227 print 'Got %d job(s)' % len(self.functionJobs)
228
229
230 def AlignmentTaskStarter(settings,run,dataset_fn,prediction_keys,param_fn,set_name):
231 """
232
233 """
234 accessWrapper = DataAccessWrapper(settings)
235 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
236 qp = QPalma(run,seqInfo)
237 qp.init_prediction(dataset_fn,prediction_keys,param_fn,set_name)
238 return 'finished prediction of set %s.' % set_name
239
240
241
242 class TrainingTask(ClusterTask):
243 """
244 This class represents the cluster task of training QPalma.
245 """
246
247 def __init__(self):
248 ClusterTask.__init__(self)
249
250
251 class PostprocessingTask(ClusterTask):
252 """
253 After QPalma predicted alignments this task postprocesses the data.
254 """
255
256 def __init__(self,settings):
257 ClusterTask.__init__(self,settings)
258
259
260 def CreateJobs(self):
261 run_dir = self.settings['prediction_dir']
262 self.result_dir = self.settings['alignment_dir']
263
264 chunks_fn = []
265 for fn in os.listdir(run_dir):
266 if fn.startswith('chunk'):
267 chunks_fn.append(fn)
268
269 functionJobs=[]
270
271 self.result_files = []
272 for chunk_fn in chunks_fn:
273 chunk_name = chunk_fn[:chunk_fn.find('.')]
274 result_fn = jp(self.result_dir,'%s.%s'%(chunk_name,self.settings['output_format']))
275 chunk_fn = jp(run_dir,chunk_fn)
276
277 self.result_files.append(result_fn)
278
279 current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
280 current_job.h_vmem = '15.0G'
281 current_job.express = 'True'
282
283 print "job #1: ", current_job.nativeSpecification
284
285 self.functionJobs.append(current_job)
286
287
288 def collectResults(self):
289 combined_fn = jp(self.result_dir,'all_alignments.%s'%self.settings['output_format'])
290 combine_files(self.result_files,combined_fn)
291
292
293 def PostProcessingTaskStarter(settings,chunk_fn,result_fn):
294 createAlignmentOutput(settings,chunk_fn,result_fn)
295
296
297
298 class DummyTask(ClusterTask):
299 """
300 This class represents a dummy task to make debugging easier.
301 """
302 def __init__(self):
303 ClusterTask.__init__(self)
304
305
306 def DummyTaskStarter(param):
307 create_alignment_file(chunk_fn,result_fn)
308
309
310 def CreateJobs(self):
311 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
312 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
313
314 for chunk_fn in chunks_fn:
315 chunk_name = chunk_fn[:chunk_fn.find('.')]
316 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
317 chunk_fn = jp(run_dir,chunk_fn)
318
319 current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
320 current_job.h_vmem = '15.0G'
321 current_job.express = 'True'
322
323 print "job #1: ", current_job.nativeSpecification