##// END OF EJS Templates
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
Juan C. Espinoza -
r1287:af11e4aac00c
parent child
Show More
@@ -1,10 +1,9
1 import click
1 import click
2 import schainpy
3 import subprocess
2 import subprocess
4 import os
3 import os
5 import sys
4 import sys
6 import glob
5 import glob
7 from multiprocessing import cpu_count
6 import schainpy
8 from schainpy.controller import Project
7 from schainpy.controller import Project
9 from schainpy.model import Operation, ProcessingUnit
8 from schainpy.model import Operation, ProcessingUnit
10 from schainpy.utils import log
9 from schainpy.utils import log
@@ -43,7 +42,7 def getOperations():
43 def getArgs(op):
42 def getArgs(op):
44 module = locate('schainpy.model.{}'.format(op))
43 module = locate('schainpy.model.{}'.format(op))
45 try:
44 try:
46 obj = module(1,2,3,Queue(),5,6)
45 obj = module(1, 2, 3, Queue())
47 except:
46 except:
48 obj = module()
47 obj = module()
49
48
@@ -68,7 +67,7 def getArgs(op):
68 def getDoc(obj):
67 def getDoc(obj):
69 module = locate('schainpy.model.{}'.format(obj))
68 module = locate('schainpy.model.{}'.format(obj))
70 try:
69 try:
71 obj = module(1,2,3,Queue(),5,6)
70 obj = module(1, 2, 3, Queue())
72 except:
71 except:
73 obj = module()
72 obj = module()
74 return obj.__doc__
73 return obj.__doc__
@@ -94,9 +93,9 PREFIX = 'experiment'
94 @click.argument('nextcommand', default=None, required=False, type=str)
93 @click.argument('nextcommand', default=None, required=False, type=str)
95 def main(command, nextcommand, version):
94 def main(command, nextcommand, version):
96 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
95 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
97 Available commands.\n
96 Available commands:\n
98 xml: runs a schain XML generated file\n
97 xml: runs a schain XML generated file\n
99 run: runs any python script starting 'experiment_'\n
98 run: runs any python script'\n
100 generate: generates a template schain script\n
99 generate: generates a template schain script\n
101 list: return a list of available procs and operations\n
100 list: return a list of available procs and operations\n
102 search: return avilable operations, procs or arguments of the given
101 search: return avilable operations, procs or arguments of the given
@@ -156,11 +155,9 def search(nextcommand):
156 try:
155 try:
157 args = getArgs(nextcommand)
156 args = getArgs(nextcommand)
158 doc = getDoc(nextcommand)
157 doc = getDoc(nextcommand)
159 if len(args) == 0:
158 log.success('{}\n{}\n\narguments:\n {}'.format(
160 log.success('\n{} has no arguments'.format(nextcommand), '')
159 nextcommand, doc, ', '.join(args)), ''
161 else:
160 )
162 log.success('{}\n{}\n\narguments:\n {}'.format(
163 nextcommand, doc, ', '.join(args)), '')
164 except Exception as e:
161 except Exception as e:
165 log.error('Module `{}` does not exists'.format(nextcommand), '')
162 log.error('Module `{}` does not exists'.format(nextcommand), '')
166 allModules = getAll()
163 allModules = getAll()
This diff has been collapsed as it changes many lines, (1196 lines changed) Show them Hide them
@@ -1,568 +1,180
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
2 Main routines to create a Signal Chain project
3 Author: Sergio Cortez
4 Created on September , 2012
5 '''
3 '''
6 from platform import python_version
4
5 import re
7 import sys
6 import sys
8 import ast
7 import ast
9 import datetime
8 import datetime
10 import traceback
9 import traceback
11 import math
12 import time
10 import time
13 import zmq
11 from multiprocessing import Process, Queue
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 from threading import Thread
12 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
13 from xml.etree.ElementTree import ElementTree, Element, SubElement
17 from xml.dom import minidom
18
19
14
20 from schainpy.admin import Alarm, SchainWarning
15 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.model import *
16 from schainpy.model import *
22 from schainpy.utils import log
17 from schainpy.utils import log
23
18
24
19
25 DTYPES = {
20 class ConfBase():
26 'Voltage': '.r',
27 'Spectra': '.pdata'
28 }
29
30
31 def MPProject(project, n=cpu_count()):
32 '''
33 Project wrapper to run schain in n processes
34 '''
35
36 rconf = project.getReadUnitObj()
37 op = rconf.getOperationObj('run')
38 dt1 = op.getParameterValue('startDate')
39 dt2 = op.getParameterValue('endDate')
40 tm1 = op.getParameterValue('startTime')
41 tm2 = op.getParameterValue('endTime')
42 days = (dt2 - dt1).days
43
44 for day in range(days + 1):
45 skip = 0
46 cursor = 0
47 processes = []
48 dt = dt1 + datetime.timedelta(day)
49 dt_str = dt.strftime('%Y/%m/%d')
50 reader = JRODataReader()
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 startDate=dt,
53 endDate=dt,
54 startTime=tm1,
55 endTime=tm2,
56 ext=DTYPES[rconf.datatype])
57 nFiles = len(files)
58 if nFiles == 0:
59 continue
60 skip = int(math.ceil(nFiles / n))
61 while nFiles > cursor * skip:
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 skip=skip)
64 p = project.clone()
65 p.start()
66 processes.append(p)
67 cursor += 1
68
69 def beforeExit(exctype, value, trace):
70 for process in processes:
71 process.terminate()
72 process.join()
73 print(traceback.print_tb(trace))
74
75 sys.excepthook = beforeExit
76
77 for process in processes:
78 process.join()
79 process.terminate()
80
81 time.sleep(3)
82
83 def wait(context):
84
85 time.sleep(1)
86 c = zmq.Context()
87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 msg = receiver.recv_multipart()[1]
91 context.terminate()
92
93 class ParameterConf():
94
95 id = None
96 name = None
97 value = None
98 format = None
99
100 __formated_value = None
101
102 ELEMENTNAME = 'Parameter'
103
104 def __init__(self):
105
106 self.format = 'str'
107
108 def getElementName(self):
109
110 return self.ELEMENTNAME
111
112 def getValue(self):
113
114 value = self.value
115 format = self.format
116
117 if self.__formated_value != None:
118
119 return self.__formated_value
120
121 if format == 'obj':
122 return value
123
124 if format == 'str':
125 self.__formated_value = str(value)
126 return self.__formated_value
127
128 if value == '':
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
131 if format == 'list':
132 strList = [s.strip() for s in value.split(',')]
133 self.__formated_value = strList
134
135 return self.__formated_value
136
137 if format == 'intlist':
138 '''
139 Example:
140 value = (0,1,2)
141 '''
142
143 new_value = ast.literal_eval(value)
144
145 if type(new_value) not in (tuple, list):
146 new_value = [int(new_value)]
147
148 self.__formated_value = new_value
149
150 return self.__formated_value
151
152 if format == 'floatlist':
153 '''
154 Example:
155 value = (0.5, 1.4, 2.7)
156 '''
157
158 new_value = ast.literal_eval(value)
159
160 if type(new_value) not in (tuple, list):
161 new_value = [float(new_value)]
162
163 self.__formated_value = new_value
164
165 return self.__formated_value
166
167 if format == 'date':
168 strList = value.split('/')
169 intList = [int(x) for x in strList]
170 date = datetime.date(intList[0], intList[1], intList[2])
171
172 self.__formated_value = date
173
174 return self.__formated_value
175
176 if format == 'time':
177 strList = value.split(':')
178 intList = [int(x) for x in strList]
179 time = datetime.time(intList[0], intList[1], intList[2])
180
181 self.__formated_value = time
182
183 return self.__formated_value
184
185 if format == 'pairslist':
186 '''
187 Example:
188 value = (0,1),(1,2)
189 '''
190
191 new_value = ast.literal_eval(value)
192
193 if type(new_value) not in (tuple, list):
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195
196 if type(new_value[0]) not in (tuple, list):
197 if len(new_value) != 2:
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 new_value = [new_value]
200
201 for thisPair in new_value:
202 if len(thisPair) != 2:
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204
205 self.__formated_value = new_value
206
207 return self.__formated_value
208
209 if format == 'multilist':
210 '''
211 Example:
212 value = (0,1,2),(3,4,5)
213 '''
214 multiList = ast.literal_eval(value)
215
216 if type(multiList[0]) == int:
217 multiList = ast.literal_eval('(' + value + ')')
218
219 self.__formated_value = multiList
220
221 return self.__formated_value
222
223 if format == 'bool':
224 value = int(value)
225
226 if format == 'int':
227 value = float(value)
228
229 format_func = eval(format)
230
231 self.__formated_value = format_func(value)
232
233 return self.__formated_value
234
235 def updateId(self, new_id):
236
237 self.id = str(new_id)
238
239 def setup(self, id, name, value, format='str'):
240 self.id = str(id)
241 self.name = name
242 if format == 'obj':
243 self.value = value
244 else:
245 self.value = str(value)
246 self.format = str.lower(format)
247
248 self.getValue()
249
250 return 1
251
252 def update(self, name, value, format='str'):
253
254 self.name = name
255 self.value = str(value)
256 self.format = format
257
258 def makeXml(self, opElement):
259 if self.name not in ('queue',):
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 parmElement.set('id', str(self.id))
262 parmElement.set('name', self.name)
263 parmElement.set('value', self.value)
264 parmElement.set('format', self.format)
265
266 def readXml(self, parmElement):
267
268 self.id = parmElement.get('id')
269 self.name = parmElement.get('name')
270 self.value = parmElement.get('value')
271 self.format = str.lower(parmElement.get('format'))
272
273 # Compatible with old signal chain version
274 if self.format == 'int' and self.name == 'idfigure':
275 self.name = 'id'
276
277 def printattr(self):
278
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280
281 class OperationConf():
282
283 ELEMENTNAME = 'Operation'
284
21
285 def __init__(self):
22 def __init__(self):
286
23
287 self.id = '0'
24 self.id = '0'
288 self.name = None
25 self.name = None
289 self.priority = None
26 self.priority = None
290 self.topic = None
27 self.parameters = {}
291
28 self.object = None
292 def __getNewId(self):
29 self.operations = []
293
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295
30
296 def getId(self):
31 def getId(self):
32
297 return self.id
33 return self.id
34
35 def getNewId(self):
36
37 return int(self.id) * 10 + len(self.operations) + 1
298
38
299 def updateId(self, new_id):
39 def updateId(self, new_id):
300
40
301 self.id = str(new_id)
41 self.id = str(new_id)
302
42
303 n = 1
43 n = 1
304 for parmObj in self.parmConfObjList:
44 for conf in self.operations:
305
45 conf_id = str(int(new_id) * 10 + n)
306 idParm = str(int(new_id) * 10 + n)
46 conf.updateId(conf_id)
307 parmObj.updateId(idParm)
308
309 n += 1
47 n += 1
310
48
311 def getElementName(self):
49 def getKwargs(self):
312
313 return self.ELEMENTNAME
314
315 def getParameterObjList(self):
316
317 return self.parmConfObjList
318
319 def getParameterObj(self, parameterName):
320
321 for parmConfObj in self.parmConfObjList:
322
323 if parmConfObj.name != parameterName:
324 continue
325
326 return parmConfObj
327
328 return None
329
330 def getParameterObjfromValue(self, parameterValue):
331
332 for parmConfObj in self.parmConfObjList:
333
50
334 if parmConfObj.getValue() != parameterValue:
51 params = {}
335 continue
336
52
337 return parmConfObj.getValue()
53 for key, value in self.parameters.items():
54 if value not in (None, '', ' '):
55 params[key] = value
56
57 return params
338
58
339 return None
59 def update(self, **kwargs):
340
60
341 def getParameterValue(self, parameterName):
61 for key, value in kwargs.items():
62 self.addParameter(name=key, value=value)
342
63
343 parameterObj = self.getParameterObj(parameterName)
64 def addParameter(self, name, value, format=None):
65 '''
66 '''
344
67
345 # if not parameterObj:
68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
346 # return None
69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
72 else:
73 try:
74 self.parameters[name] = ast.literal_eval(value)
75 except:
76 if isinstance(value, str) and ',' in value:
77 self.parameters[name] = value.split(',')
78 else:
79 self.parameters[name] = value
80
81 def getParameters(self):
82
83 params = {}
84 for key, value in self.parameters.items():
85 s = type(value).__name__
86 if s == 'date':
87 params[key] = value.strftime('%Y/%m/%d')
88 elif s == 'time':
89 params[key] = value.strftime('%H:%M:%S')
90 else:
91 params[key] = str(value)
347
92
348 value = parameterObj.getValue()
93 return params
94
95 def makeXml(self, element):
349
96
350 return value
97 xml = SubElement(element, self.ELEMENTNAME)
98 for label in self.xml_labels:
99 xml.set(label, str(getattr(self, label)))
100
101 for key, value in self.getParameters().items():
102 xml_param = SubElement(xml, 'Parameter')
103 xml_param.set('name', key)
104 xml_param.set('value', value)
105
106 for conf in self.operations:
107 conf.makeXml(xml)
108
109 def __str__(self):
351
110
352 def getKwargs(self):
111 if self.ELEMENTNAME == 'Operation':
112 s = ' {}[id={}]\n'.format(self.name, self.id)
113 else:
114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
353
115
354 kwargs = {}
116 for key, value in self.parameters.items():
117 if self.ELEMENTNAME == 'Operation':
118 s += ' {}: {}\n'.format(key, value)
119 else:
120 s += ' {}: {}\n'.format(key, value)
121
122 for conf in self.operations:
123 s += str(conf)
355
124
356 for parmConfObj in self.parmConfObjList:
125 return s
357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 continue
359
126
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
127 class OperationConf(ConfBase):
361
128
362 return kwargs
129 ELEMENTNAME = 'Operation'
130 xml_labels = ['id', 'name']
363
131
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
132 def setup(self, id, name, priority, project_id, err_queue):
365
133
366 self.id = str(id)
134 self.id = str(id)
367 self.project_id = project_id
135 self.project_id = project_id
368 self.name = name
136 self.name = name
369 self.type = type
137 self.type = 'other'
370 self.priority = priority
371 self.err_queue = err_queue
138 self.err_queue = err_queue
372 self.lock = lock
373 self.parmConfObjList = []
374
375 def removeParameters(self):
376
377 for obj in self.parmConfObjList:
378 del obj
379
380 self.parmConfObjList = []
381
382 def addParameter(self, name, value, format='str'):
383
384 if value is None:
385 return None
386 id = self.__getNewId()
387
388 parmConfObj = ParameterConf()
389 if not parmConfObj.setup(id, name, value, format):
390 return None
391
392 self.parmConfObjList.append(parmConfObj)
393
394 return parmConfObj
395
396 def changeParameter(self, name, value, format='str'):
397
398 parmConfObj = self.getParameterObj(name)
399 parmConfObj.update(name, value, format)
400
139
401 return parmConfObj
140 def readXml(self, element, project_id, err_queue):
402
141
403 def makeXml(self, procUnitElement):
142 self.id = element.get('id')
404
143 self.name = element.get('name')
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
144 self.type = 'other'
406 opElement.set('id', str(self.id))
145 self.project_id = str(project_id)
407 opElement.set('name', self.name)
146 self.err_queue = err_queue
408 opElement.set('type', self.type)
409 opElement.set('priority', str(self.priority))
410
411 for parmConfObj in self.parmConfObjList:
412 parmConfObj.makeXml(opElement)
413
414 def readXml(self, opElement, project_id):
415
416 self.id = opElement.get('id')
417 self.name = opElement.get('name')
418 self.type = opElement.get('type')
419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
421
422 # Compatible with old signal chain version
423 # Use of 'run' method instead 'init'
424 if self.type == 'self' and self.name == 'init':
425 self.name = 'run'
426
427 self.parmConfObjList = []
428
429 parmElementList = opElement.iter(ParameterConf().getElementName())
430
431 for parmElement in parmElementList:
432 parmConfObj = ParameterConf()
433 parmConfObj.readXml(parmElement)
434
435 # Compatible with old signal chain version
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 if self.type != 'self' and self.name == 'Plot':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 self.name = parmConfObj.value
440 continue
441
442 self.parmConfObjList.append(parmConfObj)
443
444 def printattr(self):
445
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 self.id,
448 self.name,
449 self.type,
450 self.priority,
451 self.project_id))
452
147
453 for parmConfObj in self.parmConfObjList:
148 for elm in element.iter('Parameter'):
454 parmConfObj.printattr()
149 self.addParameter(elm.get('name'), elm.get('value'))
455
150
456 def createObject(self):
151 def createObject(self):
457
152
458 className = eval(self.name)
153 className = eval(self.name)
459
154
460 if self.type == 'other':
155 if 'Plot' in self.name or 'Writer' in self.name:
461 opObj = className()
462 elif self.type == 'external':
463 kwargs = self.getKwargs()
156 kwargs = self.getKwargs()
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
465 opObj.start()
158 opObj.start()
466 self.opObj = opObj
159 self.type = 'external'
160 else:
161 opObj = className()
467
162
163 self.object = opObj
468 return opObj
164 return opObj
469
165
470 class ProcUnitConf():
166 class ProcUnitConf(ConfBase):
471
167
472 ELEMENTNAME = 'ProcUnit'
168 ELEMENTNAME = 'ProcUnit'
169 xml_labels = ['id', 'inputId', 'name']
473
170
474 def __init__(self):
171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
475
476 self.id = None
477 self.datatype = None
478 self.name = None
479 self.inputId = None
480 self.opConfObjList = []
481 self.procUnitObj = None
482 self.opObjDict = {}
483
484 def __getPriority(self):
485
486 return len(self.opConfObjList) + 1
487
488 def __getNewId(self):
489
490 return int(self.id) * 10 + len(self.opConfObjList) + 1
491
492 def getElementName(self):
493
494 return self.ELEMENTNAME
495
496 def getId(self):
497
498 return self.id
499
500 def updateId(self, new_id):
501 '''
172 '''
502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504
505 # If this proc unit has not inputs
506 #if self.inputId == '0':
507 #new_inputId = 0
508
509 n = 1
510 for opConfObj in self.opConfObjList:
511
512 idOp = str(int(new_id) * 10 + n)
513 opConfObj.updateId(idOp)
514
515 n += 1
516
517 self.parentId = str(parentId)
518 self.id = str(new_id)
519 #self.inputId = str(new_inputId)
520 '''
521 n = 1
522
523 def getInputId(self):
524
525 return self.inputId
526
527 def getOperationObjList(self):
528
529 return self.opConfObjList
530
531 def getOperationObj(self, name=None):
532
533 for opConfObj in self.opConfObjList:
534
535 if opConfObj.name != name:
536 continue
537
538 return opConfObj
539
540 return None
541
542 def getOpObjfromParamValue(self, value=None):
543
544 for opConfObj in self.opConfObjList:
545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 continue
547 return opConfObj
548 return None
549
550 def getProcUnitObj(self):
551
552 return self.procUnitObj
553
554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
555 '''
556 id sera el topico a publicar
557 inputId sera el topico a subscribirse
558 '''
173 '''
559
174
560 # Compatible with old signal chain version
561 if datatype == None and name == None:
175 if datatype == None and name == None:
562 raise ValueError('datatype or name should be defined')
176 raise ValueError('datatype or name should be defined')
563
177
564 #Definir una condicion para inputId cuando sea 0
565
566 if name == None:
178 if name == None:
567 if 'Proc' in datatype:
179 if 'Proc' in datatype:
568 name = datatype
180 name = datatype
@@ -578,100 +190,49 class ProcUnitConf():
578 self.datatype = datatype
190 self.datatype = datatype
579 self.inputId = inputId
191 self.inputId = inputId
580 self.err_queue = err_queue
192 self.err_queue = err_queue
581 self.lock = lock
193 self.operations = []
582 self.opConfObjList = []
194 self.parameters = {}
583
584 self.addOperation(name='run', optype='self')
585
586 def removeOperations(self):
587
588 for obj in self.opConfObjList:
589 del obj
590
195
591 self.opConfObjList = []
196 def removeOperation(self, id):
592 self.addOperation(name='run')
593
197
594 def addParameter(self, **kwargs):
198 i = [1 if x.id==id else 0 for x in self.operations]
595 '''
199 self.operations.pop(i.index(1))
596 Add parameters to 'run' operation
200
597 '''
201 def getOperation(self, id):
598 opObj = self.opConfObjList[0]
599
600 opObj.addParameter(**kwargs)
601
202
602 return opObj
203 for conf in self.operations:
204 if conf.id == id:
205 return conf
603
206
604 def addOperation(self, name, optype='self'):
207 def addOperation(self, name, optype='self'):
605 '''
208 '''
606 Actualizacion - > proceso comunicacion
607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 definir el tipoc de socket o comunicacion ipc++
609
610 '''
209 '''
611
210
612 id = self.__getNewId()
211 id = self.getNewId()
613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
212 conf = OperationConf()
614 opConfObj = OperationConf()
213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
214 self.operations.append(conf)
616 self.opConfObjList.append(opConfObj)
617
618 return opConfObj
619
620 def makeXml(self, projectElement):
621
215
622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
216 return conf
623 procUnitElement.set('id', str(self.id))
624 procUnitElement.set('name', self.name)
625 procUnitElement.set('datatype', self.datatype)
626 procUnitElement.set('inputId', str(self.inputId))
627
217
628 for opConfObj in self.opConfObjList:
218 def readXml(self, element, project_id, err_queue):
629 opConfObj.makeXml(procUnitElement)
630
219
631 def readXml(self, upElement, project_id):
220 self.id = element.get('id')
632
221 self.name = element.get('name')
633 self.id = upElement.get('id')
222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
634 self.name = upElement.get('name')
223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
635 self.datatype = upElement.get('datatype')
636 self.inputId = upElement.get('inputId')
637 self.project_id = str(project_id)
224 self.project_id = str(project_id)
638
225 self.err_queue = err_queue
639 if self.ELEMENTNAME == 'ReadUnit':
226 self.operations = []
640 self.datatype = self.datatype.replace('Reader', '')
227 self.parameters = {}
641
228
642 if self.ELEMENTNAME == 'ProcUnit':
229 for elm in element:
643 self.datatype = self.datatype.replace('Proc', '')
230 if elm.tag == 'Parameter':
644
231 self.addParameter(elm.get('name'), elm.get('value'))
645 if self.inputId == 'None':
232 elif elm.tag == 'Operation':
646 self.inputId = '0'
233 conf = OperationConf()
647
234 conf.readXml(elm, project_id, err_queue)
648 self.opConfObjList = []
235 self.operations.append(conf)
649
650 opElementList = upElement.iter(OperationConf().getElementName())
651
652 for opElement in opElementList:
653 opConfObj = OperationConf()
654 opConfObj.readXml(opElement, project_id)
655 self.opConfObjList.append(opConfObj)
656
657 def printattr(self):
658
659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 self.id,
661 self.name,
662 self.datatype,
663 self.inputId,
664 self.project_id))
665
666 for opConfObj in self.opConfObjList:
667 opConfObj.printattr()
668
669 def getKwargs(self):
670
671 opObj = self.opConfObjList[0]
672 kwargs = opObj.getKwargs()
673
674 return kwargs
675
236
676 def createObjects(self):
237 def createObjects(self):
677 '''
238 '''
@@ -680,39 +241,27 class ProcUnitConf():
680
241
681 className = eval(self.name)
242 className = eval(self.name)
682 kwargs = self.getKwargs()
243 kwargs = self.getKwargs()
683 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
244 procUnitObj = className()
245 procUnitObj.name = self.name
684 log.success('creating process...', self.name)
246 log.success('creating process...', self.name)
685
247
686 for opConfObj in self.opConfObjList:
248 for conf in self.operations:
687
249
688 if opConfObj.type == 'self' and opConfObj.name == 'run':
250 opObj = conf.createObject()
689 continue
690 elif opConfObj.type == 'self':
691 opObj = getattr(procUnitObj, opConfObj.name)
692 else:
693 opObj = opConfObj.createObject()
694
251
695 log.success('adding operation: {}, type:{}'.format(
252 log.success('adding operation: {}, type:{}'.format(
696 opConfObj.name,
253 conf.name,
697 opConfObj.type), self.name)
254 conf.type), self.name)
698
255
699 procUnitObj.addOperation(opConfObj, opObj)
256 procUnitObj.addOperation(conf, opObj)
700
257
701 procUnitObj.start()
258 self.object = procUnitObj
702 self.procUnitObj = procUnitObj
703
704 def close(self):
705
706 for opConfObj in self.opConfObjList:
707 if opConfObj.type == 'self':
708 continue
709
710 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 opObj.close()
712
713 self.procUnitObj.close()
714
259
715 return
260 def run(self):
261 '''
262 '''
263
264 return self.object.call(**self.getKwargs())
716
265
717
266
718 class ReadUnitConf(ProcUnitConf):
267 class ReadUnitConf(ProcUnitConf):
@@ -725,28 +274,12 class ReadUnitConf(ProcUnitConf):
725 self.datatype = None
274 self.datatype = None
726 self.name = None
275 self.name = None
727 self.inputId = None
276 self.inputId = None
728 self.opConfObjList = []
277 self.operations = []
729 self.lock = Event()
278 self.parameters = {}
730 self.lock.set()
731 self.lock.n = Value('d', 0)
732
733 def getElementName(self):
734
735 return self.ELEMENTNAME
736
279
737 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 startTime='', endTime='', server=None, **kwargs):
281 startTime='', endTime='', server=None, **kwargs):
739
740
741 '''
742 *****el id del proceso sera el Topico
743
744 Adicion de {topic}, si no esta presente -> error
745 kwargs deben ser trasmitidos en la instanciacion
746
747 '''
748
282
749 # Compatible with old signal chain version
750 if datatype == None and name == None:
283 if datatype == None and name == None:
751 raise ValueError('datatype or name should be defined')
284 raise ValueError('datatype or name should be defined')
752 if name == None:
285 if name == None:
@@ -766,112 +299,16 class ReadUnitConf(ProcUnitConf):
766 self.project_id = project_id
299 self.project_id = project_id
767 self.name = name
300 self.name = name
768 self.datatype = datatype
301 self.datatype = datatype
769 if path != '':
770 self.path = os.path.abspath(path)
771 self.startDate = startDate
772 self.endDate = endDate
773 self.startTime = startTime
774 self.endTime = endTime
775 self.server = server
776 self.err_queue = err_queue
302 self.err_queue = err_queue
777 self.addRunOperation(**kwargs)
303
778
304 self.addParameter(name='path', value=path)
779 def update(self, **kwargs):
305 self.addParameter(name='startDate', value=startDate)
780
306 self.addParameter(name='endDate', value=endDate)
781 if 'datatype' in kwargs:
307 self.addParameter(name='startTime', value=startTime)
782 datatype = kwargs.pop('datatype')
308 self.addParameter(name='endTime', value=endTime)
783 if 'Reader' in datatype:
784 self.name = datatype
785 else:
786 self.name = '%sReader' % (datatype)
787 self.datatype = self.name.replace('Reader', '')
788
789 attrs = ('path', 'startDate', 'endDate',
790 'startTime', 'endTime')
791
792 for attr in attrs:
793 if attr in kwargs:
794 setattr(self, attr, kwargs.pop(attr))
795
796 self.updateRunOperation(**kwargs)
797
798 def removeOperations(self):
799
800 for obj in self.opConfObjList:
801 del obj
802
803 self.opConfObjList = []
804
805 def addRunOperation(self, **kwargs):
806
807 opObj = self.addOperation(name='run', optype='self')
808
809 if self.server is None:
810 opObj.addParameter(
811 name='datatype', value=self.datatype, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
813 opObj.addParameter(
814 name='startDate', value=self.startDate, format='date')
815 opObj.addParameter(
816 name='endDate', value=self.endDate, format='date')
817 opObj.addParameter(
818 name='startTime', value=self.startTime, format='time')
819 opObj.addParameter(
820 name='endTime', value=self.endTime, format='time')
821
822 for key, value in list(kwargs.items()):
823 opObj.addParameter(name=key, value=value,
824 format=type(value).__name__)
825 else:
826 opObj.addParameter(name='server', value=self.server, format='str')
827
828 return opObj
829
830 def updateRunOperation(self, **kwargs):
831
832 opObj = self.getOperationObj(name='run')
833 opObj.removeParameters()
834
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
837 opObj.addParameter(
838 name='startDate', value=self.startDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 opObj.addParameter(
841 name='startTime', value=self.startTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843
844 for key, value in list(kwargs.items()):
845 opObj.addParameter(name=key, value=value,
846 format=type(value).__name__)
847
848 return opObj
849
850 def readXml(self, upElement, project_id):
851
852 self.id = upElement.get('id')
853 self.name = upElement.get('name')
854 self.datatype = upElement.get('datatype')
855 self.project_id = str(project_id) #yong
856
857 if self.ELEMENTNAME == 'ReadUnit':
858 self.datatype = self.datatype.replace('Reader', '')
859
860 self.opConfObjList = []
861
862 opElementList = upElement.iter(OperationConf().getElementName())
863
864 for opElement in opElementList:
865 opConfObj = OperationConf()
866 opConfObj.readXml(opElement, project_id)
867 self.opConfObjList.append(opConfObj)
868
309
869 if opConfObj.name == 'run':
310 for key, value in kwargs.items():
870 self.path = opConfObj.getParameterValue('path')
311 self.addParameter(name=key, value=value)
871 self.startDate = opConfObj.getParameterValue('startDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
873 self.startTime = opConfObj.getParameterValue('startTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
875
312
876
313
877 class Project(Process):
314 class Project(Process):
@@ -885,13 +322,15 class Project(Process):
885 self.filename = None
322 self.filename = None
886 self.description = None
323 self.description = None
887 self.email = None
324 self.email = None
888 self.alarm = None
325 self.alarm = []
889 self.procUnitConfObjDict = {}
326 self.configurations = {}
890 self.err_queue = Queue()
327 # self.err_queue = Queue()
328 self.err_queue = None
329 self.started = False
891
330
892 def __getNewId(self):
331 def getNewId(self):
893
332
894 idList = list(self.procUnitConfObjDict.keys())
333 idList = list(self.configurations.keys())
895 id = int(self.id) * 10
334 id = int(self.id) * 10
896
335
897 while True:
336 while True:
@@ -904,43 +343,28 class Project(Process):
904
343
905 return str(id)
344 return str(id)
906
345
907 def getElementName(self):
908
909 return self.ELEMENTNAME
910
911 def getId(self):
912
913 return self.id
914
915 def updateId(self, new_id):
346 def updateId(self, new_id):
916
347
917 self.id = str(new_id)
348 self.id = str(new_id)
918
349
919 keyList = list(self.procUnitConfObjDict.keys())
350 keyList = list(self.configurations.keys())
920 keyList.sort()
351 keyList.sort()
921
352
922 n = 1
353 n = 1
923 newProcUnitConfObjDict = {}
354 new_confs = {}
924
355
925 for procKey in keyList:
356 for procKey in keyList:
926
357
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
358 conf = self.configurations[procKey]
928 idProcUnit = str(int(self.id) * 10 + n)
359 idProcUnit = str(int(self.id) * 10 + n)
929 procUnitConfObj.updateId(idProcUnit)
360 conf.updateId(idProcUnit)
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
361 new_confs[idProcUnit] = conf
931 n += 1
362 n += 1
932
363
933 self.procUnitConfObjDict = newProcUnitConfObjDict
364 self.configurations = new_confs
934
365
935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936
367
937 print(' ')
938 print('*' * 60)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 print('*' * 60)
941 print("* Python " + python_version() + " *")
942 print('*' * 19)
943 print(' ')
944 self.id = str(id)
368 self.id = str(id)
945 self.description = description
369 self.description = description
946 self.email = email
370 self.email = email
@@ -950,108 +374,91 class Project(Process):
950
374
951 def update(self, **kwargs):
375 def update(self, **kwargs):
952
376
953 for key, value in list(kwargs.items()):
377 for key, value in kwargs.items():
954 setattr(self, key, value)
378 setattr(self, key, value)
955
379
956 def clone(self):
380 def clone(self):
957
381
958 p = Project()
382 p = Project()
959 p.procUnitConfObjDict = self.procUnitConfObjDict
383 p.id = self.id
384 p.name = self.name
385 p.description = self.description
386 p.configurations = self.configurations.copy()
387
960 return p
388 return p
961
389
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963
391
964 '''
392 '''
965 Actualizacion:
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969
970 '''
393 '''
971
394
972 if id is None:
395 if id is None:
973 idReadUnit = self.__getNewId()
396 idReadUnit = self.getNewId()
974 else:
397 else:
975 idReadUnit = str(id)
398 idReadUnit = str(id)
976
399
977 readUnitConfObj = ReadUnitConf()
400 conf = ReadUnitConf()
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
402 self.configurations[conf.id] = conf
980
403
981 return readUnitConfObj
404 return conf
982
405
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
984
407
985 '''
408 '''
986 Actualizacion:
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 Deberia reemplazar a "inputId"
989
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992
993 '''
409 '''
994
410
995 idProcUnit = self.__getNewId()
411 if id is None:
996 procUnitConfObj = ProcUnitConf()
412 idProcUnit = self.getNewId()
997 input_proc = self.procUnitConfObjDict[inputId]
413 else:
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
414 idProcUnit = id
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
415
416 conf = ProcUnitConf()
417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
418 self.configurations[conf.id] = conf
1000
419
1001 return procUnitConfObj
420 return conf
1002
421
1003 def removeProcUnit(self, id):
422 def removeProcUnit(self, id):
1004
423
1005 if id in list(self.procUnitConfObjDict.keys()):
424 if id in self.configurations:
1006 self.procUnitConfObjDict.pop(id)
425 self.configurations.pop(id)
1007
1008 def getReadUnitId(self):
1009
1010 readUnitConfObj = self.getReadUnitObj()
1011
426
1012 return readUnitConfObj.id
427 def getReadUnit(self):
1013
428
1014 def getReadUnitObj(self):
429 for obj in list(self.configurations.values()):
1015
430 if obj.ELEMENTNAME == 'ReadUnit':
1016 for obj in list(self.procUnitConfObjDict.values()):
1017 if obj.getElementName() == 'ReadUnit':
1018 return obj
431 return obj
1019
432
1020 return None
433 return None
1021
434
1022 def getProcUnitObj(self, id=None, name=None):
435 def getProcUnit(self, id):
1023
1024 if id != None:
1025 return self.procUnitConfObjDict[id]
1026
436
1027 if name != None:
437 return self.configurations[id]
1028 return self.getProcUnitObjByName(name)
1029
438
1030 return None
439 def getUnits(self):
1031
1032 def getProcUnitObjByName(self, name):
1033
440
1034 for obj in list(self.procUnitConfObjDict.values()):
441 keys = list(self.configurations)
1035 if obj.name == name:
442 keys.sort()
1036 return obj
1037
1038 return None
1039
443
1040 def procUnitItems(self):
444 for key in keys:
445 yield self.configurations[key]
1041
446
1042 return list(self.procUnitConfObjDict.items())
447 def updateUnit(self, id, **kwargs):
1043
448
449 conf = self.configurations[id].update(**kwargs)
450
1044 def makeXml(self):
451 def makeXml(self):
1045
452
1046 projectElement = Element('Project')
453 xml = Element('Project')
1047 projectElement.set('id', str(self.id))
454 xml.set('id', str(self.id))
1048 projectElement.set('name', self.name)
455 xml.set('name', self.name)
1049 projectElement.set('description', self.description)
456 xml.set('description', self.description)
1050
457
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
458 for conf in self.configurations.values():
1052 procUnitConfObj.makeXml(projectElement)
459 conf.makeXml(xml)
1053
460
1054 self.projectElement = projectElement
461 self.xml = xml
1055
462
1056 def writeXml(self, filename=None):
463 def writeXml(self, filename=None):
1057
464
@@ -1077,83 +484,72 class Project(Process):
1077
484
1078 self.makeXml()
485 self.makeXml()
1079
486
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
487 ElementTree(self.xml).write(abs_file, method='xml')
1081
488
1082 self.filename = abs_file
489 self.filename = abs_file
1083
490
1084 return 1
491 return 1
1085
492
1086 def readXml(self, filename=None):
493 def readXml(self, filename):
1087
1088 if not filename:
1089 print('filename is not defined')
1090 return 0
1091
494
1092 abs_file = os.path.abspath(filename)
495 abs_file = os.path.abspath(filename)
1093
496
1094 if not os.path.isfile(abs_file):
497 self.configurations = {}
1095 print('%s file does not exist' % abs_file)
1096 return 0
1097
1098 self.projectElement = None
1099 self.procUnitConfObjDict = {}
1100
498
1101 try:
499 try:
1102 self.projectElement = ElementTree().parse(abs_file)
500 self.xml = ElementTree().parse(abs_file)
1103 except:
501 except:
1104 print('Error reading %s, verify file format' % filename)
502 log.error('Error reading %s, verify file format' % filename)
1105 return 0
503 return 0
1106
504
1107 self.project = self.projectElement.tag
505 self.id = self.xml.get('id')
1108
506 self.name = self.xml.get('name')
1109 self.id = self.projectElement.get('id')
507 self.description = self.xml.get('description')
1110 self.name = self.projectElement.get('name')
508
1111 self.description = self.projectElement.get('description')
509 for element in self.xml:
1112
510 if element.tag == 'ReadUnit':
1113 readUnitElementList = self.projectElement.iter(
511 conf = ReadUnitConf()
1114 ReadUnitConf().getElementName())
512 conf.readXml(element, self.id, self.err_queue)
1115
513 self.configurations[conf.id] = conf
1116 for readUnitElement in readUnitElementList:
514 elif element.tag == 'ProcUnit':
1117 readUnitConfObj = ReadUnitConf()
515 conf = ProcUnitConf()
1118 readUnitConfObj.readXml(readUnitElement, self.id)
516 input_proc = self.configurations[element.get('inputId')]
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
517 conf.readXml(element, self.id, self.err_queue)
1120
518 self.configurations[conf.id] = conf
1121 procUnitElementList = self.projectElement.iter(
1122 ProcUnitConf().getElementName())
1123
1124 for procUnitElement in procUnitElementList:
1125 procUnitConfObj = ProcUnitConf()
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128
519
1129 self.filename = abs_file
520 self.filename = abs_file
1130
521
1131 return 1
522 return 1
1132
523
1133 def __str__(self):
524 def __str__(self):
1134
525
1135 print('Project: name = %s, description = %s, id = %s' % (
526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
1136 self.name,
527 self.id,
1137 self.description,
528 self.name,
1138 self.id))
529 self.description,
530 )
531
532 for conf in self.configurations.values():
533 text += '{}'.format(conf)
1139
534
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
535 return text
1141 print(procUnitConfObj)
1142
536
1143 def createObjects(self):
537 def createObjects(self):
1144
538
1145
539 keys = list(self.configurations.keys())
1146 keys = list(self.procUnitConfObjDict.keys())
1147 keys.sort()
540 keys.sort()
1148 for key in keys:
541 for key in keys:
1149 self.procUnitConfObjDict[key].createObjects()
542 conf = self.configurations[key]
543 conf.createObjects()
544 if conf.inputId is not None:
545 conf.object.setInput(self.configurations[conf.inputId].object)
1150
546
1151 def monitor(self):
547 def monitor(self):
1152
548
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
1154 t.start()
550 t.start()
1155
551
1156 def __monitor(self, queue, ctx):
552 def _monitor(self, queue, ctx):
1157
553
1158 import socket
554 import socket
1159
555
@@ -1184,13 +580,7 class Project(Process):
1184 else:
580 else:
1185 name, err = self.name, err_msg
581 name, err = self.name, err_msg
1186
582
1187 time.sleep(2)
583 time.sleep(1)
1188
1189 for conf in self.procUnitConfObjDict.values():
1190 for confop in conf.opConfObjList:
1191 if confop.type == 'external':
1192 confop.opObj.terminate()
1193 conf.procUnitObj.terminate()
1194
584
1195 ctx.term()
585 ctx.term()
1196
586
@@ -1206,15 +596,14 class Project(Process):
1206 subtitle += 'Configuration file: %s\n' % self.filename
596 subtitle += 'Configuration file: %s\n' % self.filename
1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208
598
1209 readUnitConfObj = self.getReadUnitObj()
599 readUnitConfObj = self.getReadUnit()
1210 if readUnitConfObj:
600 if readUnitConfObj:
1211 subtitle += '\nInput parameters:\n'
601 subtitle += '\nInput parameters:\n'
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218
607
1219 a = Alarm(
608 a = Alarm(
1220 modes=self.alarm,
609 modes=self.alarm,
@@ -1227,64 +616,33 class Project(Process):
1227
616
1228 a.start()
617 a.start()
1229
618
1230 def isPaused(self):
1231 return 0
1232
1233 def isStopped(self):
1234 return 0
1235
1236 def runController(self):
1237 '''
1238 returns 0 when this process has been stopped, 1 otherwise
1239 '''
1240
1241 if self.isPaused():
1242 print('Process suspended')
1243
1244 while True:
1245 time.sleep(0.1)
1246
1247 if not self.isPaused():
1248 break
1249
1250 if self.isStopped():
1251 break
1252
1253 print('Process reinitialized')
1254
1255 if self.isStopped():
1256 print('Process stopped')
1257 return 0
1258
1259 return 1
1260
1261 def setFilename(self, filename):
619 def setFilename(self, filename):
1262
620
1263 self.filename = filename
621 self.filename = filename
1264
622
1265 def setProxy(self):
623 def runProcs(self):
1266
624
1267 if not os.path.exists('/tmp/schain'):
625 err = False
1268 os.mkdir('/tmp/schain')
626 n = len(self.configurations)
627
628 while not err:
629 for conf in self.getUnits():
630 ok = conf.run()
631 if ok is 'Error':
632 n -= 1
633 continue
634 elif not ok:
635 break
636 if n == 0:
637 err = True
1269
638
1270 self.ctx = zmq.Context()
1271 xpub = self.ctx.socket(zmq.XPUB)
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 xsub = self.ctx.socket(zmq.XSUB)
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 self.monitor()
1276 try:
1277 zmq.proxy(xpub, xsub)
1278 except zmq.ContextTerminated:
1279 xpub.close()
1280 xsub.close()
1281
1282 def run(self):
639 def run(self):
1283
640
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
642 self.started = True
1285 self.start_time = time.time()
643 self.start_time = time.time()
1286 self.createObjects()
644 self.createObjects()
1287 self.setProxy()
645 self.runProcs()
1288 log.success('{} Done (Time: {}s)'.format(
646 log.success('{} Done (Time: {:4.2f}s)'.format(
1289 self.name,
647 self.name,
1290 time.time()-self.start_time), '')
648 time.time()-self.start_time), '')
@@ -84,7 +84,7 DATA_STRUCTURE = numpy.dtype([
84 ('sea_algorithm', '<u4')
84 ('sea_algorithm', '<u4')
85 ])
85 ])
86
86
87 @MPDecorator
87
88 class BLTRParamReader(JRODataReader, ProcessingUnit):
88 class BLTRParamReader(JRODataReader, ProcessingUnit):
89 '''
89 '''
90 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
90 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
@@ -247,7 +247,7 class RecordHeaderBLTR():
247
247
248 return 1
248 return 1
249
249
250 @MPDecorator
250
251 class BLTRSpectraReader (ProcessingUnit):
251 class BLTRSpectraReader (ProcessingUnit):
252
252
253 def __init__(self):
253 def __init__(self):
@@ -14,6 +14,7 import time
14 import datetime
14 import datetime
15 import zmq
15 import zmq
16
16
17 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 from schainpy.utils import log
20 from schainpy.utils import log
@@ -724,9 +725,9 class JRODataReader(Reader):
724 firstHeaderSize = 0
725 firstHeaderSize = 0
725 basicHeaderSize = 24
726 basicHeaderSize = 24
726 __isFirstTimeOnline = 1
727 __isFirstTimeOnline = 1
727 __printInfo = True
728 filefmt = "*%Y%j***"
728 filefmt = "*%Y%j***"
729 folderfmt = "*%Y%j"
729 folderfmt = "*%Y%j"
730 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
730
731
731 def getDtypeWidth(self):
732 def getDtypeWidth(self):
732
733
@@ -1214,26 +1215,6 class JRODataReader(Reader):
1214
1215
1215 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216
1217
1217 def printNumberOfBlock(self):
1218 'SPAM!'
1219
1220 # if self.flagIsNewBlock:
1221 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1222 # self.processingHeaderObj.dataBlocksPerFile,
1223 # self.dataOut.datatime.ctime())
1224
1225 def printInfo(self):
1226
1227 if self.__printInfo == False:
1228 return
1229
1230 self.basicHeaderObj.printInfo()
1231 self.systemHeaderObj.printInfo()
1232 self.radarControllerHeaderObj.printInfo()
1233 self.processingHeaderObj.printInfo()
1234
1235 self.__printInfo = False
1236
1237 def run(self, **kwargs):
1218 def run(self, **kwargs):
1238 """
1219 """
1239
1220
@@ -1573,3 +1554,27 class JRODataWriter(Reader):
1573 self.dataOut = dataOut
1554 self.dataOut = dataOut
1574 self.putData()
1555 self.putData()
1575 return self.dataOut
1556 return self.dataOut
1557
1558 class printInfo(Operation):
1559
1560 def __init__(self):
1561
1562 Operation.__init__(self)
1563 self.__printInfo = True
1564
1565 def run(self, dataOut, headers = ['systemHeaderObj', 'radarControllerHeaderObj', 'processingHeaderObj']):
1566 if self.__printInfo == False:
1567 return dataOut
1568
1569 for header in headers:
1570 if hasattr(dataOut, header):
1571 obj = getattr(dataOut, header)
1572 if hasattr(obj, 'printInfo'):
1573 obj.printInfo()
1574 else:
1575 print(obj)
1576 else:
1577 log.warning('Header {} Not found in object'.format(header))
1578
1579 self.__printInfo = False
1580 return dataOut No newline at end of file
@@ -31,7 +31,7 try:
31 except:
31 except:
32 pass
32 pass
33
33
34 @MPDecorator
34
35 class DigitalRFReader(ProcessingUnit):
35 class DigitalRFReader(ProcessingUnit):
36 '''
36 '''
37 classdocs
37 classdocs
@@ -633,7 +633,7 class DigitalRFReader(ProcessingUnit):
633
633
634 return
634 return
635
635
636
636 @MPDecorator
637 class DigitalRFWriter(Operation):
637 class DigitalRFWriter(Operation):
638 '''
638 '''
639 classdocs
639 classdocs
@@ -120,6 +120,7 class Metadata(object):
120 parmConfObj.readXml(parmElement)
120 parmConfObj.readXml(parmElement)
121 self.parmConfObjList.append(parmConfObj)
121 self.parmConfObjList.append(parmConfObj)
122
122
123 @MPDecorator
123 class FitsWriter(Operation):
124 class FitsWriter(Operation):
124 def __init__(self, **kwargs):
125 def __init__(self, **kwargs):
125 Operation.__init__(self, **kwargs)
126 Operation.__init__(self, **kwargs)
@@ -283,7 +284,7 class FitsWriter(Operation):
283 self.isConfig = True
284 self.isConfig = True
284 self.putData()
285 self.putData()
285
286
286 @MPDecorator
287
287 class FitsReader(ProcessingUnit):
288 class FitsReader(ProcessingUnit):
288
289
289 # __TIMEZONE = time.timezone
290 # __TIMEZONE = time.timezone
@@ -78,7 +78,7 def load_json(obj):
78
78
79 return iterable
79 return iterable
80
80
81 @MPDecorator
81
82 class MADReader(Reader, ProcessingUnit):
82 class MADReader(Reader, ProcessingUnit):
83
83
84 def __init__(self):
84 def __init__(self):
@@ -11,7 +11,7 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecora
11 from schainpy.model.io.jroIO_base import *
11 from schainpy.model.io.jroIO_base import *
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14 @MPDecorator
14
15 class ParamReader(JRODataReader,ProcessingUnit):
15 class ParamReader(JRODataReader,ProcessingUnit):
16 '''
16 '''
17 Reads HDF5 format files
17 Reads HDF5 format files
@@ -965,7 +965,7 class ParamWriter(Operation):
965 return
965 return
966
966
967
967
968 @MPDecorator
968
969 class ParameterReader(Reader, ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 '''
970 '''
971 Reads HDF5 format files
971 Reads HDF5 format files
@@ -11,7 +11,7 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader,
11 from schainpy.model.data.jrodata import Spectra
11 from schainpy.model.data.jrodata import Spectra
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14 @MPDecorator
14
15 class SpectraReader(JRODataReader, ProcessingUnit):
15 class SpectraReader(JRODataReader, ProcessingUnit):
16 """
16 """
17 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
17 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
@@ -14,7 +14,7 except:
14
14
15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 from schainpy.model.data.jrodata import Voltage
16 from schainpy.model.data.jrodata import Voltage
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
18
18
19 try:
19 try:
20 import digital_rf_hdf5
20 import digital_rf_hdf5
@@ -546,6 +546,8 class USRPReader(ProcessingUnit):
546
546
547 return
547 return
548
548
549
550 @MPDecorator
549 class USRPWriter(Operation):
551 class USRPWriter(Operation):
550 '''
552 '''
551 classdocs
553 classdocs
@@ -10,12 +10,8 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 from schainpy.model.data.jrodata import Voltage
12 from schainpy.model.data.jrodata import Voltage
13 import zmq
14 import tempfile
15 from io import StringIO
16 # from _sha import blocksize
17
13
18 @MPDecorator
14
19 class VoltageReader(JRODataReader, ProcessingUnit):
15 class VoltageReader(JRODataReader, ProcessingUnit):
20 """
16 """
21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
17 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
@@ -15,7 +15,7 from numpy import transpose
15 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
15 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
16 from schainpy.model.data.jrodata import Parameters
16 from schainpy.model.data.jrodata import Parameters
17
17
18 @MPDecorator
18
19 class BLTRParametersProc(ProcessingUnit):
19 class BLTRParametersProc(ProcessingUnit):
20 '''
20 '''
21 Processing unit for BLTR parameters data (winds)
21 Processing unit for BLTR parameters data (winds)
@@ -76,7 +76,7 class BLTRParametersProc(ProcessingUnit):
76 self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan
76 self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan
77
77
78 # TODO
78 # TODO
79 @MPDecorator
79
80 class OutliersFilter(Operation):
80 class OutliersFilter(Operation):
81
81
82 def __init__(self):
82 def __init__(self):
@@ -16,7 +16,7 class AMISRProc(ProcessingUnit):
16 self.dataOut.copy(self.dataIn)
16 self.dataOut.copy(self.dataIn)
17
17
18
18
19 class PrintInfo(Operation):
19 class PrintInfoAMISR(Operation):
20 def __init__(self, **kwargs):
20 def __init__(self, **kwargs):
21 Operation.__init__(self, **kwargs)
21 Operation.__init__(self, **kwargs)
22 self.__isPrinted = False
22 self.__isPrinted = False
@@ -1,19 +1,9
1 '''
1 '''
2 Updated for multiprocessing
2 Base clases to create Processing units and operations, the MPDecorator
3 Author : Sergio Cortez
3 must be used in plotting and writing operations to allow to run as an
4 Jan 2018
4 external process.
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
10 Based on:
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
5 '''
14
6
15 import os
16 import sys
17 import inspect
7 import inspect
18 import zmq
8 import zmq
19 import time
9 import time
@@ -24,27 +14,16 try:
24 except:
14 except:
25 from Queue import Queue
15 from Queue import Queue
26 from threading import Thread
16 from threading import Thread
27 from multiprocessing import Process
17 from multiprocessing import Process, Queue
28
29 from schainpy.utils import log
18 from schainpy.utils import log
30
19
31
20
32 class ProcessingUnit(object):
21 class ProcessingUnit(object):
22 '''
23 Base class to create Signal Chain Units
24 '''
33
25
34 """
35 Update - Jan 2018 - MULTIPROCESSING
36 All the "call" methods present in the previous base were removed.
37 The majority of operations are independant processes, thus
38 the decorator is in charge of communicate the operation processes
39 with the proccessing unit via IPC.
40
41 The constructor does not receive any argument. The remaining methods
42 are related with the operations to execute.
43
44
45 """
46 proc_type = 'processing'
26 proc_type = 'processing'
47 __attrs__ = []
48
27
49 def __init__(self):
28 def __init__(self):
50
29
@@ -52,8 +31,11 class ProcessingUnit(object):
52 self.dataOut = None
31 self.dataOut = None
53 self.isConfig = False
32 self.isConfig = False
54 self.operations = []
33 self.operations = []
55 self.plots = []
34
35 def setInput(self, unit):
56
36
37 self.dataIn = unit.dataOut
38
57 def getAllowedArgs(self):
39 def getAllowedArgs(self):
58 if hasattr(self, '__attrs__'):
40 if hasattr(self, '__attrs__'):
59 return self.__attrs__
41 return self.__attrs__
@@ -61,27 +43,10 class ProcessingUnit(object):
61 return inspect.getargspec(self.run).args
43 return inspect.getargspec(self.run).args
62
44
63 def addOperation(self, conf, operation):
45 def addOperation(self, conf, operation):
64 """
46 '''
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
47 '''
66 posses the id of the operation process (IPC purposes)
67
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 identificador asociado a este objeto.
70
71 Input:
72
73 object : objeto de la clase "Operation"
74
75 Return:
76
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 """
79
80 self.operations.append(
81 (operation, conf.type, conf.id, conf.getKwargs()))
82
48
83 if 'plot' in self.name.lower():
49 self.operations.append((operation, conf.type, conf.getKwargs()))
84 self.plots.append(operation.CODE)
85
50
86 def getOperationObj(self, objId):
51 def getOperationObj(self, objId):
87
52
@@ -90,17 +55,37 class ProcessingUnit(object):
90
55
91 return self.operations[objId]
56 return self.operations[objId]
92
57
93 def operation(self, **kwargs):
58 def call(self, **kwargs):
94 """
59 '''
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
60 '''
96 atributos del objeto dataOut
61
97
62 try:
98 Input:
63 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
99
64 return self.dataIn.isReady()
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
65 elif self.dataIn is None or not self.dataIn.error:
101 """
66 self.run(**kwargs)
67 elif self.dataIn.error:
68 self.dataOut.error = self.dataIn.error
69 self.dataOut.flagNoData = True
70 except:
71 err = traceback.format_exc()
72 if 'SchainWarning' in err:
73 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
74 elif 'SchainError' in err:
75 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
76 else:
77 log.error(err, self.name)
78 self.dataOut.error = True
79
80 for op, optype, opkwargs in self.operations:
81 if optype == 'other' and not self.dataOut.flagNoData:
82 self.dataOut = op.run(self.dataOut, **opkwargs)
83 elif optype == 'external' and not self.dataOut.flagNoData:
84 op.queue.put(self.dataOut)
85 elif optype == 'external' and self.dataOut.error:
86 op.queue.put(self.dataOut)
102
87
103 raise NotImplementedError
88 return 'Error' if self.dataOut.error else self.dataOut.isReady()
104
89
105 def setup(self):
90 def setup(self):
106
91
@@ -117,22 +102,10 class ProcessingUnit(object):
117
102
118 class Operation(object):
103 class Operation(object):
119
104
120 """
105 '''
121 Update - Jan 2018 - MULTIPROCESSING
106 '''
122
107
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 The constructor doe snot receive any argument, neither the baseclass.
125
126
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 acumulacion dentro de esta clase
130
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132
133 """
134 proc_type = 'operation'
108 proc_type = 'operation'
135 __attrs__ = []
136
109
137 def __init__(self):
110 def __init__(self):
138
111
@@ -180,58 +153,12 class Operation(object):
180
153
181 return
154 return
182
155
183 class InputQueue(Thread):
184
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
188
189 def __init__(self, project_id, inputId, lock=None):
190
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.islocked = False
197 self.size = 0
198
199 def run(self):
200
201 c = zmq.Context()
202 self.receiver = c.socket(zmq.SUB)
203 self.receiver.connect(
204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206
207 while True:
208 obj = self.receiver.recv_multipart()[1]
209 self.size += sys.getsizeof(obj)
210 self.queue.put(obj)
211
212 def get(self):
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
217 self.lock.clear()
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
222 self.lock.set()
223
224 obj = self.queue.get()
225 self.size -= sys.getsizeof(obj)
226 return pickle.loads(obj)
227
228
156
229 def MPDecorator(BaseClass):
157 def MPDecorator(BaseClass):
230 """
158 """
231 Multiprocessing class decorator
159 Multiprocessing class decorator
232
160
233 This function add multiprocessing features to a BaseClass. Also, it handle
161 This function add multiprocessing features to a BaseClass.
234 the communication beetween processes (readers, procUnits and operations).
235 """
162 """
236
163
237 class MPClass(BaseClass, Process):
164 class MPClass(BaseClass, Process):
@@ -239,191 +166,42 def MPDecorator(BaseClass):
239 def __init__(self, *args, **kwargs):
166 def __init__(self, *args, **kwargs):
240 super(MPClass, self).__init__()
167 super(MPClass, self).__init__()
241 Process.__init__(self)
168 Process.__init__(self)
242 self.operationKwargs = {}
169
243 self.args = args
170 self.args = args
244 self.kwargs = kwargs
171 self.kwargs = kwargs
245 self.sender = None
246 self.receiver = None
247 self.i = 0
248 self.t = time.time()
172 self.t = time.time()
173 self.op_type = 'external'
249 self.name = BaseClass.__name__
174 self.name = BaseClass.__name__
250 self.__doc__ = BaseClass.__doc__
175 self.__doc__ = BaseClass.__doc__
251
176
252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
177 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
178 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254
179
255 self.start_time = time.time()
180 self.start_time = time.time()
256 self.id = args[0]
257 self.inputId = args[1]
258 self.project_id = args[2]
259 self.err_queue = args[3]
181 self.err_queue = args[3]
260 self.lock = args[4]
182 self.queue = Queue(maxsize=1)
261 self.typeProc = args[5]
183 self.myrun = BaseClass.run
262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265
266 def subscribe(self):
267 '''
268 Start the zmq socket receiver and subcribe to input ID.
269 '''
270
271 self.queue.start()
272
273 def listen(self):
274 '''
275 This function waits for objects
276 '''
277
278 return self.queue.get()
279
280 def set_publisher(self):
281 '''
282 This function create a zmq socket for publishing objects.
283 '''
284
285 time.sleep(0.5)
286
287 c = zmq.Context()
288 self.sender = c.socket(zmq.PUB)
289 self.sender.connect(
290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291
292 def publish(self, data, id):
293 '''
294 This function publish an object, to an specific topic.
295 It blocks publishing when receiver queue is full to avoid data loss
296 '''
297
298 if self.inputId is None:
299 self.lock.wait()
300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301
302 def runReader(self):
303 '''
304 Run fuction for read units
305 '''
306 while True:
307
308 try:
309 BaseClass.run(self, **self.kwargs)
310 except:
311 err = traceback.format_exc()
312 if 'No more files' in err:
313 log.warning('No more files to read', self.name)
314 else:
315 self.err_queue.put('{}|{}'.format(self.name, err))
316 self.dataOut.error = True
317
318 for op, optype, opId, kwargs in self.operations:
319 if optype == 'self' and not self.dataOut.flagNoData:
320 op(**kwargs)
321 elif optype == 'other' and not self.dataOut.flagNoData:
322 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 elif optype == 'external':
324 self.publish(self.dataOut, opId)
325
326 if self.dataOut.flagNoData and not self.dataOut.error:
327 continue
328
329 self.publish(self.dataOut, self.id)
330
331 if self.dataOut.error:
332 break
333
334 time.sleep(0.5)
335
336 def runProc(self):
337 '''
338 Run function for proccessing units
339 '''
340
341 while True:
342 self.dataIn = self.listen()
343
344 if self.dataIn.flagNoData and self.dataIn.error is None:
345 continue
346 elif not self.dataIn.error:
347 try:
348 BaseClass.run(self, **self.kwargs)
349 except:
350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 self.dataOut.error = True
352 elif self.dataIn.error:
353 self.dataOut.error = self.dataIn.error
354 self.dataOut.flagNoData = True
355
356 for op, optype, opId, kwargs in self.operations:
357 if optype == 'self' and not self.dataOut.flagNoData:
358 op(**kwargs)
359 elif optype == 'other' and not self.dataOut.flagNoData:
360 self.dataOut = op.run(self.dataOut, **kwargs)
361 elif optype == 'external' and not self.dataOut.flagNoData:
362 self.publish(self.dataOut, opId)
363
364 self.publish(self.dataOut, self.id)
365 for op, optype, opId, kwargs in self.operations:
366 if optype == 'external' and self.dataOut.error:
367 self.publish(self.dataOut, opId)
368
369 if self.dataOut.error:
370 break
371
372 time.sleep(0.5)
373
184
374 def runOp(self):
185 def run(self):
375 '''
376 Run function for external operations (this operations just receive data
377 ex: plots, writers, publishers)
378 '''
379
186
380 while True:
187 while True:
381
188
382 dataOut = self.listen()
189 dataOut = self.queue.get()
383
190
384 if not dataOut.error:
191 if not dataOut.error:
385 try:
192 try:
386 BaseClass.run(self, dataOut, **self.kwargs)
193 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
194 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
195 err = traceback.format_exc()
389 dataOut.error = True
196 log.error(err.split('\n')[-2], self.name)
390 else:
391 break
392
393 def run(self):
394 if self.typeProc is "ProcUnit":
395
396 if self.inputId is not None:
397 self.subscribe()
398
399 self.set_publisher()
400
401 if 'Reader' not in BaseClass.__name__:
402 self.runProc()
403 else:
197 else:
404 self.runReader()
198 break
405
406 elif self.typeProc is "Operation":
407
408 self.subscribe()
409 self.runOp()
410
411 else:
412 raise ValueError("Unknown type")
413
199
414 self.close()
200 self.close()
415
201
416 def close(self):
202 def close(self):
417
203
418 BaseClass.close(self)
204 BaseClass.close(self)
419 self.err_queue.put('#_end_#')
420
421 if self.sender:
422 self.sender.close()
423
424 if self.receiver:
425 self.receiver.close()
426
427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
205 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428
206
429 return MPClass
207 return MPClass
@@ -1,7 +1,7
1 import numpy
1 import numpy
2
2
3 from .jroproc_base import ProcessingUnit, Operation
3 from .jroproc_base import ProcessingUnit, Operation
4 from schainpy.model.data.jrodata import Correlation, hildebrand_sekhon
4 from schainpy.model.data.jrodata import Correlation
5
5
6 class CorrelationProc(ProcessingUnit):
6 class CorrelationProc(ProcessingUnit):
7
7
@@ -5,7 +5,7 from schainpy.model.data.jrodata import SpectraHeis
5 from schainpy.utils import log
5 from schainpy.utils import log
6
6
7
7
8 @MPDecorator
8
9 class SpectraHeisProc(ProcessingUnit):
9 class SpectraHeisProc(ProcessingUnit):
10
10
11 def __init__(self):#, **kwargs):
11 def __init__(self):#, **kwargs):
@@ -46,7 +46,7 def _unpickle_method(func_name, obj, cls):
46 break
46 break
47 return func.__get__(obj, cls)
47 return func.__get__(obj, cls)
48
48
49 @MPDecorator
49
50 class ParametersProc(ProcessingUnit):
50 class ParametersProc(ProcessingUnit):
51
51
52 METHODS = {}
52 METHODS = {}
@@ -1329,13 +1329,12 class SpectralMoments(Operation):
1329
1329
1330 def run(self, dataOut):
1330 def run(self, dataOut):
1331
1331
1332 #dataOut.data_pre = dataOut.data_pre[0]
1333 data = dataOut.data_pre[0]
1332 data = dataOut.data_pre[0]
1334 absc = dataOut.abscissaList[:-1]
1333 absc = dataOut.abscissaList[:-1]
1335 noise = dataOut.noise
1334 noise = dataOut.noise
1336 nChannel = data.shape[0]
1335 nChannel = data.shape[0]
1337 data_param = numpy.zeros((nChannel, 4, data.shape[2]))
1336 data_param = numpy.zeros((nChannel, 4, data.shape[2]))
1338
1337
1339 for ind in range(nChannel):
1338 for ind in range(nChannel):
1340 data_param[ind,:,:] = self.__calculateMoments( data[ind,:,:] , absc , noise[ind] )
1339 data_param[ind,:,:] = self.__calculateMoments( data[ind,:,:] , absc , noise[ind] )
1341
1340
@@ -1344,6 +1343,7 class SpectralMoments(Operation):
1344 dataOut.data_POW = data_param[:,1]
1343 dataOut.data_POW = data_param[:,1]
1345 dataOut.data_DOP = data_param[:,2]
1344 dataOut.data_DOP = data_param[:,2]
1346 dataOut.data_WIDTH = data_param[:,3]
1345 dataOut.data_WIDTH = data_param[:,3]
1346
1347 return dataOut
1347 return dataOut
1348
1348
1349 def __calculateMoments(self, oldspec, oldfreq, n0,
1349 def __calculateMoments(self, oldspec, oldfreq, n0,
@@ -1370,25 +1370,27 class SpectralMoments(Operation):
1370 vec_w = numpy.zeros(oldspec.shape[1])
1370 vec_w = numpy.zeros(oldspec.shape[1])
1371 vec_snr = numpy.zeros(oldspec.shape[1])
1371 vec_snr = numpy.zeros(oldspec.shape[1])
1372
1372
1373 oldspec = numpy.ma.masked_invalid(oldspec)
1373 # oldspec = numpy.ma.masked_invalid(oldspec)
1374
1374
1375 for ind in range(oldspec.shape[1]):
1375 for ind in range(oldspec.shape[1]):
1376
1376
1377 spec = oldspec[:,ind]
1377 spec = oldspec[:,ind]
1378 aux = spec*fwindow
1378 aux = spec*fwindow
1379 max_spec = aux.max()
1379 max_spec = aux.max()
1380 m = list(aux).index(max_spec)
1380 m = aux.tolist().index(max_spec)
1381
1381
1382 #Smooth
1382 #Smooth
1383 if (smooth == 0): spec2 = spec
1383 if (smooth == 0):
1384 else: spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1384 spec2 = spec
1385
1385 else:
1386 spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1387
1386 # Calculo de Momentos
1388 # Calculo de Momentos
1387 bb = spec2[list(range(m,spec2.size))]
1389 bb = spec2[numpy.arange(m,spec2.size)]
1388 bb = (bb<n0).nonzero()
1390 bb = (bb<n0).nonzero()
1389 bb = bb[0]
1391 bb = bb[0]
1390
1392
1391 ss = spec2[list(range(0,m + 1))]
1393 ss = spec2[numpy.arange(0,m + 1)]
1392 ss = (ss<n0).nonzero()
1394 ss = (ss<n0).nonzero()
1393 ss = ss[0]
1395 ss = ss[0]
1394
1396
@@ -1399,17 +1401,20 class SpectralMoments(Operation):
1399 if (bb0 < 0):
1401 if (bb0 < 0):
1400 bb0 = 0
1402 bb0 = 0
1401
1403
1402 if (ss.size == 0): ss1 = 1
1404 if (ss.size == 0):
1403 else: ss1 = max(ss) + 1
1405 ss1 = 1
1406 else:
1407 ss1 = max(ss) + 1
1404
1408
1405 if (ss1 > m): ss1 = m
1409 if (ss1 > m):
1410 ss1 = m
1406
1411
1407 valid = numpy.asarray(list(range(int(m + bb0 - ss1 + 1)))) + ss1
1412 valid = numpy.arange(int(m + bb0 - ss1 + 1)) + ss1
1408 power = ((spec2[valid] - n0)*fwindow[valid]).sum()
1409 fd = ((spec2[valid]- n0)*freq[valid]*fwindow[valid]).sum()/power
1410 w = math.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum()/power)
1411 snr = (spec2.mean()-n0)/n0
1412
1413
1414 power = ((spec2[valid] - n0) * fwindow[valid]).sum()
1415 fd = ((spec2[valid]- n0)*freq[valid] * fwindow[valid]).sum() / power
1416 w = numpy.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum() / power)
1417 snr = (spec2.mean()-n0)/n0
1413 if (snr < 1.e-20) :
1418 if (snr < 1.e-20) :
1414 snr = 1.e-20
1419 snr = 1.e-20
1415
1420
@@ -1417,9 +1422,8 class SpectralMoments(Operation):
1417 vec_fd[ind] = fd
1422 vec_fd[ind] = fd
1418 vec_w[ind] = w
1423 vec_w[ind] = w
1419 vec_snr[ind] = snr
1424 vec_snr[ind] = snr
1420
1425
1421 moments = numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1426 return numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1422 return moments
1423
1427
1424 #------------------ Get SA Parameters --------------------------
1428 #------------------ Get SA Parameters --------------------------
1425
1429
@@ -1,3 +1,4
1 import time
1 import itertools
2 import itertools
2
3
3 import numpy
4 import numpy
@@ -7,7 +8,7 from schainpy.model.data.jrodata import Spectra
7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.utils import log
9 from schainpy.utils import log
9
10
10 @MPDecorator
11
11 class SpectraProc(ProcessingUnit):
12 class SpectraProc(ProcessingUnit):
12
13
13
14
@@ -120,7 +121,7 class SpectraProc(ProcessingUnit):
120 self.dataOut.flagShiftFFT = False
121 self.dataOut.flagShiftFFT = False
121
122
122 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
123 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
123
124
124 if self.dataIn.type == "Spectra":
125 if self.dataIn.type == "Spectra":
125 self.dataOut.copy(self.dataIn)
126 self.dataOut.copy(self.dataIn)
126 if shift_fft:
127 if shift_fft:
@@ -219,81 +220,6 class SpectraProc(ProcessingUnit):
219 self.dataOut.pairsList = pairs
220 self.dataOut.pairsList = pairs
220
221
221 return
222 return
222
223 def __selectPairsByChannel(self, channelList=None):
224
225 if channelList == None:
226 return
227
228 pairsIndexListSelected = []
229 for pairIndex in self.dataOut.pairsIndexList:
230 # First pair
231 if self.dataOut.pairsList[pairIndex][0] not in channelList:
232 continue
233 # Second pair
234 if self.dataOut.pairsList[pairIndex][1] not in channelList:
235 continue
236
237 pairsIndexListSelected.append(pairIndex)
238
239 if not pairsIndexListSelected:
240 self.dataOut.data_cspc = None
241 self.dataOut.pairsList = []
242 return
243
244 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
245 self.dataOut.pairsList = [self.dataOut.pairsList[i]
246 for i in pairsIndexListSelected]
247
248 return
249
250 def selectChannels(self, channelList):
251
252 channelIndexList = []
253
254 for channel in channelList:
255 if channel not in self.dataOut.channelList:
256 raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % (
257 channel, str(self.dataOut.channelList)))
258
259 index = self.dataOut.channelList.index(channel)
260 channelIndexList.append(index)
261
262 self.selectChannelsByIndex(channelIndexList)
263
264 def selectChannelsByIndex(self, channelIndexList):
265 """
266 Selecciona un bloque de datos en base a canales segun el channelIndexList
267
268 Input:
269 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
270
271 Affected:
272 self.dataOut.data_spc
273 self.dataOut.channelIndexList
274 self.dataOut.nChannels
275
276 Return:
277 None
278 """
279
280 for channelIndex in channelIndexList:
281 if channelIndex not in self.dataOut.channelIndexList:
282 raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % (
283 channelIndex, self.dataOut.channelIndexList))
284
285 data_spc = self.dataOut.data_spc[channelIndexList, :]
286 data_dc = self.dataOut.data_dc[channelIndexList, :]
287
288 self.dataOut.data_spc = data_spc
289 self.dataOut.data_dc = data_dc
290
291 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
292 self.dataOut.channelList = range(len(channelIndexList))
293 self.__selectPairsByChannel(channelIndexList)
294
295 return 1
296
297
223
298 def selectFFTs(self, minFFT, maxFFT ):
224 def selectFFTs(self, minFFT, maxFFT ):
299 """
225 """
@@ -331,67 +257,6 class SpectraProc(ProcessingUnit):
331
257
332 return 1
258 return 1
333
259
334
335 def setH0(self, h0, deltaHeight = None):
336
337 if not deltaHeight:
338 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
339
340 nHeights = self.dataOut.nHeights
341
342 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
343
344 self.dataOut.heightList = newHeiRange
345
346
347 def selectHeights(self, minHei, maxHei):
348 """
349 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
350 minHei <= height <= maxHei
351
352 Input:
353 minHei : valor minimo de altura a considerar
354 maxHei : valor maximo de altura a considerar
355
356 Affected:
357 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
358
359 Return:
360 1 si el metodo se ejecuto con exito caso contrario devuelve 0
361 """
362
363
364 if (minHei > maxHei):
365 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei))
366
367 if (minHei < self.dataOut.heightList[0]):
368 minHei = self.dataOut.heightList[0]
369
370 if (maxHei > self.dataOut.heightList[-1]):
371 maxHei = self.dataOut.heightList[-1]
372
373 minIndex = 0
374 maxIndex = 0
375 heights = self.dataOut.heightList
376
377 inda = numpy.where(heights >= minHei)
378 indb = numpy.where(heights <= maxHei)
379
380 try:
381 minIndex = inda[0][0]
382 except:
383 minIndex = 0
384
385 try:
386 maxIndex = indb[0][-1]
387 except:
388 maxIndex = len(heights)
389
390 self.selectHeightsByIndex(minIndex, maxIndex)
391
392
393 return 1
394
395 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
260 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
396 newheis = numpy.where(
261 newheis = numpy.where(
397 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
262 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
@@ -466,54 +331,100 class SpectraProc(ProcessingUnit):
466
331
467 return 1
332 return 1
468
333
334 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
335 # validacion de rango
336 if minHei == None:
337 minHei = self.dataOut.heightList[0]
469
338
339 if maxHei == None:
340 maxHei = self.dataOut.heightList[-1]
470
341
471 def selectHeightsByIndex(self, minIndex, maxIndex):
342 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
472 """
343 print('minHei: %.2f is out of the heights range' % (minHei))
473 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
344 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
474 minIndex <= index <= maxIndex
345 minHei = self.dataOut.heightList[0]
475
346
476 Input:
347 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
477 minIndex : valor de indice minimo de altura a considerar
348 print('maxHei: %.2f is out of the heights range' % (maxHei))
478 maxIndex : valor de indice maximo de altura a considerar
349 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
350 maxHei = self.dataOut.heightList[-1]
479
351
480 Affected:
352 # validacion de velocidades
481 self.dataOut.data_spc
353 velrange = self.dataOut.getVelRange(1)
482 self.dataOut.data_cspc
483 self.dataOut.data_dc
484 self.dataOut.heightList
485
354
486 Return:
355 if minVel == None:
487 1 si el metodo se ejecuto con exito caso contrario devuelve 0
356 minVel = velrange[0]
488 """
357
358 if maxVel == None:
359 maxVel = velrange[-1]
360
361 if (minVel < velrange[0]) or (minVel > maxVel):
362 print('minVel: %.2f is out of the velocity range' % (minVel))
363 print('minVel is setting to %.2f' % (velrange[0]))
364 minVel = velrange[0]
365
366 if (maxVel > velrange[-1]) or (maxVel < minVel):
367 print('maxVel: %.2f is out of the velocity range' % (maxVel))
368 print('maxVel is setting to %.2f' % (velrange[-1]))
369 maxVel = velrange[-1]
370
371 # seleccion de indices para rango
372 minIndex = 0
373 maxIndex = 0
374 heights = self.dataOut.heightList
375
376 inda = numpy.where(heights >= minHei)
377 indb = numpy.where(heights <= maxHei)
378
379 try:
380 minIndex = inda[0][0]
381 except:
382 minIndex = 0
383
384 try:
385 maxIndex = indb[0][-1]
386 except:
387 maxIndex = len(heights)
489
388
490 if (minIndex < 0) or (minIndex > maxIndex):
389 if (minIndex < 0) or (minIndex > maxIndex):
491 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
390 raise ValueError("some value in (%d,%d) is not valid" % (
492 minIndex, maxIndex))
391 minIndex, maxIndex))
493
392
494 if (maxIndex >= self.dataOut.nHeights):
393 if (maxIndex >= self.dataOut.nHeights):
495 maxIndex = self.dataOut.nHeights - 1
394 maxIndex = self.dataOut.nHeights - 1
496
395
497 # Spectra
396 # seleccion de indices para velocidades
498 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
397 indminvel = numpy.where(velrange >= minVel)
398 indmaxvel = numpy.where(velrange <= maxVel)
399 try:
400 minIndexVel = indminvel[0][0]
401 except:
402 minIndexVel = 0
499
403
500 data_cspc = None
404 try:
501 if self.dataOut.data_cspc is not None:
405 maxIndexVel = indmaxvel[0][-1]
502 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
406 except:
407 maxIndexVel = len(velrange)
503
408
504 data_dc = None
409 # seleccion del espectro
505 if self.dataOut.data_dc is not None:
410 data_spc = self.dataOut.data_spc[:,
506 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
411 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
412 # estimacion de ruido
413 noise = numpy.zeros(self.dataOut.nChannels)
507
414
508 self.dataOut.data_spc = data_spc
415 for channel in range(self.dataOut.nChannels):
509 self.dataOut.data_cspc = data_cspc
416 daux = data_spc[channel, :, :]
510 self.dataOut.data_dc = data_dc
417 sortdata = numpy.sort(daux, axis=None)
418 noise[channel] = hildebrand_sekhon(sortdata, self.dataOut.nIncohInt)
511
419
512 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
420 self.dataOut.noise_estimation = noise.copy()
513
421
514 return 1
422 return 1
515
423
516 def removeDC(self, mode=2):
424 class removeDC(Operation):
425
426 def run(self, dataOut, mode=2):
427 self.dataOut = dataOut
517 jspectra = self.dataOut.data_spc
428 jspectra = self.dataOut.data_spc
518 jcspectra = self.dataOut.data_cspc
429 jcspectra = self.dataOut.data_cspc
519
430
@@ -571,7 +482,9 class SpectraProc(ProcessingUnit):
571 self.dataOut.data_spc = jspectra
482 self.dataOut.data_spc = jspectra
572 self.dataOut.data_cspc = jcspectra
483 self.dataOut.data_cspc = jcspectra
573
484
574 return 1
485 return self.dataOut
486
487 class removeInterference(Operation):
575
488
576 def removeInterference2(self):
489 def removeInterference2(self):
577
490
@@ -594,11 +507,9 class SpectraProc(ProcessingUnit):
594 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
507 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
595 cspc[i,InterferenceRange,:] = numpy.NaN
508 cspc[i,InterferenceRange,:] = numpy.NaN
596
509
597
598
599 self.dataOut.data_cspc = cspc
510 self.dataOut.data_cspc = cspc
600
511
601 def removeInterference(self, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None):
512 def removeInterference(self):
602
513
603 jspectra = self.dataOut.data_spc
514 jspectra = self.dataOut.data_spc
604 jcspectra = self.dataOut.data_cspc
515 jcspectra = self.dataOut.data_cspc
@@ -781,101 +692,16 class SpectraProc(ProcessingUnit):
781
692
782 return 1
693 return 1
783
694
784 def setRadarFrequency(self, frequency=None):
695 def run(self, dataOut, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None, mode=1):
785
786 if frequency != None:
787 self.dataOut.frequency = frequency
788
789 return 1
790
791 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
792 # validacion de rango
793 if minHei == None:
794 minHei = self.dataOut.heightList[0]
795
796 if maxHei == None:
797 maxHei = self.dataOut.heightList[-1]
798
799 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
800 print('minHei: %.2f is out of the heights range' % (minHei))
801 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
802 minHei = self.dataOut.heightList[0]
803
804 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
805 print('maxHei: %.2f is out of the heights range' % (maxHei))
806 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
807 maxHei = self.dataOut.heightList[-1]
808
809 # validacion de velocidades
810 velrange = self.dataOut.getVelRange(1)
811
696
812 if minVel == None:
697 self.dataOut = dataOut
813 minVel = velrange[0]
814
698
815 if maxVel == None:
699 if mode == 1:
816 maxVel = velrange[-1]
700 self.removeInterference(interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None)
817
701 elif mode == 2:
818 if (minVel < velrange[0]) or (minVel > maxVel):
702 self.removeInterference2()
819 print('minVel: %.2f is out of the velocity range' % (minVel))
820 print('minVel is setting to %.2f' % (velrange[0]))
821 minVel = velrange[0]
822
823 if (maxVel > velrange[-1]) or (maxVel < minVel):
824 print('maxVel: %.2f is out of the velocity range' % (maxVel))
825 print('maxVel is setting to %.2f' % (velrange[-1]))
826 maxVel = velrange[-1]
827
828 # seleccion de indices para rango
829 minIndex = 0
830 maxIndex = 0
831 heights = self.dataOut.heightList
832
833 inda = numpy.where(heights >= minHei)
834 indb = numpy.where(heights <= maxHei)
835
836 try:
837 minIndex = inda[0][0]
838 except:
839 minIndex = 0
840
841 try:
842 maxIndex = indb[0][-1]
843 except:
844 maxIndex = len(heights)
845
846 if (minIndex < 0) or (minIndex > maxIndex):
847 raise ValueError("some value in (%d,%d) is not valid" % (
848 minIndex, maxIndex))
849
850 if (maxIndex >= self.dataOut.nHeights):
851 maxIndex = self.dataOut.nHeights - 1
852
853 # seleccion de indices para velocidades
854 indminvel = numpy.where(velrange >= minVel)
855 indmaxvel = numpy.where(velrange <= maxVel)
856 try:
857 minIndexVel = indminvel[0][0]
858 except:
859 minIndexVel = 0
860
861 try:
862 maxIndexVel = indmaxvel[0][-1]
863 except:
864 maxIndexVel = len(velrange)
865
866 # seleccion del espectro
867 data_spc = self.dataOut.data_spc[:,
868 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
869 # estimacion de ruido
870 noise = numpy.zeros(self.dataOut.nChannels)
871
872 for channel in range(self.dataOut.nChannels):
873 daux = data_spc[channel, :, :]
874 noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt)
875
876 self.dataOut.noise_estimation = noise.copy()
877
703
878 return 1
704 return self.dataOut
879
705
880
706
881 class IncohInt(Operation):
707 class IncohInt(Operation):
@@ -1031,7 +857,7 class IncohInt(Operation):
1031
857
1032 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
858 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
1033 if n == 1:
859 if n == 1:
1034 return
860 return dataOut
1035
861
1036 dataOut.flagNoData = True
862 dataOut.flagNoData = True
1037
863
@@ -7,7 +7,7 from schainpy.utils import log
7 from time import time
7 from time import time
8
8
9
9
10 @MPDecorator
10
11 class VoltageProc(ProcessingUnit):
11 class VoltageProc(ProcessingUnit):
12
12
13 def __init__(self):
13 def __init__(self):
@@ -26,8 +26,6 class VoltageProc(ProcessingUnit):
26 if self.dataIn.type == 'Voltage':
26 if self.dataIn.type == 'Voltage':
27 self.dataOut.copy(self.dataIn)
27 self.dataOut.copy(self.dataIn)
28
28
29 # self.dataOut.copy(self.dataIn)
30
31 def __updateObjFromAmisrInput(self):
29 def __updateObjFromAmisrInput(self):
32
30
33 self.dataOut.timeZone = self.dataIn.timeZone
31 self.dataOut.timeZone = self.dataIn.timeZone
@@ -53,23 +51,14 class VoltageProc(ProcessingUnit):
53 self.dataOut.beam.codeList = self.dataIn.beam.codeList
51 self.dataOut.beam.codeList = self.dataIn.beam.codeList
54 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
52 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
55 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
53 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
56 #
54
57 # pass#
55
58 #
56 class selectChannels(Operation):
59 # def init(self):
57
60 #
58 def run(self, dataOut, channelList):
61 #
62 # if self.dataIn.type == 'AMISR':
63 # self.__updateObjFromAmisrInput()
64 #
65 # if self.dataIn.type == 'Voltage':
66 # self.dataOut.copy(self.dataIn)
67 # # No necesita copiar en cada init() los atributos de dataIn
68 # # la copia deberia hacerse por cada nuevo bloque de datos
69
70 def selectChannels(self, channelList):
71
59
72 channelIndexList = []
60 channelIndexList = []
61 self.dataOut = dataOut
73
62
74 for channel in channelList:
63 for channel in channelList:
75 if channel not in self.dataOut.channelList:
64 if channel not in self.dataOut.channelList:
@@ -79,6 +68,7 class VoltageProc(ProcessingUnit):
79 channelIndexList.append(index)
68 channelIndexList.append(index)
80
69
81 self.selectChannelsByIndex(channelIndexList)
70 self.selectChannelsByIndex(channelIndexList)
71 return self.dataOut
82
72
83 def selectChannelsByIndex(self, channelIndexList):
73 def selectChannelsByIndex(self, channelIndexList):
84 """
74 """
@@ -101,24 +91,63 class VoltageProc(ProcessingUnit):
101
91
102 for channelIndex in channelIndexList:
92 for channelIndex in channelIndexList:
103 if channelIndex not in self.dataOut.channelIndexList:
93 if channelIndex not in self.dataOut.channelIndexList:
104 print(channelIndexList)
105 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
94 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
106
95
107 if self.dataOut.flagDataAsBlock:
96 if self.dataOut.type == 'Voltage':
108 """
97 if self.dataOut.flagDataAsBlock:
109 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
98 """
110 """
99 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
111 data = self.dataOut.data[channelIndexList,:,:]
100 """
112 else:
101 data = self.dataOut.data[channelIndexList,:,:]
113 data = self.dataOut.data[channelIndexList,:]
102 else:
103 data = self.dataOut.data[channelIndexList,:]
104
105 self.dataOut.data = data
106 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
107 self.dataOut.channelList = range(len(channelIndexList))
108 elif self.dataOut.type == 'Spectra':
109 data_spc = self.dataOut.data_spc[channelIndexList, :]
110 data_dc = self.dataOut.data_dc[channelIndexList, :]
111
112 self.dataOut.data_spc = data_spc
113 self.dataOut.data_dc = data_dc
114
115 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
116 self.dataOut.channelList = range(len(channelIndexList))
117 self.__selectPairsByChannel(channelIndexList)
114
118
115 self.dataOut.data = data
116 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
117 self.dataOut.channelList = range(len(channelIndexList))
118
119 return 1
119 return 1
120
120
121 def selectHeights(self, minHei=None, maxHei=None):
121 def __selectPairsByChannel(self, channelList=None):
122
123 if channelList == None:
124 return
125
126 pairsIndexListSelected = []
127 for pairIndex in self.dataOut.pairsIndexList:
128 # First pair
129 if self.dataOut.pairsList[pairIndex][0] not in channelList:
130 continue
131 # Second pair
132 if self.dataOut.pairsList[pairIndex][1] not in channelList:
133 continue
134
135 pairsIndexListSelected.append(pairIndex)
136
137 if not pairsIndexListSelected:
138 self.dataOut.data_cspc = None
139 self.dataOut.pairsList = []
140 return
141
142 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
143 self.dataOut.pairsList = [self.dataOut.pairsList[i]
144 for i in pairsIndexListSelected]
145
146 return
147
148 class selectHeights(Operation):
149
150 def run(self, dataOut, minHei=None, maxHei=None):
122 """
151 """
123 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
152 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
124 minHei <= height <= maxHei
153 minHei <= height <= maxHei
@@ -134,6 +163,8 class VoltageProc(ProcessingUnit):
134 1 si el metodo se ejecuto con exito caso contrario devuelve 0
163 1 si el metodo se ejecuto con exito caso contrario devuelve 0
135 """
164 """
136
165
166 self.dataOut = dataOut
167
137 if minHei == None:
168 if minHei == None:
138 minHei = self.dataOut.heightList[0]
169 minHei = self.dataOut.heightList[0]
139
170
@@ -165,8 +196,7 class VoltageProc(ProcessingUnit):
165
196
166 self.selectHeightsByIndex(minIndex, maxIndex)
197 self.selectHeightsByIndex(minIndex, maxIndex)
167
198
168 return 1
199 return self.dataOut
169
170
200
171 def selectHeightsByIndex(self, minIndex, maxIndex):
201 def selectHeightsByIndex(self, minIndex, maxIndex):
172 """
202 """
@@ -185,81 +215,118 class VoltageProc(ProcessingUnit):
185 1 si el metodo se ejecuto con exito caso contrario devuelve 0
215 1 si el metodo se ejecuto con exito caso contrario devuelve 0
186 """
216 """
187
217
188 if (minIndex < 0) or (minIndex > maxIndex):
218 if self.dataOut.type == 'Voltage':
189 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
219 if (minIndex < 0) or (minIndex > maxIndex):
220 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
190
221
191 if (maxIndex >= self.dataOut.nHeights):
222 if (maxIndex >= self.dataOut.nHeights):
192 maxIndex = self.dataOut.nHeights
223 maxIndex = self.dataOut.nHeights
193
224
194 #voltage
225 #voltage
195 if self.dataOut.flagDataAsBlock:
226 if self.dataOut.flagDataAsBlock:
196 """
227 """
197 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
228 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
198 """
229 """
199 data = self.dataOut.data[:,:, minIndex:maxIndex]
230 data = self.dataOut.data[:,:, minIndex:maxIndex]
200 else:
231 else:
201 data = self.dataOut.data[:, minIndex:maxIndex]
232 data = self.dataOut.data[:, minIndex:maxIndex]
233
234 # firstHeight = self.dataOut.heightList[minIndex]
235
236 self.dataOut.data = data
237 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
238
239 if self.dataOut.nHeights <= 1:
240 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
241 elif self.dataOut.type == 'Spectra':
242 if (minIndex < 0) or (minIndex > maxIndex):
243 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
244 minIndex, maxIndex))
202
245
203 # firstHeight = self.dataOut.heightList[minIndex]
246 if (maxIndex >= self.dataOut.nHeights):
247 maxIndex = self.dataOut.nHeights - 1
204
248
205 self.dataOut.data = data
249 # Spectra
206 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
250 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
207
251
208 if self.dataOut.nHeights <= 1:
252 data_cspc = None
209 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
253 if self.dataOut.data_cspc is not None:
254 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
210
255
256 data_dc = None
257 if self.dataOut.data_dc is not None:
258 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
259
260 self.dataOut.data_spc = data_spc
261 self.dataOut.data_cspc = data_cspc
262 self.dataOut.data_dc = data_dc
263
264 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
265
211 return 1
266 return 1
212
267
213
268
214 def filterByHeights(self, window):
269 class filterByHeights(Operation):
270
271 def run(self, dataOut, window):
215
272
216 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
273 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
217
274
218 if window == None:
275 if window == None:
219 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
276 window = (dataOut.radarControllerHeaderObj.txA/dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
220
277
221 newdelta = deltaHeight * window
278 newdelta = deltaHeight * window
222 r = self.dataOut.nHeights % window
279 r = dataOut.nHeights % window
223 newheights = (self.dataOut.nHeights-r)/window
280 newheights = (dataOut.nHeights-r)/window
224
281
225 if newheights <= 1:
282 if newheights <= 1:
226 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
283 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(dataOut.nHeights, window))
227
284
228 if self.dataOut.flagDataAsBlock:
285 if dataOut.flagDataAsBlock:
229 """
286 """
230 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
287 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
231 """
288 """
232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
289 buffer = dataOut.data[:, :, 0:int(dataOut.nHeights-r)]
233 buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window)
290 buffer = buffer.reshape(dataOut.nChannels, dataOut.nProfiles, int(dataOut.nHeights/window), window)
234 buffer = numpy.sum(buffer,3)
291 buffer = numpy.sum(buffer,3)
235
292
236 else:
293 else:
237 buffer = self.dataOut.data[:,0:int(self.dataOut.nHeights-r)]
294 buffer = dataOut.data[:,0:int(dataOut.nHeights-r)]
238 buffer = buffer.reshape(self.dataOut.nChannels,int(self.dataOut.nHeights/window),int(window))
295 buffer = buffer.reshape(dataOut.nChannels,int(dataOut.nHeights/window),int(window))
239 buffer = numpy.sum(buffer,2)
296 buffer = numpy.sum(buffer,2)
240
297
241 self.dataOut.data = buffer
298 dataOut.data = buffer
242 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
299 dataOut.heightList = dataOut.heightList[0] + numpy.arange( newheights )*newdelta
243 self.dataOut.windowOfFilter = window
300 dataOut.windowOfFilter = window
301
302 return dataOut
303
304
305 class setH0(Operation):
244
306
245 def setH0(self, h0, deltaHeight = None):
307 def run(self, dataOut, h0, deltaHeight = None):
246
308
247 if not deltaHeight:
309 if not deltaHeight:
248 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
310 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
249
311
250 nHeights = self.dataOut.nHeights
312 nHeights = dataOut.nHeights
251
313
252 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
314 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
253
315
254 self.dataOut.heightList = newHeiRange
316 dataOut.heightList = newHeiRange
317
318 return dataOut
319
320
321 class deFlip(Operation):
255
322
256 def deFlip(self, channelList = []):
323 def run(self, dataOut, channelList = []):
257
324
258 data = self.dataOut.data.copy()
325 data = dataOut.data.copy()
259
326
260 if self.dataOut.flagDataAsBlock:
327 if dataOut.flagDataAsBlock:
261 flip = self.flip
328 flip = self.flip
262 profileList = list(range(self.dataOut.nProfiles))
329 profileList = list(range(dataOut.nProfiles))
263
330
264 if not channelList:
331 if not channelList:
265 for thisProfile in profileList:
332 for thisProfile in profileList:
@@ -267,7 +334,7 class VoltageProc(ProcessingUnit):
267 flip *= -1.0
334 flip *= -1.0
268 else:
335 else:
269 for thisChannel in channelList:
336 for thisChannel in channelList:
270 if thisChannel not in self.dataOut.channelList:
337 if thisChannel not in dataOut.channelList:
271 continue
338 continue
272
339
273 for thisProfile in profileList:
340 for thisProfile in profileList:
@@ -281,41 +348,57 class VoltageProc(ProcessingUnit):
281 data[:,:] = data[:,:]*self.flip
348 data[:,:] = data[:,:]*self.flip
282 else:
349 else:
283 for thisChannel in channelList:
350 for thisChannel in channelList:
284 if thisChannel not in self.dataOut.channelList:
351 if thisChannel not in dataOut.channelList:
285 continue
352 continue
286
353
287 data[thisChannel,:] = data[thisChannel,:]*self.flip
354 data[thisChannel,:] = data[thisChannel,:]*self.flip
288
355
289 self.flip *= -1.
356 self.flip *= -1.
290
357
291 self.dataOut.data = data
358 dataOut.data = data
359
360 return dataOut
292
361
293 def setRadarFrequency(self, frequency=None):
294
362
295 if frequency != None:
363 class setAttribute(Operation):
296 self.dataOut.frequency = frequency
364 '''
365 Set an arbitrary attribute to dataOut
366 '''
297
367
298 return 1
368 def __init__(self):
369
370 Operation.__init__(self)
371 self._ready = False
299
372
300 def interpolateHeights(self, topLim, botLim):
373 def run(self, dataOut, **kwargs):
374
375 for key, value in kwargs.items():
376 setattr(dataOut, key, value)
377
378 return dataOut
379
380
381 class interpolateHeights(Operation):
382
383 def run(self, dataOut, topLim, botLim):
301 #69 al 72 para julia
384 #69 al 72 para julia
302 #82-84 para meteoros
385 #82-84 para meteoros
303 if len(numpy.shape(self.dataOut.data))==2:
386 if len(numpy.shape(dataOut.data))==2:
304 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
387 sampInterp = (dataOut.data[:,botLim-1] + dataOut.data[:,topLim+1])/2
305 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
388 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
306 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
389 #dataOut.data[:,botLim:limSup+1] = sampInterp
307 self.dataOut.data[:,botLim:topLim+1] = sampInterp
390 dataOut.data[:,botLim:topLim+1] = sampInterp
308 else:
391 else:
309 nHeights = self.dataOut.data.shape[2]
392 nHeights = dataOut.data.shape[2]
310 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
393 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
311 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
394 y = dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
312 f = interpolate.interp1d(x, y, axis = 2)
395 f = interpolate.interp1d(x, y, axis = 2)
313 xnew = numpy.arange(botLim,topLim+1)
396 xnew = numpy.arange(botLim,topLim+1)
314 ynew = f(xnew)
397 ynew = f(xnew)
398 dataOut.data[:,:,botLim:topLim+1] = ynew
315
399
316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
400 return dataOut
317
401
318 # import collections
319
402
320 class CohInt(Operation):
403 class CohInt(Operation):
321
404
@@ -979,8 +1062,6 class ProfileSelector(Operation):
979
1062
980 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
1063 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981
1064
982 #return False
983 return dataOut
984
1065
985 class Reshaper(Operation):
1066 class Reshaper(Operation):
986
1067
General Comments 0
You need to be logged in to leave comments. Login now