+ combined two scripts that were responsible for generating the output format
[qpalma.git] / qpalma / gridtools.py
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
5 #
6 # Written (W) 2008 Fabio De Bona
7 # Copyright (C) 2008 Max-Planck-Society
8
9 import cPickle
10 import math
11 import os
12 import os.path
13 import pdb
14 import sys
15 import time
16
17 from threading import Thread
18
19 from pythongrid import KybJob, Usage
20 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
21
22 from qpalma.OutputFormat import createAlignmentOutput
23
24 from PipelineHeuristic import *
25
26 import gridtools
27
28 from utils import get_slices,split_file,combine_files
29
30 from qpalma.sequence_utils import SeqSpliceInfo,DataAccessWrapper
31
32 from qpalma_main import QPalma
33
34 jp = os.path.join
35
36
37 class ClusterTask(Thread):
38 """
39 This class..
40
41 Every task creates a status file.
42 All cluster jobs submit then their exit status to this status file.
43
44 Every cluster task subclass should override/implement the methods:
45
46 1. __init__
47 2. CreateJobs
48 3. TaskStarter
49
50 """
51
52 def __init__(self,settings):
53 self.sleep_time = 0
54
55 # this list stores the cluster/local jobs objects
56 self.functionJobs = []
57
58 # this object stores the configuration
59 self.settings = settings
60
61
62 def CreateJobs(self):
63 """
64 This method create an array of jobs called self.functionJobs. It is only
65 virtual in this base class and has to be implemented specifically for
66 each cluster task.
67 """
68 pass
69
70
71 def Submit(self):
72 """
73 After creation of jobs this function submits them to the cluster.
74 """
75 self.sid, self.jobids = submit_jobs(self.functionJobs)
76 #self.processedFunctionJobs = process_jobs(self.functionJobs)
77
78
79 def Restart(self,id):
80 pass
81
82
83 def collectResults(self):
84 pass
85
86
87 def CheckIfTaskFinished(self):
88 """
89 This function is responsible for checking whether the submitted jobs were
90 completed successfully.
91 """
92
93 #print 'checking whether jobs finished...'
94 #while not get_status(self.sid, self.jobids):
95 # time.sleep(7)
96 #print 'collecting jobs'
97 #retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
98
99 print 'checking whether finished'
100 while not get_status(self.sid, self.jobids):
101 time.sleep(10)
102
103 print 'collecting jobs'
104 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
105 print "ret fields AFTER execution on cluster"
106 for (i, job) in enumerate(retjobs):
107 print "Job #", i, "- ret: ", job.ret
108
109 print '--------------'
110
111 #print "ret fields AFTER execution on cluster"
112 #for (i, job) in enumerate(self.processedFunctionJobs):
113 # print "Job #", i, "- ret: ", job.ret
114
115 self.collectResults()
116
117
118
119 class ApproximationTask(ClusterTask):
120 """
121 This task represents the first step towards a valid QPalma dataset.
122 It starts an approximative QPalma model on the putative unspliced reads to
123 identify true spliced reads.
124 """
125
126 def __init__(self,settings):
127 ClusterTask.__init__(self,settings)
128
129
130 def CreateJobs(self):
131 """
132 Create...
133 """
134
135 num_splits = self.settings['num_splits']
136
137 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
138 #param_fname = jp(run_dir,'param_526.pickle')
139 param_fname = self.settings['prediction_parameter_fn']
140 #run_fname = jp(run_dir,'run_obj.pickle')
141 run_fname = self.settings['run_fn']
142
143 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
144 result_dir = self.settings['approximation_dir']
145
146 original_map_fname = self.settings['unspliced_reads_fn']
147 split_file(original_map_fname,result_dir,num_splits)
148
149 self.result_files = []
150
151 for idx in range(0,num_splits):
152 data_fname = jp(result_dir,'map.part_%d'%idx)
153 result_fname = jp(result_dir,'map.vm.part_%d.heuristic.spliced'%idx)
154 self.result_files.append(result_fname)
155
156 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
157 current_job.h_vmem = '25.0G'
158 #current_job.express = 'True'
159
160 print "job #1: ", current_job.nativeSpecification
161
162 self.functionJobs.append(current_job)
163
164
165 def collectResults(self):
166 result_dir = self.settings['approximation_dir']
167 combined_fn = jp(result_dir,'map.vm.spliced')
168 combine_files(self.result_files,combined_fn)
169 combine_files([combined_fn,self.settings['spliced_reads_fn']],'map.vm')
170
171
172 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
173 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
174 ph1.filter()
175
176 return 'finished filtering set %s.' % data_fname
177
178
179 class PreprocessingTask(ClusterTask):
180 """
181 This class encapsules some...
182 """
183
184 def __init__(self):
185 ClusterTask.__init__(self)
186
187
188 class AlignmentTask(ClusterTask):
189 """
190 This task represents the main part of QPalma.
191 """
192
193 def __init__(self,settings):
194 ClusterTask.__init__(self,settings)
195
196
197 def CreateJobs(self):
198 """
199
200 """
201
202 num_splits = self.settings['num_splits']
203
204 jp = os.path.join
205
206 run = cPickle.load(open(self.settings['run_fn']))
207 run['name'] = 'saved_run'
208
209 param_fn = self.settings['prediction_parameter_fn']
210
211 run['result_dir'] = self.settings['prediction_dir']
212 dataset_fn = self.settings['prediction_dataset_fn']
213 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
214
215 prediction_keys = cPickle.load(open(prediction_keys_fn))
216
217 print 'Found %d keys for prediction.' % len(prediction_keys)
218
219 slices = get_slices(len(prediction_keys),num_splits)
220 chunks = []
221 for idx,slice in enumerate(slices):
222 #if idx != 0:
223 c_name = 'chunk_%d' % idx
224 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
225
226 for c_name,current_chunk in chunks:
227 current_job = KybJob(gridtools.AlignmentTaskStarter,[self.settings,run,dataset_fn,current_chunk,param_fn,c_name])
228 current_job.h_vmem = '20.0G'
229 #current_job.express = 'True'
230
231 print "job #1: ", current_job.nativeSpecification
232
233 self.functionJobs.append(current_job)
234
235 sum = 0
236 for size in [len(elem) for name,elem in chunks]:
237 sum += size
238
239 print 'Got %d job(s)' % len(self.functionJobs)
240
241
242 def AlignmentTaskStarter(settings,run,dataset_fn,prediction_keys,param_fn,set_name):
243 """
244
245 """
246 accessWrapper = DataAccessWrapper(settings)
247 seqInfo = SeqSpliceInfo(accessWrapper,settings['allowed_fragments'])
248 qp = QPalma(run,seqInfo)
249 qp.init_prediction(dataset_fn,prediction_keys,param_fn,set_name)
250 return 'finished prediction of set %s.' % set_name
251
252
253
254 class TrainingTask(ClusterTask):
255 """
256 This class represents the cluster task of training QPalma.
257 """
258
259 def __init__(self):
260 ClusterTask.__init__(self)
261
262
263 class PostprocessingTask(ClusterTask):
264 """
265 After QPalma predicted alignments this task postprocesses the data.
266 """
267
268 def __init__(self,settings):
269 ClusterTask.__init__(self,settings)
270
271
272 def CreateJobs(self):
273 run_dir = self.settings['prediction_dir']
274 self.result_dir = self.settings['alignment_dir']
275
276 chunks_fn = []
277 for fn in os.listdir(run_dir):
278 if fn.startswith('chunk'):
279 chunks_fn.append(fn)
280
281 functionJobs=[]
282
283 self.result_files = []
284 for chunk_fn in chunks_fn:
285 chunk_name = chunk_fn[:chunk_fn.find('.')]
286 result_fn = jp(self.result_dir,'%s.align'%chunk_name)
287 chunk_fn = jp(run_dir,chunk_fn)
288
289 self.result_files.append(result_fn)
290
291 current_job = KybJob(gridtools.PostProcessingTaskStarter,[self.settings,chunk_fn,result_fn])
292 current_job.h_vmem = '15.0G'
293 current_job.express = 'True'
294
295 print "job #1: ", current_job.nativeSpecification
296
297 self.functionJobs.append(current_job)
298
299
300 def collectResults(self):
301 combined_fn = jp(self.result_dir,'all_alignments.align')
302 combine_files(self.result_files,combined_fn)
303
304
305 def PostProcessingTaskStarter(settings,chunk_fn,result_fn):
306 createAlignmentOutput(settings,chunk_fn,result_fn)
307
308
309
310 class DummyTask(ClusterTask):
311 """
312 This class represents a dummy task to make debugging easier.
313 """
314 def __init__(self):
315 ClusterTask.__init__(self)
316
317
318 def DummyTaskStarter(param):
319 create_alignment_file(chunk_fn,result_fn)
320
321
322 def CreateJobs(self):
323 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
324 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
325
326 for chunk_fn in chunks_fn:
327 chunk_name = chunk_fn[:chunk_fn.find('.')]
328 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
329 chunk_fn = jp(run_dir,chunk_fn)
330
331 current_job = KybJob(grid_alignment.DummyTaskStarter,[chunk_fn,result_fn])
332 current_job.h_vmem = '15.0G'
333 current_job.express = 'True'
334
335 print "job #1: ", current_job.nativeSpecification