##// END OF EJS Templates
Block publishing when input queues are full to avoid data loss
jespinoza -
r1256:7ed2e4983ab9
parent child
Show More
@@ -1,237 +1,236
1 1 import click
2 2 import schainpy
3 3 import subprocess
4 4 import os
5 5 import sys
6 6 import glob
7 7 save_stdout = sys.stdout
8 8 sys.stdout = open('/dev/null', 'w')
9 9 from multiprocessing import cpu_count
10 10 from schainpy.controller import Project
11 11 from schainpy.model import Operation, ProcessingUnit
12 12 from schainpy.utils import log
13 13 from importlib import import_module
14 14 from pydoc import locate
15 15 from fuzzywuzzy import process
16 16 from schainpy.cli import templates
17 17 import inspect
18 18 try:
19 19 from queue import Queue
20 20 except:
21 21 from Queue import Queue
22 22 sys.stdout = save_stdout
23 23
24 24
25 25 def getProcs():
26 26 modules = dir(schainpy.model)
27 27 procs = check_module(modules, 'processing')
28 28 try:
29 29 procs.remove('ProcessingUnit')
30 30 except Exception as e:
31 31 pass
32 32 return procs
33 33
34 34 def getOperations():
35 35 module = dir(schainpy.model)
36 36 noProcs = [x for x in module if not x.endswith('Proc')]
37 37 operations = check_module(noProcs, 'operation')
38 38 try:
39 39 operations.remove('Operation')
40 40 operations.remove('Figure')
41 41 operations.remove('Plot')
42 42 except Exception as e:
43 43 pass
44 44 return operations
45 45
46 46 def getArgs(op):
47 47 module = locate('schainpy.model.{}'.format(op))
48 48
49 49 if hasattr(module, '__attrs__'):
50 50 args = module.__attrs__
51 51 else:
52 52 args = inspect.getargspec(module.run).args
53 53 try:
54 54 args.remove('self')
55 55 except Exception as e:
56 56 pass
57 57 try:
58 58 args.remove('dataOut')
59 59 except Exception as e:
60 60 pass
61 61 return args
62 62
63 63 def getDoc(obj):
64 64 module = locate('schainpy.model.{}'.format(obj))
65 65 try:
66 obj = module(1,2,3,Queue(),5)
66 obj = module(1,2,3,Queue(),5,6)
67 67 except:
68 68 obj = module()
69 69 return obj.__doc__
70 70
71 71 def getAll():
72 72 modules = getOperations()
73 73 modules.extend(getProcs())
74 74 return modules
75 75
76 76
77 77 def print_version(ctx, param, value):
78 78 if not value or ctx.resilient_parsing:
79 79 return
80 80 click.echo(schainpy.__version__)
81 81 ctx.exit()
82 82
83 83
84 84 PREFIX = 'experiment'
85 85
86 86 @click.command()
87 87 @click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str)
88 88 @click.argument('command', default='run', required=True)
89 89 @click.argument('nextcommand', default=None, required=False, type=str)
90 90 def main(command, nextcommand, version):
91 91 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
92 92 Available commands.\n
93 93 xml: runs a schain XML generated file\n
94 94 run: runs any python script starting 'experiment_'\n
95 95 generate: generates a template schain script\n
96 96 list: return a list of available procs and operations\n
97 97 search: return avilable operations, procs or arguments of the given
98 98 operation/proc\n"""
99 99 if command == 'xml':
100 100 runFromXML(nextcommand)
101 101 elif command == 'generate':
102 102 generate()
103 103 elif command == 'test':
104 104 test()
105 105 elif command == 'run':
106 106 runschain(nextcommand)
107 107 elif command == 'search':
108 108 search(nextcommand)
109 109 elif command == 'list':
110 110 cmdlist(nextcommand)
111 111 else:
112 112 log.error('Command {} is not defined'.format(command))
113 113
114 114
115 115 def check_module(possible, instance):
116 116 def check(x):
117 117 try:
118 118 instancia = locate('schainpy.model.{}'.format(x))
119 119 ret = instancia.proc_type == instance
120 120 return ret
121 121 except Exception as e:
122 122 return False
123 123 clean = clean_modules(possible)
124 124 return [x for x in clean if check(x)]
125 125
126 126
127 127 def clean_modules(module):
128 128 noEndsUnder = [x for x in module if not x.endswith('__')]
129 129 noStartUnder = [x for x in noEndsUnder if not x.startswith('__')]
130 130 noFullUpper = [x for x in noStartUnder if not x.isupper()]
131 131 return noFullUpper
132 132
133 133 def cmdlist(nextcommand):
134 134 if nextcommand is None:
135 135 log.error('Missing argument, available arguments: procs, operations', '')
136 136 elif nextcommand == 'procs':
137 137 procs = getProcs()
138 138 log.success(
139 139 'Current ProcessingUnits are:\n {}'.format('\n '.join(procs)), '')
140 140 elif nextcommand == 'operations':
141 141 operations = getOperations()
142 142 log.success('Current Operations are:\n {}'.format(
143 143 '\n '.join(operations)), '')
144 144 else:
145 145 log.error('Wrong argument', '')
146 146
147 147 def search(nextcommand):
148 148 if nextcommand is None:
149 149 log.error('There is no Operation/ProcessingUnit to search', '')
150 150 else:
151 #try:
152 if True:
151 try:
153 152 args = getArgs(nextcommand)
154 153 doc = getDoc(nextcommand)
155 154 if len(args) == 0:
156 155 log.success('\n{} has no arguments'.format(nextcommand), '')
157 156 else:
158 157 log.success('{}\n{}\n\narguments:\n {}'.format(
159 158 nextcommand, doc, ', '.join(args)), '')
160 # except Exception as e:
161 # log.error('Module `{}` does not exists'.format(nextcommand), '')
162 # allModules = getAll()
163 # similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
164 # log.success('Possible modules are: {}'.format(', '.join(similar)), '')
159 except Exception as e:
160 log.error('Module `{}` does not exists'.format(nextcommand), '')
161 allModules = getAll()
162 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
163 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
165 164
166 165 def runschain(nextcommand):
167 166 if nextcommand is None:
168 167 currentfiles = glob.glob('./{}_*.py'.format(PREFIX))
169 168 numberfiles = len(currentfiles)
170 169 if numberfiles > 1:
171 170 log.error('There is more than one file to run')
172 171 elif numberfiles == 1:
173 172 subprocess.call(['python ' + currentfiles[0]], shell=True)
174 173 else:
175 174 log.error('There is no file to run')
176 175 else:
177 176 try:
178 177 subprocess.call(['python ' + nextcommand], shell=True)
179 178 except Exception as e:
180 179 log.error("I cannot run the file. Does it exists?")
181 180
182 181
183 182 def basicInputs():
184 183 inputs = {}
185 184 inputs['name'] = click.prompt(
186 185 'Name of the project', default="project", type=str)
187 186 inputs['desc'] = click.prompt(
188 187 'Enter a description', default="A schain project", type=str)
189 188 inputs['multiprocess'] = click.prompt(
190 189 '''Select data type:
191 190
192 191 - Voltage (*.r): [1]
193 192 - Spectra (*.pdata): [2]
194 193 - Voltage and Spectra (*.r): [3]
195 194
196 195 -->''', type=int)
197 196 inputs['path'] = click.prompt('Data path', default=os.getcwd(
198 197 ), type=click.Path(exists=True, resolve_path=True))
199 198 inputs['startDate'] = click.prompt(
200 199 'Start date', default='1970/01/01', type=str)
201 200 inputs['endDate'] = click.prompt(
202 201 'End date', default='2018/12/31', type=str)
203 202 inputs['startHour'] = click.prompt(
204 203 'Start hour', default='00:00:00', type=str)
205 204 inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str)
206 205 inputs['figpath'] = inputs['path'] + '/figs'
207 206 return inputs
208 207
209 208
210 209 def generate():
211 210 inputs = basicInputs()
212 211
213 212 if inputs['multiprocess'] == 1:
214 213 current = templates.voltage.format(**inputs)
215 214 elif inputs['multiprocess'] == 2:
216 215 current = templates.spectra.format(**inputs)
217 216 elif inputs['multiprocess'] == 3:
218 217 current = templates.voltagespectra.format(**inputs)
219 218 scriptname = '{}_{}.py'.format(PREFIX, inputs['name'])
220 219 script = open(scriptname, 'w')
221 220 try:
222 221 script.write(current)
223 222 log.success('Script {} generated'.format(scriptname))
224 223 except Exception as e:
225 224 log.error('I cannot create the file. Do you have writing permissions?')
226 225
227 226
228 227 def test():
229 228 log.warning('testing')
230 229
231 230
232 231 def runFromXML(filename):
233 232 controller = Project()
234 233 if not controller.readXml(filename):
235 234 return
236 235 controller.start()
237 236 return
@@ -1,1284 +1,1290
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 from multiprocessing import Process, Queue, cpu_count
14 from multiprocessing import Process, Queue, Event, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84 84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 132 strList = [s.strip() for s in value.split(',')]
133 133 self.__formated_value = strList
134 134
135 135 return self.__formated_value
136 136
137 137 if format == 'intlist':
138 138 '''
139 139 Example:
140 140 value = (0,1,2)
141 141 '''
142 142
143 143 new_value = ast.literal_eval(value)
144 144
145 145 if type(new_value) not in (tuple, list):
146 146 new_value = [int(new_value)]
147 147
148 148 self.__formated_value = new_value
149 149
150 150 return self.__formated_value
151 151
152 152 if format == 'floatlist':
153 153 '''
154 154 Example:
155 155 value = (0.5, 1.4, 2.7)
156 156 '''
157 157
158 158 new_value = ast.literal_eval(value)
159 159
160 160 if type(new_value) not in (tuple, list):
161 161 new_value = [float(new_value)]
162 162
163 163 self.__formated_value = new_value
164 164
165 165 return self.__formated_value
166 166
167 167 if format == 'date':
168 168 strList = value.split('/')
169 169 intList = [int(x) for x in strList]
170 170 date = datetime.date(intList[0], intList[1], intList[2])
171 171
172 172 self.__formated_value = date
173 173
174 174 return self.__formated_value
175 175
176 176 if format == 'time':
177 177 strList = value.split(':')
178 178 intList = [int(x) for x in strList]
179 179 time = datetime.time(intList[0], intList[1], intList[2])
180 180
181 181 self.__formated_value = time
182 182
183 183 return self.__formated_value
184 184
185 185 if format == 'pairslist':
186 186 '''
187 187 Example:
188 188 value = (0,1),(1,2)
189 189 '''
190 190
191 191 new_value = ast.literal_eval(value)
192 192
193 193 if type(new_value) not in (tuple, list):
194 194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195 195
196 196 if type(new_value[0]) not in (tuple, list):
197 197 if len(new_value) != 2:
198 198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 199 new_value = [new_value]
200 200
201 201 for thisPair in new_value:
202 202 if len(thisPair) != 2:
203 203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204 204
205 205 self.__formated_value = new_value
206 206
207 207 return self.__formated_value
208 208
209 209 if format == 'multilist':
210 210 '''
211 211 Example:
212 212 value = (0,1,2),(3,4,5)
213 213 '''
214 214 multiList = ast.literal_eval(value)
215 215
216 216 if type(multiList[0]) == int:
217 217 multiList = ast.literal_eval('(' + value + ')')
218 218
219 219 self.__formated_value = multiList
220 220
221 221 return self.__formated_value
222 222
223 223 if format == 'bool':
224 224 value = int(value)
225 225
226 226 if format == 'int':
227 227 value = float(value)
228 228
229 229 format_func = eval(format)
230 230
231 231 self.__formated_value = format_func(value)
232 232
233 233 return self.__formated_value
234 234
235 235 def updateId(self, new_id):
236 236
237 237 self.id = str(new_id)
238 238
239 239 def setup(self, id, name, value, format='str'):
240 240 self.id = str(id)
241 241 self.name = name
242 242 if format == 'obj':
243 243 self.value = value
244 244 else:
245 245 self.value = str(value)
246 246 self.format = str.lower(format)
247 247
248 248 self.getValue()
249 249
250 250 return 1
251 251
252 252 def update(self, name, value, format='str'):
253 253
254 254 self.name = name
255 255 self.value = str(value)
256 256 self.format = format
257 257
258 258 def makeXml(self, opElement):
259 259 if self.name not in ('queue',):
260 260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 261 parmElement.set('id', str(self.id))
262 262 parmElement.set('name', self.name)
263 263 parmElement.set('value', self.value)
264 264 parmElement.set('format', self.format)
265 265
266 266 def readXml(self, parmElement):
267 267
268 268 self.id = parmElement.get('id')
269 269 self.name = parmElement.get('name')
270 270 self.value = parmElement.get('value')
271 271 self.format = str.lower(parmElement.get('format'))
272 272
273 273 # Compatible with old signal chain version
274 274 if self.format == 'int' and self.name == 'idfigure':
275 275 self.name = 'id'
276 276
277 277 def printattr(self):
278 278
279 279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280 280
281 281 class OperationConf():
282 282
283 283 ELEMENTNAME = 'Operation'
284 284
285 285 def __init__(self):
286 286
287 287 self.id = '0'
288 288 self.name = None
289 289 self.priority = None
290 290 self.topic = None
291 291
292 292 def __getNewId(self):
293 293
294 294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295 295
296 296 def getId(self):
297 297 return self.id
298 298
299 299 def updateId(self, new_id):
300 300
301 301 self.id = str(new_id)
302 302
303 303 n = 1
304 304 for parmObj in self.parmConfObjList:
305 305
306 306 idParm = str(int(new_id) * 10 + n)
307 307 parmObj.updateId(idParm)
308 308
309 309 n += 1
310 310
311 311 def getElementName(self):
312 312
313 313 return self.ELEMENTNAME
314 314
315 315 def getParameterObjList(self):
316 316
317 317 return self.parmConfObjList
318 318
319 319 def getParameterObj(self, parameterName):
320 320
321 321 for parmConfObj in self.parmConfObjList:
322 322
323 323 if parmConfObj.name != parameterName:
324 324 continue
325 325
326 326 return parmConfObj
327 327
328 328 return None
329 329
330 330 def getParameterObjfromValue(self, parameterValue):
331 331
332 332 for parmConfObj in self.parmConfObjList:
333 333
334 334 if parmConfObj.getValue() != parameterValue:
335 335 continue
336 336
337 337 return parmConfObj.getValue()
338 338
339 339 return None
340 340
341 341 def getParameterValue(self, parameterName):
342 342
343 343 parameterObj = self.getParameterObj(parameterName)
344 344
345 345 # if not parameterObj:
346 346 # return None
347 347
348 348 value = parameterObj.getValue()
349 349
350 350 return value
351 351
352 352 def getKwargs(self):
353 353
354 354 kwargs = {}
355 355
356 356 for parmConfObj in self.parmConfObjList:
357 357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 358 continue
359 359
360 360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361 361
362 362 return kwargs
363 363
364 def setup(self, id, name, priority, type, project_id, err_queue):
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
368 368 self.name = name
369 369 self.type = type
370 370 self.priority = priority
371 371 self.err_queue = err_queue
372 self.lock = lock
372 373 self.parmConfObjList = []
373 374
374 375 def removeParameters(self):
375 376
376 377 for obj in self.parmConfObjList:
377 378 del obj
378 379
379 380 self.parmConfObjList = []
380 381
381 382 def addParameter(self, name, value, format='str'):
382 383
383 384 if value is None:
384 385 return None
385 386 id = self.__getNewId()
386 387
387 388 parmConfObj = ParameterConf()
388 389 if not parmConfObj.setup(id, name, value, format):
389 390 return None
390 391
391 392 self.parmConfObjList.append(parmConfObj)
392 393
393 394 return parmConfObj
394 395
395 396 def changeParameter(self, name, value, format='str'):
396 397
397 398 parmConfObj = self.getParameterObj(name)
398 399 parmConfObj.update(name, value, format)
399 400
400 401 return parmConfObj
401 402
402 403 def makeXml(self, procUnitElement):
403 404
404 405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 406 opElement.set('id', str(self.id))
406 407 opElement.set('name', self.name)
407 408 opElement.set('type', self.type)
408 409 opElement.set('priority', str(self.priority))
409 410
410 411 for parmConfObj in self.parmConfObjList:
411 412 parmConfObj.makeXml(opElement)
412 413
413 414 def readXml(self, opElement, project_id):
414 415
415 416 self.id = opElement.get('id')
416 417 self.name = opElement.get('name')
417 418 self.type = opElement.get('type')
418 419 self.priority = opElement.get('priority')
419 420 self.project_id = str(project_id)
420 421
421 422 # Compatible with old signal chain version
422 423 # Use of 'run' method instead 'init'
423 424 if self.type == 'self' and self.name == 'init':
424 425 self.name = 'run'
425 426
426 427 self.parmConfObjList = []
427 428
428 429 parmElementList = opElement.iter(ParameterConf().getElementName())
429 430
430 431 for parmElement in parmElementList:
431 432 parmConfObj = ParameterConf()
432 433 parmConfObj.readXml(parmElement)
433 434
434 435 # Compatible with old signal chain version
435 436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 437 if self.type != 'self' and self.name == 'Plot':
437 438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 439 self.name = parmConfObj.value
439 440 continue
440 441
441 442 self.parmConfObjList.append(parmConfObj)
442 443
443 444 def printattr(self):
444 445
445 446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 447 self.id,
447 448 self.name,
448 449 self.type,
449 450 self.priority,
450 451 self.project_id))
451 452
452 453 for parmConfObj in self.parmConfObjList:
453 454 parmConfObj.printattr()
454 455
455 456 def createObject(self):
456 457
457 458 className = eval(self.name)
458 459
459 460 if self.type == 'other':
460 461 opObj = className()
461 462 elif self.type == 'external':
462 463 kwargs = self.getKwargs()
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
464 465 opObj.start()
465 466 self.opObj = opObj
466 467
467 468 return opObj
468 469
469 470 class ProcUnitConf():
470 471
471 472 ELEMENTNAME = 'ProcUnit'
472 473
473 474 def __init__(self):
474 475
475 476 self.id = None
476 477 self.datatype = None
477 478 self.name = None
478 479 self.inputId = None
479 480 self.opConfObjList = []
480 481 self.procUnitObj = None
481 482 self.opObjDict = {}
483 self.mylock = Event()
482 484
483 485 def __getPriority(self):
484 486
485 487 return len(self.opConfObjList) + 1
486 488
487 489 def __getNewId(self):
488 490
489 491 return int(self.id) * 10 + len(self.opConfObjList) + 1
490 492
491 493 def getElementName(self):
492 494
493 495 return self.ELEMENTNAME
494 496
495 497 def getId(self):
496 498
497 499 return self.id
498 500
499 501 def updateId(self, new_id):
500 502 '''
501 503 new_id = int(parentId) * 10 + (int(self.id) % 10)
502 504 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
503 505
504 506 # If this proc unit has not inputs
505 507 #if self.inputId == '0':
506 508 #new_inputId = 0
507 509
508 510 n = 1
509 511 for opConfObj in self.opConfObjList:
510 512
511 513 idOp = str(int(new_id) * 10 + n)
512 514 opConfObj.updateId(idOp)
513 515
514 516 n += 1
515 517
516 518 self.parentId = str(parentId)
517 519 self.id = str(new_id)
518 520 #self.inputId = str(new_inputId)
519 521 '''
520 522 n = 1
521 523
522 524 def getInputId(self):
523 525
524 526 return self.inputId
525 527
526 528 def getOperationObjList(self):
527 529
528 530 return self.opConfObjList
529 531
530 532 def getOperationObj(self, name=None):
531 533
532 534 for opConfObj in self.opConfObjList:
533 535
534 536 if opConfObj.name != name:
535 537 continue
536 538
537 539 return opConfObj
538 540
539 541 return None
540 542
541 543 def getOpObjfromParamValue(self, value=None):
542 544
543 545 for opConfObj in self.opConfObjList:
544 546 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
545 547 continue
546 548 return opConfObj
547 549 return None
548 550
549 551 def getProcUnitObj(self):
550 552
551 553 return self.procUnitObj
552 554
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
555 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
554 556 '''
555 557 id sera el topico a publicar
556 558 inputId sera el topico a subscribirse
557 559 '''
558 560
559 561 # Compatible with old signal chain version
560 562 if datatype == None and name == None:
561 563 raise ValueError('datatype or name should be defined')
562 564
563 565 #Definir una condicion para inputId cuando sea 0
564 566
565 567 if name == None:
566 568 if 'Proc' in datatype:
567 569 name = datatype
568 570 else:
569 571 name = '%sProc' % (datatype)
570 572
571 573 if datatype == None:
572 574 datatype = name.replace('Proc', '')
573 575
574 576 self.id = str(id)
575 577 self.project_id = project_id
576 578 self.name = name
577 579 self.datatype = datatype
578 580 self.inputId = inputId
579 581 self.err_queue = err_queue
582 self.lock = lock
580 583 self.opConfObjList = []
581 584
582 585 self.addOperation(name='run', optype='self')
583 586
584 587 def removeOperations(self):
585 588
586 589 for obj in self.opConfObjList:
587 590 del obj
588 591
589 592 self.opConfObjList = []
590 593 self.addOperation(name='run')
591 594
592 595 def addParameter(self, **kwargs):
593 596 '''
594 597 Add parameters to 'run' operation
595 598 '''
596 599 opObj = self.opConfObjList[0]
597 600
598 601 opObj.addParameter(**kwargs)
599 602
600 603 return opObj
601 604
602 605 def addOperation(self, name, optype='self'):
603 606 '''
604 607 Actualizacion - > proceso comunicacion
605 608 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
606 609 definir el tipoc de socket o comunicacion ipc++
607 610
608 611 '''
609 612
610 613 id = self.__getNewId()
611 614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
612 615 opConfObj = OperationConf()
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
614 617 self.opConfObjList.append(opConfObj)
615 618
616 619 return opConfObj
617 620
618 621 def makeXml(self, projectElement):
619 622
620 623 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
621 624 procUnitElement.set('id', str(self.id))
622 625 procUnitElement.set('name', self.name)
623 626 procUnitElement.set('datatype', self.datatype)
624 627 procUnitElement.set('inputId', str(self.inputId))
625 628
626 629 for opConfObj in self.opConfObjList:
627 630 opConfObj.makeXml(procUnitElement)
628 631
629 632 def readXml(self, upElement, project_id):
630 633
631 634 self.id = upElement.get('id')
632 635 self.name = upElement.get('name')
633 636 self.datatype = upElement.get('datatype')
634 637 self.inputId = upElement.get('inputId')
635 638 self.project_id = str(project_id)
636 639
637 640 if self.ELEMENTNAME == 'ReadUnit':
638 641 self.datatype = self.datatype.replace('Reader', '')
639 642
640 643 if self.ELEMENTNAME == 'ProcUnit':
641 644 self.datatype = self.datatype.replace('Proc', '')
642 645
643 646 if self.inputId == 'None':
644 647 self.inputId = '0'
645 648
646 649 self.opConfObjList = []
647 650
648 651 opElementList = upElement.iter(OperationConf().getElementName())
649 652
650 653 for opElement in opElementList:
651 654 opConfObj = OperationConf()
652 655 opConfObj.readXml(opElement, project_id)
653 656 self.opConfObjList.append(opConfObj)
654 657
655 658 def printattr(self):
656 659
657 660 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
658 661 self.id,
659 662 self.name,
660 663 self.datatype,
661 664 self.inputId,
662 665 self.project_id))
663 666
664 667 for opConfObj in self.opConfObjList:
665 668 opConfObj.printattr()
666 669
667 670 def getKwargs(self):
668 671
669 672 opObj = self.opConfObjList[0]
670 673 kwargs = opObj.getKwargs()
671 674
672 675 return kwargs
673 676
674 677 def createObjects(self):
675 678 '''
676 679 Instancia de unidades de procesamiento.
677 680 '''
678 681
679 682 className = eval(self.name)
680 683 kwargs = self.getKwargs()
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
684 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
682 685 log.success('creating process...', self.name)
683 686
684 687 for opConfObj in self.opConfObjList:
685 688
686 689 if opConfObj.type == 'self' and opConfObj.name == 'run':
687 690 continue
688 691 elif opConfObj.type == 'self':
689 692 opObj = getattr(procUnitObj, opConfObj.name)
690 693 else:
691 694 opObj = opConfObj.createObject()
692 695
693 696 log.success('adding operation: {}, type:{}'.format(
694 697 opConfObj.name,
695 698 opConfObj.type), self.name)
696 699
697 700 procUnitObj.addOperation(opConfObj, opObj)
698 701
699 702 procUnitObj.start()
700 703 self.procUnitObj = procUnitObj
701 704
702 705 def close(self):
703 706
704 707 for opConfObj in self.opConfObjList:
705 708 if opConfObj.type == 'self':
706 709 continue
707 710
708 711 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
709 712 opObj.close()
710 713
711 714 self.procUnitObj.close()
712 715
713 716 return
714 717
715 718
716 719 class ReadUnitConf(ProcUnitConf):
717 720
718 721 ELEMENTNAME = 'ReadUnit'
719 722
720 723 def __init__(self):
721 724
722 725 self.id = None
723 726 self.datatype = None
724 727 self.name = None
725 728 self.inputId = None
726 729 self.opConfObjList = []
730 self.mylock = Event()
727 731
728 732 def getElementName(self):
729 733
730 734 return self.ELEMENTNAME
731 735
732 736 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
733 737 startTime='', endTime='', server=None, **kwargs):
734 738
735 739
736 740 '''
737 741 *****el id del proceso sera el Topico
738 742
739 743 Adicion de {topic}, si no esta presente -> error
740 744 kwargs deben ser trasmitidos en la instanciacion
741 745
742 746 '''
743 747
744 748 # Compatible with old signal chain version
745 749 if datatype == None and name == None:
746 750 raise ValueError('datatype or name should be defined')
747 751 if name == None:
748 752 if 'Reader' in datatype:
749 753 name = datatype
750 754 datatype = name.replace('Reader','')
751 755 else:
752 756 name = '{}Reader'.format(datatype)
753 757 if datatype == None:
754 758 if 'Reader' in name:
755 759 datatype = name.replace('Reader','')
756 760 else:
757 761 datatype = name
758 762 name = '{}Reader'.format(name)
759 763
760 764 self.id = id
761 765 self.project_id = project_id
762 766 self.name = name
763 767 self.datatype = datatype
764 768 if path != '':
765 769 self.path = os.path.abspath(path)
766 770 self.startDate = startDate
767 771 self.endDate = endDate
768 772 self.startTime = startTime
769 773 self.endTime = endTime
770 774 self.server = server
771 775 self.err_queue = err_queue
776 self.lock = self.mylock
772 777 self.addRunOperation(**kwargs)
773 778
774 779 def update(self, **kwargs):
775 780
776 781 if 'datatype' in kwargs:
777 782 datatype = kwargs.pop('datatype')
778 783 if 'Reader' in datatype:
779 784 self.name = datatype
780 785 else:
781 786 self.name = '%sReader' % (datatype)
782 787 self.datatype = self.name.replace('Reader', '')
783 788
784 789 attrs = ('path', 'startDate', 'endDate',
785 790 'startTime', 'endTime')
786 791
787 792 for attr in attrs:
788 793 if attr in kwargs:
789 794 setattr(self, attr, kwargs.pop(attr))
790 795
791 796 self.updateRunOperation(**kwargs)
792 797
793 798 def removeOperations(self):
794 799
795 800 for obj in self.opConfObjList:
796 801 del obj
797 802
798 803 self.opConfObjList = []
799 804
800 805 def addRunOperation(self, **kwargs):
801 806
802 807 opObj = self.addOperation(name='run', optype='self')
803 808
804 809 if self.server is None:
805 810 opObj.addParameter(
806 811 name='datatype', value=self.datatype, format='str')
807 812 opObj.addParameter(name='path', value=self.path, format='str')
808 813 opObj.addParameter(
809 814 name='startDate', value=self.startDate, format='date')
810 815 opObj.addParameter(
811 816 name='endDate', value=self.endDate, format='date')
812 817 opObj.addParameter(
813 818 name='startTime', value=self.startTime, format='time')
814 819 opObj.addParameter(
815 820 name='endTime', value=self.endTime, format='time')
816 821
817 822 for key, value in list(kwargs.items()):
818 823 opObj.addParameter(name=key, value=value,
819 824 format=type(value).__name__)
820 825 else:
821 826 opObj.addParameter(name='server', value=self.server, format='str')
822 827
823 828 return opObj
824 829
825 830 def updateRunOperation(self, **kwargs):
826 831
827 832 opObj = self.getOperationObj(name='run')
828 833 opObj.removeParameters()
829 834
830 835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
831 836 opObj.addParameter(name='path', value=self.path, format='str')
832 837 opObj.addParameter(
833 838 name='startDate', value=self.startDate, format='date')
834 839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
835 840 opObj.addParameter(
836 841 name='startTime', value=self.startTime, format='time')
837 842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
838 843
839 844 for key, value in list(kwargs.items()):
840 845 opObj.addParameter(name=key, value=value,
841 846 format=type(value).__name__)
842 847
843 848 return opObj
844 849
845 850 def readXml(self, upElement, project_id):
846 851
847 852 self.id = upElement.get('id')
848 853 self.name = upElement.get('name')
849 854 self.datatype = upElement.get('datatype')
850 855 self.project_id = str(project_id) #yong
851 856
852 857 if self.ELEMENTNAME == 'ReadUnit':
853 858 self.datatype = self.datatype.replace('Reader', '')
854 859
855 860 self.opConfObjList = []
856 861
857 862 opElementList = upElement.iter(OperationConf().getElementName())
858 863
859 864 for opElement in opElementList:
860 865 opConfObj = OperationConf()
861 866 opConfObj.readXml(opElement, project_id)
862 867 self.opConfObjList.append(opConfObj)
863 868
864 869 if opConfObj.name == 'run':
865 870 self.path = opConfObj.getParameterValue('path')
866 871 self.startDate = opConfObj.getParameterValue('startDate')
867 872 self.endDate = opConfObj.getParameterValue('endDate')
868 873 self.startTime = opConfObj.getParameterValue('startTime')
869 874 self.endTime = opConfObj.getParameterValue('endTime')
870 875
871 876
872 877 class Project(Process):
873 878
874 879 ELEMENTNAME = 'Project'
875 880
876 881 def __init__(self):
877 882
878 883 Process.__init__(self)
879 884 self.id = None
880 885 self.filename = None
881 886 self.description = None
882 887 self.email = None
883 888 self.alarm = None
884 889 self.procUnitConfObjDict = {}
885 890 self.err_queue = Queue()
886 891
887 892 def __getNewId(self):
888 893
889 894 idList = list(self.procUnitConfObjDict.keys())
890 895 id = int(self.id) * 10
891 896
892 897 while True:
893 898 id += 1
894 899
895 900 if str(id) in idList:
896 901 continue
897 902
898 903 break
899 904
900 905 return str(id)
901 906
902 907 def getElementName(self):
903 908
904 909 return self.ELEMENTNAME
905 910
906 911 def getId(self):
907 912
908 913 return self.id
909 914
910 915 def updateId(self, new_id):
911 916
912 917 self.id = str(new_id)
913 918
914 919 keyList = list(self.procUnitConfObjDict.keys())
915 920 keyList.sort()
916 921
917 922 n = 1
918 923 newProcUnitConfObjDict = {}
919 924
920 925 for procKey in keyList:
921 926
922 927 procUnitConfObj = self.procUnitConfObjDict[procKey]
923 928 idProcUnit = str(int(self.id) * 10 + n)
924 929 procUnitConfObj.updateId(idProcUnit)
925 930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
926 931 n += 1
927 932
928 933 self.procUnitConfObjDict = newProcUnitConfObjDict
929 934
930 935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
931 936
932 937 print(' ')
933 938 print('*' * 60)
934 939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
935 940 print('*' * 60)
936 941 print("* Python " + python_version() + " *")
937 942 print('*' * 19)
938 943 print(' ')
939 944 self.id = str(id)
940 945 self.description = description
941 946 self.email = email
942 947 self.alarm = alarm
943 948 if name:
944 949 self.name = '{} ({})'.format(Process.__name__, name)
945 950
946 951 def update(self, **kwargs):
947 952
948 953 for key, value in list(kwargs.items()):
949 954 setattr(self, key, value)
950 955
951 956 def clone(self):
952 957
953 958 p = Project()
954 959 p.procUnitConfObjDict = self.procUnitConfObjDict
955 960 return p
956 961
957 962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
958 963
959 964 '''
960 965 Actualizacion:
961 966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
962 967
963 968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
964 969
965 970 '''
966 971
967 972 if id is None:
968 973 idReadUnit = self.__getNewId()
969 974 else:
970 975 idReadUnit = str(id)
971 976
972 977 readUnitConfObj = ReadUnitConf()
973 978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
974 979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
975 980
976 981 return readUnitConfObj
977 982
978 983 def addProcUnit(self, inputId='0', datatype=None, name=None):
979 984
980 985 '''
981 986 Actualizacion:
982 987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
983 988 Deberia reemplazar a "inputId"
984 989
985 990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
986 991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
987 992
988 993 '''
989 994
990 995 idProcUnit = self.__getNewId()
991 996 procUnitConfObj = ProcUnitConf()
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
993 999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994 1000
995 1001 return procUnitConfObj
996 1002
997 1003 def removeProcUnit(self, id):
998 1004
999 1005 if id in list(self.procUnitConfObjDict.keys()):
1000 1006 self.procUnitConfObjDict.pop(id)
1001 1007
1002 1008 def getReadUnitId(self):
1003 1009
1004 1010 readUnitConfObj = self.getReadUnitObj()
1005 1011
1006 1012 return readUnitConfObj.id
1007 1013
1008 1014 def getReadUnitObj(self):
1009 1015
1010 1016 for obj in list(self.procUnitConfObjDict.values()):
1011 1017 if obj.getElementName() == 'ReadUnit':
1012 1018 return obj
1013 1019
1014 1020 return None
1015 1021
1016 1022 def getProcUnitObj(self, id=None, name=None):
1017 1023
1018 1024 if id != None:
1019 1025 return self.procUnitConfObjDict[id]
1020 1026
1021 1027 if name != None:
1022 1028 return self.getProcUnitObjByName(name)
1023 1029
1024 1030 return None
1025 1031
1026 1032 def getProcUnitObjByName(self, name):
1027 1033
1028 1034 for obj in list(self.procUnitConfObjDict.values()):
1029 1035 if obj.name == name:
1030 1036 return obj
1031 1037
1032 1038 return None
1033 1039
1034 1040 def procUnitItems(self):
1035 1041
1036 1042 return list(self.procUnitConfObjDict.items())
1037 1043
1038 1044 def makeXml(self):
1039 1045
1040 1046 projectElement = Element('Project')
1041 1047 projectElement.set('id', str(self.id))
1042 1048 projectElement.set('name', self.name)
1043 1049 projectElement.set('description', self.description)
1044 1050
1045 1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1046 1052 procUnitConfObj.makeXml(projectElement)
1047 1053
1048 1054 self.projectElement = projectElement
1049 1055
1050 1056 def writeXml(self, filename=None):
1051 1057
1052 1058 if filename == None:
1053 1059 if self.filename:
1054 1060 filename = self.filename
1055 1061 else:
1056 1062 filename = 'schain.xml'
1057 1063
1058 1064 if not filename:
1059 1065 print('filename has not been defined. Use setFilename(filename) for do it.')
1060 1066 return 0
1061 1067
1062 1068 abs_file = os.path.abspath(filename)
1063 1069
1064 1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1065 1071 print('No write permission on %s' % os.path.dirname(abs_file))
1066 1072 return 0
1067 1073
1068 1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1069 1075 print('File %s already exists and it could not be overwriten' % abs_file)
1070 1076 return 0
1071 1077
1072 1078 self.makeXml()
1073 1079
1074 1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1075 1081
1076 1082 self.filename = abs_file
1077 1083
1078 1084 return 1
1079 1085
1080 1086 def readXml(self, filename=None):
1081 1087
1082 1088 if not filename:
1083 1089 print('filename is not defined')
1084 1090 return 0
1085 1091
1086 1092 abs_file = os.path.abspath(filename)
1087 1093
1088 1094 if not os.path.isfile(abs_file):
1089 1095 print('%s file does not exist' % abs_file)
1090 1096 return 0
1091 1097
1092 1098 self.projectElement = None
1093 1099 self.procUnitConfObjDict = {}
1094 1100
1095 1101 try:
1096 1102 self.projectElement = ElementTree().parse(abs_file)
1097 1103 except:
1098 1104 print('Error reading %s, verify file format' % filename)
1099 1105 return 0
1100 1106
1101 1107 self.project = self.projectElement.tag
1102 1108
1103 1109 self.id = self.projectElement.get('id')
1104 1110 self.name = self.projectElement.get('name')
1105 1111 self.description = self.projectElement.get('description')
1106 1112
1107 1113 readUnitElementList = self.projectElement.iter(
1108 1114 ReadUnitConf().getElementName())
1109 1115
1110 1116 for readUnitElement in readUnitElementList:
1111 1117 readUnitConfObj = ReadUnitConf()
1112 1118 readUnitConfObj.readXml(readUnitElement, self.id)
1113 1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1114 1120
1115 1121 procUnitElementList = self.projectElement.iter(
1116 1122 ProcUnitConf().getElementName())
1117 1123
1118 1124 for procUnitElement in procUnitElementList:
1119 1125 procUnitConfObj = ProcUnitConf()
1120 1126 procUnitConfObj.readXml(procUnitElement, self.id)
1121 1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1122 1128
1123 1129 self.filename = abs_file
1124 1130
1125 1131 return 1
1126 1132
1127 1133 def __str__(self):
1128 1134
1129 1135 print('Project: name = %s, description = %s, id = %s' % (
1130 1136 self.name,
1131 1137 self.description,
1132 1138 self.id))
1133 1139
1134 1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1135 1141 print(procUnitConfObj)
1136 1142
1137 1143 def createObjects(self):
1138 1144
1139 1145
1140 1146 keys = list(self.procUnitConfObjDict.keys())
1141 1147 keys.sort()
1142 1148 for key in keys:
1143 1149 self.procUnitConfObjDict[key].createObjects()
1144 1150
1145 1151 def monitor(self):
1146 1152
1147 1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1148 1154 t.start()
1149 1155
1150 1156 def __monitor(self, queue, ctx):
1151 1157
1152 1158 import socket
1153 1159
1154 1160 procs = 0
1155 1161 err_msg = ''
1156 1162
1157 1163 while True:
1158 1164 msg = queue.get()
1159 1165 if '#_start_#' in msg:
1160 1166 procs += 1
1161 1167 elif '#_end_#' in msg:
1162 1168 procs -=1
1163 1169 else:
1164 1170 err_msg = msg
1165 1171
1166 1172 if procs == 0 or 'Traceback' in err_msg:
1167 1173 break
1168 1174 time.sleep(0.1)
1169 1175
1170 1176 if '|' in err_msg:
1171 1177 name, err = err_msg.split('|')
1172 1178 if 'SchainWarning' in err:
1173 1179 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1174 1180 elif 'SchainError' in err:
1175 1181 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1176 1182 else:
1177 1183 log.error(err, name)
1178 1184 else:
1179 1185 name, err = self.name, err_msg
1180 1186
1181 1187 time.sleep(2)
1182 1188
1183 1189 for conf in self.procUnitConfObjDict.values():
1184 1190 for confop in conf.opConfObjList:
1185 1191 if confop.type == 'external':
1186 1192 confop.opObj.terminate()
1187 1193 conf.procUnitObj.terminate()
1188 1194
1189 1195 ctx.term()
1190 1196
1191 1197 message = ''.join(err)
1192 1198
1193 1199 if err_msg:
1194 1200 subject = 'SChain v%s: Error running %s\n' % (
1195 1201 schainpy.__version__, self.name)
1196 1202
1197 1203 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1198 1204 socket.gethostname())
1199 1205 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1200 1206 subtitle += 'Configuration file: %s\n' % self.filename
1201 1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1202 1208
1203 1209 readUnitConfObj = self.getReadUnitObj()
1204 1210 if readUnitConfObj:
1205 1211 subtitle += '\nInput parameters:\n'
1206 1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1207 1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1208 1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1209 1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1210 1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1211 1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1212 1218
1213 1219 a = Alarm(
1214 1220 modes=self.alarm,
1215 1221 email=self.email,
1216 1222 message=message,
1217 1223 subject=subject,
1218 1224 subtitle=subtitle,
1219 1225 filename=self.filename
1220 1226 )
1221 1227
1222 1228 a.start()
1223 1229
1224 1230 def isPaused(self):
1225 1231 return 0
1226 1232
1227 1233 def isStopped(self):
1228 1234 return 0
1229 1235
1230 1236 def runController(self):
1231 1237 '''
1232 1238 returns 0 when this process has been stopped, 1 otherwise
1233 1239 '''
1234 1240
1235 1241 if self.isPaused():
1236 1242 print('Process suspended')
1237 1243
1238 1244 while True:
1239 1245 time.sleep(0.1)
1240 1246
1241 1247 if not self.isPaused():
1242 1248 break
1243 1249
1244 1250 if self.isStopped():
1245 1251 break
1246 1252
1247 1253 print('Process reinitialized')
1248 1254
1249 1255 if self.isStopped():
1250 1256 print('Process stopped')
1251 1257 return 0
1252 1258
1253 1259 return 1
1254 1260
1255 1261 def setFilename(self, filename):
1256 1262
1257 1263 self.filename = filename
1258 1264
1259 1265 def setProxy(self):
1260 1266
1261 1267 if not os.path.exists('/tmp/schain'):
1262 1268 os.mkdir('/tmp/schain')
1263 1269
1264 1270 self.ctx = zmq.Context()
1265 1271 xpub = self.ctx.socket(zmq.XPUB)
1266 1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1267 1273 xsub = self.ctx.socket(zmq.XSUB)
1268 1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1269 1275 self.monitor()
1270 1276 try:
1271 1277 zmq.proxy(xpub, xsub)
1272 1278 except zmq.ContextTerminated:
1273 1279 xpub.close()
1274 1280 xsub.close()
1275 1281
1276 1282 def run(self):
1277 1283
1278 1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1279 1285 self.start_time = time.time()
1280 1286 self.createObjects()
1281 1287 self.setProxy()
1282 1288 log.success('{} Done (Time: {}s)'.format(
1283 1289 self.name,
1284 1290 time.time()-self.start_time), '')
@@ -1,415 +1,416
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 import sys
16 17 import inspect
17 18 import zmq
18 19 import time
19 20 import pickle
20 21 import traceback
21 22 try:
22 23 from queue import Queue
23 24 except:
24 25 from Queue import Queue
25 26 from threading import Thread
26 27 from multiprocessing import Process
27 28
28 29 from schainpy.utils import log
29 30
30 31
31 32 class ProcessingUnit(object):
32 33
33 34 """
34 35 Update - Jan 2018 - MULTIPROCESSING
35 36 All the "call" methods present in the previous base were removed.
36 37 The majority of operations are independant processes, thus
37 38 the decorator is in charge of communicate the operation processes
38 39 with the proccessing unit via IPC.
39 40
40 41 The constructor does not receive any argument. The remaining methods
41 42 are related with the operations to execute.
42 43
43 44
44 45 """
45 46 proc_type = 'processing'
46 47 __attrs__ = []
47 48
48 49 def __init__(self):
49 50
50 51 self.dataIn = None
51 52 self.dataOut = None
52 53 self.isConfig = False
53 54 self.operations = []
54 55 self.plots = []
55 56
56 57 def getAllowedArgs(self):
57 58 if hasattr(self, '__attrs__'):
58 59 return self.__attrs__
59 60 else:
60 61 return inspect.getargspec(self.run).args
61 62
62 63 def addOperation(self, conf, operation):
63 64 """
64 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
65 66 posses the id of the operation process (IPC purposes)
66 67
67 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
68 69 identificador asociado a este objeto.
69 70
70 71 Input:
71 72
72 73 object : objeto de la clase "Operation"
73 74
74 75 Return:
75 76
76 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
77 78 """
78 79
79 80 self.operations.append(
80 81 (operation, conf.type, conf.id, conf.getKwargs()))
81 82
82 83 if 'plot' in self.name.lower():
83 84 self.plots.append(operation.CODE)
84 85
85 86 def getOperationObj(self, objId):
86 87
87 88 if objId not in list(self.operations.keys()):
88 89 return None
89 90
90 91 return self.operations[objId]
91 92
92 93 def operation(self, **kwargs):
93 94 """
94 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
95 96 atributos del objeto dataOut
96 97
97 98 Input:
98 99
99 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
100 101 """
101 102
102 103 raise NotImplementedError
103 104
104 105 def setup(self):
105 106
106 107 raise NotImplementedError
107 108
108 109 def run(self):
109 110
110 111 raise NotImplementedError
111 112
112 113 def close(self):
113 114
114 115 return
115 116
116 117
117 118 class Operation(object):
118 119
119 120 """
120 121 Update - Jan 2018 - MULTIPROCESSING
121 122
122 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
123 124 The constructor doe snot receive any argument, neither the baseclass.
124 125
125 126
126 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
127 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
128 129 acumulacion dentro de esta clase
129 130
130 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
131 132
132 133 """
133 134 proc_type = 'operation'
134 135 __attrs__ = []
135 136
136 137 def __init__(self):
137 138
138 139 self.id = None
139 140 self.isConfig = False
140 141
141 142 if not hasattr(self, 'name'):
142 143 self.name = self.__class__.__name__
143 144
144 145 def getAllowedArgs(self):
145 146 if hasattr(self, '__attrs__'):
146 147 return self.__attrs__
147 148 else:
148 149 return inspect.getargspec(self.run).args
149 150
150 151 def setup(self):
151 152
152 153 self.isConfig = True
153 154
154 155 raise NotImplementedError
155 156
156 157 def run(self, dataIn, **kwargs):
157 158 """
158 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
159 160 atributos del objeto dataIn.
160 161
161 162 Input:
162 163
163 164 dataIn : objeto del tipo JROData
164 165
165 166 Return:
166 167
167 168 None
168 169
169 170 Affected:
170 171 __buffer : buffer de recepcion de datos.
171 172
172 173 """
173 174 if not self.isConfig:
174 175 self.setup(**kwargs)
175 176
176 177 raise NotImplementedError
177 178
178 179 def close(self):
179 180
180 181 return
181 182
182 183 class InputQueue(Thread):
183 184
184 '''
185 Class to hold input data for Proccessing Units and external Operations,
186 '''
187
188 def __init__(self, project_id, inputId):
189
190 Thread.__init__(self)
191 self.queue = Queue()
192 self.project_id = project_id
193 self.inputId = inputId
194
195 def run(self):
196
197 c = zmq.Context()
198 self.receiver = c.socket(zmq.SUB)
199 self.receiver.connect(
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
202
203 while True:
204 self.queue.put(self.receiver.recv_multipart()[1])
205
206 def get(self):
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
188
189 def __init__(self, project_id, inputId, lock=None):
207 190
208 return pickle.loads(self.queue.get())
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.size = 0
197
198 def run(self):
199
200 c = zmq.Context()
201 self.receiver = c.socket(zmq.SUB)
202 self.receiver.connect(
203 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
204 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
205
206 while True:
207 obj = self.receiver.recv_multipart()[1]
208 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
210
211 def get(self):
212 if self.size/1000000 > 2048:
213 self.lock.clear()
214 else:
215 self.lock.set()
216 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
209 219
210 220
211 221 def MPDecorator(BaseClass):
212 222 """
213 223 Multiprocessing class decorator
214 224
215 225 This function add multiprocessing features to a BaseClass. Also, it handle
216 226 the communication beetween processes (readers, procUnits and operations).
217 227 """
218 228
219 229 class MPClass(BaseClass, Process):
220 230
221 231 def __init__(self, *args, **kwargs):
222 232 super(MPClass, self).__init__()
223 233 Process.__init__(self)
224 234 self.operationKwargs = {}
225 235 self.args = args
226 236 self.kwargs = kwargs
227 237 self.sender = None
228 238 self.receiver = None
229 239 self.i = 0
230 240 self.t = time.time()
231 241 self.name = BaseClass.__name__
232 242 self.__doc__ = BaseClass.__doc__
233 243
234 244 if 'plot' in self.name.lower() and not self.name.endswith('_'):
235 245 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
236 246
237 247 self.start_time = time.time()
238 248 self.id = args[0]
239 249 self.inputId = args[1]
240 250 self.project_id = args[2]
241 251 self.err_queue = args[3]
242 self.typeProc = args[4]
252 self.lock = args[4]
253 self.typeProc = args[5]
243 254 self.err_queue.put('#_start_#')
244 self.queue = InputQueue(self.project_id, self.inputId)
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
245 256
246 257 def subscribe(self):
247 258 '''
248 259 Start the zmq socket receiver and subcribe to input ID.
249 260 '''
250 261
251 262 self.queue.start()
252 263
253 264 def listen(self):
254 265 '''
255 266 This function waits for objects
256 267 '''
257 268
258 269 return self.queue.get()
259 270
260 271 def set_publisher(self):
261 272 '''
262 273 This function create a zmq socket for publishing objects.
263 274 '''
264 275
265 276 time.sleep(0.5)
266 277
267 278 c = zmq.Context()
268 279 self.sender = c.socket(zmq.PUB)
269 280 self.sender.connect(
270 281 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
271 282
272 283 def publish(self, data, id):
273 284 '''
274 285 This function publish an object, to an specific topic.
275 For Read Units (inputId == None) adds a little delay
276 to avoid data loss
286 It blocks publishing when receiver queue is full to avoid data loss
277 287 '''
278 288
279 289 if self.inputId is None:
280 self.i += 1
281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
282 self.i = 0
283 self.t = time.time()
284 time.sleep(0.05)
285 elif self.i % 40 == 0:
286 self.i = 0
287 self.t = time.time()
288 time.sleep(0.01)
289
290 self.lock.wait()
290 291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291 292
292 293 def runReader(self):
293 294 '''
294 295 Run fuction for read units
295 296 '''
296 297 while True:
297 298
298 299 try:
299 300 BaseClass.run(self, **self.kwargs)
300 301 except:
301 302 err = traceback.format_exc()
302 303 if 'No more files' in err:
303 304 log.warning('No more files to read', self.name)
304 305 else:
305 306 self.err_queue.put('{}|{}'.format(self.name, err))
306 307 self.dataOut.error = True
307 308
308 309 for op, optype, opId, kwargs in self.operations:
309 310 if optype == 'self' and not self.dataOut.flagNoData:
310 311 op(**kwargs)
311 312 elif optype == 'other' and not self.dataOut.flagNoData:
312 313 self.dataOut = op.run(self.dataOut, **self.kwargs)
313 314 elif optype == 'external':
314 315 self.publish(self.dataOut, opId)
315 316
316 317 if self.dataOut.flagNoData and not self.dataOut.error:
317 318 continue
318 319
319 320 self.publish(self.dataOut, self.id)
320 321
321 322 if self.dataOut.error:
322 323 break
323 324
324 325 time.sleep(0.5)
325 326
326 327 def runProc(self):
327 328 '''
328 329 Run function for proccessing units
329 330 '''
330 331
331 332 while True:
332 333 self.dataIn = self.listen()
333 334
334 335 if self.dataIn.flagNoData and self.dataIn.error is None:
335 336 continue
336 337 elif not self.dataIn.error:
337 338 try:
338 339 BaseClass.run(self, **self.kwargs)
339 340 except:
340 341 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
341 342 self.dataOut.error = True
342 343 elif self.dataIn.error:
343 344 self.dataOut.error = self.dataIn.error
344 345 self.dataOut.flagNoData = True
345 346
346 347 for op, optype, opId, kwargs in self.operations:
347 348 if optype == 'self' and not self.dataOut.flagNoData:
348 349 op(**kwargs)
349 350 elif optype == 'other' and not self.dataOut.flagNoData:
350 351 self.dataOut = op.run(self.dataOut, **kwargs)
351 352 elif optype == 'external' and not self.dataOut.flagNoData:
352 353 self.publish(self.dataOut, opId)
353 354
354 355 self.publish(self.dataOut, self.id)
355 356 for op, optype, opId, kwargs in self.operations:
356 357 if optype == 'external' and self.dataOut.error:
357 358 self.publish(self.dataOut, opId)
358 359
359 360 if self.dataOut.error:
360 361 break
361 362
362 363 time.sleep(0.5)
363 364
364 365 def runOp(self):
365 366 '''
366 367 Run function for external operations (this operations just receive data
367 368 ex: plots, writers, publishers)
368 369 '''
369 370
370 371 while True:
371 372
372 373 dataOut = self.listen()
373 374
374 375 if not dataOut.error:
375 376 BaseClass.run(self, dataOut, **self.kwargs)
376 377 else:
377 378 break
378 379
379 380 def run(self):
380 381 if self.typeProc is "ProcUnit":
381 382
382 383 if self.inputId is not None:
383 384 self.subscribe()
384 385
385 386 self.set_publisher()
386 387
387 388 if 'Reader' not in BaseClass.__name__:
388 389 self.runProc()
389 390 else:
390 391 self.runReader()
391 392
392 393 elif self.typeProc is "Operation":
393 394
394 395 self.subscribe()
395 396 self.runOp()
396 397
397 398 else:
398 399 raise ValueError("Unknown type")
399 400
400 401 self.close()
401 402
402 403 def close(self):
403 404
404 405 BaseClass.close(self)
405 406 self.err_queue.put('#_end_#')
406 407
407 408 if self.sender:
408 409 self.sender.close()
409 410
410 411 if self.receiver:
411 412 self.receiver.close()
412 413
413 414 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
414 415
415 416 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now