+ extended pipeline code
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 from PipelineHeuristic import *
28
29 import gridtools
30
31 from utils import get_slices,split_file,combine_files
32
33 from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
34
35 from qpalma_main import QPalma
36
37 jp = os.path.join
38
39
40 class ClusterTask(Thread):
41 """
42 This class..
43
44 Every task creates a status file.
45 All cluster jobs submit then their exit status to this status file.
46
47 Every cluster task subclass should override/implement the methods:
48
49 1. __init__
50 2. CreateJobs
51 3. TaskStarter
52
53 """
54
55 def __init__(self,settings):
56 self.sleep_time = 0
57
58 # this list stores the cluster/local jobs objects
59 self.functionJobs = []
60
61 # this object stores the configuration
62 self.settings = settings
63
64
65 def CreateJobs(self):
66 """
67 This method create an array of jobs called self.functionJobs. It is only
68 virtual in this base class and has to be implemented specifically for
69 each cluster task.
70 """
71 pass
72
73
74 def Submit(self):
75 """
76 After creation of jobs this function submits them to the cluster.
77 """
78 self.sid, self.jobids = submit_jobs(self.functionJobs)
79 #self.processedFunctionJobs = process_jobs(self.functionJobs)
80
81
82 def Restart(self,id):
83 pass
84
85
86 def collectResults(self):
87 pass
88
89
90 def CheckIfTaskFinished(self):
91 """
92 This function is responsible for checking whether the submitted jobs were
93 completed successfully.
94 """
95
96 #print 'checking whether jobs finished...'
97 #while not get_status(self.sid, self.jobids):
98 # time.sleep(7)
99 #print 'collecting jobs'
100 #retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
101
102 print 'checking whether finished'
103 while not get_status(self.sid, self.jobids):
104 time.sleep(10)
105
106 print 'collecting jobs'
107 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
108 print "ret fields AFTER execution on cluster"
109 for (i, job) in enumerate(retjobs):
110 print "Job #", i, "- ret: ", job.ret
111
112 print '--------------'
113
114 #print "ret fields AFTER execution on cluster"
115 #for (i, job) in enumerate(self.processedFunctionJobs):
116 # print "Job #", i, "- ret: ", job.ret
117
118 self.collectResults()
119
120
121
122 class ApproximationTask(ClusterTask):
123 """
124 This task represents the first step towards a valid QPalma dataset.
125 It starts an approximative QPalma model on the putative unspliced reads to
126 identify true spliced reads.
127 """
128
129 def __init__(self,settings):
130 ClusterTask.__init__(self,settings)
131
132
133 def CreateJobs(self):
134 """
135 Create...
136 """
137
138 num_splits = self.settings['num_splits']
139
140 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
141 #param_fname = jp(run_dir,'param_526.pickle')
142 param_fname = self.settings['prediction_parameter_fn']
143 #run_fname = jp(run_dir,'run_obj.pickle')
144 run_fname = self.settings['run_fn']
145
146 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
147 result_dir = self.settings['approximation_dir']
148
149 original_map_fname = self.settings['unspliced_reads_fn']
150 split_file(original_map_fname,result_dir,num_splits)
151
152 self.result_files = []
153
154 for idx in range(0,num_splits):
155 data_fname = jp(result_dir,'map.part_%d'%idx)
156 result_fname = jp(result_dir,'map.vm.part_%d.heuristic.spliced'%idx)
157 self.result_files.append(result_fname)
158
159 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
160 current_job.h_vmem = '25.0G'
161 #current_job.express = 'True'
162
163 print "job #1: ", current_job.nativeSpecification
164
165 self.functionJobs.append(current_job)
166
167
168 def collectResults(self):
169 result_dir = self.settings['approximation_dir']
170 combined_fn = jp(result_dir,'map.vm.spliced')
171 combine_files(self.result_files,combined_fn)
172 combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
173
174
175 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
176 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
177 ph1.filter()
178
179 return 'finished filtering set %s.' % data_fname
180
181
182 class PreprocessingTask(ClusterTask):
183 """
184 This class encapsules some...
185 """
186
187 def __init__(self):
188 ClusterTask.__init__(self)
189
190
191 class AlignmentTask(ClusterTask):
192 """
193 This task represents the main part of QPalma.
194 """
195
196 def __init__(self,settings):
197 ClusterTask.__init__(self,settings)
198
199
200 def CreateJobs(self):
201 """
202
203 """
204
205 num_splits = self.settings['num_splits']
206
207 jp = os.path.join
208
209 run = cPickle.load(open(self.settings['run_fn']))
210 run['name'] = 'saved_run'
211
212 param_fn = self.settings['prediction_parameter_fn']
213
214 run['result_dir'] = self.settings['prediction_dir']
215 dataset_fn = self.settings['prediction_dataset_fn']
216 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
217
218 prediction_keys = cPickle.load(open(prediction_keys_fn))
219
220 print 'Found %d keys for prediction.' % len(prediction_keys)
221
222 slices = get_slices(len(prediction_keys),num_splits)
223 chunks = []
224 for idx,slice in enumerate(slices):
225 #if idx != 0:
226 c_name = 'chunk_%d' % idx
227 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
228
229 for c_name,current_chunk in chunks:
230 current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,run,dataset_fn,current_chunk,param_fn,c_name])
231 current_job.h_vmem = '20.0G'
232 #current_job.express = 'True'
233
234 print "job #1: ", current_job.nativeSpecification
235
236 self.functionJobs.append(current_job)
237
238 sum = 0
239 for size in [len(elem) for name,elem in chunks]:
240 sum += size
241
242 print 'Got %d job(s)' % len(self.functionJobs)
243
244
245 def AlignmentTaskStarter(settings,run,dataset_fn,prediction_keys,param_fn,set_name):
246 """
247
248 """
249 accessWrapper = DataAccessWrapper(settings)
250 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
251 qp = QPalma(run,seqInfo)
252 qp.init_prediction(dataset_fn,prediction_keys,param_fn,set_name)
253 return 'finished prediction of set %s.' % set_name
254
255
256
257 class TrainingTask(ClusterTask):
258 """
259 This class represents the cluster task of training QPalma.
260 """
261
262 def __init__(self):
263 ClusterTask.__init__(self)
264
265
266 class PostprocessingTask(ClusterTask):
267 """
268 After QPalma predicted alignments this task postprocesses the data.
269 """
270
271 def __init__(self,settings):
272 ClusterTask.__init__(self,settings)
273
274
275 def CreateJobs(self):
276 run_dir = self.settings['prediction_dir']
277 self.result_dir = self.settings['alignment_dir']
278
279 chunks_fn = []
280 for fn in os.listdir(run_dir):
281 if fn.startswith('chunk'):
282 chunks_fn.append(fn)
283
284 functionJobs=[]
285
286 self.result_files = []
287 for chunk_fn in chunks_fn:
288 chunk_name = chunk_fn[:chunk_fn.find('.')]
289 result_fn = jp(self.result_dir,'%s.align'%chunk_name)
290 chunk_fn = jp(run_dir,chunk_fn)
291
292 self.result_files.append(result_fn)
293
294 current_job = KybJob(gridtools.PostProcessingTaskStarter,[chunk_fn,result_fn])
295 current_job.h_vmem = '15.0G'
296 current_job.express = 'True'
297
298 print "job #1: ", current_job.nativeSpecification
299
300 self.functionJobs.append(current_job)
301
302
303 def collectResults(self):
304 combined_fn = jp(self.result_dir,'all_alignments.align')
305 combine_files(self.result_files,combined_fn)
306
307
308 def PostProcessingTaskStarter(chunk_fn,result_fn):
309 create_alignment_file(chunk_fn,result_fn)
310
311
312
313 class DummyTask(ClusterTask):
314 """
315 This class represents a dummy task to make debugging easier.
316 """
317 def __init__(self):
318 ClusterTask.__init__(self)
319
320
321 def DummyTaskStarter(param):
322 create_alignment_file(chunk_fn,result_fn)
323
324
325 def CreateJobs(self):
326 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
327 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
328
329 for chunk_fn in chunks_fn:
330 chunk_name = chunk_fn[:chunk_fn.find('.')]
331 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
332 chunk_fn = jp(run_dir,chunk_fn)
333
334 current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
335 current_job.h_vmem = '15.0G'
336 current_job.express = 'True'
337
338 print "job #1: ", current_job.nativeSpecification