+ got rid of minor bugs
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 from PipelineHeuristic import *
28
29 import gridtools
30
31 from utils import get_slices,split_file
32
33 jp = os.path.join
34
35
36 class ClusterTask(Thread):
37 """
38 This class..
39
40 Every task creates a status file.
41 All cluster jobs submit then their exit status to this status file.
42
43 Every cluster task subclass should override/implement the methods:
44
45 1. __init__
46 2. CreateJobs
47 3. TaskStarter
48
49 """
50
51 def __init__(self,global_settings):
52 self.sleep_time = 0
53
54 # this list stores the cluster/local jobs objects
55 self.functionJobs = []
56
57 # this object stores the configuration
58 self.global_settings = global_settings
59
60
61 def CreateJobs(self):
62 """
63 This method create an array of jobs called self.functionJobs. It is only
64 virtual in this base class and has to be implemented specifically for
65 each cluster task.
66 """
67 pass
68
69
70 def Submit(self):
71 """
72 After creation of jobs this function submits them to the cluster.
73 """
74 for current_job in self.functionJobs:
75 self.sid, self.jobids = submit_jobs([current_job])
76
77
78 def Restart(self,id):
79 pass
80
81
82 def CheckIfTaskFinished(self):
83 """
84 This function is responsible for checking whether the submitted jobs were
85 completed successfully.
86 """
87
88 print 'checking whether finished'
89 while not get_status(self.sid, self.jobids):
90 time.sleep(7)
91 print 'collecting jobs'
92 retjobs = collect_jobs(self.sid, self.jobids, self.functionJobs)
93
94 #print "ret fields AFTER execution on cluster"
95 #for (i, job) in enumerate(retjobs):
96 # print "Job #", i, "- ret: ", job.ret
97 #print '--------------'
98
99
100
101 class ApproximationTask(ClusterTask):
102 """
103 This task represents the first step towards a valid QPalma dataset.
104 It starts an approximative QPalma model on the putative unspliced reads to
105 identify true spliced reads.
106 """
107
108 def __init__(self,settings):
109 ClusterTask.__init__(self,settings)
110
111
112 def CreateJobs(self):
113 """
114 Create...
115 """
116
117 num_splits = self.global_settings['num_splits']
118
119 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
120 #param_fname = jp(run_dir,'param_526.pickle')
121 param_fname = self.global_settings['prediction_parameter_fn']
122 #run_fname = jp(run_dir,'run_obj.pickle')
123 run_fname = self.global_settings['run_fn']
124
125 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
126 result_dir = self.global_settings['approximation_dir']
127
128 original_map_fname = self.global_settings['read_ascii_data_fn']
129 split_file(original_map_fname,result_dir,num_splits)
130
131 for idx in range(0,num_splits):
132 data_fname = jp(result_dir,'map.part_%d'%idx)
133 result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
134
135 current_job = KybJob(gridtools.ApproximationTaskStarter,[run_fname,data_fname,param_fname,result_fname])
136 current_job.h_vmem = '25.0G'
137 #current_job.express = 'True'
138
139 print "job #1: ", current_job.nativeSpecification
140
141 self.functionJobs.append(current_job)
142
143
144 def ApproximationTaskStarter(run_fname,data_fname,param_fname,result_fname):
145 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
146 ph1.filter()
147
148 return 'finished filtering set %s.' % data_fname
149
150
151 class PreprocessingTask(ClusterTask):
152 """
153 This class encapsules some...
154 """
155
156 def __init__(self):
157 ClusterTask.__init__(self)
158
159
160 class AlignmentTask(ClusterTask):
161
162 def __init__(self):
163 ClusterTask.__init__(self)
164
165
166 def CreateJobs():
167 """
168
169 """
170
171 num_splits = self.global_settings['num_splits']
172
173 jp = os.path.join
174
175 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
176
177 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
178 run['name'] = 'saved_run'
179
180 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
181
182 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
183 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
184 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
185
186 prediction_keys = cPickle.load(open(prediction_keys_fn))
187
188 print 'Found %d keys for prediction.' % len(prediction_keys)
189
190 slices = get_slices(len(prediction_keys),num_splits)
191 chunks = []
192 for idx,slice in enumerate(slices):
193 #if idx != 0:
194 c_name = 'chunk_%d' % idx
195 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
196
197 for c_name,current_chunk in chunks:
198 current_job = KybJob(grid_predict.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
199 current_job.h_vmem = '20.0G'
200 #current_job.express = 'True'
201
202 print "job #1: ", current_job.nativeSpecification
203
204 self.functionJobs.append(current_job)
205
206 sum = 0
207 for size in [len(elem) for name,elem in chunks]:
208 sum += size
209
210 print 'Got %d job(s)' % len(functionJobs)
211
212
213 def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
214 """
215
216 """
217
218 qp = QPalma()
219 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
220
221 return 'finished prediction of set %s.' % set_name
222
223
224
225 class TrainingTask(ClusterTask):
226 """
227 This class represents the cluster task of training QPalma.
228 """
229
230 def __init__(self):
231 ClusterTask.__init__(self)
232
233
234 class PostprocessingTask(ClusterTask):
235 """
236 After QPalma predicted alignments this task postprocesses the data.
237 """
238
239 def __init__(self):
240 ClusterTask.__init__(self)
241
242
243 def TaskStarter(chunk_fn,result_fn):
244 create_alignment_file(chunk_fn,result_fn)
245
246
247 def CreateJobs(self):
248 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
249 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
250
251 chunks_fn = []
252 for fn in os.listdir(run_dir):
253 if fn.startswith('chunk'):
254 chunks_fn.append(fn)
255
256 print chunks_fn
257
258 functionJobs=[]
259
260 for chunk_fn in chunks_fn:
261 chunk_name = chunk_fn[:chunk_fn.find('.')]
262 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
263 chunk_fn = jp(run_dir,chunk_fn)
264
265 current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
266 current_job.h_vmem = '15.0G'
267 current_job.express = 'True'
268
269 print "job #1: ", current_job.nativeSpecification
270
271
272 class DummyTask(ClusterTask):
273 """
274 This class represents a dummy task to make debugging easier.
275 """
276 def __init__(self):
277 ClusterTask.__init__(self)
278
279
280 def TaskStarter(param):
281 create_alignment_file(chunk_fn,result_fn)
282
283
284 def CreateJobs(self):
285 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
286 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
287
288 for chunk_fn in chunks_fn:
289 chunk_name = chunk_fn[:chunk_fn.find('.')]
290 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
291 chunk_fn = jp(run_dir,chunk_fn)
292
293 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
294 current_job.h_vmem = '15.0G'
295 current_job.express = 'True'
296
297 print "job #1: ", current_job.nativeSpecification