+ added settings parser module
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 import gridtools
28
29 from utils import get_slices
30
31 jp = os.path.join
32
33
34 class ClusterTask(Thread):
35 """
36 This class..
37
38 Every task creates a status file.
39 All cluster jobs submit then their exit status to this status file.
40
41 Every cluster task subclass should override/implement the methods:
42
43 1. __init__
44 2. CreateJobs
45 3. TaskStarter
46
47 """
48
49 def __init__(self,global_settings):
50 self.sleep_time = 0
51
52 # this list stores the cluster/local jobs objects
53 self.functionJobs = []
54
55 # this object stores the configuration
56 self.global_settings = global_settings
57
58
59 def CreateJobs(self):
60 """
61 This method create an array of jobs called self.functionJobs. It is only
62 virtual in this base class and has to be implemented specifically for
63 each cluster task.
64 """
65 pass
66
67
68 def Submit(self):
69 """
70 After creation of jobs this function submits them to the cluster.
71 """
72 for current_job in self.functionJobs:
73 self.sid, self.jobids = submit_jobs([functionJobs])
74
75
76 def Restart(self,id):
77 pass
78
79
80 def CheckIfTaskFinished(self):
81 """
82 This function is responsible for checking whether the submitted jobs were
83 completed successfully.
84 """
85
86 print 'checking whether finished'
87 while not get_status(self.sid, self.jobids):
88 time.sleep(7)
89 print 'collecting jobs'
90 retjobs = collect_jobs(sid, jobids, myjobs)
91 print "ret fields AFTER execution on cluster"
92 for (i, job) in enumerate(retjobs):
93 print "Job #", i, "- ret: ", job.ret
94
95 print '--------------'
96
97
98
99 class ApproximationTask(ClusterTask):
100 """
101 This task represents the first step towards a valid QPalma dataset.
102 It starts an approximative QPalma model on the putative unspliced reads to
103 identify true spliced reads.
104 """
105
106 def __init__(self):
107 ClusterTask.__init__(self)
108
109
110 def CreateJobs(self):
111 """
112 Create...
113 """
114
115 num_splits = self.global_settings['num_splits']
116
117 #run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
118 #param_fname = jp(run_dir,'param_526.pickle')
119 param_fname = self.global_settings['prediction_parameter_fn']
120 #run_fname = jp(run_dir,'run_obj.pickle')
121 run_fname = self.global_settings['run_fn']
122
123 #result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
124 result_dir = self.global_settings['approximation_dir']
125
126 original_map_fname = self.global_settings['read_ascii_data_fn']
127 split_file(original_map_fname,result_dir,num_splits)
128
129 for idx in range(0,num_splits):
130 data_fname = jp(result_dir,'map.part_%d'%idx)
131 result_fname = jp(result_dir,'map.vm.part_%d.heuristic'%idx)
132
133 current_job = KybJob(grid_heuristic.TaskStarter,[run_fname,data_fname,param_fname,result_fname])
134 current_job.h_vmem = '25.0G'
135 #current_job.express = 'True'
136
137 print "job #1: ", current_job.nativeSpecification
138
139 self.functionJobs.append(current_job)
140
141
142 def TaskStarter(run_fname,data_fname,param_fname,result_fname):
143 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
144 ph1.filter()
145
146 return 'finished filtering set %s.' % data_fname
147
148
149 class PreprocessingTask(ClusterTask):
150 """
151 This class encapsules some...
152 """
153
154 def __init__(self):
155 ClusterTask.__init__(self)
156
157
158 class AlignmentTask(ClusterTask):
159
160 def __init__(self):
161 ClusterTask.__init__(self)
162
163
164 def CreateJobs():
165 """
166
167 """
168
169 num_splits = self.global_settings['num_splits']
170
171 jp = os.path.join
172
173 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
174
175 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
176 run['name'] = 'saved_run'
177
178 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
179
180 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
181 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
182 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
183
184 prediction_keys = cPickle.load(open(prediction_keys_fn))
185
186 print 'Found %d keys for prediction.' % len(prediction_keys)
187
188 slices = get_slices(len(prediction_keys),num_splits)
189 chunks = []
190 for idx,slice in enumerate(slices):
191 #if idx != 0:
192 c_name = 'chunk_%d' % idx
193 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
194
195 for c_name,current_chunk in chunks:
196 current_job = KybJob(grid_predict.TaskStarter,[run,dataset_fn,current_chunk,param,c_name])
197 current_job.h_vmem = '20.0G'
198 #current_job.express = 'True'
199
200 print "job #1: ", current_job.nativeSpecification
201
202 self.functionJobs.append(current_job)
203
204 sum = 0
205 for size in [len(elem) for name,elem in chunks]:
206 sum += size
207
208 print 'Got %d job(s)' % len(functionJobs)
209
210
211 def TaskStarter(run,dataset_fn,prediction_keys,param,set_name):
212 """
213
214 """
215
216 qp = QPalma()
217 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
218
219 return 'finished prediction of set %s.' % set_name
220
221
222
223 class TrainingTask(ClusterTask):
224 """
225 This class represents the cluster task of training QPalma.
226 """
227
228 def __init__(self):
229 ClusterTask.__init__(self)
230
231
232 class PostprocessingTask(ClusterTask):
233 """
234 After QPalma predicted alignments this task postprocesses the data.
235 """
236
237 def __init__(self):
238 ClusterTask.__init__(self)
239
240
241 def TaskStarter(chunk_fn,result_fn):
242 create_alignment_file(chunk_fn,result_fn)
243
244
245 def CreateJobs(self):
246 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
247 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
248
249 chunks_fn = []
250 for fn in os.listdir(run_dir):
251 if fn.startswith('chunk'):
252 chunks_fn.append(fn)
253
254 print chunks_fn
255
256 functionJobs=[]
257
258 for chunk_fn in chunks_fn:
259 chunk_name = chunk_fn[:chunk_fn.find('.')]
260 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
261 chunk_fn = jp(run_dir,chunk_fn)
262
263 current_job = KybJob(grid_alignment.TaskStarter,[chunk_fn,result_fn])
264 current_job.h_vmem = '15.0G'
265 current_job.express = 'True'
266
267 print "job #1: ", current_job.nativeSpecification
268
269
270 class DummyTask(ClusterTask):
271 """
272 This class represents a dummy task to make debugging easier.
273 """
274 def __init__(self):
275 ClusterTask.__init__(self)
276
277
278 def TaskStarter(param):
279 create_alignment_file(chunk_fn,result_fn)
280
281
282 def CreateJobs(self):
283 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
284 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
285
286 for chunk_fn in chunks_fn:
287 chunk_name = chunk_fn[:chunk_fn.find('.')]
288 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
289 chunk_fn = jp(run_dir,chunk_fn)
290
291 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
292 current_job.h_vmem = '15.0G'
293 current_job.express = 'True'
294
295 print "job #1: ", current_job.nativeSpecification