da809c34444a49286deedd498af05b8cb27a01c4
[qpalma.git] / qpalma / gridtools.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 2 of the License, or
7 # (at your option) any later version.
8 #
9 # Written (W) 2008 Fabio De Bona
10 # Copyright (C) 2008 Max-Planck-Society
11
12 import cPickle
13 import math
14 import os
15 import os.path
16 import pdb
17 import sys
18 import time
19
20 from threading import Thread
21
22 from pythongrid import KybJob, Usage
23 from pythongrid import process_jobs, submit_jobs, collect_jobs, get_status
24
25 from createAlignmentFileFromPrediction import create_alignment_file
26
27 import gridtools
28
29 jp = os.path.join
30
31
32 class ClusterTask(Thread):
33 """
34 This class..
35 """
36
37 def __init__(self):
38 self.sleep_time = 0
39
40 # this list stores the cluster/local jobs objects
41 self.functionJobs = []
42
43
44 def createJobs(self):
45 pass
46
47
48 def submit(self):
49 for current_job in self.functionJobs:
50 (sid, jobids) = submit_jobs([functionJobs])
51 time.sleep(self.sleep_time)
52
53
54 def checkIfTaskFinished(self):
55 pass
56
57
58
59 class ApproximationTask(ClusterTask):
60 """
61 """
62
63 def __init__(self):
64 ClusterTask.__init__(self)
65
66
67 def g_heuristic(run_fname,data_fname,param_fname,result_fname):
68 #print run_fname,data_fname,param_fname,result_fname
69 ph1 = PipelineHeuristic(run_fname,data_fname,param_fname,result_fname)
70 ph1.filter()
71
72 return 'finished filtering set %s.' % data_fname
73
74 def createJobs(self):
75 num_splits = 25
76
77 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/run_enable_quality_scores_+_enable_splice_signals_+_enable_intron_length_+'
78 #data_dir = '/fml/ag-raetsch/home/fabio/tmp/lyrata_analysis/'
79
80 data_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main'
81
82 run_fname = jp(run_dir,'run_obj.pickle')
83
84 #original_map_fname = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/main/map.vm'
85 #split_file(original_map_fname,data_dir,num_splits)
86
87 param_fname = jp(run_dir,'param_526.pickle')
88
89 functionJobs=[]
90
91 for idx in range(0,num_splits):
92 data_fname = jp(data_dir,'map.part_%d'%idx)
93 result_fname = jp(data_dir,'map.vm.part_%d.heuristic'%idx)
94
95 current_job = KybJob(grid_heuristic.g_heuristic,[run_fname,data_fname,param_fname,result_fname])
96 current_job.h_vmem = '25.0G'
97 #current_job.express = 'True'
98
99 print "job #1: ", current_job.nativeSpecification
100
101 self.functionJobs.append(current_job)
102
103
104
105
106 class PreprocessingTask(ClusterTask):
107 """
108 This class encapsules some...
109 """
110
111 def __init__(self):
112 ClusterTask.__init__(self)
113
114
115 class AlignmentTask(ClusterTask):
116
117 def __init__(self):
118 ClusterTask.__init__(self)
119
120
121 def get_slices(dataset_size,num_nodes):
122 all_instances = []
123
124 part = dataset_size / num_nodes
125 begin = 0
126 end = 0
127 for idx in range(1,num_nodes+1):
128
129 if idx == num_nodes:
130 begin = end
131 end = dataset_size
132 else:
133 begin = end
134 end = begin+part
135
136 params = (begin,end)
137
138 all_instances.append(params)
139
140 return all_instances
141
142
143 def makeJobs(run,dataset_fn,chunks,param):
144 """
145 """
146
147 jobs=[]
148
149 for c_name,current_chunk in chunks:
150 current_job = KybJob(grid_predict.g_predict,[run,dataset_fn,current_chunk,param,c_name])
151 current_job.h_vmem = '20.0G'
152 #current_job.express = 'True'
153
154 print "job #1: ", current_job.nativeSpecification
155
156 jobs.append(current_job)
157
158 return jobs
159
160
161 def create_and_submit():
162 """
163
164 """
165
166 jp = os.path.join
167
168 run_dir = '/fml/ag-raetsch/home/fabio/tmp/newest_run/alignment/saved_run'
169
170 run = cPickle.load(open(jp(run_dir,'run_obj.pickle')))
171 run['name'] = 'saved_run'
172
173 param = cPickle.load(open(jp(run_dir,'param_526.pickle')))
174
175 run['result_dir'] = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
176 dataset_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.pickle'
177 prediction_keys_fn = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/dataset/dataset_run_1.pickle.keys.pickle'
178
179 prediction_keys = cPickle.load(open(prediction_keys_fn))
180
181 print 'Found %d keys for prediction.' % len(prediction_keys)
182
183 num_splits = 25
184 #num_splits = 1
185 slices = get_slices(len(prediction_keys),num_splits)
186 chunks = []
187 for idx,slice in enumerate(slices):
188 #if idx != 0:
189 c_name = 'chunk_%d' % idx
190 chunks.append((c_name,prediction_keys[slice[0]:slice[1]]))
191
192 self.functionJobs = makeJobs(run,dataset_fn,chunks,param)
193
194 sum = 0
195 for size in [len(elem) for name,elem in chunks]:
196 sum += size
197
198 print 'Got %d job(s)' % len(functionJobs)
199
200
201 def g_predict(run,dataset_fn,prediction_keys,param,set_name):
202 """
203
204 """
205
206 qp = QPalma()
207 qp.predict(run,dataset_fn,prediction_keys,param,set_name)
208
209 return 'finished prediction of set %s.' % set_name
210
211
212
213 class TrainingTask(ClusterTask):
214
215 def __init__(self):
216 ClusterTask.__init__(self)
217
218
219 class PostprocessingTask(ClusterTask):
220 """
221
222 """
223
224 def __init__(self):
225 ClusterTask.__init__(self)
226
227
228 def g_alignment(chunk_fn,result_fn):
229 create_alignment_file(chunk_fn,result_fn)
230
231
232 def createJobs(self):
233 run_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/prediction'
234 result_dir = '/fml/ag-raetsch/home/fabio/tmp/vmatch_evaluation/spliced_1/alignment'
235
236 chunks_fn = []
237 for fn in os.listdir(run_dir):
238 if fn.startswith('chunk'):
239 chunks_fn.append(fn)
240
241 print chunks_fn
242
243 functionJobs=[]
244
245 for chunk_fn in chunks_fn:
246 chunk_name = chunk_fn[:chunk_fn.find('.')]
247 result_fn = jp(result_dir,'%s.align_remap'%chunk_name)
248 chunk_fn = jp(run_dir,chunk_fn)
249
250 current_job = KybJob(grid_alignment.g_alignment,[chunk_fn,result_fn])
251 current_job.h_vmem = '15.0G'
252 current_job.express = 'True'
253
254 print "job #1: ", current_job.nativeSpecification