##// END OF EJS Templates
Rewrite controller, remove MPDecorator to units (keep for plots an writers) use of queues for interproc comm instead of zmq, self operations are no longer supported
Juan C. Espinoza -
r1287:af11e4aac00c
parent child
Show More
@@ -1,10 +1,9
1 1 import click
2 import schainpy
3 2 import subprocess
4 3 import os
5 4 import sys
6 5 import glob
7 from multiprocessing import cpu_count
6 import schainpy
8 7 from schainpy.controller import Project
9 8 from schainpy.model import Operation, ProcessingUnit
10 9 from schainpy.utils import log
@@ -43,7 +42,7 def getOperations():
43 42 def getArgs(op):
44 43 module = locate('schainpy.model.{}'.format(op))
45 44 try:
46 obj = module(1,2,3,Queue(),5,6)
45 obj = module(1, 2, 3, Queue())
47 46 except:
48 47 obj = module()
49 48
@@ -68,7 +67,7 def getArgs(op):
68 67 def getDoc(obj):
69 68 module = locate('schainpy.model.{}'.format(obj))
70 69 try:
71 obj = module(1,2,3,Queue(),5,6)
70 obj = module(1, 2, 3, Queue())
72 71 except:
73 72 obj = module()
74 73 return obj.__doc__
@@ -94,9 +93,9 PREFIX = 'experiment'
94 93 @click.argument('nextcommand', default=None, required=False, type=str)
95 94 def main(command, nextcommand, version):
96 95 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
97 Available commands.\n
96 Available commands:\n
98 97 xml: runs a schain XML generated file\n
99 run: runs any python script starting 'experiment_'\n
98 run: runs any python script'\n
100 99 generate: generates a template schain script\n
101 100 list: return a list of available procs and operations\n
102 101 search: return avilable operations, procs or arguments of the given
@@ -156,11 +155,9 def search(nextcommand):
156 155 try:
157 156 args = getArgs(nextcommand)
158 157 doc = getDoc(nextcommand)
159 if len(args) == 0:
160 log.success('\n{} has no arguments'.format(nextcommand), '')
161 else:
162 log.success('{}\n{}\n\narguments:\n {}'.format(
163 nextcommand, doc, ', '.join(args)), '')
158 log.success('{}\n{}\n\narguments:\n {}'.format(
159 nextcommand, doc, ', '.join(args)), ''
160 )
164 161 except Exception as e:
165 162 log.error('Module `{}` does not exists'.format(nextcommand), '')
166 163 allModules = getAll()
This diff has been collapsed as it changes many lines, (1196 lines changed) Show them Hide them
@@ -1,568 +1,180
1 1 '''
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
4 Created on September , 2012
2 Main routines to create a Signal Chain project
5 3 '''
6 from platform import python_version
4
5 import re
7 6 import sys
8 7 import ast
9 8 import datetime
10 9 import traceback
11 import math
12 10 import time
13 import zmq
14 from multiprocessing import Process, Queue, Event, Value, cpu_count
11 from multiprocessing import Process, Queue
15 12 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
18
13 from xml.etree.ElementTree import ElementTree, Element, SubElement
19 14
20 15 from schainpy.admin import Alarm, SchainWarning
21 16 from schainpy.model import *
22 17 from schainpy.utils import log
23 18
24 19
25 DTYPES = {
26 'Voltage': '.r',
27 'Spectra': '.pdata'
28 }
29
30
31 def MPProject(project, n=cpu_count()):
32 '''
33 Project wrapper to run schain in n processes
34 '''
35
36 rconf = project.getReadUnitObj()
37 op = rconf.getOperationObj('run')
38 dt1 = op.getParameterValue('startDate')
39 dt2 = op.getParameterValue('endDate')
40 tm1 = op.getParameterValue('startTime')
41 tm2 = op.getParameterValue('endTime')
42 days = (dt2 - dt1).days
43
44 for day in range(days + 1):
45 skip = 0
46 cursor = 0
47 processes = []
48 dt = dt1 + datetime.timedelta(day)
49 dt_str = dt.strftime('%Y/%m/%d')
50 reader = JRODataReader()
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 startDate=dt,
53 endDate=dt,
54 startTime=tm1,
55 endTime=tm2,
56 ext=DTYPES[rconf.datatype])
57 nFiles = len(files)
58 if nFiles == 0:
59 continue
60 skip = int(math.ceil(nFiles / n))
61 while nFiles > cursor * skip:
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 skip=skip)
64 p = project.clone()
65 p.start()
66 processes.append(p)
67 cursor += 1
68
69 def beforeExit(exctype, value, trace):
70 for process in processes:
71 process.terminate()
72 process.join()
73 print(traceback.print_tb(trace))
74
75 sys.excepthook = beforeExit
76
77 for process in processes:
78 process.join()
79 process.terminate()
80
81 time.sleep(3)
82
83 def wait(context):
84
85 time.sleep(1)
86 c = zmq.Context()
87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 msg = receiver.recv_multipart()[1]
91 context.terminate()
92
93 class ParameterConf():
94
95 id = None
96 name = None
97 value = None
98 format = None
99
100 __formated_value = None
101
102 ELEMENTNAME = 'Parameter'
103
104 def __init__(self):
105
106 self.format = 'str'
107
108 def getElementName(self):
109
110 return self.ELEMENTNAME
111
112 def getValue(self):
113
114 value = self.value
115 format = self.format
116
117 if self.__formated_value != None:
118
119 return self.__formated_value
120
121 if format == 'obj':
122 return value
123
124 if format == 'str':
125 self.__formated_value = str(value)
126 return self.__formated_value
127
128 if value == '':
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
131 if format == 'list':
132 strList = [s.strip() for s in value.split(',')]
133 self.__formated_value = strList
134
135 return self.__formated_value
136
137 if format == 'intlist':
138 '''
139 Example:
140 value = (0,1,2)
141 '''
142
143 new_value = ast.literal_eval(value)
144
145 if type(new_value) not in (tuple, list):
146 new_value = [int(new_value)]
147
148 self.__formated_value = new_value
149
150 return self.__formated_value
151
152 if format == 'floatlist':
153 '''
154 Example:
155 value = (0.5, 1.4, 2.7)
156 '''
157
158 new_value = ast.literal_eval(value)
159
160 if type(new_value) not in (tuple, list):
161 new_value = [float(new_value)]
162
163 self.__formated_value = new_value
164
165 return self.__formated_value
166
167 if format == 'date':
168 strList = value.split('/')
169 intList = [int(x) for x in strList]
170 date = datetime.date(intList[0], intList[1], intList[2])
171
172 self.__formated_value = date
173
174 return self.__formated_value
175
176 if format == 'time':
177 strList = value.split(':')
178 intList = [int(x) for x in strList]
179 time = datetime.time(intList[0], intList[1], intList[2])
180
181 self.__formated_value = time
182
183 return self.__formated_value
184
185 if format == 'pairslist':
186 '''
187 Example:
188 value = (0,1),(1,2)
189 '''
190
191 new_value = ast.literal_eval(value)
192
193 if type(new_value) not in (tuple, list):
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195
196 if type(new_value[0]) not in (tuple, list):
197 if len(new_value) != 2:
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 new_value = [new_value]
200
201 for thisPair in new_value:
202 if len(thisPair) != 2:
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204
205 self.__formated_value = new_value
206
207 return self.__formated_value
208
209 if format == 'multilist':
210 '''
211 Example:
212 value = (0,1,2),(3,4,5)
213 '''
214 multiList = ast.literal_eval(value)
215
216 if type(multiList[0]) == int:
217 multiList = ast.literal_eval('(' + value + ')')
218
219 self.__formated_value = multiList
220
221 return self.__formated_value
222
223 if format == 'bool':
224 value = int(value)
225
226 if format == 'int':
227 value = float(value)
228
229 format_func = eval(format)
230
231 self.__formated_value = format_func(value)
232
233 return self.__formated_value
234
235 def updateId(self, new_id):
236
237 self.id = str(new_id)
238
239 def setup(self, id, name, value, format='str'):
240 self.id = str(id)
241 self.name = name
242 if format == 'obj':
243 self.value = value
244 else:
245 self.value = str(value)
246 self.format = str.lower(format)
247
248 self.getValue()
249
250 return 1
251
252 def update(self, name, value, format='str'):
253
254 self.name = name
255 self.value = str(value)
256 self.format = format
257
258 def makeXml(self, opElement):
259 if self.name not in ('queue',):
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 parmElement.set('id', str(self.id))
262 parmElement.set('name', self.name)
263 parmElement.set('value', self.value)
264 parmElement.set('format', self.format)
265
266 def readXml(self, parmElement):
267
268 self.id = parmElement.get('id')
269 self.name = parmElement.get('name')
270 self.value = parmElement.get('value')
271 self.format = str.lower(parmElement.get('format'))
272
273 # Compatible with old signal chain version
274 if self.format == 'int' and self.name == 'idfigure':
275 self.name = 'id'
276
277 def printattr(self):
278
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280
281 class OperationConf():
282
283 ELEMENTNAME = 'Operation'
20 class ConfBase():
284 21
285 22 def __init__(self):
286 23
287 24 self.id = '0'
288 25 self.name = None
289 26 self.priority = None
290 self.topic = None
291
292 def __getNewId(self):
293
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
27 self.parameters = {}
28 self.object = None
29 self.operations = []
295 30
296 31 def getId(self):
32
297 33 return self.id
34
35 def getNewId(self):
36
37 return int(self.id) * 10 + len(self.operations) + 1
298 38
299 39 def updateId(self, new_id):
300 40
301 41 self.id = str(new_id)
302 42
303 43 n = 1
304 for parmObj in self.parmConfObjList:
305
306 idParm = str(int(new_id) * 10 + n)
307 parmObj.updateId(idParm)
308
44 for conf in self.operations:
45 conf_id = str(int(new_id) * 10 + n)
46 conf.updateId(conf_id)
309 47 n += 1
310 48
311 def getElementName(self):
312
313 return self.ELEMENTNAME
314
315 def getParameterObjList(self):
316
317 return self.parmConfObjList
318
319 def getParameterObj(self, parameterName):
320
321 for parmConfObj in self.parmConfObjList:
322
323 if parmConfObj.name != parameterName:
324 continue
325
326 return parmConfObj
327
328 return None
329
330 def getParameterObjfromValue(self, parameterValue):
331
332 for parmConfObj in self.parmConfObjList:
49 def getKwargs(self):
333 50
334 if parmConfObj.getValue() != parameterValue:
335 continue
51 params = {}
336 52
337 return parmConfObj.getValue()
53 for key, value in self.parameters.items():
54 if value not in (None, '', ' '):
55 params[key] = value
56
57 return params
338 58
339 return None
59 def update(self, **kwargs):
340 60
341 def getParameterValue(self, parameterName):
61 for key, value in kwargs.items():
62 self.addParameter(name=key, value=value)
342 63
343 parameterObj = self.getParameterObj(parameterName)
64 def addParameter(self, name, value, format=None):
65 '''
66 '''
344 67
345 # if not parameterObj:
346 # return None
68 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
69 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
70 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
71 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
72 else:
73 try:
74 self.parameters[name] = ast.literal_eval(value)
75 except:
76 if isinstance(value, str) and ',' in value:
77 self.parameters[name] = value.split(',')
78 else:
79 self.parameters[name] = value
80
81 def getParameters(self):
82
83 params = {}
84 for key, value in self.parameters.items():
85 s = type(value).__name__
86 if s == 'date':
87 params[key] = value.strftime('%Y/%m/%d')
88 elif s == 'time':
89 params[key] = value.strftime('%H:%M:%S')
90 else:
91 params[key] = str(value)
347 92
348 value = parameterObj.getValue()
93 return params
94
95 def makeXml(self, element):
349 96
350 return value
97 xml = SubElement(element, self.ELEMENTNAME)
98 for label in self.xml_labels:
99 xml.set(label, str(getattr(self, label)))
100
101 for key, value in self.getParameters().items():
102 xml_param = SubElement(xml, 'Parameter')
103 xml_param.set('name', key)
104 xml_param.set('value', value)
105
106 for conf in self.operations:
107 conf.makeXml(xml)
108
109 def __str__(self):
351 110
352 def getKwargs(self):
111 if self.ELEMENTNAME == 'Operation':
112 s = ' {}[id={}]\n'.format(self.name, self.id)
113 else:
114 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
353 115
354 kwargs = {}
116 for key, value in self.parameters.items():
117 if self.ELEMENTNAME == 'Operation':
118 s += ' {}: {}\n'.format(key, value)
119 else:
120 s += ' {}: {}\n'.format(key, value)
121
122 for conf in self.operations:
123 s += str(conf)
355 124
356 for parmConfObj in self.parmConfObjList:
357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 continue
125 return s
359 126
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
127 class OperationConf(ConfBase):
361 128
362 return kwargs
129 ELEMENTNAME = 'Operation'
130 xml_labels = ['id', 'name']
363 131
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
132 def setup(self, id, name, priority, project_id, err_queue):
365 133
366 134 self.id = str(id)
367 135 self.project_id = project_id
368 136 self.name = name
369 self.type = type
370 self.priority = priority
137 self.type = 'other'
371 138 self.err_queue = err_queue
372 self.lock = lock
373 self.parmConfObjList = []
374
375 def removeParameters(self):
376
377 for obj in self.parmConfObjList:
378 del obj
379
380 self.parmConfObjList = []
381
382 def addParameter(self, name, value, format='str'):
383
384 if value is None:
385 return None
386 id = self.__getNewId()
387
388 parmConfObj = ParameterConf()
389 if not parmConfObj.setup(id, name, value, format):
390 return None
391
392 self.parmConfObjList.append(parmConfObj)
393
394 return parmConfObj
395
396 def changeParameter(self, name, value, format='str'):
397
398 parmConfObj = self.getParameterObj(name)
399 parmConfObj.update(name, value, format)
400 139
401 return parmConfObj
140 def readXml(self, element, project_id, err_queue):
402 141
403 def makeXml(self, procUnitElement):
404
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 opElement.set('id', str(self.id))
407 opElement.set('name', self.name)
408 opElement.set('type', self.type)
409 opElement.set('priority', str(self.priority))
410
411 for parmConfObj in self.parmConfObjList:
412 parmConfObj.makeXml(opElement)
413
414 def readXml(self, opElement, project_id):
415
416 self.id = opElement.get('id')
417 self.name = opElement.get('name')
418 self.type = opElement.get('type')
419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
421
422 # Compatible with old signal chain version
423 # Use of 'run' method instead 'init'
424 if self.type == 'self' and self.name == 'init':
425 self.name = 'run'
426
427 self.parmConfObjList = []
428
429 parmElementList = opElement.iter(ParameterConf().getElementName())
430
431 for parmElement in parmElementList:
432 parmConfObj = ParameterConf()
433 parmConfObj.readXml(parmElement)
434
435 # Compatible with old signal chain version
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 if self.type != 'self' and self.name == 'Plot':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 self.name = parmConfObj.value
440 continue
441
442 self.parmConfObjList.append(parmConfObj)
443
444 def printattr(self):
445
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 self.id,
448 self.name,
449 self.type,
450 self.priority,
451 self.project_id))
142 self.id = element.get('id')
143 self.name = element.get('name')
144 self.type = 'other'
145 self.project_id = str(project_id)
146 self.err_queue = err_queue
452 147
453 for parmConfObj in self.parmConfObjList:
454 parmConfObj.printattr()
148 for elm in element.iter('Parameter'):
149 self.addParameter(elm.get('name'), elm.get('value'))
455 150
456 151 def createObject(self):
457 152
458 153 className = eval(self.name)
459 154
460 if self.type == 'other':
461 opObj = className()
462 elif self.type == 'external':
155 if 'Plot' in self.name or 'Writer' in self.name:
463 156 kwargs = self.getKwargs()
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
157 opObj = className(self.id, self.id, self.project_id, self.err_queue, **kwargs)
465 158 opObj.start()
466 self.opObj = opObj
159 self.type = 'external'
160 else:
161 opObj = className()
467 162
163 self.object = opObj
468 164 return opObj
469 165
470 class ProcUnitConf():
166 class ProcUnitConf(ConfBase):
471 167
472 168 ELEMENTNAME = 'ProcUnit'
169 xml_labels = ['id', 'inputId', 'name']
473 170
474 def __init__(self):
475
476 self.id = None
477 self.datatype = None
478 self.name = None
479 self.inputId = None
480 self.opConfObjList = []
481 self.procUnitObj = None
482 self.opObjDict = {}
483
484 def __getPriority(self):
485
486 return len(self.opConfObjList) + 1
487
488 def __getNewId(self):
489
490 return int(self.id) * 10 + len(self.opConfObjList) + 1
491
492 def getElementName(self):
493
494 return self.ELEMENTNAME
495
496 def getId(self):
497
498 return self.id
499
500 def updateId(self, new_id):
171 def setup(self, project_id, id, name, datatype, inputId, err_queue):
501 172 '''
502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504
505 # If this proc unit has not inputs
506 #if self.inputId == '0':
507 #new_inputId = 0
508
509 n = 1
510 for opConfObj in self.opConfObjList:
511
512 idOp = str(int(new_id) * 10 + n)
513 opConfObj.updateId(idOp)
514
515 n += 1
516
517 self.parentId = str(parentId)
518 self.id = str(new_id)
519 #self.inputId = str(new_inputId)
520 '''
521 n = 1
522
523 def getInputId(self):
524
525 return self.inputId
526
527 def getOperationObjList(self):
528
529 return self.opConfObjList
530
531 def getOperationObj(self, name=None):
532
533 for opConfObj in self.opConfObjList:
534
535 if opConfObj.name != name:
536 continue
537
538 return opConfObj
539
540 return None
541
542 def getOpObjfromParamValue(self, value=None):
543
544 for opConfObj in self.opConfObjList:
545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 continue
547 return opConfObj
548 return None
549
550 def getProcUnitObj(self):
551
552 return self.procUnitObj
553
554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
555 '''
556 id sera el topico a publicar
557 inputId sera el topico a subscribirse
558 173 '''
559 174
560 # Compatible with old signal chain version
561 175 if datatype == None and name == None:
562 176 raise ValueError('datatype or name should be defined')
563 177
564 #Definir una condicion para inputId cuando sea 0
565
566 178 if name == None:
567 179 if 'Proc' in datatype:
568 180 name = datatype
@@ -578,100 +190,49 class ProcUnitConf():
578 190 self.datatype = datatype
579 191 self.inputId = inputId
580 192 self.err_queue = err_queue
581 self.lock = lock
582 self.opConfObjList = []
583
584 self.addOperation(name='run', optype='self')
585
586 def removeOperations(self):
587
588 for obj in self.opConfObjList:
589 del obj
193 self.operations = []
194 self.parameters = {}
590 195
591 self.opConfObjList = []
592 self.addOperation(name='run')
196 def removeOperation(self, id):
593 197
594 def addParameter(self, **kwargs):
595 '''
596 Add parameters to 'run' operation
597 '''
598 opObj = self.opConfObjList[0]
599
600 opObj.addParameter(**kwargs)
198 i = [1 if x.id==id else 0 for x in self.operations]
199 self.operations.pop(i.index(1))
200
201 def getOperation(self, id):
601 202
602 return opObj
203 for conf in self.operations:
204 if conf.id == id:
205 return conf
603 206
604 207 def addOperation(self, name, optype='self'):
605 208 '''
606 Actualizacion - > proceso comunicacion
607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 definir el tipoc de socket o comunicacion ipc++
609
610 209 '''
611 210
612 id = self.__getNewId()
613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
614 opConfObj = OperationConf()
615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
616 self.opConfObjList.append(opConfObj)
617
618 return opConfObj
619
620 def makeXml(self, projectElement):
211 id = self.getNewId()
212 conf = OperationConf()
213 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
214 self.operations.append(conf)
621 215
622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
623 procUnitElement.set('id', str(self.id))
624 procUnitElement.set('name', self.name)
625 procUnitElement.set('datatype', self.datatype)
626 procUnitElement.set('inputId', str(self.inputId))
216 return conf
627 217
628 for opConfObj in self.opConfObjList:
629 opConfObj.makeXml(procUnitElement)
218 def readXml(self, element, project_id, err_queue):
630 219
631 def readXml(self, upElement, project_id):
632
633 self.id = upElement.get('id')
634 self.name = upElement.get('name')
635 self.datatype = upElement.get('datatype')
636 self.inputId = upElement.get('inputId')
220 self.id = element.get('id')
221 self.name = element.get('name')
222 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
223 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
637 224 self.project_id = str(project_id)
638
639 if self.ELEMENTNAME == 'ReadUnit':
640 self.datatype = self.datatype.replace('Reader', '')
641
642 if self.ELEMENTNAME == 'ProcUnit':
643 self.datatype = self.datatype.replace('Proc', '')
644
645 if self.inputId == 'None':
646 self.inputId = '0'
647
648 self.opConfObjList = []
649
650 opElementList = upElement.iter(OperationConf().getElementName())
651
652 for opElement in opElementList:
653 opConfObj = OperationConf()
654 opConfObj.readXml(opElement, project_id)
655 self.opConfObjList.append(opConfObj)
656
657 def printattr(self):
658
659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 self.id,
661 self.name,
662 self.datatype,
663 self.inputId,
664 self.project_id))
665
666 for opConfObj in self.opConfObjList:
667 opConfObj.printattr()
668
669 def getKwargs(self):
670
671 opObj = self.opConfObjList[0]
672 kwargs = opObj.getKwargs()
673
674 return kwargs
225 self.err_queue = err_queue
226 self.operations = []
227 self.parameters = {}
228
229 for elm in element:
230 if elm.tag == 'Parameter':
231 self.addParameter(elm.get('name'), elm.get('value'))
232 elif elm.tag == 'Operation':
233 conf = OperationConf()
234 conf.readXml(elm, project_id, err_queue)
235 self.operations.append(conf)
675 236
676 237 def createObjects(self):
677 238 '''
@@ -680,39 +241,27 class ProcUnitConf():
680 241
681 242 className = eval(self.name)
682 243 kwargs = self.getKwargs()
683 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
244 procUnitObj = className()
245 procUnitObj.name = self.name
684 246 log.success('creating process...', self.name)
685 247
686 for opConfObj in self.opConfObjList:
248 for conf in self.operations:
687 249
688 if opConfObj.type == 'self' and opConfObj.name == 'run':
689 continue
690 elif opConfObj.type == 'self':
691 opObj = getattr(procUnitObj, opConfObj.name)
692 else:
693 opObj = opConfObj.createObject()
250 opObj = conf.createObject()
694 251
695 252 log.success('adding operation: {}, type:{}'.format(
696 opConfObj.name,
697 opConfObj.type), self.name)
253 conf.name,
254 conf.type), self.name)
698 255
699 procUnitObj.addOperation(opConfObj, opObj)
256 procUnitObj.addOperation(conf, opObj)
700 257
701 procUnitObj.start()
702 self.procUnitObj = procUnitObj
703
704 def close(self):
705
706 for opConfObj in self.opConfObjList:
707 if opConfObj.type == 'self':
708 continue
709
710 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 opObj.close()
712
713 self.procUnitObj.close()
258 self.object = procUnitObj
714 259
715 return
260 def run(self):
261 '''
262 '''
263
264 return self.object.call(**self.getKwargs())
716 265
717 266
718 267 class ReadUnitConf(ProcUnitConf):
@@ -725,28 +274,12 class ReadUnitConf(ProcUnitConf):
725 274 self.datatype = None
726 275 self.name = None
727 276 self.inputId = None
728 self.opConfObjList = []
729 self.lock = Event()
730 self.lock.set()
731 self.lock.n = Value('d', 0)
732
733 def getElementName(self):
734
735 return self.ELEMENTNAME
277 self.operations = []
278 self.parameters = {}
736 279
737 280 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 281 startTime='', endTime='', server=None, **kwargs):
739
740
741 '''
742 *****el id del proceso sera el Topico
743
744 Adicion de {topic}, si no esta presente -> error
745 kwargs deben ser trasmitidos en la instanciacion
746
747 '''
748 282
749 # Compatible with old signal chain version
750 283 if datatype == None and name == None:
751 284 raise ValueError('datatype or name should be defined')
752 285 if name == None:
@@ -766,112 +299,16 class ReadUnitConf(ProcUnitConf):
766 299 self.project_id = project_id
767 300 self.name = name
768 301 self.datatype = datatype
769 if path != '':
770 self.path = os.path.abspath(path)
771 self.startDate = startDate
772 self.endDate = endDate
773 self.startTime = startTime
774 self.endTime = endTime
775 self.server = server
776 302 self.err_queue = err_queue
777 self.addRunOperation(**kwargs)
778
779 def update(self, **kwargs):
780
781 if 'datatype' in kwargs:
782 datatype = kwargs.pop('datatype')
783 if 'Reader' in datatype:
784 self.name = datatype
785 else:
786 self.name = '%sReader' % (datatype)
787 self.datatype = self.name.replace('Reader', '')
788
789 attrs = ('path', 'startDate', 'endDate',
790 'startTime', 'endTime')
791
792 for attr in attrs:
793 if attr in kwargs:
794 setattr(self, attr, kwargs.pop(attr))
795
796 self.updateRunOperation(**kwargs)
797
798 def removeOperations(self):
799
800 for obj in self.opConfObjList:
801 del obj
802
803 self.opConfObjList = []
804
805 def addRunOperation(self, **kwargs):
806
807 opObj = self.addOperation(name='run', optype='self')
808
809 if self.server is None:
810 opObj.addParameter(
811 name='datatype', value=self.datatype, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
813 opObj.addParameter(
814 name='startDate', value=self.startDate, format='date')
815 opObj.addParameter(
816 name='endDate', value=self.endDate, format='date')
817 opObj.addParameter(
818 name='startTime', value=self.startTime, format='time')
819 opObj.addParameter(
820 name='endTime', value=self.endTime, format='time')
821
822 for key, value in list(kwargs.items()):
823 opObj.addParameter(name=key, value=value,
824 format=type(value).__name__)
825 else:
826 opObj.addParameter(name='server', value=self.server, format='str')
827
828 return opObj
829
830 def updateRunOperation(self, **kwargs):
831
832 opObj = self.getOperationObj(name='run')
833 opObj.removeParameters()
834
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
837 opObj.addParameter(
838 name='startDate', value=self.startDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 opObj.addParameter(
841 name='startTime', value=self.startTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843
844 for key, value in list(kwargs.items()):
845 opObj.addParameter(name=key, value=value,
846 format=type(value).__name__)
847
848 return opObj
849
850 def readXml(self, upElement, project_id):
851
852 self.id = upElement.get('id')
853 self.name = upElement.get('name')
854 self.datatype = upElement.get('datatype')
855 self.project_id = str(project_id) #yong
856
857 if self.ELEMENTNAME == 'ReadUnit':
858 self.datatype = self.datatype.replace('Reader', '')
859
860 self.opConfObjList = []
861
862 opElementList = upElement.iter(OperationConf().getElementName())
863
864 for opElement in opElementList:
865 opConfObj = OperationConf()
866 opConfObj.readXml(opElement, project_id)
867 self.opConfObjList.append(opConfObj)
303
304 self.addParameter(name='path', value=path)
305 self.addParameter(name='startDate', value=startDate)
306 self.addParameter(name='endDate', value=endDate)
307 self.addParameter(name='startTime', value=startTime)
308 self.addParameter(name='endTime', value=endTime)
868 309
869 if opConfObj.name == 'run':
870 self.path = opConfObj.getParameterValue('path')
871 self.startDate = opConfObj.getParameterValue('startDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
873 self.startTime = opConfObj.getParameterValue('startTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
310 for key, value in kwargs.items():
311 self.addParameter(name=key, value=value)
875 312
876 313
877 314 class Project(Process):
@@ -885,13 +322,15 class Project(Process):
885 322 self.filename = None
886 323 self.description = None
887 324 self.email = None
888 self.alarm = None
889 self.procUnitConfObjDict = {}
890 self.err_queue = Queue()
325 self.alarm = []
326 self.configurations = {}
327 # self.err_queue = Queue()
328 self.err_queue = None
329 self.started = False
891 330
892 def __getNewId(self):
331 def getNewId(self):
893 332
894 idList = list(self.procUnitConfObjDict.keys())
333 idList = list(self.configurations.keys())
895 334 id = int(self.id) * 10
896 335
897 336 while True:
@@ -904,43 +343,28 class Project(Process):
904 343
905 344 return str(id)
906 345
907 def getElementName(self):
908
909 return self.ELEMENTNAME
910
911 def getId(self):
912
913 return self.id
914
915 346 def updateId(self, new_id):
916 347
917 348 self.id = str(new_id)
918 349
919 keyList = list(self.procUnitConfObjDict.keys())
350 keyList = list(self.configurations.keys())
920 351 keyList.sort()
921 352
922 353 n = 1
923 newProcUnitConfObjDict = {}
354 new_confs = {}
924 355
925 356 for procKey in keyList:
926 357
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
358 conf = self.configurations[procKey]
928 359 idProcUnit = str(int(self.id) * 10 + n)
929 procUnitConfObj.updateId(idProcUnit)
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
360 conf.updateId(idProcUnit)
361 new_confs[idProcUnit] = conf
931 362 n += 1
932 363
933 self.procUnitConfObjDict = newProcUnitConfObjDict
364 self.configurations = new_confs
934 365
935 366 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936 367
937 print(' ')
938 print('*' * 60)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 print('*' * 60)
941 print("* Python " + python_version() + " *")
942 print('*' * 19)
943 print(' ')
944 368 self.id = str(id)
945 369 self.description = description
946 370 self.email = email
@@ -950,108 +374,91 class Project(Process):
950 374
951 375 def update(self, **kwargs):
952 376
953 for key, value in list(kwargs.items()):
377 for key, value in kwargs.items():
954 378 setattr(self, key, value)
955 379
956 380 def clone(self):
957 381
958 382 p = Project()
959 p.procUnitConfObjDict = self.procUnitConfObjDict
383 p.id = self.id
384 p.name = self.name
385 p.description = self.description
386 p.configurations = self.configurations.copy()
387
960 388 return p
961 389
962 390 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963 391
964 392 '''
965 Actualizacion:
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969
970 393 '''
971 394
972 395 if id is None:
973 idReadUnit = self.__getNewId()
396 idReadUnit = self.getNewId()
974 397 else:
975 398 idReadUnit = str(id)
976 399
977 readUnitConfObj = ReadUnitConf()
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
400 conf = ReadUnitConf()
401 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
402 self.configurations[conf.id] = conf
980 403
981 return readUnitConfObj
404 return conf
982 405
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
406 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
984 407
985 408 '''
986 Actualizacion:
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 Deberia reemplazar a "inputId"
989
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992
993 409 '''
994 410
995 idProcUnit = self.__getNewId()
996 procUnitConfObj = ProcUnitConf()
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
411 if id is None:
412 idProcUnit = self.getNewId()
413 else:
414 idProcUnit = id
415
416 conf = ProcUnitConf()
417 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
418 self.configurations[conf.id] = conf
1000 419
1001 return procUnitConfObj
420 return conf
1002 421
1003 422 def removeProcUnit(self, id):
1004 423
1005 if id in list(self.procUnitConfObjDict.keys()):
1006 self.procUnitConfObjDict.pop(id)
1007
1008 def getReadUnitId(self):
1009
1010 readUnitConfObj = self.getReadUnitObj()
424 if id in self.configurations:
425 self.configurations.pop(id)
1011 426
1012 return readUnitConfObj.id
427 def getReadUnit(self):
1013 428
1014 def getReadUnitObj(self):
1015
1016 for obj in list(self.procUnitConfObjDict.values()):
1017 if obj.getElementName() == 'ReadUnit':
429 for obj in list(self.configurations.values()):
430 if obj.ELEMENTNAME == 'ReadUnit':
1018 431 return obj
1019 432
1020 433 return None
1021 434
1022 def getProcUnitObj(self, id=None, name=None):
1023
1024 if id != None:
1025 return self.procUnitConfObjDict[id]
435 def getProcUnit(self, id):
1026 436
1027 if name != None:
1028 return self.getProcUnitObjByName(name)
437 return self.configurations[id]
1029 438
1030 return None
1031
1032 def getProcUnitObjByName(self, name):
439 def getUnits(self):
1033 440
1034 for obj in list(self.procUnitConfObjDict.values()):
1035 if obj.name == name:
1036 return obj
1037
1038 return None
441 keys = list(self.configurations)
442 keys.sort()
1039 443
1040 def procUnitItems(self):
444 for key in keys:
445 yield self.configurations[key]
1041 446
1042 return list(self.procUnitConfObjDict.items())
447 def updateUnit(self, id, **kwargs):
1043 448
449 conf = self.configurations[id].update(**kwargs)
450
1044 451 def makeXml(self):
1045 452
1046 projectElement = Element('Project')
1047 projectElement.set('id', str(self.id))
1048 projectElement.set('name', self.name)
1049 projectElement.set('description', self.description)
453 xml = Element('Project')
454 xml.set('id', str(self.id))
455 xml.set('name', self.name)
456 xml.set('description', self.description)
1050 457
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1052 procUnitConfObj.makeXml(projectElement)
458 for conf in self.configurations.values():
459 conf.makeXml(xml)
1053 460
1054 self.projectElement = projectElement
461 self.xml = xml
1055 462
1056 463 def writeXml(self, filename=None):
1057 464
@@ -1077,83 +484,72 class Project(Process):
1077 484
1078 485 self.makeXml()
1079 486
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
487 ElementTree(self.xml).write(abs_file, method='xml')
1081 488
1082 489 self.filename = abs_file
1083 490
1084 491 return 1
1085 492
1086 def readXml(self, filename=None):
1087
1088 if not filename:
1089 print('filename is not defined')
1090 return 0
493 def readXml(self, filename):
1091 494
1092 495 abs_file = os.path.abspath(filename)
1093 496
1094 if not os.path.isfile(abs_file):
1095 print('%s file does not exist' % abs_file)
1096 return 0
1097
1098 self.projectElement = None
1099 self.procUnitConfObjDict = {}
497 self.configurations = {}
1100 498
1101 499 try:
1102 self.projectElement = ElementTree().parse(abs_file)
500 self.xml = ElementTree().parse(abs_file)
1103 501 except:
1104 print('Error reading %s, verify file format' % filename)
502 log.error('Error reading %s, verify file format' % filename)
1105 503 return 0
1106 504
1107 self.project = self.projectElement.tag
1108
1109 self.id = self.projectElement.get('id')
1110 self.name = self.projectElement.get('name')
1111 self.description = self.projectElement.get('description')
1112
1113 readUnitElementList = self.projectElement.iter(
1114 ReadUnitConf().getElementName())
1115
1116 for readUnitElement in readUnitElementList:
1117 readUnitConfObj = ReadUnitConf()
1118 readUnitConfObj.readXml(readUnitElement, self.id)
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1120
1121 procUnitElementList = self.projectElement.iter(
1122 ProcUnitConf().getElementName())
1123
1124 for procUnitElement in procUnitElementList:
1125 procUnitConfObj = ProcUnitConf()
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
505 self.id = self.xml.get('id')
506 self.name = self.xml.get('name')
507 self.description = self.xml.get('description')
508
509 for element in self.xml:
510 if element.tag == 'ReadUnit':
511 conf = ReadUnitConf()
512 conf.readXml(element, self.id, self.err_queue)
513 self.configurations[conf.id] = conf
514 elif element.tag == 'ProcUnit':
515 conf = ProcUnitConf()
516 input_proc = self.configurations[element.get('inputId')]
517 conf.readXml(element, self.id, self.err_queue)
518 self.configurations[conf.id] = conf
1128 519
1129 520 self.filename = abs_file
1130
521
1131 522 return 1
1132 523
1133 524 def __str__(self):
1134 525
1135 print('Project: name = %s, description = %s, id = %s' % (
1136 self.name,
1137 self.description,
1138 self.id))
526 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
527 self.id,
528 self.name,
529 self.description,
530 )
531
532 for conf in self.configurations.values():
533 text += '{}'.format(conf)
1139 534
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1141 print(procUnitConfObj)
535 return text
1142 536
1143 537 def createObjects(self):
1144 538
1145
1146 keys = list(self.procUnitConfObjDict.keys())
539 keys = list(self.configurations.keys())
1147 540 keys.sort()
1148 541 for key in keys:
1149 self.procUnitConfObjDict[key].createObjects()
542 conf = self.configurations[key]
543 conf.createObjects()
544 if conf.inputId is not None:
545 conf.object.setInput(self.configurations[conf.inputId].object)
1150 546
1151 547 def monitor(self):
1152 548
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
549 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
1154 550 t.start()
1155 551
1156 def __monitor(self, queue, ctx):
552 def _monitor(self, queue, ctx):
1157 553
1158 554 import socket
1159 555
@@ -1184,13 +580,7 class Project(Process):
1184 580 else:
1185 581 name, err = self.name, err_msg
1186 582
1187 time.sleep(2)
1188
1189 for conf in self.procUnitConfObjDict.values():
1190 for confop in conf.opConfObjList:
1191 if confop.type == 'external':
1192 confop.opObj.terminate()
1193 conf.procUnitObj.terminate()
583 time.sleep(1)
1194 584
1195 585 ctx.term()
1196 586
@@ -1206,15 +596,14 class Project(Process):
1206 596 subtitle += 'Configuration file: %s\n' % self.filename
1207 597 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208 598
1209 readUnitConfObj = self.getReadUnitObj()
599 readUnitConfObj = self.getReadUnit()
1210 600 if readUnitConfObj:
1211 601 subtitle += '\nInput parameters:\n'
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
602 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
603 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
604 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
605 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
606 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
1218 607
1219 608 a = Alarm(
1220 609 modes=self.alarm,
@@ -1227,64 +616,33 class Project(Process):
1227 616
1228 617 a.start()
1229 618
1230 def isPaused(self):
1231 return 0
1232
1233 def isStopped(self):
1234 return 0
1235
1236 def runController(self):
1237 '''
1238 returns 0 when this process has been stopped, 1 otherwise
1239 '''
1240
1241 if self.isPaused():
1242 print('Process suspended')
1243
1244 while True:
1245 time.sleep(0.1)
1246
1247 if not self.isPaused():
1248 break
1249
1250 if self.isStopped():
1251 break
1252
1253 print('Process reinitialized')
1254
1255 if self.isStopped():
1256 print('Process stopped')
1257 return 0
1258
1259 return 1
1260
1261 619 def setFilename(self, filename):
1262 620
1263 621 self.filename = filename
1264 622
1265 def setProxy(self):
623 def runProcs(self):
1266 624
1267 if not os.path.exists('/tmp/schain'):
1268 os.mkdir('/tmp/schain')
625 err = False
626 n = len(self.configurations)
627
628 while not err:
629 for conf in self.getUnits():
630 ok = conf.run()
631 if ok is 'Error':
632 n -= 1
633 continue
634 elif not ok:
635 break
636 if n == 0:
637 err = True
1269 638
1270 self.ctx = zmq.Context()
1271 xpub = self.ctx.socket(zmq.XPUB)
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 xsub = self.ctx.socket(zmq.XSUB)
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 self.monitor()
1276 try:
1277 zmq.proxy(xpub, xsub)
1278 except zmq.ContextTerminated:
1279 xpub.close()
1280 xsub.close()
1281
1282 639 def run(self):
1283 640
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
641 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
642 self.started = True
1285 643 self.start_time = time.time()
1286 self.createObjects()
1287 self.setProxy()
1288 log.success('{} Done (Time: {}s)'.format(
644 self.createObjects()
645 self.runProcs()
646 log.success('{} Done (Time: {:4.2f}s)'.format(
1289 647 self.name,
1290 648 time.time()-self.start_time), '')
@@ -84,7 +84,7 DATA_STRUCTURE = numpy.dtype([
84 84 ('sea_algorithm', '<u4')
85 85 ])
86 86
87 @MPDecorator
87
88 88 class BLTRParamReader(JRODataReader, ProcessingUnit):
89 89 '''
90 90 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
@@ -247,7 +247,7 class RecordHeaderBLTR():
247 247
248 248 return 1
249 249
250 @MPDecorator
250
251 251 class BLTRSpectraReader (ProcessingUnit):
252 252
253 253 def __init__(self):
@@ -14,6 +14,7 import time
14 14 import datetime
15 15 import zmq
16 16
17 from schainpy.model.proc.jroproc_base import Operation
17 18 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 19 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 20 from schainpy.utils import log
@@ -724,9 +725,9 class JRODataReader(Reader):
724 725 firstHeaderSize = 0
725 726 basicHeaderSize = 24
726 727 __isFirstTimeOnline = 1
727 __printInfo = True
728 728 filefmt = "*%Y%j***"
729 729 folderfmt = "*%Y%j"
730 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
730 731
731 732 def getDtypeWidth(self):
732 733
@@ -1214,26 +1215,6 class JRODataReader(Reader):
1214 1215
1215 1216 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216 1217
1217 def printNumberOfBlock(self):
1218 'SPAM!'
1219
1220 # if self.flagIsNewBlock:
1221 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1222 # self.processingHeaderObj.dataBlocksPerFile,
1223 # self.dataOut.datatime.ctime())
1224
1225 def printInfo(self):
1226
1227 if self.__printInfo == False:
1228 return
1229
1230 self.basicHeaderObj.printInfo()
1231 self.systemHeaderObj.printInfo()
1232 self.radarControllerHeaderObj.printInfo()
1233 self.processingHeaderObj.printInfo()
1234
1235 self.__printInfo = False
1236
1237 1218 def run(self, **kwargs):
1238 1219 """
1239 1220
@@ -1573,3 +1554,27 class JRODataWriter(Reader):
1573 1554 self.dataOut = dataOut
1574 1555 self.putData()
1575 1556 return self.dataOut
1557
1558 class printInfo(Operation):
1559
1560 def __init__(self):
1561
1562 Operation.__init__(self)
1563 self.__printInfo = True
1564
1565 def run(self, dataOut, headers = ['systemHeaderObj', 'radarControllerHeaderObj', 'processingHeaderObj']):
1566 if self.__printInfo == False:
1567 return dataOut
1568
1569 for header in headers:
1570 if hasattr(dataOut, header):
1571 obj = getattr(dataOut, header)
1572 if hasattr(obj, 'printInfo'):
1573 obj.printInfo()
1574 else:
1575 print(obj)
1576 else:
1577 log.warning('Header {} Not found in object'.format(header))
1578
1579 self.__printInfo = False
1580 return dataOut No newline at end of file
@@ -31,7 +31,7 try:
31 31 except:
32 32 pass
33 33
34 @MPDecorator
34
35 35 class DigitalRFReader(ProcessingUnit):
36 36 '''
37 37 classdocs
@@ -633,7 +633,7 class DigitalRFReader(ProcessingUnit):
633 633
634 634 return
635 635
636
636 @MPDecorator
637 637 class DigitalRFWriter(Operation):
638 638 '''
639 639 classdocs
@@ -120,6 +120,7 class Metadata(object):
120 120 parmConfObj.readXml(parmElement)
121 121 self.parmConfObjList.append(parmConfObj)
122 122
123 @MPDecorator
123 124 class FitsWriter(Operation):
124 125 def __init__(self, **kwargs):
125 126 Operation.__init__(self, **kwargs)
@@ -283,7 +284,7 class FitsWriter(Operation):
283 284 self.isConfig = True
284 285 self.putData()
285 286
286 @MPDecorator
287
287 288 class FitsReader(ProcessingUnit):
288 289
289 290 # __TIMEZONE = time.timezone
@@ -78,7 +78,7 def load_json(obj):
78 78
79 79 return iterable
80 80
81 @MPDecorator
81
82 82 class MADReader(Reader, ProcessingUnit):
83 83
84 84 def __init__(self):
@@ -11,7 +11,7 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecora
11 11 from schainpy.model.io.jroIO_base import *
12 12 from schainpy.utils import log
13 13
14 @MPDecorator
14
15 15 class ParamReader(JRODataReader,ProcessingUnit):
16 16 '''
17 17 Reads HDF5 format files
@@ -965,7 +965,7 class ParamWriter(Operation):
965 965 return
966 966
967 967
968 @MPDecorator
968
969 969 class ParameterReader(Reader, ProcessingUnit):
970 970 '''
971 971 Reads HDF5 format files
@@ -11,7 +11,7 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader,
11 11 from schainpy.model.data.jrodata import Spectra
12 12 from schainpy.utils import log
13 13
14 @MPDecorator
14
15 15 class SpectraReader(JRODataReader, ProcessingUnit):
16 16 """
17 17 Esta clase permite leer datos de espectros desde archivos procesados (.pdata). La lectura
@@ -14,7 +14,7 except:
14 14
15 15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 16 from schainpy.model.data.jrodata import Voltage
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
18 18
19 19 try:
20 20 import digital_rf_hdf5
@@ -546,6 +546,8 class USRPReader(ProcessingUnit):
546 546
547 547 return
548 548
549
550 @MPDecorator
549 551 class USRPWriter(Operation):
550 552 '''
551 553 classdocs
@@ -10,12 +10,8 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 import zmq
14 import tempfile
15 from io import StringIO
16 # from _sha import blocksize
17 13
18 @MPDecorator
14
19 15 class VoltageReader(JRODataReader, ProcessingUnit):
20 16 """
21 17 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
@@ -15,7 +15,7 from numpy import transpose
15 15 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
16 16 from schainpy.model.data.jrodata import Parameters
17 17
18 @MPDecorator
18
19 19 class BLTRParametersProc(ProcessingUnit):
20 20 '''
21 21 Processing unit for BLTR parameters data (winds)
@@ -76,7 +76,7 class BLTRParametersProc(ProcessingUnit):
76 76 self.dataOut.data_param[i][SNRavgdB <= snr_threshold] = numpy.nan
77 77
78 78 # TODO
79 @MPDecorator
79
80 80 class OutliersFilter(Operation):
81 81
82 82 def __init__(self):
@@ -16,7 +16,7 class AMISRProc(ProcessingUnit):
16 16 self.dataOut.copy(self.dataIn)
17 17
18 18
19 class PrintInfo(Operation):
19 class PrintInfoAMISR(Operation):
20 20 def __init__(self, **kwargs):
21 21 Operation.__init__(self, **kwargs)
22 22 self.__isPrinted = False
@@ -1,19 +1,9
1 1 '''
2 Updated for multiprocessing
3 Author : Sergio Cortez
4 Jan 2018
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
10 Based on:
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
2 Base clases to create Processing units and operations, the MPDecorator
3 must be used in plotting and writing operations to allow to run as an
4 external process.
13 5 '''
14 6
15 import os
16 import sys
17 7 import inspect
18 8 import zmq
19 9 import time
@@ -24,27 +14,16 try:
24 14 except:
25 15 from Queue import Queue
26 16 from threading import Thread
27 from multiprocessing import Process
28
17 from multiprocessing import Process, Queue
29 18 from schainpy.utils import log
30 19
31 20
32 21 class ProcessingUnit(object):
22 '''
23 Base class to create Signal Chain Units
24 '''
33 25
34 """
35 Update - Jan 2018 - MULTIPROCESSING
36 All the "call" methods present in the previous base were removed.
37 The majority of operations are independant processes, thus
38 the decorator is in charge of communicate the operation processes
39 with the proccessing unit via IPC.
40
41 The constructor does not receive any argument. The remaining methods
42 are related with the operations to execute.
43
44
45 """
46 26 proc_type = 'processing'
47 __attrs__ = []
48 27
49 28 def __init__(self):
50 29
@@ -52,8 +31,11 class ProcessingUnit(object):
52 31 self.dataOut = None
53 32 self.isConfig = False
54 33 self.operations = []
55 self.plots = []
34
35 def setInput(self, unit):
56 36
37 self.dataIn = unit.dataOut
38
57 39 def getAllowedArgs(self):
58 40 if hasattr(self, '__attrs__'):
59 41 return self.__attrs__
@@ -61,27 +43,10 class ProcessingUnit(object):
61 43 return inspect.getargspec(self.run).args
62 44
63 45 def addOperation(self, conf, operation):
64 """
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 posses the id of the operation process (IPC purposes)
67
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 identificador asociado a este objeto.
70
71 Input:
72
73 object : objeto de la clase "Operation"
74
75 Return:
76
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 """
79
80 self.operations.append(
81 (operation, conf.type, conf.id, conf.getKwargs()))
46 '''
47 '''
82 48
83 if 'plot' in self.name.lower():
84 self.plots.append(operation.CODE)
49 self.operations.append((operation, conf.type, conf.getKwargs()))
85 50
86 51 def getOperationObj(self, objId):
87 52
@@ -90,17 +55,37 class ProcessingUnit(object):
90 55
91 56 return self.operations[objId]
92 57
93 def operation(self, **kwargs):
94 """
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 atributos del objeto dataOut
97
98 Input:
99
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 """
58 def call(self, **kwargs):
59 '''
60 '''
61
62 try:
63 if self.dataIn is not None and self.dataIn.flagNoData and not self.dataIn.error:
64 return self.dataIn.isReady()
65 elif self.dataIn is None or not self.dataIn.error:
66 self.run(**kwargs)
67 elif self.dataIn.error:
68 self.dataOut.error = self.dataIn.error
69 self.dataOut.flagNoData = True
70 except:
71 err = traceback.format_exc()
72 if 'SchainWarning' in err:
73 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), self.name)
74 elif 'SchainError' in err:
75 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), self.name)
76 else:
77 log.error(err, self.name)
78 self.dataOut.error = True
79
80 for op, optype, opkwargs in self.operations:
81 if optype == 'other' and not self.dataOut.flagNoData:
82 self.dataOut = op.run(self.dataOut, **opkwargs)
83 elif optype == 'external' and not self.dataOut.flagNoData:
84 op.queue.put(self.dataOut)
85 elif optype == 'external' and self.dataOut.error:
86 op.queue.put(self.dataOut)
102 87
103 raise NotImplementedError
88 return 'Error' if self.dataOut.error else self.dataOut.isReady()
104 89
105 90 def setup(self):
106 91
@@ -117,22 +102,10 class ProcessingUnit(object):
117 102
118 103 class Operation(object):
119 104
120 """
121 Update - Jan 2018 - MULTIPROCESSING
122
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 The constructor doe snot receive any argument, neither the baseclass.
125
126
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 acumulacion dentro de esta clase
130
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132
133 """
105 '''
106 '''
107
134 108 proc_type = 'operation'
135 __attrs__ = []
136 109
137 110 def __init__(self):
138 111
@@ -180,58 +153,12 class Operation(object):
180 153
181 154 return
182 155
183 class InputQueue(Thread):
184
185 '''
186 Class to hold input data for Proccessing Units and external Operations,
187 '''
188
189 def __init__(self, project_id, inputId, lock=None):
190
191 Thread.__init__(self)
192 self.queue = Queue()
193 self.project_id = project_id
194 self.inputId = inputId
195 self.lock = lock
196 self.islocked = False
197 self.size = 0
198
199 def run(self):
200
201 c = zmq.Context()
202 self.receiver = c.socket(zmq.SUB)
203 self.receiver.connect(
204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206
207 while True:
208 obj = self.receiver.recv_multipart()[1]
209 self.size += sys.getsizeof(obj)
210 self.queue.put(obj)
211
212 def get(self):
213
214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
216 self.islocked = True
217 self.lock.clear()
218 elif self.islocked and self.size/1000000 <= 512:
219 self.islocked = False
220 self.lock.n.value -= 1
221 if self.lock.n.value == 0:
222 self.lock.set()
223
224 obj = self.queue.get()
225 self.size -= sys.getsizeof(obj)
226 return pickle.loads(obj)
227
228 156
229 157 def MPDecorator(BaseClass):
230 158 """
231 159 Multiprocessing class decorator
232 160
233 This function add multiprocessing features to a BaseClass. Also, it handle
234 the communication beetween processes (readers, procUnits and operations).
161 This function add multiprocessing features to a BaseClass.
235 162 """
236 163
237 164 class MPClass(BaseClass, Process):
@@ -239,191 +166,42 def MPDecorator(BaseClass):
239 166 def __init__(self, *args, **kwargs):
240 167 super(MPClass, self).__init__()
241 168 Process.__init__(self)
242 self.operationKwargs = {}
169
243 170 self.args = args
244 171 self.kwargs = kwargs
245 self.sender = None
246 self.receiver = None
247 self.i = 0
248 172 self.t = time.time()
173 self.op_type = 'external'
249 174 self.name = BaseClass.__name__
250 175 self.__doc__ = BaseClass.__doc__
251 176
252 177 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 178 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 179
255 self.start_time = time.time()
256 self.id = args[0]
257 self.inputId = args[1]
258 self.project_id = args[2]
180 self.start_time = time.time()
259 181 self.err_queue = args[3]
260 self.lock = args[4]
261 self.typeProc = args[5]
262 self.err_queue.put('#_start_#')
263 if self.inputId is not None:
264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265
266 def subscribe(self):
267 '''
268 Start the zmq socket receiver and subcribe to input ID.
269 '''
270
271 self.queue.start()
272
273 def listen(self):
274 '''
275 This function waits for objects
276 '''
277
278 return self.queue.get()
279
280 def set_publisher(self):
281 '''
282 This function create a zmq socket for publishing objects.
283 '''
284
285 time.sleep(0.5)
286
287 c = zmq.Context()
288 self.sender = c.socket(zmq.PUB)
289 self.sender.connect(
290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291
292 def publish(self, data, id):
293 '''
294 This function publish an object, to an specific topic.
295 It blocks publishing when receiver queue is full to avoid data loss
296 '''
297
298 if self.inputId is None:
299 self.lock.wait()
300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301
302 def runReader(self):
303 '''
304 Run fuction for read units
305 '''
306 while True:
307
308 try:
309 BaseClass.run(self, **self.kwargs)
310 except:
311 err = traceback.format_exc()
312 if 'No more files' in err:
313 log.warning('No more files to read', self.name)
314 else:
315 self.err_queue.put('{}|{}'.format(self.name, err))
316 self.dataOut.error = True
317
318 for op, optype, opId, kwargs in self.operations:
319 if optype == 'self' and not self.dataOut.flagNoData:
320 op(**kwargs)
321 elif optype == 'other' and not self.dataOut.flagNoData:
322 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 elif optype == 'external':
324 self.publish(self.dataOut, opId)
325
326 if self.dataOut.flagNoData and not self.dataOut.error:
327 continue
328
329 self.publish(self.dataOut, self.id)
330
331 if self.dataOut.error:
332 break
333
334 time.sleep(0.5)
335
336 def runProc(self):
337 '''
338 Run function for proccessing units
339 '''
340
341 while True:
342 self.dataIn = self.listen()
343
344 if self.dataIn.flagNoData and self.dataIn.error is None:
345 continue
346 elif not self.dataIn.error:
347 try:
348 BaseClass.run(self, **self.kwargs)
349 except:
350 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 self.dataOut.error = True
352 elif self.dataIn.error:
353 self.dataOut.error = self.dataIn.error
354 self.dataOut.flagNoData = True
355
356 for op, optype, opId, kwargs in self.operations:
357 if optype == 'self' and not self.dataOut.flagNoData:
358 op(**kwargs)
359 elif optype == 'other' and not self.dataOut.flagNoData:
360 self.dataOut = op.run(self.dataOut, **kwargs)
361 elif optype == 'external' and not self.dataOut.flagNoData:
362 self.publish(self.dataOut, opId)
363
364 self.publish(self.dataOut, self.id)
365 for op, optype, opId, kwargs in self.operations:
366 if optype == 'external' and self.dataOut.error:
367 self.publish(self.dataOut, opId)
368
369 if self.dataOut.error:
370 break
371
372 time.sleep(0.5)
182 self.queue = Queue(maxsize=1)
183 self.myrun = BaseClass.run
373 184
374 def runOp(self):
375 '''
376 Run function for external operations (this operations just receive data
377 ex: plots, writers, publishers)
378 '''
185 def run(self):
379 186
380 187 while True:
381 188
382 dataOut = self.listen()
189 dataOut = self.queue.get()
383 190
384 191 if not dataOut.error:
385 192 try:
386 193 BaseClass.run(self, dataOut, **self.kwargs)
387 194 except:
388 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 dataOut.error = True
390 else:
391 break
392
393 def run(self):
394 if self.typeProc is "ProcUnit":
395
396 if self.inputId is not None:
397 self.subscribe()
398
399 self.set_publisher()
400
401 if 'Reader' not in BaseClass.__name__:
402 self.runProc()
195 err = traceback.format_exc()
196 log.error(err.split('\n')[-2], self.name)
403 197 else:
404 self.runReader()
405
406 elif self.typeProc is "Operation":
407
408 self.subscribe()
409 self.runOp()
410
411 else:
412 raise ValueError("Unknown type")
198 break
413 199
414 200 self.close()
415 201
416 202 def close(self):
417 203
418 204 BaseClass.close(self)
419 self.err_queue.put('#_end_#')
420
421 if self.sender:
422 self.sender.close()
423
424 if self.receiver:
425 self.receiver.close()
426
427 205 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 206
429 207 return MPClass
@@ -1,7 +1,7
1 1 import numpy
2 2
3 3 from .jroproc_base import ProcessingUnit, Operation
4 from schainpy.model.data.jrodata import Correlation, hildebrand_sekhon
4 from schainpy.model.data.jrodata import Correlation
5 5
6 6 class CorrelationProc(ProcessingUnit):
7 7
@@ -5,7 +5,7 from schainpy.model.data.jrodata import SpectraHeis
5 5 from schainpy.utils import log
6 6
7 7
8 @MPDecorator
8
9 9 class SpectraHeisProc(ProcessingUnit):
10 10
11 11 def __init__(self):#, **kwargs):
@@ -46,7 +46,7 def _unpickle_method(func_name, obj, cls):
46 46 break
47 47 return func.__get__(obj, cls)
48 48
49 @MPDecorator
49
50 50 class ParametersProc(ProcessingUnit):
51 51
52 52 METHODS = {}
@@ -1329,13 +1329,12 class SpectralMoments(Operation):
1329 1329
1330 1330 def run(self, dataOut):
1331 1331
1332 #dataOut.data_pre = dataOut.data_pre[0]
1333 1332 data = dataOut.data_pre[0]
1334 1333 absc = dataOut.abscissaList[:-1]
1335 1334 noise = dataOut.noise
1336 1335 nChannel = data.shape[0]
1337 1336 data_param = numpy.zeros((nChannel, 4, data.shape[2]))
1338
1337
1339 1338 for ind in range(nChannel):
1340 1339 data_param[ind,:,:] = self.__calculateMoments( data[ind,:,:] , absc , noise[ind] )
1341 1340
@@ -1344,6 +1343,7 class SpectralMoments(Operation):
1344 1343 dataOut.data_POW = data_param[:,1]
1345 1344 dataOut.data_DOP = data_param[:,2]
1346 1345 dataOut.data_WIDTH = data_param[:,3]
1346
1347 1347 return dataOut
1348 1348
1349 1349 def __calculateMoments(self, oldspec, oldfreq, n0,
@@ -1370,25 +1370,27 class SpectralMoments(Operation):
1370 1370 vec_w = numpy.zeros(oldspec.shape[1])
1371 1371 vec_snr = numpy.zeros(oldspec.shape[1])
1372 1372
1373 oldspec = numpy.ma.masked_invalid(oldspec)
1374
1373 # oldspec = numpy.ma.masked_invalid(oldspec)
1374
1375 1375 for ind in range(oldspec.shape[1]):
1376
1376
1377 1377 spec = oldspec[:,ind]
1378 1378 aux = spec*fwindow
1379 1379 max_spec = aux.max()
1380 m = list(aux).index(max_spec)
1381
1380 m = aux.tolist().index(max_spec)
1381
1382 1382 #Smooth
1383 if (smooth == 0): spec2 = spec
1384 else: spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1385
1383 if (smooth == 0):
1384 spec2 = spec
1385 else:
1386 spec2 = scipy.ndimage.filters.uniform_filter1d(spec,size=smooth)
1387
1386 1388 # Calculo de Momentos
1387 bb = spec2[list(range(m,spec2.size))]
1389 bb = spec2[numpy.arange(m,spec2.size)]
1388 1390 bb = (bb<n0).nonzero()
1389 1391 bb = bb[0]
1390 1392
1391 ss = spec2[list(range(0,m + 1))]
1393 ss = spec2[numpy.arange(0,m + 1)]
1392 1394 ss = (ss<n0).nonzero()
1393 1395 ss = ss[0]
1394 1396
@@ -1399,17 +1401,20 class SpectralMoments(Operation):
1399 1401 if (bb0 < 0):
1400 1402 bb0 = 0
1401 1403
1402 if (ss.size == 0): ss1 = 1
1403 else: ss1 = max(ss) + 1
1404 if (ss.size == 0):
1405 ss1 = 1
1406 else:
1407 ss1 = max(ss) + 1
1404 1408
1405 if (ss1 > m): ss1 = m
1409 if (ss1 > m):
1410 ss1 = m
1406 1411
1407 valid = numpy.asarray(list(range(int(m + bb0 - ss1 + 1)))) + ss1
1408 power = ((spec2[valid] - n0)*fwindow[valid]).sum()
1409 fd = ((spec2[valid]- n0)*freq[valid]*fwindow[valid]).sum()/power
1410 w = math.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum()/power)
1411 snr = (spec2.mean()-n0)/n0
1412 valid = numpy.arange(int(m + bb0 - ss1 + 1)) + ss1
1412 1413
1414 power = ((spec2[valid] - n0) * fwindow[valid]).sum()
1415 fd = ((spec2[valid]- n0)*freq[valid] * fwindow[valid]).sum() / power
1416 w = numpy.sqrt(((spec2[valid] - n0)*fwindow[valid]*(freq[valid]- fd)**2).sum() / power)
1417 snr = (spec2.mean()-n0)/n0
1413 1418 if (snr < 1.e-20) :
1414 1419 snr = 1.e-20
1415 1420
@@ -1417,9 +1422,8 class SpectralMoments(Operation):
1417 1422 vec_fd[ind] = fd
1418 1423 vec_w[ind] = w
1419 1424 vec_snr[ind] = snr
1420
1421 moments = numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1422 return moments
1425
1426 return numpy.vstack((vec_snr, vec_power, vec_fd, vec_w))
1423 1427
1424 1428 #------------------ Get SA Parameters --------------------------
1425 1429
@@ -1,3 +1,4
1 import time
1 2 import itertools
2 3
3 4 import numpy
@@ -7,7 +8,7 from schainpy.model.data.jrodata import Spectra
7 8 from schainpy.model.data.jrodata import hildebrand_sekhon
8 9 from schainpy.utils import log
9 10
10 @MPDecorator
11
11 12 class SpectraProc(ProcessingUnit):
12 13
13 14
@@ -120,7 +121,7 class SpectraProc(ProcessingUnit):
120 121 self.dataOut.flagShiftFFT = False
121 122
122 123 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
123
124
124 125 if self.dataIn.type == "Spectra":
125 126 self.dataOut.copy(self.dataIn)
126 127 if shift_fft:
@@ -219,81 +220,6 class SpectraProc(ProcessingUnit):
219 220 self.dataOut.pairsList = pairs
220 221
221 222 return
222
223 def __selectPairsByChannel(self, channelList=None):
224
225 if channelList == None:
226 return
227
228 pairsIndexListSelected = []
229 for pairIndex in self.dataOut.pairsIndexList:
230 # First pair
231 if self.dataOut.pairsList[pairIndex][0] not in channelList:
232 continue
233 # Second pair
234 if self.dataOut.pairsList[pairIndex][1] not in channelList:
235 continue
236
237 pairsIndexListSelected.append(pairIndex)
238
239 if not pairsIndexListSelected:
240 self.dataOut.data_cspc = None
241 self.dataOut.pairsList = []
242 return
243
244 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
245 self.dataOut.pairsList = [self.dataOut.pairsList[i]
246 for i in pairsIndexListSelected]
247
248 return
249
250 def selectChannels(self, channelList):
251
252 channelIndexList = []
253
254 for channel in channelList:
255 if channel not in self.dataOut.channelList:
256 raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % (
257 channel, str(self.dataOut.channelList)))
258
259 index = self.dataOut.channelList.index(channel)
260 channelIndexList.append(index)
261
262 self.selectChannelsByIndex(channelIndexList)
263
264 def selectChannelsByIndex(self, channelIndexList):
265 """
266 Selecciona un bloque de datos en base a canales segun el channelIndexList
267
268 Input:
269 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
270
271 Affected:
272 self.dataOut.data_spc
273 self.dataOut.channelIndexList
274 self.dataOut.nChannels
275
276 Return:
277 None
278 """
279
280 for channelIndex in channelIndexList:
281 if channelIndex not in self.dataOut.channelIndexList:
282 raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % (
283 channelIndex, self.dataOut.channelIndexList))
284
285 data_spc = self.dataOut.data_spc[channelIndexList, :]
286 data_dc = self.dataOut.data_dc[channelIndexList, :]
287
288 self.dataOut.data_spc = data_spc
289 self.dataOut.data_dc = data_dc
290
291 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
292 self.dataOut.channelList = range(len(channelIndexList))
293 self.__selectPairsByChannel(channelIndexList)
294
295 return 1
296
297 223
298 224 def selectFFTs(self, minFFT, maxFFT ):
299 225 """
@@ -331,67 +257,6 class SpectraProc(ProcessingUnit):
331 257
332 258 return 1
333 259
334
335 def setH0(self, h0, deltaHeight = None):
336
337 if not deltaHeight:
338 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
339
340 nHeights = self.dataOut.nHeights
341
342 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
343
344 self.dataOut.heightList = newHeiRange
345
346
347 def selectHeights(self, minHei, maxHei):
348 """
349 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
350 minHei <= height <= maxHei
351
352 Input:
353 minHei : valor minimo de altura a considerar
354 maxHei : valor maximo de altura a considerar
355
356 Affected:
357 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
358
359 Return:
360 1 si el metodo se ejecuto con exito caso contrario devuelve 0
361 """
362
363
364 if (minHei > maxHei):
365 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei))
366
367 if (minHei < self.dataOut.heightList[0]):
368 minHei = self.dataOut.heightList[0]
369
370 if (maxHei > self.dataOut.heightList[-1]):
371 maxHei = self.dataOut.heightList[-1]
372
373 minIndex = 0
374 maxIndex = 0
375 heights = self.dataOut.heightList
376
377 inda = numpy.where(heights >= minHei)
378 indb = numpy.where(heights <= maxHei)
379
380 try:
381 minIndex = inda[0][0]
382 except:
383 minIndex = 0
384
385 try:
386 maxIndex = indb[0][-1]
387 except:
388 maxIndex = len(heights)
389
390 self.selectHeightsByIndex(minIndex, maxIndex)
391
392
393 return 1
394
395 260 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
396 261 newheis = numpy.where(
397 262 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
@@ -466,54 +331,100 class SpectraProc(ProcessingUnit):
466 331
467 332 return 1
468 333
334 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
335 # validacion de rango
336 if minHei == None:
337 minHei = self.dataOut.heightList[0]
469 338
339 if maxHei == None:
340 maxHei = self.dataOut.heightList[-1]
470 341
471 def selectHeightsByIndex(self, minIndex, maxIndex):
472 """
473 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
474 minIndex <= index <= maxIndex
342 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
343 print('minHei: %.2f is out of the heights range' % (minHei))
344 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
345 minHei = self.dataOut.heightList[0]
475 346
476 Input:
477 minIndex : valor de indice minimo de altura a considerar
478 maxIndex : valor de indice maximo de altura a considerar
347 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
348 print('maxHei: %.2f is out of the heights range' % (maxHei))
349 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
350 maxHei = self.dataOut.heightList[-1]
479 351
480 Affected:
481 self.dataOut.data_spc
482 self.dataOut.data_cspc
483 self.dataOut.data_dc
484 self.dataOut.heightList
352 # validacion de velocidades
353 velrange = self.dataOut.getVelRange(1)
485 354
486 Return:
487 1 si el metodo se ejecuto con exito caso contrario devuelve 0
488 """
355 if minVel == None:
356 minVel = velrange[0]
357
358 if maxVel == None:
359 maxVel = velrange[-1]
360
361 if (minVel < velrange[0]) or (minVel > maxVel):
362 print('minVel: %.2f is out of the velocity range' % (minVel))
363 print('minVel is setting to %.2f' % (velrange[0]))
364 minVel = velrange[0]
365
366 if (maxVel > velrange[-1]) or (maxVel < minVel):
367 print('maxVel: %.2f is out of the velocity range' % (maxVel))
368 print('maxVel is setting to %.2f' % (velrange[-1]))
369 maxVel = velrange[-1]
370
371 # seleccion de indices para rango
372 minIndex = 0
373 maxIndex = 0
374 heights = self.dataOut.heightList
375
376 inda = numpy.where(heights >= minHei)
377 indb = numpy.where(heights <= maxHei)
378
379 try:
380 minIndex = inda[0][0]
381 except:
382 minIndex = 0
383
384 try:
385 maxIndex = indb[0][-1]
386 except:
387 maxIndex = len(heights)
489 388
490 389 if (minIndex < 0) or (minIndex > maxIndex):
491 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
390 raise ValueError("some value in (%d,%d) is not valid" % (
492 391 minIndex, maxIndex))
493 392
494 393 if (maxIndex >= self.dataOut.nHeights):
495 394 maxIndex = self.dataOut.nHeights - 1
496 395
497 # Spectra
498 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
396 # seleccion de indices para velocidades
397 indminvel = numpy.where(velrange >= minVel)
398 indmaxvel = numpy.where(velrange <= maxVel)
399 try:
400 minIndexVel = indminvel[0][0]
401 except:
402 minIndexVel = 0
499 403
500 data_cspc = None
501 if self.dataOut.data_cspc is not None:
502 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
404 try:
405 maxIndexVel = indmaxvel[0][-1]
406 except:
407 maxIndexVel = len(velrange)
503 408
504 data_dc = None
505 if self.dataOut.data_dc is not None:
506 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
409 # seleccion del espectro
410 data_spc = self.dataOut.data_spc[:,
411 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
412 # estimacion de ruido
413 noise = numpy.zeros(self.dataOut.nChannels)
507 414
508 self.dataOut.data_spc = data_spc
509 self.dataOut.data_cspc = data_cspc
510 self.dataOut.data_dc = data_dc
415 for channel in range(self.dataOut.nChannels):
416 daux = data_spc[channel, :, :]
417 sortdata = numpy.sort(daux, axis=None)
418 noise[channel] = hildebrand_sekhon(sortdata, self.dataOut.nIncohInt)
511 419
512 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
420 self.dataOut.noise_estimation = noise.copy()
513 421
514 422 return 1
515 423
516 def removeDC(self, mode=2):
424 class removeDC(Operation):
425
426 def run(self, dataOut, mode=2):
427 self.dataOut = dataOut
517 428 jspectra = self.dataOut.data_spc
518 429 jcspectra = self.dataOut.data_cspc
519 430
@@ -571,7 +482,9 class SpectraProc(ProcessingUnit):
571 482 self.dataOut.data_spc = jspectra
572 483 self.dataOut.data_cspc = jcspectra
573 484
574 return 1
485 return self.dataOut
486
487 class removeInterference(Operation):
575 488
576 489 def removeInterference2(self):
577 490
@@ -594,11 +507,9 class SpectraProc(ProcessingUnit):
594 507 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
595 508 cspc[i,InterferenceRange,:] = numpy.NaN
596 509
597
598
599 510 self.dataOut.data_cspc = cspc
600 511
601 def removeInterference(self, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None):
512 def removeInterference(self):
602 513
603 514 jspectra = self.dataOut.data_spc
604 515 jcspectra = self.dataOut.data_cspc
@@ -781,101 +692,16 class SpectraProc(ProcessingUnit):
781 692
782 693 return 1
783 694
784 def setRadarFrequency(self, frequency=None):
785
786 if frequency != None:
787 self.dataOut.frequency = frequency
788
789 return 1
790
791 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
792 # validacion de rango
793 if minHei == None:
794 minHei = self.dataOut.heightList[0]
795
796 if maxHei == None:
797 maxHei = self.dataOut.heightList[-1]
798
799 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
800 print('minHei: %.2f is out of the heights range' % (minHei))
801 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
802 minHei = self.dataOut.heightList[0]
803
804 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
805 print('maxHei: %.2f is out of the heights range' % (maxHei))
806 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
807 maxHei = self.dataOut.heightList[-1]
808
809 # validacion de velocidades
810 velrange = self.dataOut.getVelRange(1)
695 def run(self, dataOut, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None, mode=1):
811 696
812 if minVel == None:
813 minVel = velrange[0]
697 self.dataOut = dataOut
814 698
815 if maxVel == None:
816 maxVel = velrange[-1]
817
818 if (minVel < velrange[0]) or (minVel > maxVel):
819 print('minVel: %.2f is out of the velocity range' % (minVel))
820 print('minVel is setting to %.2f' % (velrange[0]))
821 minVel = velrange[0]
822
823 if (maxVel > velrange[-1]) or (maxVel < minVel):
824 print('maxVel: %.2f is out of the velocity range' % (maxVel))
825 print('maxVel is setting to %.2f' % (velrange[-1]))
826 maxVel = velrange[-1]
827
828 # seleccion de indices para rango
829 minIndex = 0
830 maxIndex = 0
831 heights = self.dataOut.heightList
832
833 inda = numpy.where(heights >= minHei)
834 indb = numpy.where(heights <= maxHei)
835
836 try:
837 minIndex = inda[0][0]
838 except:
839 minIndex = 0
840
841 try:
842 maxIndex = indb[0][-1]
843 except:
844 maxIndex = len(heights)
845
846 if (minIndex < 0) or (minIndex > maxIndex):
847 raise ValueError("some value in (%d,%d) is not valid" % (
848 minIndex, maxIndex))
849
850 if (maxIndex >= self.dataOut.nHeights):
851 maxIndex = self.dataOut.nHeights - 1
852
853 # seleccion de indices para velocidades
854 indminvel = numpy.where(velrange >= minVel)
855 indmaxvel = numpy.where(velrange <= maxVel)
856 try:
857 minIndexVel = indminvel[0][0]
858 except:
859 minIndexVel = 0
860
861 try:
862 maxIndexVel = indmaxvel[0][-1]
863 except:
864 maxIndexVel = len(velrange)
865
866 # seleccion del espectro
867 data_spc = self.dataOut.data_spc[:,
868 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
869 # estimacion de ruido
870 noise = numpy.zeros(self.dataOut.nChannels)
871
872 for channel in range(self.dataOut.nChannels):
873 daux = data_spc[channel, :, :]
874 noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt)
875
876 self.dataOut.noise_estimation = noise.copy()
699 if mode == 1:
700 self.removeInterference(interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None)
701 elif mode == 2:
702 self.removeInterference2()
877 703
878 return 1
704 return self.dataOut
879 705
880 706
881 707 class IncohInt(Operation):
@@ -1031,7 +857,7 class IncohInt(Operation):
1031 857
1032 858 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
1033 859 if n == 1:
1034 return
860 return dataOut
1035 861
1036 862 dataOut.flagNoData = True
1037 863
@@ -7,7 +7,7 from schainpy.utils import log
7 7 from time import time
8 8
9 9
10 @MPDecorator
10
11 11 class VoltageProc(ProcessingUnit):
12 12
13 13 def __init__(self):
@@ -26,8 +26,6 class VoltageProc(ProcessingUnit):
26 26 if self.dataIn.type == 'Voltage':
27 27 self.dataOut.copy(self.dataIn)
28 28
29 # self.dataOut.copy(self.dataIn)
30
31 29 def __updateObjFromAmisrInput(self):
32 30
33 31 self.dataOut.timeZone = self.dataIn.timeZone
@@ -53,23 +51,14 class VoltageProc(ProcessingUnit):
53 51 self.dataOut.beam.codeList = self.dataIn.beam.codeList
54 52 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
55 53 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
56 #
57 # pass#
58 #
59 # def init(self):
60 #
61 #
62 # if self.dataIn.type == 'AMISR':
63 # self.__updateObjFromAmisrInput()
64 #
65 # if self.dataIn.type == 'Voltage':
66 # self.dataOut.copy(self.dataIn)
67 # # No necesita copiar en cada init() los atributos de dataIn
68 # # la copia deberia hacerse por cada nuevo bloque de datos
69
70 def selectChannels(self, channelList):
54
55
56 class selectChannels(Operation):
57
58 def run(self, dataOut, channelList):
71 59
72 60 channelIndexList = []
61 self.dataOut = dataOut
73 62
74 63 for channel in channelList:
75 64 if channel not in self.dataOut.channelList:
@@ -79,6 +68,7 class VoltageProc(ProcessingUnit):
79 68 channelIndexList.append(index)
80 69
81 70 self.selectChannelsByIndex(channelIndexList)
71 return self.dataOut
82 72
83 73 def selectChannelsByIndex(self, channelIndexList):
84 74 """
@@ -101,24 +91,63 class VoltageProc(ProcessingUnit):
101 91
102 92 for channelIndex in channelIndexList:
103 93 if channelIndex not in self.dataOut.channelIndexList:
104 print(channelIndexList)
105 94 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
106 95
107 if self.dataOut.flagDataAsBlock:
108 """
109 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
110 """
111 data = self.dataOut.data[channelIndexList,:,:]
112 else:
113 data = self.dataOut.data[channelIndexList,:]
96 if self.dataOut.type == 'Voltage':
97 if self.dataOut.flagDataAsBlock:
98 """
99 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
100 """
101 data = self.dataOut.data[channelIndexList,:,:]
102 else:
103 data = self.dataOut.data[channelIndexList,:]
104
105 self.dataOut.data = data
106 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
107 self.dataOut.channelList = range(len(channelIndexList))
108 elif self.dataOut.type == 'Spectra':
109 data_spc = self.dataOut.data_spc[channelIndexList, :]
110 data_dc = self.dataOut.data_dc[channelIndexList, :]
111
112 self.dataOut.data_spc = data_spc
113 self.dataOut.data_dc = data_dc
114
115 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
116 self.dataOut.channelList = range(len(channelIndexList))
117 self.__selectPairsByChannel(channelIndexList)
114 118
115 self.dataOut.data = data
116 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
117 self.dataOut.channelList = range(len(channelIndexList))
118
119 119 return 1
120 120
121 def selectHeights(self, minHei=None, maxHei=None):
121 def __selectPairsByChannel(self, channelList=None):
122
123 if channelList == None:
124 return
125
126 pairsIndexListSelected = []
127 for pairIndex in self.dataOut.pairsIndexList:
128 # First pair
129 if self.dataOut.pairsList[pairIndex][0] not in channelList:
130 continue
131 # Second pair
132 if self.dataOut.pairsList[pairIndex][1] not in channelList:
133 continue
134
135 pairsIndexListSelected.append(pairIndex)
136
137 if not pairsIndexListSelected:
138 self.dataOut.data_cspc = None
139 self.dataOut.pairsList = []
140 return
141
142 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
143 self.dataOut.pairsList = [self.dataOut.pairsList[i]
144 for i in pairsIndexListSelected]
145
146 return
147
148 class selectHeights(Operation):
149
150 def run(self, dataOut, minHei=None, maxHei=None):
122 151 """
123 152 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
124 153 minHei <= height <= maxHei
@@ -134,6 +163,8 class VoltageProc(ProcessingUnit):
134 163 1 si el metodo se ejecuto con exito caso contrario devuelve 0
135 164 """
136 165
166 self.dataOut = dataOut
167
137 168 if minHei == None:
138 169 minHei = self.dataOut.heightList[0]
139 170
@@ -165,8 +196,7 class VoltageProc(ProcessingUnit):
165 196
166 197 self.selectHeightsByIndex(minIndex, maxIndex)
167 198
168 return 1
169
199 return self.dataOut
170 200
171 201 def selectHeightsByIndex(self, minIndex, maxIndex):
172 202 """
@@ -185,81 +215,118 class VoltageProc(ProcessingUnit):
185 215 1 si el metodo se ejecuto con exito caso contrario devuelve 0
186 216 """
187 217
188 if (minIndex < 0) or (minIndex > maxIndex):
189 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
218 if self.dataOut.type == 'Voltage':
219 if (minIndex < 0) or (minIndex > maxIndex):
220 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
190 221
191 if (maxIndex >= self.dataOut.nHeights):
192 maxIndex = self.dataOut.nHeights
222 if (maxIndex >= self.dataOut.nHeights):
223 maxIndex = self.dataOut.nHeights
193 224
194 #voltage
195 if self.dataOut.flagDataAsBlock:
196 """
197 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
198 """
199 data = self.dataOut.data[:,:, minIndex:maxIndex]
200 else:
201 data = self.dataOut.data[:, minIndex:maxIndex]
225 #voltage
226 if self.dataOut.flagDataAsBlock:
227 """
228 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
229 """
230 data = self.dataOut.data[:,:, minIndex:maxIndex]
231 else:
232 data = self.dataOut.data[:, minIndex:maxIndex]
233
234 # firstHeight = self.dataOut.heightList[minIndex]
235
236 self.dataOut.data = data
237 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
238
239 if self.dataOut.nHeights <= 1:
240 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
241 elif self.dataOut.type == 'Spectra':
242 if (minIndex < 0) or (minIndex > maxIndex):
243 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
244 minIndex, maxIndex))
202 245
203 # firstHeight = self.dataOut.heightList[minIndex]
246 if (maxIndex >= self.dataOut.nHeights):
247 maxIndex = self.dataOut.nHeights - 1
204 248
205 self.dataOut.data = data
206 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
249 # Spectra
250 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
207 251
208 if self.dataOut.nHeights <= 1:
209 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
252 data_cspc = None
253 if self.dataOut.data_cspc is not None:
254 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
210 255
256 data_dc = None
257 if self.dataOut.data_dc is not None:
258 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
259
260 self.dataOut.data_spc = data_spc
261 self.dataOut.data_cspc = data_cspc
262 self.dataOut.data_dc = data_dc
263
264 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
265
211 266 return 1
212 267
213 268
214 def filterByHeights(self, window):
269 class filterByHeights(Operation):
270
271 def run(self, dataOut, window):
215 272
216 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
273 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
217 274
218 275 if window == None:
219 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
276 window = (dataOut.radarControllerHeaderObj.txA/dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
220 277
221 278 newdelta = deltaHeight * window
222 r = self.dataOut.nHeights % window
223 newheights = (self.dataOut.nHeights-r)/window
279 r = dataOut.nHeights % window
280 newheights = (dataOut.nHeights-r)/window
224 281
225 282 if newheights <= 1:
226 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
283 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(dataOut.nHeights, window))
227 284
228 if self.dataOut.flagDataAsBlock:
285 if dataOut.flagDataAsBlock:
229 286 """
230 287 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
231 288 """
232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
233 buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window)
289 buffer = dataOut.data[:, :, 0:int(dataOut.nHeights-r)]
290 buffer = buffer.reshape(dataOut.nChannels, dataOut.nProfiles, int(dataOut.nHeights/window), window)
234 291 buffer = numpy.sum(buffer,3)
235 292
236 293 else:
237 buffer = self.dataOut.data[:,0:int(self.dataOut.nHeights-r)]
238 buffer = buffer.reshape(self.dataOut.nChannels,int(self.dataOut.nHeights/window),int(window))
294 buffer = dataOut.data[:,0:int(dataOut.nHeights-r)]
295 buffer = buffer.reshape(dataOut.nChannels,int(dataOut.nHeights/window),int(window))
239 296 buffer = numpy.sum(buffer,2)
240 297
241 self.dataOut.data = buffer
242 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
243 self.dataOut.windowOfFilter = window
298 dataOut.data = buffer
299 dataOut.heightList = dataOut.heightList[0] + numpy.arange( newheights )*newdelta
300 dataOut.windowOfFilter = window
301
302 return dataOut
303
304
305 class setH0(Operation):
244 306
245 def setH0(self, h0, deltaHeight = None):
307 def run(self, dataOut, h0, deltaHeight = None):
246 308
247 309 if not deltaHeight:
248 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
310 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
249 311
250 nHeights = self.dataOut.nHeights
312 nHeights = dataOut.nHeights
251 313
252 314 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
253 315
254 self.dataOut.heightList = newHeiRange
316 dataOut.heightList = newHeiRange
317
318 return dataOut
319
320
321 class deFlip(Operation):
255 322
256 def deFlip(self, channelList = []):
323 def run(self, dataOut, channelList = []):
257 324
258 data = self.dataOut.data.copy()
325 data = dataOut.data.copy()
259 326
260 if self.dataOut.flagDataAsBlock:
327 if dataOut.flagDataAsBlock:
261 328 flip = self.flip
262 profileList = list(range(self.dataOut.nProfiles))
329 profileList = list(range(dataOut.nProfiles))
263 330
264 331 if not channelList:
265 332 for thisProfile in profileList:
@@ -267,7 +334,7 class VoltageProc(ProcessingUnit):
267 334 flip *= -1.0
268 335 else:
269 336 for thisChannel in channelList:
270 if thisChannel not in self.dataOut.channelList:
337 if thisChannel not in dataOut.channelList:
271 338 continue
272 339
273 340 for thisProfile in profileList:
@@ -281,41 +348,57 class VoltageProc(ProcessingUnit):
281 348 data[:,:] = data[:,:]*self.flip
282 349 else:
283 350 for thisChannel in channelList:
284 if thisChannel not in self.dataOut.channelList:
351 if thisChannel not in dataOut.channelList:
285 352 continue
286 353
287 354 data[thisChannel,:] = data[thisChannel,:]*self.flip
288 355
289 356 self.flip *= -1.
290 357
291 self.dataOut.data = data
358 dataOut.data = data
359
360 return dataOut
292 361
293 def setRadarFrequency(self, frequency=None):
294 362
295 if frequency != None:
296 self.dataOut.frequency = frequency
363 class setAttribute(Operation):
364 '''
365 Set an arbitrary attribute to dataOut
366 '''
297 367
298 return 1
368 def __init__(self):
369
370 Operation.__init__(self)
371 self._ready = False
299 372
300 def interpolateHeights(self, topLim, botLim):
373 def run(self, dataOut, **kwargs):
374
375 for key, value in kwargs.items():
376 setattr(dataOut, key, value)
377
378 return dataOut
379
380
381 class interpolateHeights(Operation):
382
383 def run(self, dataOut, topLim, botLim):
301 384 #69 al 72 para julia
302 385 #82-84 para meteoros
303 if len(numpy.shape(self.dataOut.data))==2:
304 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
386 if len(numpy.shape(dataOut.data))==2:
387 sampInterp = (dataOut.data[:,botLim-1] + dataOut.data[:,topLim+1])/2
305 388 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
306 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
307 self.dataOut.data[:,botLim:topLim+1] = sampInterp
389 #dataOut.data[:,botLim:limSup+1] = sampInterp
390 dataOut.data[:,botLim:topLim+1] = sampInterp
308 391 else:
309 nHeights = self.dataOut.data.shape[2]
392 nHeights = dataOut.data.shape[2]
310 393 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
311 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
394 y = dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
312 395 f = interpolate.interp1d(x, y, axis = 2)
313 396 xnew = numpy.arange(botLim,topLim+1)
314 397 ynew = f(xnew)
398 dataOut.data[:,:,botLim:topLim+1] = ynew
315 399
316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
400 return dataOut
317 401
318 # import collections
319 402
320 403 class CohInt(Operation):
321 404
@@ -979,8 +1062,6 class ProfileSelector(Operation):
979 1062
980 1063 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
981 1064
982 #return False
983 return dataOut
984 1065
985 1066 class Reshaper(Operation):
986 1067
General Comments 0
You need to be logged in to leave comments. Login now