+ extended docu
[qpalma.git] / qpalma / gridtools.py
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
5 #
6 # Written (W) 2008 Fabio De Bona
7 # Copyright (C) 2008 Max-Planck-Society
8
9 import cPickle
10 import math
11 import time
12 import os
13 import os.path
14 import pdb
15 import sys
16 import time
17
18 from threading import Thread
19
20 from pythongrid import KybJob, Usage
21 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
22
23 from qpalma.OutputFormat import createAlignmentOutput
24
25 from PipelineHeuristic import *
26
27 import gridtools
28
29 from utils import get_slices,split_file,combine_files
30
31 from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
32
33 from qpalma_main import QPalma
34
35 jp = os.path.join
36 pjoin = lambda *args: reduce(lambda x,y: jp(x,y),args)
37
38
39 class ClusterTask(Thread):
40 """
41 This class..
42
43 Every task creates a status file.
44 All cluster jobs submit then their exit status to this status file.
45
46 Every cluster task subclass should override/implement the methods:
47
48 1. __init__
49 2. CreateJobs
50 3. TaskStarter
51
52 """
53
54 def __init__(self,settings):
55 self.sleep_time = 0
56
57 # this list stores the cluster/local jobs objects
58 self.functionJobs = []
59
60 # this object stores the configuration
61 self.settings = settings
62
63
64 def CreateJobs(self):
65 """
66 This method create an array of jobs called self.functionJobs. It is only
67 virtual in this base class and has to be implemented specifically for
68 each cluster task.
69 """
70 pass
71
72
73 def Submit(self):
74 """
75 After creation of jobs this function submits them to the cluster.
76 """
77 self.sid, self.jobids = submit_jobs(self.functionJobs)
78 #self.processedFunctionJobs = process_jobs(self.functionJobs,local=True,maxNumThreads=1)
79
80
81 def Restart(self,id):
82 pass
83
84
85 def collectResults(self):
86 pass
87
88
89 def CheckIfTaskFinished(self):
90 """
91 This function is responsible for checking whether the submitted jobs were
92 completed successfully.
93 """
94
95 print 'collecting jobs'
96 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs, True)
97 print "ret fields AFTER execution on cluster"
98 for (i, job) in enumerate(retjobs):
99 print "Job #", i, "- ret: ", job.ret
100
101 print '--------------'
102
103 self.collectResults()
104
105
106 class ApproximationTask(ClusterTask):
107 """
108 This task represents the first step towards a valid QPalma dataset.
109 It starts an approximative QPalma model on the putative unspliced reads to
110 identify true spliced reads.
111 """
112
113 def __init__(self,settings):
114 ClusterTask.__init__(self,settings)
115
116
117 def CreateJobs(self):
118 """
119 Create...
120 """
121
122 num_splits = self.settings['num_splits']
123
124 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
125 #param_fname = jp(run_dir,'param_526.pickle')
126 param_fname = self.settings['prediction_parameter_fn']
127 #run_fname = jp(run_dir,'run_obj.pickle')
128 run_fname = self.settings['run_fn']
129
130 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
131 result_dir = self.settings['approximation_dir']
132
133 original_map_fname = self.settings['unspliced_reads_fn']
134 split_file(original_map_fname,result_dir,num_splits)
135
136 self.result_files = []
137
138 for idx in range(0,num_splits):
139 data_fname = jp(result_dir,'map.part_%d'%idx)
140 result_fname = jp(result_dir,'map.vm.part_%d'%idx)
141 self.result_files.append(result_fname)
142
143 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
144 current_job.h_vmem = '25.0G'
145 #current_job.express = 'True'
146
147 print "job #1: ", current_job.nativeSpecification
148
149 self.functionJobs.append(current_job)
150
151
152 def collectResults(self):
153 result_dir = self.settings['approximation_dir']
154 combined_fn = jp(result_dir,'map.vm.spliced')
155 self.result_files = map(lambda x:x+'.spliced',self.result_files)
156 combine_files(self.result_files,combined_fn)
157 combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
158
159
160 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
161 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
162 ph1.filter()
163
164 return 'finished filtering set %s.' % data_fname
165
166
167 class PreprocessingTask(ClusterTask):
168 """
169 This class encapsules some...
170 """
171
172 def __init__(self):
173 ClusterTask.__init__(self)
174
175
176 class AlignmentTask(ClusterTask):
177 """
178 This task represents the main part of QPalma.
179 """
180
181 def __init__(self,settings):
182 ClusterTask.__init__(self,settings)
183
184
185 def CreateJobs(self):
186 """
187
188 """
189
190 num_splits = self.settings['num_splits']
191
192 jp = os.path.join
193
194 dataset_fn = self.settings['prediction_dataset_fn']
195 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
196
197 prediction_keys = cPickle.load(open(prediction_keys_fn))
198
199 print 'Found %d keys for prediction.' % len(prediction_keys)
200
201 slices = get_slices(len(prediction_keys),num_splits)
202 chunks = []
203 for idx,slice in enumerate(slices):
204 #if idx != 0:
205 c_name = 'chunk_%d' % idx
206 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
207
208 for c_name,current_chunk in chunks:
209 current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,dataset_fn,current_chunk,c_name])
210 current_job.h_vmem = '2.0G'
211 current_job.express = 'True'
212
213 print "job #1: ", current_job.nativeSpecification
214
215 self.functionJobs.append(current_job)
216
217 sum = 0
218 for size in [len(elem) for name,elem in chunks]:
219 sum += size
220
221 print 'Got %d job(s)' % len(self.functionJobs)
222
223
224 def AlignmentTaskStarter(settings,dataset_fn,prediction_keys,set_name):
225 """
226
227 """
228 accessWrapper = DataAccessWrapper(settings)
229 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
230 qp = QPalma(seqInfo)
231 qp.init_prediction(dataset_fn,prediction_keys,settings,set_name)
232 return 'finished prediction of set %s.' % set_name
233
234
235
236 class TrainingTask(ClusterTask):
237 """
238 This class represents the cluster task of training QPalma.
239 """
240
241 def __init__(self):
242 ClusterTask.__init__(self)
243
244 def CreateJobs(self):
245 pass
246 #cPickle.dump(settings,open(jp(,'training_settings.pickle''run_obj.pickle','w+'))
247
248
249 class PostprocessingTask(ClusterTask):
250 """
251 After QPalma predicted alignments this task postprocesses the data.
252 """
253
254 def __init__(self,settings):
255 ClusterTask.__init__(self,settings)
256
257
258 def CreateJobs(self):
259 run_dir = self.settings['prediction_dir']
260 self.result_dir = self.settings['alignment_dir']
261
262 chunks_fn = []
263 for fn in os.listdir(run_dir):
264 if fn.startswith('chunk'):
265 chunks_fn.append(fn)
266
267 functionJobs=[]
268
269 self.result_files = []
270 for chunk_fn in chunks_fn:
271 chunk_name = chunk_fn[:chunk_fn.find('.')]
272 result_fn = jp(self.result_dir,'%s.%s'%(chunk_name,self.settings['output_format']))
273 chunk_fn = jp(run_dir,chunk_fn)
274
275 self.result_files.append(result_fn)
276
277 current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
278 current_job.h_vmem = '15.0G'
279 current_job.express = 'True'
280
281 print "job #1: ", current_job.nativeSpecification
282
283 self.functionJobs.append(current_job)
284
285
286 def collectResults(self):
287 combined_fn = jp(self.result_dir,'all_alignments.%s'%self.settings['output_format'])
288 combine_files(self.result_files,combined_fn)
289
290
291 def PostProcessingTaskStarter(settings,chunk_fn,result_fn):
292 createAlignmentOutput(settings,chunk_fn,result_fn)
293
294
295
296 class DummyTask(ClusterTask):
297 """
298 This class represents a dummy task to make debugging easier.
299 """
300 def __init__(self):
301 ClusterTask.__init__(self)
302
303
304 def DummyTaskStarter(param):
305 create_alignment_file(chunk_fn,result_fn)
306
307
308 def CreateJobs(self):
309 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
310 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
311
312 for chunk_fn in chunks_fn:
313 chunk_name = chunk_fn[:chunk_fn.find('.')]
314 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
315 chunk_fn = jp(run_dir,chunk_fn)
316
317 current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
318 current_job.h_vmem = '15.0G'
319 current_job.express = 'True'
320
321 print "job #1: ", current_job.nativeSpecification