+ added some text and references to the documentation
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 from PipelineHeuristic import *
28
29 import gridtools
30
31 from utils import get_slices,split_file
32
33 jp = os.path.join
34
35
36 class ClusterTask(Thread):
37 """
38 This class..
39
40 Every task creates a status file.
41 All cluster jobs submit then their exit status to this status file.
42
43 Every cluster task subclass should override/implement the methods:
44
45 1. __init__
46 2. CreateJobs
47 3. TaskStarter
48
49 """
50
51 def __init__(self,settings):
52 self.sleep_time = 0
53
54 # this list stores the cluster/local jobs objects
55 self.functionJobs = []
56
57 # this object stores the configuration
58 self.settings = settings
59
60
61 def CreateJobs(self):
62 """
63 This method create an array of jobs called self.functionJobs. It is only
64 virtual in this base class and has to be implemented specifically for
65 each cluster task.
66 """
67 pass
68
69
70 def Submit(self):
71 """
72 After creation of jobs this function submits them to the cluster.
73 """
74 for current_job in self.functionJobs:
75 self.sid, self.jobids = submit_jobs([current_job])
76
77
78 def Restart(self,id):
79 pass
80
81
82 def collectResults(self):
83 pass
84
85
86 def CheckIfTaskFinished(self):
87 """
88 This function is responsible for checking whether the submitted jobs were
89 completed successfully.
90 """
91
92 print 'checking whether finished'
93 while not get_status(self.sid, self.jobids):
94 time.sleep(7)
95 print 'collecting jobs'
96 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
97
98 #print "ret fields AFTER execution on cluster"
99 #for (i, job) in enumerate(retjobs):
100 # print "Job #", i, "- ret: ", job.ret
101 #print '--------------'
102 self.collectResults()
103
104
105
106 class ApproximationTask(ClusterTask):
107 """
108 This task represents the first step towards a valid QPalma dataset.
109 It starts an approximative QPalma model on the putative unspliced reads to
110 identify true spliced reads.
111 """
112
113 def __init__(self,settings):
114 ClusterTask.__init__(self,settings)
115
116
117 def CreateJobs(self):
118 """
119 Create...
120 """
121
122 num_splits = self.settings['num_splits']
123
124 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
125 #param_fname = jp(run_dir,'param_526.pickle')
126 param_fname = self.settings['prediction_parameter_fn']
127 #run_fname = jp(run_dir,'run_obj.pickle')
128 run_fname = self.settings['run_fn']
129
130 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
131 result_dir = self.settings['approximation_dir']
132
133 original_map_fname = self.settings['unspliced_reads_fn']
134 split_file(original_map_fname,result_dir,num_splits)
135
136 self.result_files = []
137
138 for idx in range(0,num_splits):
139 data_fname = jp(result_dir,'map.part_%d'%idx)
140 result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
141 self.result_files.append(result_fname)
142
143 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname,self.settings])
144 current_job.h_vmem = '25.0G'
145 #current_job.express = 'True'
146
147 print "job #1: ", current_job.nativeSpecification
148
149 self.functionJobs.append(current_job)
150
151
152 def collectResults(self):
153 result_dir = self.settings['approximation_dir']
154 combined_fn = jp(result_dir,'map.vm.spliced')
155 combine_files(self.result_files,combined_fn)
156 combine_files([combined_fn,settings['spliced_reads_fn']],'map.vm')
157
158
159 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname,settings):
160 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname,settings)
161 ph1.filter()
162
163 return 'finished filtering set %s.' % data_fname
164
165
166 class PreprocessingTask(ClusterTask):
167 """
168 This class encapsules some...
169 """
170
171 def __init__(self):
172 ClusterTask.__init__(self)
173
174
175 class AlignmentTask(ClusterTask):
176 """
177 This task represents the main part of QPalma.
178 """
179
180 def __init__(self,settings):
181 ClusterTask.__init__(self,settings)
182
183
184 def CreateJobs(self):
185 """
186
187 """
188
189 num_splits = self.settings['num_splits']
190
191 jp = os.path.join
192
193 run = cPickle.load(open(self.settings['run_fn']))
194 run['name'] = 'saved_run'
195
196 param = self.settings['prediction_parameter_fn']
197
198 run['result_dir'] = self.settings['prediction_dir']
199 dataset_fn = self.settings['prediction_dataset_fn']
200 prediction_keys_fn = self.settings['prediction_dataset_keys_fn']
201
202 prediction_keys = cPickle.load(open(prediction_keys_fn))
203
204 print 'Found %d keys for prediction.' % len(prediction_keys)
205
206 slices = get_slices(len(prediction_keys),num_splits)
207 chunks = []
208 for idx,slice in enumerate(slices):
209 #if idx != 0:
210 c_name = 'chunk_%d' % idx
211 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
212
213 for c_name,current_chunk in chunks:
214 current_job = KybJob(gridtools.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
215 current_job.h_vmem = '20.0G'
216 #current_job.express = 'True'
217
218 print "job #1: ", current_job.nativeSpecification
219
220 self.functionJobs.append(current_job)
221
222 sum = 0
223 for size in [len(elem) for name,elem in chunks]:
224 sum += size
225
226 print 'Got %d job(s)' % len(self.functionJobs)
227
228
229 def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
230 """
231
232 """
233
234 qp = QPalma()
235 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
236
237 return 'finished prediction of set %s.' % set_name
238
239
240
241 class TrainingTask(ClusterTask):
242 """
243 This class represents the cluster task of training QPalma.
244 """
245
246 def __init__(self):
247 ClusterTask.__init__(self)
248
249
250 class PostprocessingTask(ClusterTask):
251 """
252 After QPalma predicted alignments this task postprocesses the data.
253 """
254
255 def __init__(self):
256 ClusterTask.__init__(self)
257
258
259 def CreateJobs(self):
260 run_dir = self.settings['prediction_dir']
261 self.result_dir = self.settings['alignment_dir']
262
263 chunks_fn = []
264 for fn in os.listdir(run_dir):
265 if fn.startswith('chunk'):
266 chunks_fn.append(fn)
267
268 functionJobs=[]
269
270 self.result_files = []
271 for chunk_fn in chunks_fn:
272 chunk_name = chunk_fn[:chunk_fn.find('.')]
273 result_fn = jp(self.result_dir,'%s.align'%chunk_name)
274 chunk_fn = jp(run_dir,chunk_fn)
275
276 self.result_files.append(result_fn)
277
278 current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
279 current_job.h_vmem = '15.0G'
280 current_job.express = 'True'
281
282 print "job #1: ", current_job.nativeSpecification
283
284
285 def collectResults(self):
286 combined_fn = jp(self.result_dir,'all_alignments.align')
287 combine_files(self.result_files,combined_fn)
288
289
290 def TaskStarter(chunk_fn,result_fn):
291 create_alignment_file(chunk_fn,result_fn)
292
293
294
295 class DummyTask(ClusterTask):
296 """
297 This class represents a dummy task to make debugging easier.
298 """
299 def __init__(self):
300 ClusterTask.__init__(self)
301
302
303 def TaskStarter(param):
304 create_alignment_file(chunk_fn,result_fn)
305
306
307 def CreateJobs(self):
308 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
309 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
310
311 for chunk_fn in chunks_fn:
312 chunk_name = chunk_fn[:chunk_fn.find('.')]
313 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
314 chunk_fn = jp(run_dir,chunk_fn)
315
316 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
317 current_job.h_vmem = '15.0G'
318 current_job.express = 'True'
319
320 print "job #1: ", current_job.nativeSpecification