##// END OF EJS Templates
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
Juan C. Espinoza -
r1287:af11e4aac00c
parent child
Show More
@@ -1,10 +1,9
1 import click
1 import click
2 import schainpy
3 import subprocess
2 import subprocess
4 import os
3 import os
5 import sys
4 import sys
6 import glob
5 import glob
7 from multiprocessing import cpu_count
6 import schainpy
8 from schainpy.controller import Project
7 from schainpy.controller import Project
9 from schainpy.model import Operation, ProcessingUnit
8 from schainpy.model import Operation, ProcessingUnit
10 from schainpy.utils import log
9 from schainpy.utils import log
@@ -43,7 +42,7 def getOperations():
43 def getArgs(op):
42 def getArgs(op):
44 module = locate('schainpy.model.{}'.format(op))
43 module = locate('schainpy.model.{}'.format(op))
45 try:
44 try:
46 obj = module(1,2,3,Queue(),5,6)
45 obj = module(1, 2, 3, Queue())
47 except:
46 except:
48 obj = module()
47 obj = module()
49
48
@@ -68,7 +67,7 def getArgs(op):
68 def getDoc(obj):
67 def getDoc(obj):
69 module = locate('schainpy.model.{}'.format(obj))
68 module = locate('schainpy.model.{}'.format(obj))
70 try:
69 try:
71 obj = module(1,2,3,Queue(),5,6)
70 obj = module(1, 2, 3, Queue())
72 except:
71 except:
73 obj = module()
72 obj = module()
74 return obj.__doc__
73 return obj.__doc__
@@ -94,9 +93,9 PREFIX = 'experiment'
94 @click.argument('nextcommand', default=None, required=False, type=str)
93 @click.argument('nextcommand', default=None, required=False, type=str)
95 def main(command, nextcommand, version):
94 def main(command, nextcommand, version):
96 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
95 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
97 Available commands.\n
96 Available commands:\n
98 xml: runs a schain XML generated file\n
97 xml: runs a schain XML generated file\n
99 run: runs any python script starting 'experiment_'\n
98 run: runs any python script'\n
100 generate: generates a template schain script\n
99 generate: generates a template schain script\n
101 list: return a list of available procs and operations\n
100 list: return a list of available procs and operations\n
102 search: return avilable operations, procs or arguments of the given
101 search: return avilable operations, procs or arguments of the given
@@ -156,11 +155,9 def search(nextcommand):
156 try:
155 try:
157 args = getArgs(nextcommand)
156 args = getArgs(nextcommand)
158 doc = getDoc(nextcommand)
157 doc = getDoc(nextcommand)
159 if len(args) == 0:
160 log.success('\n{} has no arguments'.format(nextcommand), '')
161 else:
162 log.success('{}\n{}\n\narguments:\n {}'.format(
158 log.success('{}\n{}\n\narguments:\n {}'.format(
163 nextcommand, doc, ', '.join(args)), '')
159 nextcommand, doc, ', '.join(args)), ''
160 )
164 except Exception as e:
161 except Exception as e:
165 log.error('Module `{}` does not exists'.format(nextcommand), '')
162 log.error('Module `{}` does not exists'.format(nextcommand), '')
166 allModules = getAll()
163 allModules = getAll()
This diff has been collapsed as it changes many lines, (1158 lines changed) Show them Hide them
@@ -1,568 +1,180
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
2 Main routines to create a Signal Chain project
3 Author: Sergio Cortez
4 Created on September , 2012
5 '''
3 '''
6 from platform import python_version
4
5 import re
7 import sys
6 import sys
8 import ast
7 import ast
9 import datetime
8 import datetime
10 import traceback
9 import traceback
11 import math
12 import time
10 import time
13 import zmq
11 from multiprocessing import Process, Queue
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 from threading import Thread
12 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
13 from xml.etree.ElementTree import ElementTree, Element, SubElement
17 from xml.dom import minidom
18
19
14
20 from schainpy.admin import Alarm, SchainWarning
15 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.model import *
16 from schainpy.model import *
22 from schainpy.utils import log
17 from schainpy.utils import log
23
18
24
19
25 DTYPES = {
20 class ConfBase():
26 'Voltage': '.r',
27 'Spectra': '.pdata'
28 }
29
30
31 def MPProject(project, n=cpu_count()):
32 '''
33 Project wrapper to run schain in n processes
34 '''
35
36 rconf = project.getReadUnitObj()
37 op = rconf.getOperationObj('run')
38 dt1 = op.getParameterValue('startDate')
39 dt2 = op.getParameterValue('endDate')
40 tm1 = op.getParameterValue('startTime')
41 tm2 = op.getParameterValue('endTime')
42 days = (dt2 - dt1).days
43
44 for day in range(days + 1):
45 skip = 0
46 cursor = 0
47 processes = []
48 dt = dt1 + datetime.timedelta(day)
49 dt_str = dt.strftime('%Y/%m/%d')
50 reader = JRODataReader()
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 startDate=dt,
53 endDate=dt,
54 startTime=tm1,
55 endTime=tm2,
56 ext=DTYPES[rconf.datatype])
57 nFiles = len(files)
58 if nFiles == 0:
59 continue
60 skip = int(math.ceil(nFiles / n))
61 while nFiles > cursor * skip:
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 skip=skip)
64 p = project.clone()
65 p.start()
66 processes.append(p)
67 cursor += 1
68
69 def beforeExit(exctype, value, trace):
70 for process in processes:
71 process.terminate()
72 process.join()
73 print(traceback.print_tb(trace))
74
75 sys.excepthook = beforeExit
76
77 for process in processes:
78 process.join()
79 process.terminate()
80
81 time.sleep(3)
82
83 def wait(context):
84
85 time.sleep(1)
86 c = zmq.Context()
87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 msg = receiver.recv_multipart()[1]
91 context.terminate()
92
93 class ParameterConf():
94
95 id = None
96 name = None
97 value = None
98 format = None
99
100 __formated_value = None
101
102 ELEMENTNAME = 'Parameter'
103
104 def __init__(self):
105
106 self.format = 'str'
107
108 def getElementName(self):
109
110 return self.ELEMENTNAME
111
112 def getValue(self):
113
114 value = self.value
115 format = self.format
116
117 if self.__formated_value != None:
118
119 return self.__formated_value
120
121 if format == 'obj':
122 return value
123
124 if format == 'str':
125 self.__formated_value = str(value)
126 return self.__formated_value
127
128 if value == '':
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
131 if format == 'list':
132 strList = [s.strip() for s in value.split(',')]
133 self.__formated_value = strList
134
135 return self.__formated_value
136
137 if format == 'intlist':
138 '''
139 Example:
140 value = (0,1,2)
141 '''
142
143 new_value = ast.literal_eval(value)
144
145 if type(new_value) not in (tuple, list):
146 new_value = [int(new_value)]
147
148 self.__formated_value = new_value
149
150 return self.__formated_value
151
152 if format == 'floatlist':
153 '''
154 Example:
155 value = (0.5, 1.4, 2.7)
156 '''
157
158 new_value = ast.literal_eval(value)
159
160 if type(new_value) not in (tuple, list):
161 new_value = [float(new_value)]
162
163 self.__formated_value = new_value
164
165 return self.__formated_value
166
167 if format == 'date':
168 strList = value.split('/')
169 intList = [int(x) for x in strList]
170 date = datetime.date(intList[0], intList[1], intList[2])
171
172 self.__formated_value = date
173
174 return self.__formated_value
175
176 if format == 'time':
177 strList = value.split(':')
178 intList = [int(x) for x in strList]
179 time = datetime.time(intList[0], intList[1], intList[2])
180
181 self.__formated_value = time
182
183 return self.__formated_value
184
185 if format == 'pairslist':
186 '''
187 Example:
188 value = (0,1),(1,2)
189 '''
190
191 new_value = ast.literal_eval(value)
192
193 if type(new_value) not in (tuple, list):
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195
196 if type(new_value[0]) not in (tuple, list):
197 if len(new_value) != 2:
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 new_value = [new_value]
200
201 for thisPair in new_value:
202 if len(thisPair) != 2:
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204
205 self.__formated_value = new_value
206
207 return self.__formated_value
208
209 if format == 'multilist':
210 '''
211 Example:
212 value = (0,1,2),(3,4,5)
213 '''
214 multiList = ast.literal_eval(value)
215
216 if type(multiList[0]) == int:
217 multiList = ast.literal_eval('(' + value + ')')
218
219 self.__formated_value = multiList
220
221 return self.__formated_value
222
223 if format == 'bool':
224 value = int(value)
225
226 if format == 'int':
227 value = float(value)
228
229 format_func = eval(format)
230
231 self.__formated_value = format_func(value)
232
233 return self.__formated_value
234
235 def updateId(self, new_id):
236
237 self.id = str(new_id)
238
239 def setup(self, id, name, value, format='str'):
240 self.id = str(id)
241 self.name = name
242 if format == 'obj':
243 self.value = value
244 else:
245 self.value = str(value)
246 self.format = str.lower(format)
247
248 self.getValue()
249
250 return 1
251
252 def update(self, name, value, format='str'):
253
254 self.name = name
255 self.value = str(value)
256 self.format = format
257
258 def makeXml(self, opElement):
259 if self.name not in ('queue',):
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 parmElement.set('id', str(self.id))
262 parmElement.set('name', self.name)
263 parmElement.set('value', self.value)
264 parmElement.set('format', self.format)
265
266 def readXml(self, parmElement):
267
268 self.id = parmElement.get('id')
269 self.name = parmElement.get('name')
270 self.value = parmElement.get('value')
271 self.format = str.lower(parmElement.get('format'))
272
273 # Compatible with old signal chain version
274 if self.format == 'int' and self.name == 'idfigure':
275 self.name = 'id'
276
277 def printattr(self):
278
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280
281 class OperationConf():
282
283 ELEMENTNAME = 'Operation'
284
21
285 def __init__(self):
22 def __init__(self):
286
23
287 self.id = '0'
24 self.id = '0'
288 self.name = None
25 self.name = None
289 self.priority = None
26 self.priority = None
290 self.topic = None
27 self.parameters = {}
291
28 self.object = None
292 def __getNewId(self):
29 self.operations = []
293
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295
30
296 def getId(self):
31 def getId(self):
32
297 return self.id
33 return self.id
298
34
35 def getNewId(self):
36
37 return int(self.id) * 10 + len(self.operations) + 1
38
299 def updateId(self, new_id):
39 def updateId(self, new_id):
300
40
301 self.id = str(new_id)
41 self.id = str(new_id)
302
42
303 n = 1
43 n = 1
304 for parmObj in self.parmConfObjList:
44 for conf in self.operations:
305
45 conf_id = str(int(new_id) * 10 + n)
306 idParm = str(int(new_id) * 10 + n)
46 conf.updateId(conf_id)
307 parmObj.updateId(idParm)
308
309 n += 1
47 n += 1
310
48
311 def getElementName(self):
49 def getKwargs(self):
312
313 return self.ELEMENTNAME
314
315 def getParameterObjList(self):
316
50
317 return self.parmConfObjList
51 params = {}
318
52
319 def getParameterObj(self, parameterName):
53 for key, value in self.parameters.items():
54 if value not in (None, '', ' '):
55 params[key] = value
320
56
321 for parmConfObj in self.parmConfObjList:
57 return params
322
58
323 if parmConfObj.name != parameterName:
59 def update(self, **kwargs):
324 continue
325
60
326 return parmConfObj
61 for key, value in kwargs.items():
62 self.addParameter(name=key, value=value)
327
63
328 return None
64 def addParameter(self, name, value, format=None):
65 '''
66 '''
329
67
330 def getParameterObjfromValue(self, parameterValue):
68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
72 else:
73 try:
74 self.parameters[name] = ast.literal_eval(value)
75 except:
76 if isinstance(value, str) and ',' in value:
77 self.parameters[name] = value.split(',')
78 else:
79 self.parameters[name] = value
331
80
332 for parmConfObj in self.parmConfObjList:
81 def getParameters(self):
333
82
334 if parmConfObj.getValue() != parameterValue:
83 params = {}
335 continue
84 for key, value in self.parameters.items():
85 s = type(value).__name__
86 if s == 'date':
87 params[key] = value.strftime('%Y/%m/%d')
88 elif s == 'time':
89 params[key] = value.strftime('%H:%M:%S')
90 else:
91 params[key] = str(value)
336
92
337 return parmConfObj.getValue()
93 return params
338
94
339 return None
95 def makeXml(self, element):
340
96
341 def getParameterValue(self, parameterName):
97 xml = SubElement(element, self.ELEMENTNAME)
98 for label in self.xml_labels:
99 xml.set(label, str(getattr(self, label)))
342
100
343 parameterObj = self.getParameterObj(parameterName)
101 for key, value in self.getParameters().items():
102 xml_param = SubElement(xml, 'Parameter')
103 xml_param.set('name', key)
104 xml_param.set('value', value)
344
105
345 # if not parameterObj:
106 for conf in self.operations:
346 # return None
107 conf.makeXml(xml)
347
108
348 value = parameterObj.getValue()
109 def __str__(self):
349
110
350 return value
111 if self.ELEMENTNAME == 'Operation':
112 s = ' {}[id={}]\n'.format(self.name, self.id)
113 else:
114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
351
115
352 def getKwargs(self):
116 for key, value in self.parameters.items():
117 if self.ELEMENTNAME == 'Operation':
118 s += ' {}: {}\n'.format(key, value)
119 else:
120 s += ' {}: {}\n'.format(key, value)
353
121
354 kwargs = {}
122 for conf in self.operations:
123 s += str(conf)
355
124
356 for parmConfObj in self.parmConfObjList:
125 return s
357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 continue
359
126
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
127 class OperationConf(ConfBase):
361
128
362 return kwargs
129 ELEMENTNAME = 'Operation'
130 xml_labels = ['id', 'name']
363
131
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
132 def setup(self, id, name, priority, project_id, err_queue):
365
133
366 self.id = str(id)
134 self.id = str(id)
367 self.project_id = project_id
135 self.project_id = project_id
368 self.name = name
136 self.name = name
369 self.type = type
137 self.type = 'other'
370 self.priority = priority
371 self.err_queue = err_queue
138 self.err_queue = err_queue
372 self.lock = lock
373 self.parmConfObjList = []
374
375 def removeParameters(self):
376
377 for obj in self.parmConfObjList:
378 del obj
379
380 self.parmConfObjList = []
381
139
382 def addParameter(self, name, value, format='str'):
140 def readXml(self, element, project_id, err_queue):
383
141
384 if value is None:
142 self.id = element.get('id')
385 return None
143 self.name = element.get('name')
386 id = self.__getNewId()
144 self.type = 'other'
387
388 parmConfObj = ParameterConf()
389 if not parmConfObj.setup(id, name, value, format):
390 return None
391
392 self.parmConfObjList.append(parmConfObj)
393
394 return parmConfObj
395
396 def changeParameter(self, name, value, format='str'):
397
398 parmConfObj = self.getParameterObj(name)
399 parmConfObj.update(name, value, format)
400
401 return parmConfObj
402
403 def makeXml(self, procUnitElement):
404
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 opElement.set('id', str(self.id))
407 opElement.set('name', self.name)
408 opElement.set('type', self.type)
409 opElement.set('priority', str(self.priority))
410
411 for parmConfObj in self.parmConfObjList:
412 parmConfObj.makeXml(opElement)
413
414 def readXml(self, opElement, project_id):
415
416 self.id = opElement.get('id')
417 self.name = opElement.get('name')
418 self.type = opElement.get('type')
419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
145 self.project_id = str(project_id)
146 self.err_queue = err_queue
421
147
422 # Compatible with old signal chain version
148 for elm in element.iter('Parameter'):
423 # Use of 'run' method instead 'init'
149 self.addParameter(elm.get('name'), elm.get('value'))
424 if self.type == 'self' and self.name == 'init':
425 self.name = 'run'
426
427 self.parmConfObjList = []
428
429 parmElementList = opElement.iter(ParameterConf().getElementName())
430
431 for parmElement in parmElementList:
432 parmConfObj = ParameterConf()
433 parmConfObj.readXml(parmElement)
434
435 # Compatible with old signal chain version
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 if self.type != 'self' and self.name == 'Plot':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 self.name = parmConfObj.value
440 continue
441
442 self.parmConfObjList.append(parmConfObj)
443
444 def printattr(self):
445
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 self.id,
448 self.name,
449 self.type,
450 self.priority,
451 self.project_id))
452
453 for parmConfObj in self.parmConfObjList:
454 parmConfObj.printattr()
455
150
456 def createObject(self):
151 def createObject(self):
457
152
458 className = eval(self.name)
153 className = eval(self.name)
459
154
460 if self.type == 'other':
155 if 'Plot' in self.name or 'Writer' in self.name:
461 opObj = className()
462 elif self.type == 'external':
463 kwargs = self.getKwargs()
156 kwargs = self.getKwargs()
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
465 opObj.start()
158 opObj.start()
466 self.opObj = opObj
159 self.type = 'external'
160 else:
161 opObj = className()
467
162
163 self.object = opObj
468 return opObj
164 return opObj
469
165
470 class ProcUnitConf():
166 class ProcUnitConf(ConfBase):
471
167
472 ELEMENTNAME = 'ProcUnit'
168 ELEMENTNAME = 'ProcUnit'
169 xml_labels = ['id', 'inputId', 'name']
473
170
474 def __init__(self):
171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
475
476 self.id = None
477 self.datatype = None
478 self.name = None
479 self.inputId = None
480 self.opConfObjList = []
481 self.procUnitObj = None
482 self.opObjDict = {}
483
484 def __getPriority(self):
485
486 return len(self.opConfObjList) + 1
487
488 def __getNewId(self):
489
490 return int(self.id) * 10 + len(self.opConfObjList) + 1
491
492 def getElementName(self):
493
494 return self.ELEMENTNAME
495
496 def getId(self):
497
498 return self.id
499
500 def updateId(self, new_id):
501 '''
502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504
505 # If this proc unit has not inputs
506 #if self.inputId == '0':
507 #new_inputId = 0
508
509 n = 1
510 for opConfObj in self.opConfObjList:
511
512 idOp = str(int(new_id) * 10 + n)
513 opConfObj.updateId(idOp)
514
515 n += 1
516
517 self.parentId = str(parentId)
518 self.id = str(new_id)
519 #self.inputId = str(new_inputId)
520 '''
521 n = 1
522
523 def getInputId(self):
524
525 return self.inputId
526
527 def getOperationObjList(self):
528
529 return self.opConfObjList
530
531 def getOperationObj(self, name=None):
532
533 for opConfObj in self.opConfObjList:
534
535 if opConfObj.name != name:
536 continue
537
538 return opConfObj
539
540 return None
541
542 def getOpObjfromParamValue(self, value=None):
543
544 for opConfObj in self.opConfObjList:
545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 continue
547 return opConfObj
548 return None
549
550 def getProcUnitObj(self):
551
552 return self.procUnitObj
553
554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
555 '''
172 '''
556 id sera el topico a publicar
557 inputId sera el topico a subscribirse
558 '''
173 '''
559
174
560 # Compatible with old signal chain version
561 if datatype == None and name == None:
175 if datatype == None and name == None:
562 raise ValueError('datatype or name should be defined')
176 raise ValueError('datatype or name should be defined')
563
177
564 #Definir una condicion para inputId cuando sea 0
565
566 if name == None:
178 if name == None:
567 if 'Proc' in datatype:
179 if 'Proc' in datatype:
568 name = datatype
180 name = datatype
@@ -578,100 +190,49 class ProcUnitConf():
578 self.datatype = datatype
190 self.datatype = datatype
579 self.inputId = inputId
191 self.inputId = inputId
580 self.err_queue = err_queue
192 self.err_queue = err_queue
581 self.lock = lock
193 self.operations = []
582 self.opConfObjList = []
194 self.parameters = {}
583
584 self.addOperation(name='run', optype='self')
585
586 def removeOperations(self):
587
588 for obj in self.opConfObjList:
589 del obj
590
195
591 self.opConfObjList = []
196 def removeOperation(self, id):
592 self.addOperation(name='run')
593
197
594 def addParameter(self, **kwargs):
198 i = [1 if x.id==id else 0 for x in self.operations]
595 '''
199 self.operations.pop(i.index(1))
596 Add parameters to 'run' operation
597 '''
598 opObj = self.opConfObjList[0]
599
200
600 opObj.addParameter(**kwargs)
201 def getOperation(self, id):
601
202
602 return opObj
203 for conf in self.operations:
204 if conf.id == id:
205 return conf
603
206
604 def addOperation(self, name, optype='self'):
207 def addOperation(self, name, optype='self'):
605 '''
208 '''
606 Actualizacion - > proceso comunicacion
607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 definir el tipoc de socket o comunicacion ipc++
609
610 '''
209 '''
611
210
612 id = self.__getNewId()
211 id = self.getNewId()
613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
212 conf = OperationConf()
614 opConfObj = OperationConf()
213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
214 self.operations.append(conf)
616 self.opConfObjList.append(opConfObj)
617
618 return opConfObj
619
215
620 def makeXml(self, projectElement):
216 return conf
621
217
622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
218 def readXml(self, element, project_id, err_queue):
623 procUnitElement.set('id', str(self.id))
624 procUnitElement.set('name', self.name)
625 procUnitElement.set('datatype', self.datatype)
626 procUnitElement.set('inputId', str(self.inputId))
627
219
628 for opConfObj in self.opConfObjList:
220 self.id = element.get('id')
629 opConfObj.makeXml(procUnitElement)
221 self.name = element.get('name')
630
222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
631 def readXml(self, upElement, project_id):
223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
632
633 self.id = upElement.get('id')
634 self.name = upElement.get('name')
635 self.datatype = upElement.get('datatype')
636 self.inputId = upElement.get('inputId')
637 self.project_id = str(project_id)
224 self.project_id = str(project_id)
225 self.err_queue = err_queue
226 self.operations = []
227 self.parameters = {}
638
228
639 if self.ELEMENTNAME == 'ReadUnit':
229 for elm in element:
640 self.datatype = self.datatype.replace('Reader', '')
230 if elm.tag == 'Parameter':
641
231 self.addParameter(elm.get('name'), elm.get('value'))
642 if self.ELEMENTNAME == 'ProcUnit':
232 elif elm.tag == 'Operation':
643 self.datatype = self.datatype.replace('Proc', '')
233 conf = OperationConf()
644
234 conf.readXml(elm, project_id, err_queue)
645 if self.inputId == 'None':
235 self.operations.append(conf)
646 self.inputId = '0'
647
648 self.opConfObjList = []
649
650 opElementList = upElement.iter(OperationConf().getElementName())
651
652 for opElement in opElementList:
653 opConfObj = OperationConf()
654 opConfObj.readXml(opElement, project_id)
655 self.opConfObjList.append(opConfObj)
656
657 def printattr(self):
658
659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 self.id,
661 self.name,
662 self.datatype,
663 self.inputId,
664 self.project_id))
665
666 for opConfObj in self.opConfObjList:
667 opConfObj.printattr()
668
669 def getKwargs(self):
670
671 opObj = self.opConfObjList[0]
672 kwargs = opObj.getKwargs()
673
674 return kwargs
675
236
676 def createObjects(self):
237 def createObjects(self):
677 '''
238 '''
@@ -680,39 +241,27 class ProcUnitConf():
680
241
681 className = eval(self.name)
242 className = eval(self.name)
682 kwargs = self.getKwargs()
243 kwargs = self.getKwargs()
683 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
244 procUnitObj = className()
245 procUnitObj.name = self.name
684 log.success('creating process...', self.name)
246 log.success('creating process...', self.name)
685
247
686 for opConfObj in self.opConfObjList:
248 for conf in self.operations:
687
249
688 if opConfObj.type == 'self' and opConfObj.name == 'run':
250 opObj = conf.createObject()
689 continue
690 elif opConfObj.type == 'self':
691 opObj = getattr(procUnitObj, opConfObj.name)
692 else:
693 opObj = opConfObj.createObject()
694
251
695 log.success('adding operation: {}, type:{}'.format(
252 log.success('adding operation: {}, type:{}'.format(
696 opConfObj.name,
253 conf.name,
697 opConfObj.type), self.name)
254 conf.type), self.name)
698
699 procUnitObj.addOperation(opConfObj, opObj)
700
701 procUnitObj.start()
702 self.procUnitObj = procUnitObj
703
255
704 def close(self):
256 procUnitObj.addOperation(conf, opObj)
705
257
706 for opConfObj in self.opConfObjList:
258 self.object = procUnitObj
707 if opConfObj.type == 'self':
708 continue
709
710 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 opObj.close()
712
259
713 self.procUnitObj.close()
260 def run(self):
261 '''
262 '''
714
263
715 return
264 return self.object.call(**self.getKwargs())
716
265
717
266
718 class ReadUnitConf(ProcUnitConf):
267 class ReadUnitConf(ProcUnitConf):
@@ -725,28 +274,12 class ReadUnitConf(ProcUnitConf):
725 self.datatype = None
274 self.datatype = None
726 self.name = None
275 self.name = None
727 self.inputId = None
276 self.inputId = None
728 self.opConfObjList = []
277 self.operations = []
729 self.lock = Event()
278 self.parameters = {}
730 self.lock.set()
731 self.lock.n = Value('d', 0)
732
733 def getElementName(self):
734
735 return self.ELEMENTNAME
736
279
737 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 startTime='', endTime='', server=None, **kwargs):
281 startTime='', endTime='', server=None, **kwargs):
739
282
740
741 '''
742 *****el id del proceso sera el Topico
743
744 Adicion de {topic}, si no esta presente -> error
745 kwargs deben ser trasmitidos en la instanciacion
746
747 '''
748
749 # Compatible with old signal chain version
750 if datatype == None and name == None:
283 if datatype == None and name == None:
751 raise ValueError('datatype or name should be defined')
284 raise ValueError('datatype or name should be defined')
752 if name == None:
285 if name == None:
@@ -766,112 +299,16 class ReadUnitConf(ProcUnitConf):
766 self.project_id = project_id
299 self.project_id = project_id
767 self.name = name
300 self.name = name
768 self.datatype = datatype
301 self.datatype = datatype
769 if path != '':
770 self.path = os.path.abspath(path)
771 self.startDate = startDate
772 self.endDate = endDate
773 self.startTime = startTime
774 self.endTime = endTime
775 self.server = server
776 self.err_queue = err_queue
302 self.err_queue = err_queue
777 self.addRunOperation(**kwargs)
778
779 def update(self, **kwargs):
780
781 if 'datatype' in kwargs:
782 datatype = kwargs.pop('datatype')
783 if 'Reader' in datatype:
784 self.name = datatype
785 else:
786 self.name = '%sReader' % (datatype)
787 self.datatype = self.name.replace('Reader', '')
788
789 attrs = ('path', 'startDate', 'endDate',
790 'startTime', 'endTime')
791
792 for attr in attrs:
793 if attr in kwargs:
794 setattr(self, attr, kwargs.pop(attr))
795
796 self.updateRunOperation(**kwargs)
797
303
798 def removeOperations(self):
304 self.addParameter(name='path', value=path)
305 self.addParameter(name='startDate', value=startDate)
306 self.addParameter(name='endDate', value=endDate)
307 self.addParameter(name='startTime', value=startTime)
308 self.addParameter(name='endTime', value=endTime)
799
309
800 for obj in self.opConfObjList:
310 for key, value in kwargs.items():
801 del obj
311 self.addParameter(name=key, value=value)
802
803 self.opConfObjList = []
804
805 def addRunOperation(self, **kwargs):
806
807 opObj = self.addOperation(name='run', optype='self')
808
809 if self.server is None:
810 opObj.addParameter(
811 name='datatype', value=self.datatype, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
813 opObj.addParameter(
814 name='startDate', value=self.startDate, format='date')
815 opObj.addParameter(
816 name='endDate', value=self.endDate, format='date')
817 opObj.addParameter(
818 name='startTime', value=self.startTime, format='time')
819 opObj.addParameter(
820 name='endTime', value=self.endTime, format='time')
821
822 for key, value in list(kwargs.items()):
823 opObj.addParameter(name=key, value=value,
824 format=type(value).__name__)
825 else:
826 opObj.addParameter(name='server', value=self.server, format='str')
827
828 return opObj
829
830 def updateRunOperation(self, **kwargs):
831
832 opObj = self.getOperationObj(name='run')
833 opObj.removeParameters()
834
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
837 opObj.addParameter(
838 name='startDate', value=self.startDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 opObj.addParameter(
841 name='startTime', value=self.startTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843
844 for key, value in list(kwargs.items()):
845 opObj.addParameter(name=key, value=value,
846 format=type(value).__name__)
847
848 return opObj
849
850 def readXml(self, upElement, project_id):
851
852 self.id = upElement.get('id')
853 self.name = upElement.get('name')
854 self.datatype = upElement.get('datatype')
855 self.project_id = str(project_id) #yong
856
857 if self.ELEMENTNAME == 'ReadUnit':
858 self.datatype = self.datatype.replace('Reader', '')
859
860 self.opConfObjList = []
861
862 opElementList = upElement.iter(OperationConf().getElementName())
863
864 for opElement in opElementList:
865 opConfObj = OperationConf()
866 opConfObj.readXml(opElement, project_id)
867 self.opConfObjList.append(opConfObj)
868
869 if opConfObj.name == 'run':
870 self.path = opConfObj.getParameterValue('path')
871 self.startDate = opConfObj.getParameterValue('startDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
873 self.startTime = opConfObj.getParameterValue('startTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
875
312
876
313
877 class Project(Process):
314 class Project(Process):
@@ -885,13 +322,15 class Project(Process):
885 self.filename = None
322 self.filename = None
886 self.description = None
323 self.description = None
887 self.email = None
324 self.email = None
888 self.alarm = None
325 self.alarm = []
889 self.procUnitConfObjDict = {}
326 self.configurations = {}
890 self.err_queue = Queue()
327 # self.err_queue = Queue()
328 self.err_queue = None
329 self.started = False
891
330
892 def __getNewId(self):
331 def getNewId(self):
893
332
894 idList = list(self.procUnitConfObjDict.keys())
333 idList = list(self.configurations.keys())
895 id = int(self.id) * 10
334 id = int(self.id) * 10
896
335
897 while True:
336 while True:
@@ -904,43 +343,28 class Project(Process):
904
343
905 return str(id)
344 return str(id)
906
345
907 def getElementName(self):
908
909 return self.ELEMENTNAME
910
911 def getId(self):
912
913 return self.id
914
915 def updateId(self, new_id):
346 def updateId(self, new_id):
916
347
917 self.id = str(new_id)
348 self.id = str(new_id)
918
349
919 keyList = list(self.procUnitConfObjDict.keys())
350 keyList = list(self.configurations.keys())
920 keyList.sort()
351 keyList.sort()
921
352
922 n = 1
353 n = 1
923 newProcUnitConfObjDict = {}
354 new_confs = {}
924
355
925 for procKey in keyList:
356 for procKey in keyList:
926
357
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
358 conf = self.configurations[procKey]
928 idProcUnit = str(int(self.id) * 10 + n)
359 idProcUnit = str(int(self.id) * 10 + n)
929 procUnitConfObj.updateId(idProcUnit)
360 conf.updateId(idProcUnit)
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
361 new_confs[idProcUnit] = conf
931 n += 1
362 n += 1
932
363
933 self.procUnitConfObjDict = newProcUnitConfObjDict
364 self.configurations = new_confs
934
365
935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936
367
937 print(' ')
938 print('*' * 60)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 print('*' * 60)
941 print("* Python " + python_version() + " *")
942 print('*' * 19)
943 print(' ')
944 self.id = str(id)
368 self.id = str(id)
945 self.description = description
369 self.description = description
946 self.email = email
370 self.email = email
@@ -950,108 +374,91 class Project(Process):
950
374
951 def update(self, **kwargs):
375 def update(self, **kwargs):
952
376
953 for key, value in list(kwargs.items()):
377 for key, value in kwargs.items():
954 setattr(self, key, value)
378 setattr(self, key, value)
955
379
956 def clone(self):
380 def clone(self):
957
381
958 p = Project()
382 p = Project()
959 p.procUnitConfObjDict = self.procUnitConfObjDict
383 p.id = self.id
384 p.name = self.name
385 p.description = self.description
386 p.configurations = self.configurations.copy()
387
960 return p
388 return p
961
389
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963
391
964 '''
392 '''
965 Actualizacion:
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969
970 '''
393 '''
971
394
972 if id is None:
395 if id is None:
973 idReadUnit = self.__getNewId()
396 idReadUnit = self.getNewId()
974 else:
397 else:
975 idReadUnit = str(id)
398 idReadUnit = str(id)
976
399
977 readUnitConfObj = ReadUnitConf()
400 conf = ReadUnitConf()
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
402 self.configurations[conf.id] = conf
980
403
981 return readUnitConfObj
404 return conf
982
405
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
984
407
985 '''
408 '''
986 Actualizacion:
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 Deberia reemplazar a "inputId"
989
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992
993 '''
409 '''
994
410
995 idProcUnit = self.__getNewId()
411 if id is None:
996 procUnitConfObj = ProcUnitConf()
412 idProcUnit = self.getNewId()
997 input_proc = self.procUnitConfObjDict[inputId]
413 else:
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
414 idProcUnit = id
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000
1001 return procUnitConfObj
1002
1003 def removeProcUnit(self, id):
1004
415
1005 if id in list(self.procUnitConfObjDict.keys()):
416 conf = ProcUnitConf()
1006 self.procUnitConfObjDict.pop(id)
417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
418 self.configurations[conf.id] = conf
1007
419
1008 def getReadUnitId(self):
420 return conf
1009
421
1010 readUnitConfObj = self.getReadUnitObj()
422 def removeProcUnit(self, id):
1011
423
1012 return readUnitConfObj.id
424 if id in self.configurations:
425 self.configurations.pop(id)
1013
426
1014 def getReadUnitObj(self):
427 def getReadUnit(self):
1015
428
1016 for obj in list(self.procUnitConfObjDict.values()):
429 for obj in list(self.configurations.values()):
1017 if obj.getElementName() == 'ReadUnit':
430 if obj.ELEMENTNAME == 'ReadUnit':
1018 return obj
431 return obj
1019
432
1020 return None
433 return None
1021
434
1022 def getProcUnitObj(self, id=None, name=None):
435 def getProcUnit(self, id):
1023
1024 if id != None:
1025 return self.procUnitConfObjDict[id]
1026
436
1027 if name != None:
437 return self.configurations[id]
1028 return self.getProcUnitObjByName(name)
1029
438
1030 return None
439 def getUnits(self):
1031
440
1032 def getProcUnitObjByName(self, name):
441 keys = list(self.configurations)
1033
442 keys.sort()
1034 for obj in list(self.procUnitConfObjDict.values()):
1035 if obj.name == name:
1036 return obj
1037
443
1038 return None
444 for key in keys:
445 yield self.configurations[key]
1039
446
1040 def procUnitItems(self):
447 def updateUnit(self, id, **kwargs):
1041
448
1042 return list(self.procUnitConfObjDict.items())
449 conf = self.configurations[id].update(**kwargs)
1043
450
1044 def makeXml(self):
451 def makeXml(self):
1045
452
1046 projectElement = Element('Project')
453 xml = Element('Project')
1047 projectElement.set('id', str(self.id))
454 xml.set('id', str(self.id))
1048 projectElement.set('name', self.name)
455 xml.set('name', self.name)
1049 projectElement.set('description', self.description)
456 xml.set('description', self.description)
1050
457
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
458 for conf in self.configurations.values():
1052 procUnitConfObj.makeXml(projectElement)
459 conf.makeXml(xml)
1053
460
1054 self.projectElement = projectElement
461 self.xml = xml
1055
462
1056 def writeXml(self, filename=None):
463 def writeXml(self, filename=None):
1057
464
@@ -1077,54 +484,38 class Project(Process):
1077
484
1078 self.makeXml()
485 self.makeXml()
1079
486
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
487 ElementTree(self.xml).write(abs_file, method='xml')
1081
488
1082 self.filename = abs_file
489 self.filename = abs_file
1083
490
1084 return 1
491 return 1
1085
492
1086 def readXml(self, filename=None):
493 def readXml(self, filename):
1087
1088 if not filename:
1089 print('filename is not defined')
1090 return 0
1091
494
1092 abs_file = os.path.abspath(filename)
495 abs_file = os.path.abspath(filename)
1093
496
1094 if not os.path.isfile(abs_file):
497 self.configurations = {}
1095 print('%s file does not exist' % abs_file)
1096 return 0
1097
1098 self.projectElement = None
1099 self.procUnitConfObjDict = {}
1100
498
1101 try:
499 try:
1102 self.projectElement = ElementTree().parse(abs_file)
500 self.xml = ElementTree().parse(abs_file)
1103 except:
501 except:
1104 print('Error reading %s, verify file format' % filename)
502 log.error('Error reading %s, verify file format' % filename)
1105 return 0
503 return 0
1106
504
1107 self.project = self.projectElement.tag
505 self.id = self.xml.get('id')
1108
506 self.name = self.xml.get('name')
1109 self.id = self.projectElement.get('id')
507 self.description = self.xml.get('description')
1110 self.name = self.projectElement.get('name')
508
1111 self.description = self.projectElement.get('description')
509 for element in self.xml:
1112
510 if element.tag == 'ReadUnit':
1113 readUnitElementList = self.projectElement.iter(
511 conf = ReadUnitConf()
1114 ReadUnitConf().getElementName())
512 conf.readXml(element, self.id, self.err_queue)
1115
513 self.configurations[conf.id] = conf
1116 for readUnitElement in readUnitElementList:
514 elif element.tag == 'ProcUnit':
1117 readUnitConfObj = ReadUnitConf()
515 conf = ProcUnitConf()
1118 readUnitConfObj.readXml(readUnitElement, self.id)
516 input_proc = self.configurations[element.get('inputId')]
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
517 conf.readXml(element, self.id, self.err_queue)
1120
518 self.configurations[conf.id] = conf
1121 procUnitElementList = self.projectElement.iter(
1122 ProcUnitConf().getElementName())
1123
1124 for procUnitElement in procUnitElementList:
1125 procUnitConfObj = ProcUnitConf()
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128
519
1129 self.filename = abs_file
520 self.filename = abs_file
1130
521
@@ -1132,28 +523,33 class Project(Process):
1132
523
1133 def __str__(self):
524 def __str__(self):
1134
525
1135 print('Project: name = %s, description = %s, id = %s' % (
526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
527 self.id,
1136 self.name,
528 self.name,
1137 self.description,
529 self.description,
1138 self.id))
530 )
1139
531
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
532 for conf in self.configurations.values():
1141 print(procUnitConfObj)
533 text += '{}'.format(conf)
1142
534
1143 def createObjects(self):
535 return text
1144
536
537 def createObjects(self):
1145
538
1146 keys = list(self.procUnitConfObjDict.keys())
539 keys = list(self.configurations.keys())
1147 keys.sort()
540 keys.sort()
1148 for key in keys:
541 for key in keys:
1149 self.procUnitConfObjDict[key].createObjects()
542 conf = self.configurations[key]
543 conf.createObjects()
544 if conf.inputId is not None:
545 conf.object.setInput(self.configurations[conf.inputId].object)
1150
546
1151 def monitor(self):
547 def monitor(self):
1152
548
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
1154 t.start()
550 t.start()
1155
551
1156 def __monitor(self, queue, ctx):
552 def _monitor(self, queue, ctx):
1157
553
1158 import socket
554 import socket
1159
555
@@ -1184,13 +580,7 class Project(Process):
1184 else:
580 else:
1185 name, err = self.name, err_msg
581 name, err = self.name, err_msg
1186
582
1187 time.sleep(2)
583 time.sleep(1)
1188
1189 for conf in self.procUnitConfObjDict.values():
1190 for confop in conf.opConfObjList:
1191 if confop.type == 'external':
1192 confop.opObj.terminate()
1193 conf.procUnitObj.terminate()
1194
584
1195 ctx.term()
585 ctx.term()
1196
586
@@ -1206,15 +596,14 class Project(Process):
1206 subtitle += 'Configuration file: %s\n' % self.filename
596 subtitle += 'Configuration file: %s\n' % self.filename
1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208
598
1209 readUnitConfObj = self.getReadUnitObj()
599 readUnitConfObj = self.getReadUnit()
1210 if readUnitConfObj:
600 if readUnitConfObj:
1211 subtitle += '\nInput parameters:\n'
601 subtitle += '\nInput parameters:\n'
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218
607
1219 a = Alarm(
608 a = Alarm(
1220 modes=self.alarm,
609 modes=self.alarm,
@@ -1227,64 +616,33 class Project(Process):
1227
616
1228 a.start()
617 a.start()
1229
618
1230 def isPaused(self):
1231 return 0
1232
1233 def isStopped(self):
1234 return 0
1235
1236 def runController(self):
1237 '''
1238 returns 0 when this process has been stopped, 1 otherwise
1239 '''
1240
1241 if self.isPaused():
1242 print('Process suspended')
1243
1244 while True:
1245 time.sleep(0.1)
1246
1247 if not self.isPaused():
1248 break
1249
1250 if self.isStopped():
1251 break
1252
1253 print('Process reinitialized')
1254
1255 if self.isStopped():
1256 print('Process stopped')
1257 return 0
1258
1259 return 1
1260
1261 def setFilename(self, filename):
619 def setFilename(self, filename):
1262
620
1263 self.filename = filename
621 self.filename = filename
1264
622
1265 def setProxy(self):
623 def runProcs(self):
1266
624
1267 if not os.path.exists('/tmp/schain'):
625 err = False
1268 os.mkdir('/tmp/schain')
626 n = len(self.configurations)
1269
627
1270 self.ctx = zmq.Context()
628 while not err:
1271 xpub = self.ctx.socket(zmq.XPUB)
629 for conf in self.getUnits():
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
630 ok = conf.run()
1273 xsub = self.ctx.socket(zmq.XSUB)
631 if ok is 'Error':
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
632 n -= 1
1275 self.monitor()
633 continue
1276 try:
634 elif not ok:
1277 zmq.proxy(xpub, xsub)
635 break
1278 except zmq.ContextTerminated:
636 if n == 0:
1279 xpub.close()
637 err = True
1280 xsub.close()
1281
638
1282 def run(self):
639 def run(self):
1283
640
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
642 self.started = True
1285 self.start_time = time.time()
643 self.start_time = time.time()
1286 self.createObjects()
644 self.createObjects()
1287 self.setProxy()
645 self.runProcs()
1288 log.success('{} Done (Time: {}s)'.format(
646 log.success('{} Done (Time: {:4.2f}s)'.format(
1289 self.name,
647 self.name,
1290 time.time()-self.start_time), '')
648 time.time()-self.start_time), '')
@@ -84,7 +84,7 DATA_STRUCTURE = numpy.dtype([
84 ('sea_algorithm', '<u4')
84 ('sea_algorithm', '<u4')
85 ])
85 ])
86
86
87 @MPDecorator
87
88 class BLTRParamReader(JRODataReader, ProcessingUnit):
88 class BLTRParamReader(JRODataReader, ProcessingUnit):
89 '''
89 '''
90 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
90 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
@@ -247,7 +247,7 class RecordHeaderBLTR():
247
247
248 return 1
248 return 1
249
249
250 @MPDecorator
250
251 class BLTRSpectraReader (ProcessingUnit):
251 class BLTRSpectraReader (ProcessingUnit):
252
252
253 def __init__(self):
253 def __init__(self):
@@ -14,6 +14,7 import time
14 import datetime
14 import datetime
15 import zmq
15 import zmq
16
16
17 from schainpy.model.proc.jroproc_base import Operation
17 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 from schainpy.utils import log
20 from schainpy.utils import log
@@ -724,9 +725,9 class JRODataReader(Reader):
724 firstHeaderSize = 0
725 firstHeaderSize = 0
725 basicHeaderSize = 24
726 basicHeaderSize = 24
726 __isFirstTimeOnline = 1
727 __isFirstTimeOnline = 1
727 __printInfo = True
728 filefmt = "*%Y%j***"
728 filefmt = "*%Y%j***"
729 folderfmt = "*%Y%j"
729 folderfmt = "*%Y%j"
730 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
730
731
731 def getDtypeWidth(self):
732 def getDtypeWidth(self):
732
733
@@ -1214,26 +1215,6 class JRODataReader(Reader):
1214
1215
1215 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216
1217
1217 def printNumberOfBlock(self):
1218 'SPAM!'
1219
1220 # if self.flagIsNewBlock:
1221 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1222 # self.processingHeaderObj.dataBlocksPerFile,
1223 # self.dataOut.datatime.ctime())
1224
1225 def printInfo(self):
1226
1227 if self.__printInfo == False:
1228 return
1229
1230 self.basicHeaderObj.printInfo()
1231 self.systemHeaderObj.printInfo()
1232 self.radarControllerHeaderObj.printInfo()
1233 self.processingHeaderObj.printInfo()
1234
1235 self.__printInfo = False
1236
1237 def run(self, **kwargs):
1218 def run(self, **kwargs):
1238 """
1219 """
1239
1220
@@ -1573,3 +1554,27 class JRODataWriter(Reader):
1573 self.dataOut = dataOut
1554 self.dataOut = dataOut
1574 self.putData()
1555 self.putData()
1575 return self.dataOut
1556 return self.dataOut
1557
1558 class printInfo(Operation):
1559
1560 def __init__(self):
1561
1562 Operation.__init__(self)
1563 self.__printInfo = True
1564
1565 def run(self, dataOut, headers = ['systemHeaderObj', 'radarControllerHeaderObj', 'processingHeaderObj']):
1566 if self.__printInfo == False:
1567 return dataOut
1568
1569 for header in headers:
1570 if hasattr(dataOut, header):
1571 obj = getattr(dataOut, header)
1572 if hasattr(obj, 'printInfo'):
1573 obj.printInfo()
1574 else:
1575 print(obj)
1576 else:
1577 log.warning('Header {} Not found in object'.format(header))
1578
1579 self.__printInfo = False
1580 return dataOut No newline at end of file
@@ -31,7 +31,7 try:
31 except:
31 except:
32 pass
32 pass
33
33
34 @MPDecorator
34
35 class DigitalRFReader(ProcessingUnit):
35 class DigitalRFReader(ProcessingUnit):
36 '''
36 '''
37 classdocs
37 classdocs
@@ -633,7 +633,7 class DigitalRFReader(ProcessingUnit):
633
633
634 return
634 return
635
635
636
636 @MPDecorator
637 class DigitalRFWriter(Operation):
637 class DigitalRFWriter(Operation):
638 '''
638 '''
639 classdocs
639 classdocs
@@ -120,6 +120,7 class Metadata(object):
120 parmConfObj.readXml(parmElement)
120 parmConfObj.readXml(parmElement)
121 self.parmConfObjList.append(parmConfObj)
121 self.parmConfObjList.append(parmConfObj)
122
122
123 @MPDecorator
123 class FitsWriter(Operation):
124 class FitsWriter(Operation):
124 def __init__(self, **kwargs):
125 def __init__(self, **kwargs):
125 Operation.__init__(self, **kwargs)
126 Operation.__init__(self, **kwargs)
@@ -283,7 +284,7 class FitsWriter(Operation):
283 self.isConfig = True
284 self.isConfig = True
284 self.putData()
285 self.putData()
285
286
286 @MPDecorator
287
287 class FitsReader(ProcessingUnit):
288 class FitsReader(ProcessingUnit):
288
289
289 # __TIMEZONE = time.timezone
290 # __TIMEZONE = time.timezone
@@ -78,7 +78,7 def load_json(obj):
78
78
79 return iterable
79 return iterable
80
80
81 @MPDecorator
81
82 class MADReader(Reader, ProcessingUnit):
82 class MADReader(Reader, ProcessingUnit):
83
83
84 def __init__(self):
84 def __init__(self):
@@ -11,7 +11,7 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecora
11 from schainpy.model.io.jroIO_base import *
11 from schainpy.model.io.jroIO_base import *
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14 @MPDecorator
14
15 class ParamReader(JRODataReader,ProcessingUnit):
15 class ParamReader(JRODataReader,ProcessingUnit):
16 '''
16 '''
17 Reads HDF5 format files
17 Reads HDF5 format files
@@ -965,7 +965,7 class ParamWriter(Operation):
965 return
965 return
966
966
967
967
968 @MPDecorator
968
969 class ParameterReader(Reader, ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 '''
970 '''
971 Reads HDF5 format files
971 Reads HDF5 format files
@@ -11,7 +11,7 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader,
11 from schainpy.model.data.jrodata import Spectra
11 from schainpy.model.data.jrodata import Spectra
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14 @MPDecorator
14
15 class SpectraReader(JRODataReader, ProcessingUnit):
15 class SpectraReader(JRODataReader, ProcessingUnit):
16 """
16 """
17 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
17 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
@@ -14,7 +14,7 except:
14
14
15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 from schainpy.model.data.jrodata import Voltage
16 from schainpy.model.data.jrodata import Voltage
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
18
18
19 try:
19 try:
20 import digital_rf_hdf5
20 import digital_rf_hdf5
@@ -546,6 +546,8 class USRPReader(ProcessingUnit):
546
546
547 return
547 return
548
548
549
550 @MPDecorator
549 class USRPWriter(Operation):
551 class USRPWriter(Operation):
550 '''
552 '''
551 classdocs
553 classdocs
@@ -10,12 +10,8 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 from schainpy.model.data.jrodata import Voltage
12 from schainpy.model.data.jrodata import Voltage
13 import zmq
14 import tempfile
15 from io import StringIO
16 # from _sha import blocksize
17
13
18 @MPDecorator
14
19 class VoltageReader(JRODataReader, ProcessingUnit):
15 class VoltageReader(JRODataReader, ProcessingUnit):
20 """
16 """
21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
17 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
@@ -15,7 +15,7 from numpy import transpose
15 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
15 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
16 from schainpy.model.data.jrodata import Parameters
16 from schainpy.model.data.jrodata import Parameters
17
17
18 @MPDecorator
18
19 class BLTRParametersProc(ProcessingUnit):
19 class BLTRParametersProc(ProcessingUnit):
20 '''
20 '''
21 Processing unit for BLTR parameters data (winds)
21 Processing unit for BLTR parameters data (winds)
@@ -76,7 +76,7 class BLTRParametersProc(ProcessingUnit):
76 self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan
76 self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan
77
77
78 # TODO
78 # TODO
79 @MPDecorator
79
80 class OutliersFilter(Operation):
80 class OutliersFilter(Operation):
81
81
82 def __init__(self):
82 def __init__(self):
@@ -16,7 +16,7 class AMISRProc(ProcessingUnit):
16 self.dataOut.copy(self.dataIn)
16 self.dataOut.copy(self.dataIn)
17
17
18
18
19 class PrintInfo(Operation):
19 class PrintInfoAMISR(Operation):
20 def __init__(self, **kwargs):
20 def __init__(self, **kwargs):
21 Operation.__init__(self, **kwargs)
21 Operation.__init__(self, **kwargs)
22 self.__isPrinted = False
22 self.__isPrinted = False
@@ -1,19 +1,9
1 '''
1 '''
2 Updated for multiprocessing
2 Base clases to create Processing units and operations, the MPDecorator
3 Author : Sergio Cortez
3 must be used in plotting and writing operations to allow to run as an
4 Jan 2018
4 external process.
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
10 Based on:
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
5 '''
14
6
15 import os
16 import sys
17 import inspect
7 import inspect
18 import zmq
8 import zmq
19 import time
9 import time
@@ -24,27 +14,16 try:
24 except:
14 except:
25 from Queue import Queue
15 from Queue import Queue
26 from threading import Thread
16 from threading import Thread
27 from multiprocessing import Process
17 from multiprocessing import Process, Queue
28
29 from schainpy.utils import log
18 from schainpy.utils import log
30
19
31
20
32 class ProcessingUnit(object):
21 class ProcessingUnit(object):
22 '''
23 Base class to create Signal Chain Units
24 '''
33
25
34 """
35 Update - Jan 2018 - MULTIPROCESSING
36 All the "call" methods present in the previous base were removed.
37 The majority of operations are independant processes, thus
38 the decorator is in charge of communicate the operation processes
39 with the proccessing unit via IPC.
40
41 The constructor does not receive any argument. The remaining methods
42 are related with the operations to execute.
43
44
45 """
46 proc_type = 'processing'
26 proc_type = 'processing'
47 __attrs__ = []
48
27
49 def __init__(self):
28 def __init__(self):
50
29
@@ -52,7 +31,10 class ProcessingUnit(object):
52 self.dataOut = None
31 self.dataOut = None
53 self.isConfig = False
32 self.isConfig = False
54 self.operations = []
33 self.operations = []
55 self.plots = []
34
35 def setInput(self, unit):
36
37 self.dataIn = unit.dataOut
56
38
57 def getAllowedArgs(self):
39 def getAllowedArgs(self):
58 if hasattr(self, '__attrs__'):
40 if hasattr(self, '__attrs__'):
@@ -61,27 +43,10 class ProcessingUnit(object):
61 return inspect.getargspec(self.run).args
43 return inspect.getargspec(self.run).args
62
44
63 def addOperation(self, conf, operation):
45 def addOperation(self, conf, operation):
64 """
46 '''
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
47 '''
66 posses the id of the operation process (IPC purposes)
67
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 identificador asociado a este objeto.
70
71 Input:
72
73 object : objeto de la clase "Operation"
74
75 Return:
76
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 """
79
80 self.operations.append(
81 (operation, conf.type, conf.id, conf.getKwargs()))
82
48
83 if 'plot' in self.name.lower():
49 self.operations.append((operation, conf.type, conf.getKwargs()))
84 self.plots.append(operation.CODE)
85
50
86 def getOperationObj(self, objId):
51 def getOperationObj(self, objId):
87
52
@@ -90,17 +55,37 class ProcessingUnit(object):
90
55
91 return self.operations[objId]
56 return self.operations[objId]
92
57
93 def operation(self, **kwargs):
58 def call(self, **kwargs):
94 """
59 '''
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
60 '''
96 atributos del objeto dataOut
97
61
98 Input:
62 try:
63 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
64 return self.dataIn.isReady()
65 elif self.dataIn is None or not self.dataIn.error:
66 self.run(**kwargs)
67 elif self.dataIn.error:
68 self.dataOut.error = self.dataIn.error
69 self.dataOut.flagNoData = True
70 except:
71 err = traceback.format_exc()
72 if 'SchainWarning' in err:
73 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
74 elif 'SchainError' in err:
75 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
76 else:
77 log.error(err, self.name)
78 self.dataOut.error = True
99
79
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
80 for op, optype, opkwargs in self.operations:
101 """
81 if optype == 'other' and not self.dataOut.flagNoData:
82 self.dataOut = op.run(self.dataOut, **opkwargs)
83 elif optype == 'external' and not self.dataOut.flagNoData:
84 op.queue.put(self.dataOut)
85 elif optype == 'external' and self.dataOut.error:
86 op.queue.put(self.dataOut)
102
87
103 raise NotImplementedError
88 return 'Error' if self.dataOut.error else self.dataOut.isReady()
104
89
105 def setup(self):
90 def setup(self):
106
91
@@ -117,22 +102,10 class ProcessingUnit(object):
117
102
118 class Operation(object):
103 class Operation(object):
119
104
120 """
105 '''
121 Update - Jan 2018 - MULTIPROCESSING
106 '''
122
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 The constructor doe snot receive any argument, neither the baseclass.
125
126
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 acumulacion dentro de esta clase
130
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132
107
133 """
134 proc_type = 'operation'
108 proc_type = 'operation'
135 __attrs__ = []
136
109
137 def __init__(self):
110 def __init__(self):
138
111
@@ -180,58 +153,12 class Operation(object):
180
153
181 return
154 return
182
155
183 class InputQueue(Thread):
184
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
188
189 def __init__(self, project_id, inputId, lock=None):
190
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.islocked = False
197 self.size = 0
198
199 def run(self):
200
201 c = zmq.Context()
202 self.receiver = c.socket(zmq.SUB)
203 self.receiver.connect(
204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206
207 while True:
208 obj = self.receiver.recv_multipart()[1]
209 self.size += sys.getsizeof(obj)
210 self.queue.put(obj)
211
212 def get(self):
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
217 self.lock.clear()
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
222 self.lock.set()
223
224 obj = self.queue.get()
225 self.size -= sys.getsizeof(obj)
226 return pickle.loads(obj)
227
228
156
229 def MPDecorator(BaseClass):
157 def MPDecorator(BaseClass):
230 """
158 """
231 Multiprocessing class decorator
159 Multiprocessing class decorator
232
160
233 This function add multiprocessing features to a BaseClass. Also, it handle
161 This function add multiprocessing features to a BaseClass.
234 the communication beetween processes (readers, procUnits and operations).
235 """
162 """
236
163
237 class MPClass(BaseClass, Process):
164 class MPClass(BaseClass, Process):
@@ -239,13 +166,11 def MPDecorator(BaseClass):
239 def __init__(self, *args, **kwargs):
166 def __init__(self, *args, **kwargs):
240 super(MPClass, self).__init__()
167 super(MPClass, self).__init__()
241 Process.__init__(self)
168 Process.__init__(self)
242 self.operationKwargs = {}
169
243 self.args = args
170 self.args = args
244 self.kwargs = kwargs
171 self.kwargs = kwargs
245 self.sender = None
246 self.receiver = None
247 self.i = 0
248 self.t = time.time()
172 self.t = time.time()
173 self.op_type = 'external'
249 self.name = BaseClass.__name__
174 self.name = BaseClass.__name__
250 self.__doc__ = BaseClass.__doc__
175 self.__doc__ = BaseClass.__doc__
251
176
@@ -253,177 +178,30 def MPDecorator(BaseClass):
253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
178 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254
179
255 self.start_time = time.time()
180 self.start_time = time.time()
256 self.id = args[0]
257 self.inputId = args[1]
258 self.project_id = args[2]
259 self.err_queue = args[3]
181 self.err_queue = args[3]
260 self.lock = args[4]
182 self.queue = Queue(maxsize=1)
261 self.typeProc = args[5]
183 self.myrun = BaseClass.run
262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265
266 def subscribe(self):
267 '''
268 Start the zmq socket receiver and subcribe to input ID.
269 '''
270
271 self.queue.start()
272
184
273 def listen(self):
185 def run(self):
274 '''
275 This function waits for objects
276 '''
277
278 return self.queue.get()
279
280 def set_publisher(self):
281 '''
282 This function create a zmq socket for publishing objects.
283 '''
284
285 time.sleep(0.5)
286
287 c = zmq.Context()
288 self.sender = c.socket(zmq.PUB)
289 self.sender.connect(
290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291
292 def publish(self, data, id):
293 '''
294 This function publish an object, to an specific topic.
295 It blocks publishing when receiver queue is full to avoid data loss
296 '''
297
298 if self.inputId is None:
299 self.lock.wait()
300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301
302 def runReader(self):
303 '''
304 Run fuction for read units
305 '''
306 while True:
307
308 try:
309 BaseClass.run(self, **self.kwargs)
310 except:
311 err = traceback.format_exc()
312 if 'No more files' in err:
313 log.warning('No more files to read', self.name)
314 else:
315 self.err_queue.put('{}|{}'.format(self.name, err))
316 self.dataOut.error = True
317
318 for op, optype, opId, kwargs in self.operations:
319 if optype == 'self' and not self.dataOut.flagNoData:
320 op(**kwargs)
321 elif optype == 'other' and not self.dataOut.flagNoData:
322 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 elif optype == 'external':
324 self.publish(self.dataOut, opId)
325
326 if self.dataOut.flagNoData and not self.dataOut.error:
327 continue
328
329 self.publish(self.dataOut, self.id)
330
331 if self.dataOut.error:
332 break
333
334 time.sleep(0.5)
335
336 def runProc(self):
337 '''
338 Run function for proccessing units
339 '''
340
341 while True:
342 self.dataIn = self.listen()
343
344 if self.dataIn.flagNoData and self.dataIn.error is None:
345 continue
346 elif not self.dataIn.error:
347 try:
348 BaseClass.run(self, **self.kwargs)
349 except:
350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 self.dataOut.error = True
352 elif self.dataIn.error:
353 self.dataOut.error = self.dataIn.error
354 self.dataOut.flagNoData = True
355
356 for op, optype, opId, kwargs in self.operations:
357 if optype == 'self' and not self.dataOut.flagNoData:
358 op(**kwargs)
359 elif optype == 'other' and not self.dataOut.flagNoData:
360 self.dataOut = op.run(self.dataOut, **kwargs)
361 elif optype == 'external' and not self.dataOut.flagNoData:
362 self.publish(self.dataOut, opId)
363
364 self.publish(self.dataOut, self.id)
365 for op, optype, opId, kwargs in self.operations:
366 if optype == 'external' and self.dataOut.error:
367 self.publish(self.dataOut, opId)
368
369 if self.dataOut.error:
370 break
371
372 time.sleep(0.5)
373
374 def runOp(self):
375 '''
376 Run function for external operations (this operations just receive data
377 ex: plots, writers, publishers)
378 '''
379
186
380 while True:
187 while True:
381
188
382 dataOut = self.listen()
189 dataOut = self.queue.get()
383
190
384 if not dataOut.error:
191 if not dataOut.error:
385 try:
192 try:
386 BaseClass.run(self, dataOut, **self.kwargs)
193 BaseClass.run(self, dataOut, **self.kwargs)
387 except:
194 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
195 err = traceback.format_exc()
389 dataOut.error = True
196 log.error(err.split('\n')[-2], self.name)
390 else:
197 else:
391 break
198 break
392
199
393 def run(self):
394 if self.typeProc is "ProcUnit":
395
396 if self.inputId is not None:
397 self.subscribe()
398
399 self.set_publisher()
400
401 if 'Reader' not in BaseClass.__name__:
402 self.runProc()
403 else:
404 self.runReader()
405
406 elif self.typeProc is "Operation":
407
408 self.subscribe()
409 self.runOp()
410
411 else:
412 raise ValueError("Unknown type")
413
414 self.close()
200 self.close()
415
201
416 def close(self):
202 def close(self):
417
203
418 BaseClass.close(self)
204 BaseClass.close(self)
419 self.err_queue.put('#_end_#')
420
421 if self.sender:
422 self.sender.close()
423
424 if self.receiver:
425 self.receiver.close()
426
427 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
205 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428
206
429 return MPClass
207 return MPClass
@@ -1,7 +1,7
1 import numpy
1 import numpy
2
2
3 from .jroproc_base import ProcessingUnit, Operation
3 from .jroproc_base import ProcessingUnit, Operation
4 from schainpy.model.data.jrodata import Correlation, hildebrand_sekhon
4 from schainpy.model.data.jrodata import Correlation
5
5
6 class CorrelationProc(ProcessingUnit):
6 class CorrelationProc(ProcessingUnit):
7
7
@@ -5,7 +5,7 from schainpy.model.data.jrodata import SpectraHeis
5 from schainpy.utils import log
5 from schainpy.utils import log
6
6
7
7
8 @MPDecorator
8
9 class SpectraHeisProc(ProcessingUnit):
9 class SpectraHeisProc(ProcessingUnit):
10
10
11 def __init__(self):#, **kwargs):
11 def __init__(self):#, **kwargs):
@@ -46,7 +46,7 def _unpickle_method(func_name, obj, cls):
46 break
46 break
47 return func.__get__(obj, cls)
47 return func.__get__(obj, cls)
48
48
49 @MPDecorator
49
50 class ParametersProc(ProcessingUnit):
50 class ParametersProc(ProcessingUnit):
51
51
52 METHODS = {}
52 METHODS = {}
@@ -1329,7 +1329,6 class SpectralMoments(Operation):
1329
1329
1330 def run(self, dataOut):
1330 def run(self, dataOut):
1331
1331
1332 #dataOut.data_pre = dataOut.data_pre[0]
1333 data = dataOut.data_pre[0]
1332 data = dataOut.data_pre[0]
1334 absc = dataOut.abscissaList[:-1]
1333 absc = dataOut.abscissaList[:-1]
1335 noise = dataOut.noise
1334 noise = dataOut.noise
@@ -1344,6 +1343,7 class SpectralMoments(Operation):
1344 dataOut.data_POW = data_param[:,1]
1343 dataOut.data_POW = data_param[:,1]
1345 dataOut.data_DOP = data_param[:,2]
1344 dataOut.data_DOP = data_param[:,2]
1346 dataOut.data_WIDTH = data_param[:,3]
1345 dataOut.data_WIDTH = data_param[:,3]
1346
1347 return dataOut
1347 return dataOut
1348
1348
1349 def __calculateMoments(self, oldspec, oldfreq, n0,
1349 def __calculateMoments(self, oldspec, oldfreq, n0,
@@ -1370,25 +1370,27 class SpectralMoments(Operation):
1370 vec_w = numpy.zeros(oldspec.shape[1])
1370 vec_w = numpy.zeros(oldspec.shape[1])
1371 vec_snr = numpy.zeros(oldspec.shape[1])
1371 vec_snr = numpy.zeros(oldspec.shape[1])
1372
1372
1373 oldspec = numpy.ma.masked_invalid(oldspec)
1373 # oldspec = numpy.ma.masked_invalid(oldspec)
1374
1374
1375 for ind in range(oldspec.shape[1]):
1375 for ind in range(oldspec.shape[1]):
1376
1376
1377 spec = oldspec[:,ind]
1377 spec = oldspec[:,ind]
1378 aux = spec*fwindow
1378 aux = spec*fwindow
1379 max_spec = aux.max()
1379 max_spec = aux.max()
1380 m = list(aux).index(max_spec)
1380 m = aux.tolist().index(max_spec)
1381
1381
1382 #Smooth
1382 #Smooth
1383 if (smooth == 0): spec2 = spec
1383 if (smooth == 0):
1384 else: spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1384 spec2 = spec
1385 else:
1386 spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1385
1387
1386 # Calculo de Momentos
1388 # Calculo de Momentos
1387 bb = spec2[list(range(m,spec2.size))]
1389 bb = spec2[numpy.arange(m,spec2.size)]
1388 bb = (bb<n0).nonzero()
1390 bb = (bb<n0).nonzero()
1389 bb = bb[0]
1391 bb = bb[0]
1390
1392
1391 ss = spec2[list(range(0,m + 1))]
1393 ss = spec2[numpy.arange(0,m + 1)]
1392 ss = (ss<n0).nonzero()
1394 ss = (ss<n0).nonzero()
1393 ss = ss[0]
1395 ss = ss[0]
1394
1396
@@ -1399,17 +1401,20 class SpectralMoments(Operation):
1399 if (bb0 < 0):
1401 if (bb0 < 0):
1400 bb0 = 0
1402 bb0 = 0
1401
1403
1402 if (ss.size == 0): ss1 = 1
1404 if (ss.size == 0):
1403 else: ss1 = max(ss) + 1
1405 ss1 = 1
1406 else:
1407 ss1 = max(ss) + 1
1408
1409 if (ss1 > m):
1410 ss1 = m
1404
1411
1405 if (ss1 > m): ss1 = m
1412 valid = numpy.arange(int(m + bb0 - ss1 + 1)) + ss1
1406
1413
1407 valid = numpy.asarray(list(range(int(m + bb0 - ss1 + 1)))) + ss1
1408 power = ((spec2[valid] - n0) * fwindow[valid]).sum()
1414 power = ((spec2[valid] - n0) * fwindow[valid]).sum()
1409 fd = ((spec2[valid]- n0)*freq[valid] * fwindow[valid]).sum() / power
1415 fd = ((spec2[valid]- n0)*freq[valid] * fwindow[valid]).sum() / power
1410 w = math.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum()/power)
1416 w = numpy.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum() / power)
1411 snr = (spec2.mean()-n0)/n0
1417 snr = (spec2.mean()-n0)/n0
1412
1413 if (snr < 1.e-20) :
1418 if (snr < 1.e-20) :
1414 snr = 1.e-20
1419 snr = 1.e-20
1415
1420
@@ -1418,8 +1423,7 class SpectralMoments(Operation):
1418 vec_w[ind] = w
1423 vec_w[ind] = w
1419 vec_snr[ind] = snr
1424 vec_snr[ind] = snr
1420
1425
1421 moments = numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1426 return numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1422 return moments
1423
1427
1424 #------------------ Get SA Parameters --------------------------
1428 #------------------ Get SA Parameters --------------------------
1425
1429
@@ -1,3 +1,4
1 import time
1 import itertools
2 import itertools
2
3
3 import numpy
4 import numpy
@@ -7,7 +8,7 from schainpy.model.data.jrodata import Spectra
7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.model.data.jrodata import hildebrand_sekhon
8 from schainpy.utils import log
9 from schainpy.utils import log
9
10
10 @MPDecorator
11
11 class SpectraProc(ProcessingUnit):
12 class SpectraProc(ProcessingUnit):
12
13
13
14
@@ -220,81 +221,6 class SpectraProc(ProcessingUnit):
220
221
221 return
222 return
222
223
223 def __selectPairsByChannel(self, channelList=None):
224
225 if channelList == None:
226 return
227
228 pairsIndexListSelected = []
229 for pairIndex in self.dataOut.pairsIndexList:
230 # First pair
231 if self.dataOut.pairsList[pairIndex][0] not in channelList:
232 continue
233 # Second pair
234 if self.dataOut.pairsList[pairIndex][1] not in channelList:
235 continue
236
237 pairsIndexListSelected.append(pairIndex)
238
239 if not pairsIndexListSelected:
240 self.dataOut.data_cspc = None
241 self.dataOut.pairsList = []
242 return
243
244 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
245 self.dataOut.pairsList = [self.dataOut.pairsList[i]
246 for i in pairsIndexListSelected]
247
248 return
249
250 def selectChannels(self, channelList):
251
252 channelIndexList = []
253
254 for channel in channelList:
255 if channel not in self.dataOut.channelList:
256 raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % (
257 channel, str(self.dataOut.channelList)))
258
259 index = self.dataOut.channelList.index(channel)
260 channelIndexList.append(index)
261
262 self.selectChannelsByIndex(channelIndexList)
263
264 def selectChannelsByIndex(self, channelIndexList):
265 """
266 Selecciona un bloque de datos en base a canales segun el channelIndexList
267
268 Input:
269 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
270
271 Affected:
272 self.dataOut.data_spc
273 self.dataOut.channelIndexList
274 self.dataOut.nChannels
275
276 Return:
277 None
278 """
279
280 for channelIndex in channelIndexList:
281 if channelIndex not in self.dataOut.channelIndexList:
282 raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % (
283 channelIndex, self.dataOut.channelIndexList))
284
285 data_spc = self.dataOut.data_spc[channelIndexList, :]
286 data_dc = self.dataOut.data_dc[channelIndexList, :]
287
288 self.dataOut.data_spc = data_spc
289 self.dataOut.data_dc = data_dc
290
291 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
292 self.dataOut.channelList = range(len(channelIndexList))
293 self.__selectPairsByChannel(channelIndexList)
294
295 return 1
296
297
298 def selectFFTs(self, minFFT, maxFFT ):
224 def selectFFTs(self, minFFT, maxFFT ):
299 """
225 """
300 Selecciona un bloque de datos en base a un grupo de valores de puntos FFTs segun el rango
226 Selecciona un bloque de datos en base a un grupo de valores de puntos FFTs segun el rango
@@ -331,67 +257,6 class SpectraProc(ProcessingUnit):
331
257
332 return 1
258 return 1
333
259
334
335 def setH0(self, h0, deltaHeight = None):
336
337 if not deltaHeight:
338 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
339
340 nHeights = self.dataOut.nHeights
341
342 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
343
344 self.dataOut.heightList = newHeiRange
345
346
347 def selectHeights(self, minHei, maxHei):
348 """
349 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
350 minHei <= height <= maxHei
351
352 Input:
353 minHei : valor minimo de altura a considerar
354 maxHei : valor maximo de altura a considerar
355
356 Affected:
357 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
358
359 Return:
360 1 si el metodo se ejecuto con exito caso contrario devuelve 0
361 """
362
363
364 if (minHei > maxHei):
365 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei))
366
367 if (minHei < self.dataOut.heightList[0]):
368 minHei = self.dataOut.heightList[0]
369
370 if (maxHei > self.dataOut.heightList[-1]):
371 maxHei = self.dataOut.heightList[-1]
372
373 minIndex = 0
374 maxIndex = 0
375 heights = self.dataOut.heightList
376
377 inda = numpy.where(heights >= minHei)
378 indb = numpy.where(heights <= maxHei)
379
380 try:
381 minIndex = inda[0][0]
382 except:
383 minIndex = 0
384
385 try:
386 maxIndex = indb[0][-1]
387 except:
388 maxIndex = len(heights)
389
390 self.selectHeightsByIndex(minIndex, maxIndex)
391
392
393 return 1
394
395 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
260 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
396 newheis = numpy.where(
261 newheis = numpy.where(
397 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
262 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
@@ -466,54 +331,100 class SpectraProc(ProcessingUnit):
466
331
467 return 1
332 return 1
468
333
334 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
335 # validacion de rango
336 if minHei == None:
337 minHei = self.dataOut.heightList[0]
469
338
339 if maxHei == None:
340 maxHei = self.dataOut.heightList[-1]
470
341
471 def selectHeightsByIndex(self, minIndex, maxIndex):
342 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
472 """
343 print('minHei: %.2f is out of the heights range' % (minHei))
473 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
344 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
474 minIndex <= index <= maxIndex
345 minHei = self.dataOut.heightList[0]
475
346
476 Input:
347 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
477 minIndex : valor de indice minimo de altura a considerar
348 print('maxHei: %.2f is out of the heights range' % (maxHei))
478 maxIndex : valor de indice maximo de altura a considerar
349 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
350 maxHei = self.dataOut.heightList[-1]
479
351
480 Affected:
352 # validacion de velocidades
481 self.dataOut.data_spc
353 velrange = self.dataOut.getVelRange(1)
482 self.dataOut.data_cspc
483 self.dataOut.data_dc
484 self.dataOut.heightList
485
354
486 Return:
355 if minVel == None:
487 1 si el metodo se ejecuto con exito caso contrario devuelve 0
356 minVel = velrange[0]
488 """
357
358 if maxVel == None:
359 maxVel = velrange[-1]
360
361 if (minVel < velrange[0]) or (minVel > maxVel):
362 print('minVel: %.2f is out of the velocity range' % (minVel))
363 print('minVel is setting to %.2f' % (velrange[0]))
364 minVel = velrange[0]
365
366 if (maxVel > velrange[-1]) or (maxVel < minVel):
367 print('maxVel: %.2f is out of the velocity range' % (maxVel))
368 print('maxVel is setting to %.2f' % (velrange[-1]))
369 maxVel = velrange[-1]
370
371 # seleccion de indices para rango
372 minIndex = 0
373 maxIndex = 0
374 heights = self.dataOut.heightList
375
376 inda = numpy.where(heights >= minHei)
377 indb = numpy.where(heights <= maxHei)
378
379 try:
380 minIndex = inda[0][0]
381 except:
382 minIndex = 0
383
384 try:
385 maxIndex = indb[0][-1]
386 except:
387 maxIndex = len(heights)
489
388
490 if (minIndex < 0) or (minIndex > maxIndex):
389 if (minIndex < 0) or (minIndex > maxIndex):
491 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
390 raise ValueError("some value in (%d,%d) is not valid" % (
492 minIndex, maxIndex))
391 minIndex, maxIndex))
493
392
494 if (maxIndex >= self.dataOut.nHeights):
393 if (maxIndex >= self.dataOut.nHeights):
495 maxIndex = self.dataOut.nHeights - 1
394 maxIndex = self.dataOut.nHeights - 1
496
395
497 # Spectra
396 # seleccion de indices para velocidades
498 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
397 indminvel = numpy.where(velrange >= minVel)
398 indmaxvel = numpy.where(velrange <= maxVel)
399 try:
400 minIndexVel = indminvel[0][0]
401 except:
402 minIndexVel = 0
499
403
500 data_cspc = None
404 try:
501 if self.dataOut.data_cspc is not None:
405 maxIndexVel = indmaxvel[0][-1]
502 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
406 except:
407 maxIndexVel = len(velrange)
503
408
504 data_dc = None
409 # seleccion del espectro
505 if self.dataOut.data_dc is not None:
410 data_spc = self.dataOut.data_spc[:,
506 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
411 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
412 # estimacion de ruido
413 noise = numpy.zeros(self.dataOut.nChannels)
507
414
508 self.dataOut.data_spc = data_spc
415 for channel in range(self.dataOut.nChannels):
509 self.dataOut.data_cspc = data_cspc
416 daux = data_spc[channel, :, :]
510 self.dataOut.data_dc = data_dc
417 sortdata = numpy.sort(daux, axis=None)
418 noise[channel] = hildebrand_sekhon(sortdata, self.dataOut.nIncohInt)
511
419
512 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
420 self.dataOut.noise_estimation = noise.copy()
513
421
514 return 1
422 return 1
515
423
516 def removeDC(self, mode=2):
424 class removeDC(Operation):
425
426 def run(self, dataOut, mode=2):
427 self.dataOut = dataOut
517 jspectra = self.dataOut.data_spc
428 jspectra = self.dataOut.data_spc
518 jcspectra = self.dataOut.data_cspc
429 jcspectra = self.dataOut.data_cspc
519
430
@@ -571,7 +482,9 class SpectraProc(ProcessingUnit):
571 self.dataOut.data_spc = jspectra
482 self.dataOut.data_spc = jspectra
572 self.dataOut.data_cspc = jcspectra
483 self.dataOut.data_cspc = jcspectra
573
484
574 return 1
485 return self.dataOut
486
487 class removeInterference(Operation):
575
488
576 def removeInterference2(self):
489 def removeInterference2(self):
577
490
@@ -594,11 +507,9 class SpectraProc(ProcessingUnit):
594 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
507 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
595 cspc[i,InterferenceRange,:] = numpy.NaN
508 cspc[i,InterferenceRange,:] = numpy.NaN
596
509
597
598
599 self.dataOut.data_cspc = cspc
510 self.dataOut.data_cspc = cspc
600
511
601 def removeInterference(self, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None):
512 def removeInterference(self):
602
513
603 jspectra = self.dataOut.data_spc
514 jspectra = self.dataOut.data_spc
604 jcspectra = self.dataOut.data_cspc
515 jcspectra = self.dataOut.data_cspc
@@ -781,101 +692,16 class SpectraProc(ProcessingUnit):
781
692
782 return 1
693 return 1
783
694
784 def setRadarFrequency(self, frequency=None):
695 def run(self, dataOut, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None, mode=1):
785
786 if frequency != None:
787 self.dataOut.frequency = frequency
788
789 return 1
790
791 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
792 # validacion de rango
793 if minHei == None:
794 minHei = self.dataOut.heightList[0]
795
796 if maxHei == None:
797 maxHei = self.dataOut.heightList[-1]
798
799 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
800 print('minHei: %.2f is out of the heights range' % (minHei))
801 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
802 minHei = self.dataOut.heightList[0]
803
804 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
805 print('maxHei: %.2f is out of the heights range' % (maxHei))
806 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
807 maxHei = self.dataOut.heightList[-1]
808
809 # validacion de velocidades
810 velrange = self.dataOut.getVelRange(1)
811
696
812 if minVel == None:
697 self.dataOut = dataOut
813 minVel = velrange[0]
814
698
815 if maxVel == None:
699 if mode == 1:
816 maxVel = velrange[-1]
700 self.removeInterference(interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None)
817
701 elif mode == 2:
818 if (minVel < velrange[0]) or (minVel > maxVel):
702 self.removeInterference2()
819 print('minVel: %.2f is out of the velocity range' % (minVel))
820 print('minVel is setting to %.2f' % (velrange[0]))
821 minVel = velrange[0]
822
823 if (maxVel > velrange[-1]) or (maxVel < minVel):
824 print('maxVel: %.2f is out of the velocity range' % (maxVel))
825 print('maxVel is setting to %.2f' % (velrange[-1]))
826 maxVel = velrange[-1]
827
828 # seleccion de indices para rango
829 minIndex = 0
830 maxIndex = 0
831 heights = self.dataOut.heightList
832
833 inda = numpy.where(heights >= minHei)
834 indb = numpy.where(heights <= maxHei)
835
836 try:
837 minIndex = inda[0][0]
838 except:
839 minIndex = 0
840
841 try:
842 maxIndex = indb[0][-1]
843 except:
844 maxIndex = len(heights)
845
846 if (minIndex < 0) or (minIndex > maxIndex):
847 raise ValueError("some value in (%d,%d) is not valid" % (
848 minIndex, maxIndex))
849
850 if (maxIndex >= self.dataOut.nHeights):
851 maxIndex = self.dataOut.nHeights - 1
852
853 # seleccion de indices para velocidades
854 indminvel = numpy.where(velrange >= minVel)
855 indmaxvel = numpy.where(velrange <= maxVel)
856 try:
857 minIndexVel = indminvel[0][0]
858 except:
859 minIndexVel = 0
860
861 try:
862 maxIndexVel = indmaxvel[0][-1]
863 except:
864 maxIndexVel = len(velrange)
865
866 # seleccion del espectro
867 data_spc = self.dataOut.data_spc[:,
868 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
869 # estimacion de ruido
870 noise = numpy.zeros(self.dataOut.nChannels)
871
872 for channel in range(self.dataOut.nChannels):
873 daux = data_spc[channel, :, :]
874 noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt)
875
876 self.dataOut.noise_estimation = noise.copy()
877
703
878 return 1
704 return self.dataOut
879
705
880
706
881 class IncohInt(Operation):
707 class IncohInt(Operation):
@@ -1031,7 +857,7 class IncohInt(Operation):
1031
857
1032 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
858 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
1033 if n == 1:
859 if n == 1:
1034 return
860 return dataOut
1035
861
1036 dataOut.flagNoData = True
862 dataOut.flagNoData = True
1037
863
@@ -7,7 +7,7 from schainpy.utils import log
7 from time import time
7 from time import time
8
8
9
9
10 @MPDecorator
10
11 class VoltageProc(ProcessingUnit):
11 class VoltageProc(ProcessingUnit):
12
12
13 def __init__(self):
13 def __init__(self):
@@ -26,8 +26,6 class VoltageProc(ProcessingUnit):
26 if self.dataIn.type == 'Voltage':
26 if self.dataIn.type == 'Voltage':
27 self.dataOut.copy(self.dataIn)
27 self.dataOut.copy(self.dataIn)
28
28
29 # self.dataOut.copy(self.dataIn)
30
31 def __updateObjFromAmisrInput(self):
29 def __updateObjFromAmisrInput(self):
32
30
33 self.dataOut.timeZone = self.dataIn.timeZone
31 self.dataOut.timeZone = self.dataIn.timeZone
@@ -53,23 +51,14 class VoltageProc(ProcessingUnit):
53 self.dataOut.beam.codeList = self.dataIn.beam.codeList
51 self.dataOut.beam.codeList = self.dataIn.beam.codeList
54 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
52 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
55 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
53 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
56 #
57 # pass#
58 #
59 # def init(self):
60 #
61 #
62 # if self.dataIn.type == 'AMISR':
63 # self.__updateObjFromAmisrInput()
64 #
65 # if self.dataIn.type == 'Voltage':
66 # self.dataOut.copy(self.dataIn)
67 # # No necesita copiar en cada init() los atributos de dataIn
68 # # la copia deberia hacerse por cada nuevo bloque de datos
69
54
70 def selectChannels(self, channelList):
55
56 class selectChannels(Operation):
57
58 def run(self, dataOut, channelList):
71
59
72 channelIndexList = []
60 channelIndexList = []
61 self.dataOut = dataOut
73
62
74 for channel in channelList:
63 for channel in channelList:
75 if channel not in self.dataOut.channelList:
64 if channel not in self.dataOut.channelList:
@@ -79,6 +68,7 class VoltageProc(ProcessingUnit):
79 channelIndexList.append(index)
68 channelIndexList.append(index)
80
69
81 self.selectChannelsByIndex(channelIndexList)
70 self.selectChannelsByIndex(channelIndexList)
71 return self.dataOut
82
72
83 def selectChannelsByIndex(self, channelIndexList):
73 def selectChannelsByIndex(self, channelIndexList):
84 """
74 """
@@ -101,9 +91,9 class VoltageProc(ProcessingUnit):
101
91
102 for channelIndex in channelIndexList:
92 for channelIndex in channelIndexList:
103 if channelIndex not in self.dataOut.channelIndexList:
93 if channelIndex not in self.dataOut.channelIndexList:
104 print(channelIndexList)
105 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
94 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
106
95
96 if self.dataOut.type == 'Voltage':
107 if self.dataOut.flagDataAsBlock:
97 if self.dataOut.flagDataAsBlock:
108 """
98 """
109 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
99 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
@@ -115,10 +105,49 class VoltageProc(ProcessingUnit):
115 self.dataOut.data = data
105 self.dataOut.data = data
116 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
106 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
117 self.dataOut.channelList = range(len(channelIndexList))
107 self.dataOut.channelList = range(len(channelIndexList))
108 elif self.dataOut.type == 'Spectra':
109 data_spc = self.dataOut.data_spc[channelIndexList, :]
110 data_dc = self.dataOut.data_dc[channelIndexList, :]
111
112 self.dataOut.data_spc = data_spc
113 self.dataOut.data_dc = data_dc
114
115 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
116 self.dataOut.channelList = range(len(channelIndexList))
117 self.__selectPairsByChannel(channelIndexList)
118
118
119 return 1
119 return 1
120
120
121 def selectHeights(self, minHei=None, maxHei=None):
121 def __selectPairsByChannel(self, channelList=None):
122
123 if channelList == None:
124 return
125
126 pairsIndexListSelected = []
127 for pairIndex in self.dataOut.pairsIndexList:
128 # First pair
129 if self.dataOut.pairsList[pairIndex][0] not in channelList:
130 continue
131 # Second pair
132 if self.dataOut.pairsList[pairIndex][1] not in channelList:
133 continue
134
135 pairsIndexListSelected.append(pairIndex)
136
137 if not pairsIndexListSelected:
138 self.dataOut.data_cspc = None
139 self.dataOut.pairsList = []
140 return
141
142 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
143 self.dataOut.pairsList = [self.dataOut.pairsList[i]
144 for i in pairsIndexListSelected]
145
146 return
147
148 class selectHeights(Operation):
149
150 def run(self, dataOut, minHei=None, maxHei=None):
122 """
151 """
123 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
152 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
124 minHei <= height <= maxHei
153 minHei <= height <= maxHei
@@ -134,6 +163,8 class VoltageProc(ProcessingUnit):
134 1 si el metodo se ejecuto con exito caso contrario devuelve 0
163 1 si el metodo se ejecuto con exito caso contrario devuelve 0
135 """
164 """
136
165
166 self.dataOut = dataOut
167
137 if minHei == None:
168 if minHei == None:
138 minHei = self.dataOut.heightList[0]
169 minHei = self.dataOut.heightList[0]
139
170
@@ -165,8 +196,7 class VoltageProc(ProcessingUnit):
165
196
166 self.selectHeightsByIndex(minIndex, maxIndex)
197 self.selectHeightsByIndex(minIndex, maxIndex)
167
198
168 return 1
199 return self.dataOut
169
170
200
171 def selectHeightsByIndex(self, minIndex, maxIndex):
201 def selectHeightsByIndex(self, minIndex, maxIndex):
172 """
202 """
@@ -185,6 +215,7 class VoltageProc(ProcessingUnit):
185 1 si el metodo se ejecuto con exito caso contrario devuelve 0
215 1 si el metodo se ejecuto con exito caso contrario devuelve 0
186 """
216 """
187
217
218 if self.dataOut.type == 'Voltage':
188 if (minIndex < 0) or (minIndex > maxIndex):
219 if (minIndex < 0) or (minIndex > maxIndex):
189 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
220 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
190
221
@@ -207,59 +238,95 class VoltageProc(ProcessingUnit):
207
238
208 if self.dataOut.nHeights <= 1:
239 if self.dataOut.nHeights <= 1:
209 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
240 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
241 elif self.dataOut.type == 'Spectra':
242 if (minIndex < 0) or (minIndex > maxIndex):
243 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
244 minIndex, maxIndex))
245
246 if (maxIndex >= self.dataOut.nHeights):
247 maxIndex = self.dataOut.nHeights - 1
248
249 # Spectra
250 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
251
252 data_cspc = None
253 if self.dataOut.data_cspc is not None:
254 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
255
256 data_dc = None
257 if self.dataOut.data_dc is not None:
258 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
259
260 self.dataOut.data_spc = data_spc
261 self.dataOut.data_cspc = data_cspc
262 self.dataOut.data_dc = data_dc
263
264 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
210
265
211 return 1
266 return 1
212
267
213
268
214 def filterByHeights(self, window):
269 class filterByHeights(Operation):
215
270
216 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
271 def run(self, dataOut, window):
272
273 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
217
274
218 if window == None:
275 if window == None:
219 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
276 window = (dataOut.radarControllerHeaderObj.txA/dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
220
277
221 newdelta = deltaHeight * window
278 newdelta = deltaHeight * window
222 r = self.dataOut.nHeights % window
279 r = dataOut.nHeights % window
223 newheights = (self.dataOut.nHeights-r)/window
280 newheights = (dataOut.nHeights-r)/window
224
281
225 if newheights <= 1:
282 if newheights <= 1:
226 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
283 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(dataOut.nHeights, window))
227
284
228 if self.dataOut.flagDataAsBlock:
285 if dataOut.flagDataAsBlock:
229 """
286 """
230 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
287 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
231 """
288 """
232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
289 buffer = dataOut.data[:, :, 0:int(dataOut.nHeights-r)]
233 buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window)
290 buffer = buffer.reshape(dataOut.nChannels, dataOut.nProfiles, int(dataOut.nHeights/window), window)
234 buffer = numpy.sum(buffer,3)
291 buffer = numpy.sum(buffer,3)
235
292
236 else:
293 else:
237 buffer = self.dataOut.data[:,0:int(self.dataOut.nHeights-r)]
294 buffer = dataOut.data[:,0:int(dataOut.nHeights-r)]
238 buffer = buffer.reshape(self.dataOut.nChannels,int(self.dataOut.nHeights/window),int(window))
295 buffer = buffer.reshape(dataOut.nChannels,int(dataOut.nHeights/window),int(window))
239 buffer = numpy.sum(buffer,2)
296 buffer = numpy.sum(buffer,2)
240
297
241 self.dataOut.data = buffer
298 dataOut.data = buffer
242 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
299 dataOut.heightList = dataOut.heightList[0] + numpy.arange( newheights )*newdelta
243 self.dataOut.windowOfFilter = window
300 dataOut.windowOfFilter = window
301
302 return dataOut
244
303
245 def setH0(self, h0, deltaHeight = None):
304
305 class setH0(Operation):
306
307 def run(self, dataOut, h0, deltaHeight = None):
246
308
247 if not deltaHeight:
309 if not deltaHeight:
248 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
310 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
249
311
250 nHeights = self.dataOut.nHeights
312 nHeights = dataOut.nHeights
251
313
252 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
314 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
253
315
254 self.dataOut.heightList = newHeiRange
316 dataOut.heightList = newHeiRange
317
318 return dataOut
255
319
256 def deFlip(self, channelList = []):
257
320
258 data = self.dataOut.data.copy()
321 class deFlip(Operation):
259
322
260 if self.dataOut.flagDataAsBlock:
323 def run(self, dataOut, channelList = []):
324
325 data = dataOut.data.copy()
326
327 if dataOut.flagDataAsBlock:
261 flip = self.flip
328 flip = self.flip
262 profileList = list(range(self.dataOut.nProfiles))
329 profileList = list(range(dataOut.nProfiles))
263
330
264 if not channelList:
331 if not channelList:
265 for thisProfile in profileList:
332 for thisProfile in profileList:
@@ -267,7 +334,7 class VoltageProc(ProcessingUnit):
267 flip *= -1.0
334 flip *= -1.0
268 else:
335 else:
269 for thisChannel in channelList:
336 for thisChannel in channelList:
270 if thisChannel not in self.dataOut.channelList:
337 if thisChannel not in dataOut.channelList:
271 continue
338 continue
272
339
273 for thisProfile in profileList:
340 for thisProfile in profileList:
@@ -281,41 +348,57 class VoltageProc(ProcessingUnit):
281 data[:,:] = data[:,:]*self.flip
348 data[:,:] = data[:,:]*self.flip
282 else:
349 else:
283 for thisChannel in channelList:
350 for thisChannel in channelList:
284 if thisChannel not in self.dataOut.channelList:
351 if thisChannel not in dataOut.channelList:
285 continue
352 continue
286
353
287 data[thisChannel,:] = data[thisChannel,:]*self.flip
354 data[thisChannel,:] = data[thisChannel,:]*self.flip
288
355
289 self.flip *= -1.
356 self.flip *= -1.
290
357
291 self.dataOut.data = data
358 dataOut.data = data
292
359
293 def setRadarFrequency(self, frequency=None):
360 return dataOut
294
361
295 if frequency != None:
296 self.dataOut.frequency = frequency
297
362
298 return 1
363 class setAttribute(Operation):
364 '''
365 Set an arbitrary attribute to dataOut
366 '''
367
368 def __init__(self):
369
370 Operation.__init__(self)
371 self._ready = False
372
373 def run(self, dataOut, **kwargs):
299
374
300 def interpolateHeights(self, topLim, botLim):
375 for key, value in kwargs.items():
376 setattr(dataOut, key, value)
377
378 return dataOut
379
380
381 class interpolateHeights(Operation):
382
383 def run(self, dataOut, topLim, botLim):
301 #69 al 72 para julia
384 #69 al 72 para julia
302 #82-84 para meteoros
385 #82-84 para meteoros
303 if len(numpy.shape(self.dataOut.data))==2:
386 if len(numpy.shape(dataOut.data))==2:
304 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
387 sampInterp = (dataOut.data[:,botLim-1] + dataOut.data[:,topLim+1])/2
305 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
388 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
306 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
389 #dataOut.data[:,botLim:limSup+1] = sampInterp
307 self.dataOut.data[:,botLim:topLim+1] = sampInterp
390 dataOut.data[:,botLim:topLim+1] = sampInterp
308 else:
391 else:
309 nHeights = self.dataOut.data.shape[2]
392 nHeights = dataOut.data.shape[2]
310 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
393 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
311 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
394 y = dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
312 f = interpolate.interp1d(x, y, axis = 2)
395 f = interpolate.interp1d(x, y, axis = 2)
313 xnew = numpy.arange(botLim,topLim+1)
396 xnew = numpy.arange(botLim,topLim+1)
314 ynew = f(xnew)
397 ynew = f(xnew)
398 dataOut.data[:,:,botLim:topLim+1] = ynew
315
399
316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
400 return dataOut
317
401
318 # import collections
319
402
320 class CohInt(Operation):
403 class CohInt(Operation):
321
404
@@ -979,8 +1062,6 class ProfileSelector(Operation):
979
1062
980 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
1063 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981
1064
982 #return False
983 return dataOut
984
1065
985 class Reshaper(Operation):
1066 class Reshaper(Operation):
986
1067
General Comments 0
You need to be logged in to leave comments. Login now