##// END OF EJS Templates
Review MP changes, three types of operations: self, other and external
Juan C. Espinoza -
r1177:b013e28003fc
parent child
Show More
@@ -1,503 +1,506
1 1 """
2 2 The admin module contains all administrative classes relating to the schain python api.
3 3
4 4 The main role of this module is to send some reports. It contains a
5 5 notification class and a standard error handing class.
6 6
7 7 $Id: admin.py 3966 2015-12-01 14:32:29Z miguel.urco $
8 8 """
9 9 import os
10 10 import sys
11 11 import time
12 12 import traceback
13 13 import smtplib
14 import configparser
14 if sys.version[0] == '3':
15 from configparser import ConfigParser
16 else:
17 from ConfigParser import ConfigParser
15 18 import io
16 19 from threading import Thread
17 20 from multiprocessing import Process
18 21 from email.mime.text import MIMEText
19 22 from email.mime.application import MIMEApplication
20 23 from email.mime.multipart import MIMEMultipart
21 24
22 25 import schainpy
23 26 from schainpy.utils import log
24 27 from schainpy.model.graphics.jroplot_data import popup
25 28
26 29 def get_path():
27 30 '''
28 31 Return schainpy path
29 32 '''
30 33
31 34 try:
32 35 root = __file__
33 36 if os.path.islink(root):
34 37 root = os.path.realpath(root)
35 38
36 39 return os.path.dirname(os.path.abspath(root))
37 40 except:
38 41 log.error('I am sorry, but something is wrong... __file__ not found')
39 42
40 43 class Alarm(Process):
41 44 '''
42 45 modes:
43 46 0 - All
44 47 1 - Send email
45 48 2 - Popup message
46 49 3 - Sound alarm
47 50 4 - Send to alarm system TODO
48 51 '''
49 52
50 53 def __init__(self, modes=[], **kwargs):
51 54 Process.__init__(self)
52 55 self.modes = modes
53 56 self.kwargs = kwargs
54 57
55 58 @staticmethod
56 59 def play_sound():
57 60 sound = os.path.join(get_path(), 'alarm1.oga')
58 61 if os.path.exists(sound):
59 62 for __ in range(2):
60 63 os.system('paplay {}'.format(sound))
61 64 time.sleep(0.5)
62 65 else:
63 66 log.warning('Unable to play alarm, sound file not found', 'ADMIN')
64 67
65 68 @staticmethod
66 69 def send_email(**kwargs):
67 70 notifier = SchainNotify()
68 71 print(kwargs)
69 72 notifier.notify(**kwargs)
70 73
71 74 @staticmethod
72 75 def show_popup(message):
73 76 if isinstance(message, list):
74 77 message = message[-1]
75 78 popup(message)
76 79
77 80 @staticmethod
78 81 def send_alarm():
79 82 pass
80 83
81 84 @staticmethod
82 85 def get_kwargs(kwargs, keys):
83 86 ret = {}
84 87 for key in keys:
85 88 ret[key] = kwargs[key]
86 89 return ret
87 90
88 91 def run(self):
89 92 tasks = {
90 93 1 : self.send_email,
91 94 2 : self.show_popup,
92 95 3 : self.play_sound,
93 96 4 : self.send_alarm,
94 97 }
95 98
96 99 tasks_args = {
97 100 1: ['email', 'message', 'subject', 'subtitle', 'filename'],
98 101 2: ['message'],
99 102 3: [],
100 103 4: [],
101 104 }
102 105 procs = []
103 106 for mode in self.modes:
104 107 if 0 in self.modes:
105 108 for x in tasks:
106 109 t = Thread(target=tasks[x], kwargs=self.get_kwargs(self.kwargs, tasks_args[x]))
107 110 t.start()
108 111 procs.append(t)
109 112 break
110 113 else:
111 114 t = Thread(target=tasks[mode], kwargs=self.get_kwargs(self.kwargs, tasks_args[mode]))
112 115 t.start()
113 116 procs.append(t)
114 117 for t in procs:
115 118 t.join()
116 119
117 120
118 121 class SchainConfigure():
119 122
120 123 __DEFAULT_ADMINISTRATOR_EMAIL = "juan.espinoza@jro.igp.gob.pe"
121 124 __DEFAULT_EMAIL_SERVER = "jro-zimbra.igp.gob.pe"
122 125 __DEFAULT_SENDER_EMAIL = "notifier-schain@jro.igp.gob.pe"
123 126 __DEFAULT_SENDER_PASS = ""
124 127
125 128 __SCHAIN_ADMINISTRATOR_EMAIL = "CONTACT"
126 129 __SCHAIN_EMAIL_SERVER = "MAILSERVER"
127 130 __SCHAIN_SENDER_EMAIL = "MAILSERVER_ACCOUNT"
128 131 __SCHAIN_SENDER_PASS = "MAILSERVER_PASSWORD"
129 132
130 133 def __init__(self, initFile = None):
131 134
132 135 # Set configuration file
133 136 if (initFile == None):
134 137 self.__confFilePath = "/etc/schain.conf"
135 138 else:
136 139 self.__confFilePath = initFile
137 140
138 141 # open configuration file
139 142 try:
140 143 self.__confFile = open(self.__confFilePath, "r")
141 144 except IOError:
142 145 # can't read from file - use all hard-coded values
143 146 self.__initFromHardCode()
144 147 return
145 148
146 149 # create Parser using standard module ConfigParser
147 self.__parser = configparser.ConfigParser()
150 self.__parser = ConfigParser()
148 151
149 152 # read conf file into a StringIO with "[madrigal]\n" section heading prepended
150 153 strConfFile = io.StringIO("[schain]\n" + self.__confFile.read())
151 154
152 155 # parse StringIO configuration file
153 156 self.__parser.readfp(strConfFile)
154 157
155 158 # read information from configuration file
156 159 self.__readConfFile()
157 160
158 161 # close conf file
159 162 self.__confFile.close()
160 163
161 164
162 165 def __initFromHardCode(self):
163 166
164 167 self.__sender_email = self.__DEFAULT_SENDER_EMAIL
165 168 self.__sender_pass = self.__DEFAULT_SENDER_PASS
166 169 self.__admin_email = self.__DEFAULT_ADMINISTRATOR_EMAIL
167 170 self.__email_server = self.__DEFAULT_EMAIL_SERVER
168 171
169 172 def __readConfFile(self):
170 173 """__readConfFile is a private helper function that reads information from the parsed config file.
171 174
172 175 Inputs: None
173 176
174 177 Returns: Void.
175 178
176 179 Affects: Initializes class member variables that are found in the config file.
177 180
178 181 Exceptions: MadrigalError thrown if any key not found.
179 182 """
180 183
181 184 # get the sender email
182 185 try:
183 186 self.__sender_email = self.__parser.get("schain", self.__SCHAIN_SENDER_EMAIL)
184 187 except:
185 188 self.__sender_email = self.__DEFAULT_SENDER_EMAIL
186 189
187 190 # get the sender password
188 191 try:
189 192 self.__sender_pass = self.__parser.get("schain", self.__SCHAIN_SENDER_PASS)
190 193 except:
191 194 self.__sender_pass = self.__DEFAULT_SENDER_PASS
192 195
193 196 # get the administrator email
194 197 try:
195 198 self.__admin_email = self.__parser.get("schain", self.__SCHAIN_ADMINISTRATOR_EMAIL)
196 199 except:
197 200 self.__admin_email = self.__DEFAULT_ADMINISTRATOR_EMAIL
198 201
199 202 # get the server email
200 203 try:
201 204 self.__email_server = self.__parser.get("schain", self.__SCHAIN_EMAIL_SERVER)
202 205 except:
203 206 self.__email_server = self.__DEFAULT_EMAIL_SERVER
204 207
205 208 def getEmailServer(self):
206 209
207 210 return self.__email_server
208 211
209 212 def getSenderEmail(self):
210 213
211 214 return self.__sender_email
212 215
213 216 def getSenderPass(self):
214 217
215 218 return self.__sender_pass
216 219
217 220 def getAdminEmail(self):
218 221
219 222 return self.__admin_email
220 223
221 224 class SchainNotify:
222 225 """SchainNotify is an object used to send messages to an administrator about a Schain software.
223 226
224 227 This object provides functions needed to send messages to an administrator about a Schain , for now
225 228 only sendAlert, which sends an email to the site administrator found is ADMIN_EMAIL
226 229
227 230 Usage example:
228 231
229 232 import schainpy.admin
230 233
231 234 try:
232 235
233 236 adminObj = schainpy.admin.SchainNotify()
234 237 adminObj.sendAlert('This is important!', 'Important Message')
235 238
236 239 except schainpy.admin.SchainError, e:
237 240
238 241 print e.getExceptionStr()
239 242
240 243
241 244 Non-standard Python modules used:
242 245 None
243 246
244 247 Exceptions thrown: None - Note that SchainNotify tries every trick it knows to avoid
245 248 throwing exceptions, since this is the class that will generally be called when there is a problem.
246 249
247 250 Change history:
248 251
249 252 Written by "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Dec. 1, 2015
250 253 """
251 254
252 255 #constants
253 256
254 257 def __init__(self):
255 258 """__init__ initializes SchainNotify by getting some basic information from SchainDB and SchainSite.
256 259
257 260 Note that SchainNotify tries every trick it knows to avoid throwing exceptions, since
258 261 this is the class that will generally be called when there is a problem.
259 262
260 263 Inputs: Existing SchainDB object, by default = None.
261 264
262 265 Returns: void
263 266
264 267 Affects: Initializes self.__binDir.
265 268
266 269 Exceptions: None.
267 270 """
268 271
269 272 # note that the main configuration file is unavailable
270 273 # the best that can be done is send an email to root using localhost mailserver
271 274 confObj = SchainConfigure()
272 275
273 276 self.__emailFromAddress = confObj.getSenderEmail()
274 277 self.__emailPass = confObj.getSenderPass()
275 278 self.__emailToAddress = confObj.getAdminEmail()
276 279 self.__emailServer = confObj.getEmailServer()
277 280
278 281 def sendEmail(self, email_from, email_to, subject='Error running ...', message="", subtitle="", filename="", html_format=True):
279 282
280 283 if not email_to:
281 284 return 0
282 285
283 286 if not self.__emailServer:
284 287 return 0
285 288
286 289 log.success('Sending email to {}...'.format(email_to), 'System')
287 290
288 291 msg = MIMEMultipart()
289 292 msg['Subject'] = subject
290 293 msg['From'] = "(Python SChain API): " + email_from
291 294 msg['Reply-to'] = email_from
292 295 msg['To'] = email_to
293 296
294 297 # That is what u see if dont have an email reader:
295 298 msg.preamble = 'SChainPy'
296 299
297 300 if html_format:
298 301 message = "<h1> %s </h1>" %subject + "<h3>" + subtitle.replace("\n", "</h3><h3>\n") + "</h3>" + message.replace("\n", "<br>\n")
299 302 message = "<html>\n" + message + '</html>'
300 303
301 304 # This is the textual part:
302 305 part = MIMEText(message, "html")
303 306 else:
304 307 message = subject + "\n" + subtitle + "\n" + message
305 308 part = MIMEText(message)
306 309
307 310 msg.attach(part)
308 311
309 312 if filename and os.path.isfile(filename):
310 313 # This is the binary part(The Attachment):
311 314 part = MIMEApplication(open(filename,"rb").read())
312 315 part.add_header('Content-Disposition',
313 316 'attachment',
314 317 filename=os.path.basename(filename))
315 318 msg.attach(part)
316 319
317 320 # Create an instance in SMTP server
318 321 try:
319 322 smtp = smtplib.SMTP(self.__emailServer)
320 323 except:
321 324 log.error('Could not connect to server {}'.format(self.__emailServer), 'System')
322 325 return 0
323 326
324 327 # Start the server:
325 328 # smtp.ehlo()
326 329 if self.__emailPass:
327 330 smtp.login(self.__emailFromAddress, self.__emailPass)
328 331
329 332 # Send the email
330 333 try:
331 334 smtp.sendmail(msg['From'], msg['To'], msg.as_string())
332 335 except:
333 336 log.error('Could not send the email to {}'.format(msg['To']), 'System')
334 337 smtp.quit()
335 338 return 0
336 339
337 340 smtp.quit()
338 341
339 342 log.success('Email sent ', 'System')
340 343
341 344 return 1
342 345
343 346 def sendAlert(self, message, subject = "", subtitle="", filename=""):
344 347 """sendAlert sends an email with the given message and optional title.
345 348
346 349 Inputs: message (string), and optional title (string)
347 350
348 351 Returns: void
349 352
350 353 Affects: none
351 354
352 355 Exceptions: None.
353 356 """
354 357
355 358 if not self.__emailToAddress:
356 359 return 0
357 360
358 361 print("***** Sending alert to %s *****" %self.__emailToAddress)
359 362 # set up message
360 363
361 364 sent=self.sendEmail(email_from=self.__emailFromAddress,
362 365 email_to=self.__emailToAddress,
363 366 subject=subject,
364 367 message=message,
365 368 subtitle=subtitle,
366 369 filename=filename)
367 370
368 371 if not sent:
369 372 return 0
370 373
371 374 return 1
372 375
373 376 def notify(self, email, message, subject = "", subtitle="", filename=""):
374 377 """notify sends an email with the given message and title to email.
375 378
376 379 Inputs: email (string), message (string), and subject (string)
377 380
378 381 Returns: void
379 382
380 383 Affects: none
381 384
382 385 Exceptions: None.
383 386 """
384 387
385 388 if email is None:
386 389 email = self.__emailToAddress
387 390
388 391 self.sendEmail(
389 392 email_from=self.__emailFromAddress,
390 393 email_to=email,
391 394 subject=subject,
392 395 message=message,
393 396 subtitle=subtitle,
394 397 filename=filename
395 398 )
396 399
397 400
398 401 class SchainError(Exception):
399 402 """SchainError is an exception class that is thrown for all known errors using Schain Py lib.
400 403
401 404 Usage example:
402 405
403 406 import sys, traceback
404 407 import schainpy.admin
405 408
406 409 try:
407 410
408 411 test = open('ImportantFile.txt', 'r')
409 412
410 413 except:
411 414
412 415 raise schainpy.admin.SchainError('ImportantFile.txt not opened!',
413 416 traceback.format_exception(sys.exc_info()[0],
414 417 sys.exc_info()[1],
415 418 sys.exc_info()[2]))
416 419 """
417 420
418 421
419 422 def __init__(self, strInterpretation, exceptionList=None):
420 423 """ __init__ gathers the interpretation string along with all information from sys.exc_info().
421 424
422 425 Inputs:
423 426 strIntepretation - A string representing the programmer's interpretation of
424 427 why the exception occurred
425 428
426 429 exceptionList - a list of strings completely describing the exception.
427 430 Generated by traceback.format_exception(sys.exc_info()[0],
428 431 sys.exc_info()[1],
429 432 sys.exc_info()[2])
430 433
431 434 Returns: Void.
432 435
433 436 Affects: Initializes class member variables _strInterp, _strExcList.
434 437
435 438 Exceptions: None.
436 439 """
437 440
438 441 if not exceptionList:
439 442 exceptionList = traceback.format_exception(sys.exc_info()[0],
440 443 sys.exc_info()[1],
441 444 sys.exc_info()[2])
442 445
443 446 self._strInterp = strInterpretation
444 447 self._strExcList = exceptionList
445 448
446 449
447 450 def getExceptionStr(self):
448 451 """ getExceptionStr returns a formatted string ready for printing completely describing the exception.
449 452
450 453 Inputs: None
451 454
452 455 Returns: A formatted string ready for printing completely describing the exception.
453 456
454 457 Affects: None
455 458
456 459 Exceptions: None.
457 460 """
458 461 excStr = ''
459 462 excStr = excStr + self._strInterp + '\n\n'
460 463
461 464 if self._strExcList != None:
462 465 for item in self._strExcList:
463 466 excStr = excStr + str(item) + '\n'
464 467
465 468 return excStr
466 469
467 470 def __str__(self):
468 471
469 472 return(self.getExceptionStr())
470 473
471 474
472 475 def getExceptionHtml(self):
473 476 """ getExceptionHtml returns an Html formatted string completely describing the exception.
474 477
475 478 Inputs: None
476 479
477 480 Returns: A formatted string ready for printing completely describing the exception.
478 481
479 482 Affects: None
480 483
481 484 Exceptions: None.
482 485 """
483 486
484 487 excStr = '<BR>The following Schain Python exception has occurred:\n<BR>'
485 488 excStr = excStr + self._strInterp + '\n<BR>\n'
486 489
487 490 if self._strExcList != None:
488 491 for item in self._strExcList:
489 492 excStr = excStr + str(item) + '\n<BR>'
490 493
491 494 return excStr
492 495
493 496 class SchainWarning(Exception):
494 497 pass
495 498
496 499
497 500 if __name__ == '__main__':
498 501
499 502 test = SchainNotify()
500 503
501 504 test.sendAlert('This is a message from the python module SchainNotify', 'Test from SchainNotify')
502 505
503 506 print('Hopefully message sent - check.') No newline at end of file
@@ -1,1333 +1,1259
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, cpu_count
15
15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21
22 22 ### Temporary imports!!!
23 23 # from schainpy.model import *
24 24 from schainpy.model.io import *
25 25 from schainpy.model.graphics import *
26 26 from schainpy.model.proc.jroproc_base import *
27 27 from schainpy.model.proc.bltrproc_parameters import *
28 28 from schainpy.model.proc.jroproc_spectra import *
29 29 from schainpy.model.proc.jroproc_voltage import *
30 30 from schainpy.model.proc.jroproc_parameters import *
31 31 from schainpy.model.utils.jroutils_publish import *
32 32 from schainpy.utils import log
33 33 ###
34 34
35 35 DTYPES = {
36 36 'Voltage': '.r',
37 37 'Spectra': '.pdata'
38 38 }
39 39
40 40
41 41 def MPProject(project, n=cpu_count()):
42 42 '''
43 43 Project wrapper to run schain in n processes
44 44 '''
45 45
46 46 rconf = project.getReadUnitObj()
47 47 op = rconf.getOperationObj('run')
48 48 dt1 = op.getParameterValue('startDate')
49 49 dt2 = op.getParameterValue('endDate')
50 50 tm1 = op.getParameterValue('startTime')
51 51 tm2 = op.getParameterValue('endTime')
52 52 days = (dt2 - dt1).days
53 53
54 54 for day in range(days + 1):
55 55 skip = 0
56 56 cursor = 0
57 57 processes = []
58 58 dt = dt1 + datetime.timedelta(day)
59 59 dt_str = dt.strftime('%Y/%m/%d')
60 60 reader = JRODataReader()
61 61 paths, files = reader.searchFilesOffLine(path=rconf.path,
62 62 startDate=dt,
63 63 endDate=dt,
64 64 startTime=tm1,
65 65 endTime=tm2,
66 66 ext=DTYPES[rconf.datatype])
67 67 nFiles = len(files)
68 68 if nFiles == 0:
69 69 continue
70 70 skip = int(math.ceil(nFiles / n))
71 71 while nFiles > cursor * skip:
72 72 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
73 73 skip=skip)
74 74 p = project.clone()
75 75 p.start()
76 76 processes.append(p)
77 77 cursor += 1
78 78
79 79 def beforeExit(exctype, value, trace):
80 80 for process in processes:
81 81 process.terminate()
82 82 process.join()
83 83 print(traceback.print_tb(trace))
84 84
85 85 sys.excepthook = beforeExit
86 86
87 87 for process in processes:
88 88 process.join()
89 89 process.terminate()
90 90
91 91 time.sleep(3)
92 92
93 def wait(context):
94
95 time.sleep(1)
96 c = zmq.Context()
97 receiver = c.socket(zmq.SUB)
98 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
99 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
100 log.error('startinggg')
101 msg = receiver.recv_multipart()[1]
102 #log.error(msg)
103 context.terminate()
104
93 105 class ParameterConf():
94 106
95 107 id = None
96 108 name = None
97 109 value = None
98 110 format = None
99 111
100 112 __formated_value = None
101 113
102 114 ELEMENTNAME = 'Parameter'
103 115
104 116 def __init__(self):
105 117
106 118 self.format = 'str'
107 119
108 120 def getElementName(self):
109 121
110 122 return self.ELEMENTNAME
111 123
112 124 def getValue(self):
113 125
114 126 value = self.value
115 127 format = self.format
116 128
117 129 if self.__formated_value != None:
118 130
119 131 return self.__formated_value
120 132
121 133 if format == 'obj':
122 134 return value
123 135
124 136 if format == 'str':
125 137 self.__formated_value = str(value)
126 138 return self.__formated_value
127 139
128 140 if value == '':
129 141 raise ValueError('%s: This parameter value is empty' % self.name)
130 142
131 143 if format == 'list':
132 144 strList = value.split(',')
133 145
134 146 self.__formated_value = strList
135 147
136 148 return self.__formated_value
137 149
138 150 if format == 'intlist':
139 151 '''
140 152 Example:
141 153 value = (0,1,2)
142 154 '''
143 155
144 156 new_value = ast.literal_eval(value)
145 157
146 158 if type(new_value) not in (tuple, list):
147 159 new_value = [int(new_value)]
148 160
149 161 self.__formated_value = new_value
150 162
151 163 return self.__formated_value
152 164
153 165 if format == 'floatlist':
154 166 '''
155 167 Example:
156 168 value = (0.5, 1.4, 2.7)
157 169 '''
158 170
159 171 new_value = ast.literal_eval(value)
160 172
161 173 if type(new_value) not in (tuple, list):
162 174 new_value = [float(new_value)]
163 175
164 176 self.__formated_value = new_value
165 177
166 178 return self.__formated_value
167 179
168 180 if format == 'date':
169 181 strList = value.split('/')
170 182 intList = [int(x) for x in strList]
171 183 date = datetime.date(intList[0], intList[1], intList[2])
172 184
173 185 self.__formated_value = date
174 186
175 187 return self.__formated_value
176 188
177 189 if format == 'time':
178 190 strList = value.split(':')
179 191 intList = [int(x) for x in strList]
180 192 time = datetime.time(intList[0], intList[1], intList[2])
181 193
182 194 self.__formated_value = time
183 195
184 196 return self.__formated_value
185 197
186 198 if format == 'pairslist':
187 199 '''
188 200 Example:
189 201 value = (0,1),(1,2)
190 202 '''
191 203
192 204 new_value = ast.literal_eval(value)
193 205
194 206 if type(new_value) not in (tuple, list):
195 207 raise ValueError('%s has to be a tuple or list of pairs' % value)
196 208
197 209 if type(new_value[0]) not in (tuple, list):
198 210 if len(new_value) != 2:
199 211 raise ValueError('%s has to be a tuple or list of pairs' % value)
200 212 new_value = [new_value]
201 213
202 214 for thisPair in new_value:
203 215 if len(thisPair) != 2:
204 216 raise ValueError('%s has to be a tuple or list of pairs' % value)
205 217
206 218 self.__formated_value = new_value
207 219
208 220 return self.__formated_value
209 221
210 222 if format == 'multilist':
211 223 '''
212 224 Example:
213 225 value = (0,1,2),(3,4,5)
214 226 '''
215 227 multiList = ast.literal_eval(value)
216 228
217 229 if type(multiList[0]) == int:
218 230 multiList = ast.literal_eval('(' + value + ')')
219 231
220 232 self.__formated_value = multiList
221 233
222 234 return self.__formated_value
223 235
224 236 if format == 'bool':
225 237 value = int(value)
226 238
227 239 if format == 'int':
228 240 value = float(value)
229 241
230 242 format_func = eval(format)
231 243
232 244 self.__formated_value = format_func(value)
233 245
234 246 return self.__formated_value
235 247
236 248 def updateId(self, new_id):
237 249
238 250 self.id = str(new_id)
239 251
240 252 def setup(self, id, name, value, format='str'):
241 253 self.id = str(id)
242 254 self.name = name
243 255 if format == 'obj':
244 256 self.value = value
245 257 else:
246 258 self.value = str(value)
247 259 self.format = str.lower(format)
248 260
249 261 self.getValue()
250 262
251 263 return 1
252 264
253 265 def update(self, name, value, format='str'):
254 266
255 267 self.name = name
256 268 self.value = str(value)
257 269 self.format = format
258 270
259 271 def makeXml(self, opElement):
260 272 if self.name not in ('queue',):
261 273 parmElement = SubElement(opElement, self.ELEMENTNAME)
262 274 parmElement.set('id', str(self.id))
263 275 parmElement.set('name', self.name)
264 276 parmElement.set('value', self.value)
265 277 parmElement.set('format', self.format)
266 278
267 279 def readXml(self, parmElement):
268 280
269 281 self.id = parmElement.get('id')
270 282 self.name = parmElement.get('name')
271 283 self.value = parmElement.get('value')
272 284 self.format = str.lower(parmElement.get('format'))
273 285
274 286 # Compatible with old signal chain version
275 287 if self.format == 'int' and self.name == 'idfigure':
276 288 self.name = 'id'
277 289
278 290 def printattr(self):
279 291
280 292 print('Parameter[%s]: name = %s, value = %s, format = %s' % (self.id, self.name, self.value, self.format))
281 293
282 294 class OperationConf():
283 295
284 id = None
285 name = None
286 priority = None
287 type = None
288
289 parmConfObjList = []
290
291 296 ELEMENTNAME = 'Operation'
292 297
293 298 def __init__(self):
294 299
295 300 self.id = '0'
296 301 self.name = None
297 302 self.priority = None
298 303 self.topic = None
299 304
300 305 def __getNewId(self):
301 306
302 307 return int(self.id) * 10 + len(self.parmConfObjList) + 1
303 308
304 309 def getId(self):
305 310 return self.id
306 311
307 312 def updateId(self, new_id):
308 313
309 314 self.id = str(new_id)
310 315
311 316 n = 1
312 317 for parmObj in self.parmConfObjList:
313 318
314 319 idParm = str(int(new_id) * 10 + n)
315 320 parmObj.updateId(idParm)
316 321
317 322 n += 1
318 323
319 324 def getElementName(self):
320 325
321 326 return self.ELEMENTNAME
322 327
323 328 def getParameterObjList(self):
324 329
325 330 return self.parmConfObjList
326 331
327 332 def getParameterObj(self, parameterName):
328 333
329 334 for parmConfObj in self.parmConfObjList:
330 335
331 336 if parmConfObj.name != parameterName:
332 337 continue
333 338
334 339 return parmConfObj
335 340
336 341 return None
337 342
338 343 def getParameterObjfromValue(self, parameterValue):
339 344
340 345 for parmConfObj in self.parmConfObjList:
341 346
342 347 if parmConfObj.getValue() != parameterValue:
343 348 continue
344 349
345 350 return parmConfObj.getValue()
346 351
347 352 return None
348 353
349 354 def getParameterValue(self, parameterName):
350 355
351 356 parameterObj = self.getParameterObj(parameterName)
352 357
353 358 # if not parameterObj:
354 359 # return None
355 360
356 361 value = parameterObj.getValue()
357 362
358 363 return value
359 364
360 365 def getKwargs(self):
361 366
362 367 kwargs = {}
363 368
364 369 for parmConfObj in self.parmConfObjList:
365 370 if self.name == 'run' and parmConfObj.name == 'datatype':
366 371 continue
367 372
368 373 kwargs[parmConfObj.name] = parmConfObj.getValue()
369 374
370 375 return kwargs
371 376
372 def setup(self, id, name, priority, type):
377 def setup(self, id, name, priority, type, project_id):
373 378
374 379 self.id = str(id)
380 self.project_id = project_id
375 381 self.name = name
376 382 self.type = type
377 383 self.priority = priority
378 384 self.parmConfObjList = []
379 385
380 386 def removeParameters(self):
381 387
382 388 for obj in self.parmConfObjList:
383 389 del obj
384 390
385 391 self.parmConfObjList = []
386 392
387 393 def addParameter(self, name, value, format='str'):
388 394
389 395 if value is None:
390 396 return None
391 397 id = self.__getNewId()
392 398
393 399 parmConfObj = ParameterConf()
394 400 if not parmConfObj.setup(id, name, value, format):
395 401 return None
396 402
397 403 self.parmConfObjList.append(parmConfObj)
398 404
399 405 return parmConfObj
400 406
401 407 def changeParameter(self, name, value, format='str'):
402 408
403 409 parmConfObj = self.getParameterObj(name)
404 410 parmConfObj.update(name, value, format)
405 411
406 412 return parmConfObj
407 413
408 414 def makeXml(self, procUnitElement):
409 415
410 416 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
411 417 opElement.set('id', str(self.id))
412 418 opElement.set('name', self.name)
413 419 opElement.set('type', self.type)
414 420 opElement.set('priority', str(self.priority))
415 421
416 422 for parmConfObj in self.parmConfObjList:
417 423 parmConfObj.makeXml(opElement)
418 424
419 425 def readXml(self, opElement):
420 426
421 427 self.id = opElement.get('id')
422 428 self.name = opElement.get('name')
423 429 self.type = opElement.get('type')
424 430 self.priority = opElement.get('priority')
425 431
426 432 # Compatible with old signal chain version
427 433 # Use of 'run' method instead 'init'
428 434 if self.type == 'self' and self.name == 'init':
429 435 self.name = 'run'
430 436
431 437 self.parmConfObjList = []
432 438
433 439 parmElementList = opElement.iter(ParameterConf().getElementName())
434 440
435 441 for parmElement in parmElementList:
436 442 parmConfObj = ParameterConf()
437 443 parmConfObj.readXml(parmElement)
438 444
439 445 # Compatible with old signal chain version
440 446 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
441 447 if self.type != 'self' and self.name == 'Plot':
442 448 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
443 449 self.name = parmConfObj.value
444 450 continue
445 451
446 452 self.parmConfObjList.append(parmConfObj)
447 453
448 454 def printattr(self):
449 455
450 456 print('%s[%s]: name = %s, type = %s, priority = %s' % (self.ELEMENTNAME,
451 457 self.id,
452 458 self.name,
453 459 self.type,
454 460 self.priority))
455 461
456 462 for parmConfObj in self.parmConfObjList:
457 463 parmConfObj.printattr()
458 464
459 465 def createObject(self):
460 466
461 467 className = eval(self.name)
462 kwargs = self.getKwargs()
463
464 opObj = className(self.id, **kwargs)
465 468
469 if self.type == 'other':
470 opObj = className()
471 elif self.type == 'external':
472 kwargs = self.getKwargs()
473 opObj = className(self.id, self.project_id, **kwargs)
466 474 opObj.start()
467 475
468 print(' Operation created')
469
470 476 return opObj
471 477
472 478 class ProcUnitConf():
473 479
474 id = None
475 name = None
476 datatype = None
477 inputId = None
478 parentId = None
479
480 opConfObjList = []
481
482 procUnitObj = None
483 opObjList = []
484
485 480 ELEMENTNAME = 'ProcUnit'
486 481
487 482 def __init__(self):
488 483
489 484 self.id = None
490 485 self.datatype = None
491 486 self.name = None
492 487 self.inputId = None
493
494 488 self.opConfObjList = []
495
496 489 self.procUnitObj = None
497 490 self.opObjDict = {}
498 491
499 492 def __getPriority(self):
500 493
501 494 return len(self.opConfObjList) + 1
502 495
503 496 def __getNewId(self):
504 497
505 498 return int(self.id) * 10 + len(self.opConfObjList) + 1
506 499
507 500 def getElementName(self):
508 501
509 502 return self.ELEMENTNAME
510 503
511 504 def getId(self):
512 505
513 506 return self.id
514 507
515 def updateId(self, new_id, parentId=parentId):
508 def updateId(self, new_id):
516 509 '''
517 510 new_id = int(parentId) * 10 + (int(self.id) % 10)
518 511 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
519 512
520 513 # If this proc unit has not inputs
521 514 #if self.inputId == '0':
522 515 #new_inputId = 0
523 516
524 517 n = 1
525 518 for opConfObj in self.opConfObjList:
526 519
527 520 idOp = str(int(new_id) * 10 + n)
528 521 opConfObj.updateId(idOp)
529 522
530 523 n += 1
531 524
532 525 self.parentId = str(parentId)
533 526 self.id = str(new_id)
534 527 #self.inputId = str(new_inputId)
535 528 '''
536 529 n = 1
530
537 531 def getInputId(self):
538 532
539 533 return self.inputId
540 534
541 535 def getOperationObjList(self):
542 536
543 537 return self.opConfObjList
544 538
545 539 def getOperationObj(self, name=None):
546 540
547 541 for opConfObj in self.opConfObjList:
548 542
549 543 if opConfObj.name != name:
550 544 continue
551 545
552 546 return opConfObj
553 547
554 548 return None
555 549
556 550 def getOpObjfromParamValue(self, value=None):
557 551
558 552 for opConfObj in self.opConfObjList:
559 553 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
560 554 continue
561 555 return opConfObj
562 556 return None
563 557
564 558 def getProcUnitObj(self):
565 559
566 560 return self.procUnitObj
567 561
568 def setup(self, id, name, datatype, inputId, parentId=None):
562 def setup(self, project_id, id, name, datatype, inputId):
569 563 '''
570 564 id sera el topico a publicar
571 565 inputId sera el topico a subscribirse
572 566 '''
573 567
574 568 # Compatible with old signal chain version
575 569 if datatype == None and name == None:
576 570 raise ValueError('datatype or name should be defined')
577 571
578 572 #Definir una condicion para inputId cuando sea 0
579 573
580 574 if name == None:
581 575 if 'Proc' in datatype:
582 576 name = datatype
583 577 else:
584 578 name = '%sProc' % (datatype)
585 579
586 580 if datatype == None:
587 581 datatype = name.replace('Proc', '')
588 582
589 583 self.id = str(id)
584 self.project_id = project_id
590 585 self.name = name
591 586 self.datatype = datatype
592 587 self.inputId = inputId
593 self.parentId = parentId
594 588 self.opConfObjList = []
595 589
596 590 self.addOperation(name='run', optype='self')
597 591
598 592 def removeOperations(self):
599 593
600 594 for obj in self.opConfObjList:
601 595 del obj
602 596
603 597 self.opConfObjList = []
604 598 self.addOperation(name='run')
605 599
606 600 def addParameter(self, **kwargs):
607 601 '''
608 602 Add parameters to 'run' operation
609 603 '''
610 604 opObj = self.opConfObjList[0]
611 605
612 606 opObj.addParameter(**kwargs)
613 607
614 608 return opObj
615 609
616 610 def addOperation(self, name, optype = 'self'):
617 611 '''
618 612 Actualizacion - > proceso comunicacion
619 613 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
620 614 definir el tipoc de socket o comunicacion ipc++
621 615
622 616 '''
623 617
624 618 id = self.__getNewId()
625 619 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
626
627 620 opConfObj = OperationConf()
628 opConfObj.setup(id, name=name, priority=priority, type=optype)
629
621 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id)
630 622 self.opConfObjList.append(opConfObj)
631 623
632 624 return opConfObj
633 625
634 626 def makeXml(self, projectElement):
635 627
636 628 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
637 629 procUnitElement.set('id', str(self.id))
638 630 procUnitElement.set('name', self.name)
639 631 procUnitElement.set('datatype', self.datatype)
640 632 procUnitElement.set('inputId', str(self.inputId))
641 633
642 634 for opConfObj in self.opConfObjList:
643 635 opConfObj.makeXml(procUnitElement)
644 636
645 637 def readXml(self, upElement):
646 638
647 639 self.id = upElement.get('id')
648 640 self.name = upElement.get('name')
649 641 self.datatype = upElement.get('datatype')
650 642 self.inputId = upElement.get('inputId')
651 643
652 644 if self.ELEMENTNAME == 'ReadUnit':
653 645 self.datatype = self.datatype.replace('Reader', '')
654 646
655 647 if self.ELEMENTNAME == 'ProcUnit':
656 648 self.datatype = self.datatype.replace('Proc', '')
657 649
658 650 if self.inputId == 'None':
659 651 self.inputId = '0'
660 652
661 653 self.opConfObjList = []
662 654
663 655 opElementList = upElement.iter(OperationConf().getElementName())
664 656
665 657 for opElement in opElementList:
666 658 opConfObj = OperationConf()
667 659 opConfObj.readXml(opElement)
668 660 self.opConfObjList.append(opConfObj)
669 661
670 662 def printattr(self):
671 663
672 664 print('%s[%s]: name = %s, datatype = %s, inputId = %s' % (self.ELEMENTNAME,
673 665 self.id,
674 666 self.name,
675 667 self.datatype,
676 668 self.inputId))
677 669
678 670 for opConfObj in self.opConfObjList:
679 671 opConfObj.printattr()
680 672
681 673 def getKwargs(self):
682 674
683 675 opObj = self.opConfObjList[0]
684 676 kwargs = opObj.getKwargs()
685 677
686 678 return kwargs
687 679
688 def createObjects(self, dictUnits):
680 def createObjects(self):
689 681 '''
690 682 Instancia de unidades de procesamiento.
691
692 683 '''
693 684 className = eval(self.name)
694 685 kwargs = self.getKwargs()
695 procUnitObj = className(self.id, self.inputId, dictUnits, **kwargs) # necesitan saber su id y su entrada por fines de ipc
696
686 procUnitObj = className(self.id, self.inputId, self.project_id, **kwargs) # necesitan saber su id y su entrada por fines de ipc
687 log.success('creating process...', self.name)
697 688
698 689 for opConfObj in self.opConfObjList:
699 690
700 if opConfObj.type == 'self' and self.name == 'run':
691 if opConfObj.type == 'self' and opConfObj.name == 'run':
701 692 continue
702 693 elif opConfObj.type == 'self':
703 procUnitObj.addOperationKwargs(
704 opConfObj.id, **opConfObj.getKwargs())
705 continue
706 print("Creating operation process:", opConfObj.name, "for", self.name)
694 opObj = getattr(procUnitObj, opConfObj.name)
695 else:
707 696 opObj = opConfObj.createObject()
708 697
698 log.success('creating operation: {}, type:{}'.format(
699 opConfObj.name,
700 opConfObj.type), self.name)
709 701
710 #self.opObjDict[opConfObj.id] = opObj.name
711
712 procUnitObj.addOperation(opConfObj.name, opConfObj.id)
702 procUnitObj.addOperation(opConfObj, opObj)
713 703
714 704 procUnitObj.start()
715
716 705 self.procUnitObj = procUnitObj
717 706
718
719 return procUnitObj
720
721 def run(self):
722
723 is_ok = True
724 """
725 for opConfObj in self.opConfObjList:
726
727 kwargs = {}
728 for parmConfObj in opConfObj.getParameterObjList():
729 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
730 continue
731
732 kwargs[parmConfObj.name] = parmConfObj.getValue()
733
734 sts = self.procUnitObj.call(opType=opConfObj.type,
735 opName=opConfObj.name,
736 opId=opConfObj.id)
737
738 is_ok = is_ok or sts
739
740 """
741 return is_ok
742
743
744 707 def close(self):
745 708
746 709 for opConfObj in self.opConfObjList:
747 710 if opConfObj.type == 'self':
748 711 continue
749 712
750 713 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
751 714 opObj.close()
752 715
753 716 self.procUnitObj.close()
754 717
755 718 return
756 719
757 720
758 721 class ReadUnitConf(ProcUnitConf):
759 722
760 path = None
761 startDate = None
762 endDate = None
763 startTime = None
764 endTime = None
765
766 723 ELEMENTNAME = 'ReadUnit'
767 724
768 725 def __init__(self):
769 726
770 727 self.id = None
771 728 self.datatype = None
772 729 self.name = None
773 730 self.inputId = None
774
775 self.parentId = None
776
777 731 self.opConfObjList = []
778 self.opObjList = []
779 732
780 733 def getElementName(self):
781 734
782 735 return self.ELEMENTNAME
783 736
784 def setup(self, id, name, datatype, path='', startDate='', endDate='',
785 startTime='', endTime='', parentId=None, server=None, **kwargs):
737 def setup(self, project_id, id, name, datatype, path='', startDate='', endDate='',
738 startTime='', endTime='', server=None, **kwargs):
786 739
787 740
788 741 '''
789 742 *****el id del proceso sera el Topico
790 743
791 744 Adicion de {topic}, si no esta presente -> error
792 745 kwargs deben ser trasmitidos en la instanciacion
793 746
794 747 '''
795 748
796 749 # Compatible with old signal chain version
797 750 if datatype == None and name == None:
798 751 raise ValueError('datatype or name should be defined')
799 752 if name == None:
800 753 if 'Reader' in datatype:
801 754 name = datatype
802 755 datatype = name.replace('Reader','')
803 756 else:
804 757 name = '{}Reader'.format(datatype)
805 758 if datatype == None:
806 759 if 'Reader' in name:
807 760 datatype = name.replace('Reader','')
808 761 else:
809 762 datatype = name
810 763 name = '{}Reader'.format(name)
811 764
812 765 self.id = id
766 self.project_id = project_id
813 767 self.name = name
814 768 self.datatype = datatype
815 769 if path != '':
816 770 self.path = os.path.abspath(path)
817 771 self.startDate = startDate
818 772 self.endDate = endDate
819 773 self.startTime = startTime
820 774 self.endTime = endTime
821 self.inputId = '0'
822 self.parentId = parentId
823 775 self.server = server
824 776 self.addRunOperation(**kwargs)
825 777
826 778 def update(self, **kwargs):
827 779
828 780 if 'datatype' in kwargs:
829 781 datatype = kwargs.pop('datatype')
830 782 if 'Reader' in datatype:
831 783 self.name = datatype
832 784 else:
833 785 self.name = '%sReader' % (datatype)
834 786 self.datatype = self.name.replace('Reader', '')
835 787
836 788 attrs = ('path', 'startDate', 'endDate',
837 'startTime', 'endTime', 'parentId')
789 'startTime', 'endTime')
838 790
839 791 for attr in attrs:
840 792 if attr in kwargs:
841 793 setattr(self, attr, kwargs.pop(attr))
842 794
843 self.inputId = '0'
844 795 self.updateRunOperation(**kwargs)
845 796
846 797 def removeOperations(self):
847 798
848 799 for obj in self.opConfObjList:
849 800 del obj
850 801
851 802 self.opConfObjList = []
852 803
853 804 def addRunOperation(self, **kwargs):
854 805
855 806 opObj = self.addOperation(name='run', optype='self')
856 807
857 808 if self.server is None:
858 809 opObj.addParameter(
859 810 name='datatype', value=self.datatype, format='str')
860 811 opObj.addParameter(name='path', value=self.path, format='str')
861 812 opObj.addParameter(
862 813 name='startDate', value=self.startDate, format='date')
863 814 opObj.addParameter(
864 815 name='endDate', value=self.endDate, format='date')
865 816 opObj.addParameter(
866 817 name='startTime', value=self.startTime, format='time')
867 818 opObj.addParameter(
868 819 name='endTime', value=self.endTime, format='time')
869 820
870 821 for key, value in list(kwargs.items()):
871 822 opObj.addParameter(name=key, value=value,
872 823 format=type(value).__name__)
873 824 else:
874 825 opObj.addParameter(name='server', value=self.server, format='str')
875 826
876 827 return opObj
877 828
878 829 def updateRunOperation(self, **kwargs):
879 830
880 831 opObj = self.getOperationObj(name='run')
881 832 opObj.removeParameters()
882 833
883 834 opObj.addParameter(name='datatype', value=self.datatype, format='str')
884 835 opObj.addParameter(name='path', value=self.path, format='str')
885 836 opObj.addParameter(
886 837 name='startDate', value=self.startDate, format='date')
887 838 opObj.addParameter(name='endDate', value=self.endDate, format='date')
888 839 opObj.addParameter(
889 840 name='startTime', value=self.startTime, format='time')
890 841 opObj.addParameter(name='endTime', value=self.endTime, format='time')
891 842
892 843 for key, value in list(kwargs.items()):
893 844 opObj.addParameter(name=key, value=value,
894 845 format=type(value).__name__)
895 846
896 847 return opObj
897 848
898 849 def readXml(self, upElement):
899 850
900 851 self.id = upElement.get('id')
901 852 self.name = upElement.get('name')
902 853 self.datatype = upElement.get('datatype')
903 self.inputId = upElement.get('inputId')
904 854
905 855 if self.ELEMENTNAME == 'ReadUnit':
906 856 self.datatype = self.datatype.replace('Reader', '')
907 857
908 if self.inputId == 'None':
909 self.inputId = '0'
910
911 858 self.opConfObjList = []
912 859
913 860 opElementList = upElement.iter(OperationConf().getElementName())
914 861
915 862 for opElement in opElementList:
916 863 opConfObj = OperationConf()
917 864 opConfObj.readXml(opElement)
918 865 self.opConfObjList.append(opConfObj)
919 866
920 867 if opConfObj.name == 'run':
921 868 self.path = opConfObj.getParameterValue('path')
922 869 self.startDate = opConfObj.getParameterValue('startDate')
923 870 self.endDate = opConfObj.getParameterValue('endDate')
924 871 self.startTime = opConfObj.getParameterValue('startTime')
925 872 self.endTime = opConfObj.getParameterValue('endTime')
926 873
927 874
928 875 class Project(Process):
929 876
930 id = None
931 description = None
932 filename = None
933
934 procUnitConfObjDict = None
935
936 877 ELEMENTNAME = 'Project'
937 878
938
939
940 879 def __init__(self):
941 880
942 881 Process.__init__(self)
943 882 self.id = None
883 self.filename = None
944 884 self.description = None
945 885 self.email = None
946 886 self.alarm = None
947 887 self.procUnitConfObjDict = {}
948 888
949 889 def __getNewId(self):
950 890
951 891 idList = list(self.procUnitConfObjDict.keys())
952
953 892 id = int(self.id) * 10
954 893
955 894 while True:
956 895 id += 1
957 896
958 897 if str(id) in idList:
959 898 continue
960 899
961 900 break
962 901
963 902 return str(id)
964 903
965 904 def getElementName(self):
966 905
967 906 return self.ELEMENTNAME
968 907
969 908 def getId(self):
970 909
971 910 return self.id
972 911
973 912 def updateId(self, new_id):
974 913
975 914 self.id = str(new_id)
976 915
977 916 keyList = list(self.procUnitConfObjDict.keys())
978 917 keyList.sort()
979 918
980 919 n = 1
981 920 newProcUnitConfObjDict = {}
982 921
983 922 for procKey in keyList:
984 923
985 924 procUnitConfObj = self.procUnitConfObjDict[procKey]
986 925 idProcUnit = str(int(self.id) * 10 + n)
987 procUnitConfObj.updateId(idProcUnit, parentId=self.id)
926 procUnitConfObj.updateId(idProcUnit)
988 927 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
989 928 n += 1
990 929
991 930 self.procUnitConfObjDict = newProcUnitConfObjDict
992 931
993 def setup(self, id, name='', description='', email=None, alarm=[]):
932 def setup(self, id=1, name='', description='', email=None, alarm=[]):
994 933
995 934 print(' ')
996 935 print('*' * 60)
997 936 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
998 937 print('*' * 60)
999 938 print("* Python " + python_version() + " *")
1000 939 print('*' * 19)
1001 940 print(' ')
1002 941 self.id = str(id)
1003 942 self.description = description
1004 943 self.email = email
1005 944 self.alarm = alarm
1006 945
1007 946 def update(self, **kwargs):
1008 947
1009 948 for key, value in list(kwargs.items()):
1010 949 setattr(self, key, value)
1011 950
1012 951 def clone(self):
1013 952
1014 953 p = Project()
1015 954 p.procUnitConfObjDict = self.procUnitConfObjDict
1016 955 return p
1017 956
1018 957 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
1019 958
1020 959 '''
1021 960 Actualizacion:
1022 961 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
1023 962
1024 963 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
1025 964
1026 965 '''
1027 966
1028 967 if id is None:
1029 968 idReadUnit = self.__getNewId()
1030 969 else:
1031 970 idReadUnit = str(id)
1032 971
1033 972 readUnitConfObj = ReadUnitConf()
1034 readUnitConfObj.setup(idReadUnit, name, datatype,
1035 parentId=self.id, **kwargs)
1036
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, **kwargs)
1037 974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1038 975
1039 976 return readUnitConfObj
1040 977
1041 978 def addProcUnit(self, inputId='0', datatype=None, name=None):
1042 979
1043 980 '''
1044 981 Actualizacion:
1045 982 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
1046 983 Deberia reemplazar a "inputId"
1047 984
1048 985 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
1049 986 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
1050 987
1051 988 '''
1052 989
1053 990 idProcUnit = self.__getNewId() #Topico para subscripcion
1054
1055 991 procUnitConfObj = ProcUnitConf()
1056 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, #topic_read, topic_write,
1057 parentId=self.id)
1058
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId) #topic_read, topic_write,
1059 993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1060 994
1061 995 return procUnitConfObj
1062 996
1063 997 def removeProcUnit(self, id):
1064 998
1065 999 if id in list(self.procUnitConfObjDict.keys()):
1066 1000 self.procUnitConfObjDict.pop(id)
1067 1001
1068 1002 def getReadUnitId(self):
1069 1003
1070 1004 readUnitConfObj = self.getReadUnitObj()
1071 1005
1072 1006 return readUnitConfObj.id
1073 1007
1074 1008 def getReadUnitObj(self):
1075 1009
1076 1010 for obj in list(self.procUnitConfObjDict.values()):
1077 1011 if obj.getElementName() == 'ReadUnit':
1078 1012 return obj
1079 1013
1080 1014 return None
1081 1015
1082 1016 def getProcUnitObj(self, id=None, name=None):
1083 1017
1084 1018 if id != None:
1085 1019 return self.procUnitConfObjDict[id]
1086 1020
1087 1021 if name != None:
1088 1022 return self.getProcUnitObjByName(name)
1089 1023
1090 1024 return None
1091 1025
1092 1026 def getProcUnitObjByName(self, name):
1093 1027
1094 1028 for obj in list(self.procUnitConfObjDict.values()):
1095 1029 if obj.name == name:
1096 1030 return obj
1097 1031
1098 1032 return None
1099 1033
1100 1034 def procUnitItems(self):
1101 1035
1102 1036 return list(self.procUnitConfObjDict.items())
1103 1037
1104 1038 def makeXml(self):
1105 1039
1106 1040 projectElement = Element('Project')
1107 1041 projectElement.set('id', str(self.id))
1108 1042 projectElement.set('name', self.name)
1109 1043 projectElement.set('description', self.description)
1110 1044
1111 1045 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1112 1046 procUnitConfObj.makeXml(projectElement)
1113 1047
1114 1048 self.projectElement = projectElement
1115 1049
1116 1050 def writeXml(self, filename=None):
1117 1051
1118 1052 if filename == None:
1119 1053 if self.filename:
1120 1054 filename = self.filename
1121 1055 else:
1122 1056 filename = 'schain.xml'
1123 1057
1124 1058 if not filename:
1125 1059 print('filename has not been defined. Use setFilename(filename) for do it.')
1126 1060 return 0
1127 1061
1128 1062 abs_file = os.path.abspath(filename)
1129 1063
1130 1064 if not os.access(os.path.dirname(abs_file), os.W_OK):
1131 1065 print('No write permission on %s' % os.path.dirname(abs_file))
1132 1066 return 0
1133 1067
1134 1068 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1135 1069 print('File %s already exists and it could not be overwriten' % abs_file)
1136 1070 return 0
1137 1071
1138 1072 self.makeXml()
1139 1073
1140 1074 ElementTree(self.projectElement).write(abs_file, method='xml')
1141 1075
1142 1076 self.filename = abs_file
1143 1077
1144 1078 return 1
1145 1079
1146 1080 def readXml(self, filename=None):
1147 1081
1148 1082 if not filename:
1149 1083 print('filename is not defined')
1150 1084 return 0
1151 1085
1152 1086 abs_file = os.path.abspath(filename)
1153 1087
1154 1088 if not os.path.isfile(abs_file):
1155 1089 print('%s file does not exist' % abs_file)
1156 1090 return 0
1157 1091
1158 1092 self.projectElement = None
1159 1093 self.procUnitConfObjDict = {}
1160 1094
1161 1095 try:
1162 1096 self.projectElement = ElementTree().parse(abs_file)
1163 1097 except:
1164 1098 print('Error reading %s, verify file format' % filename)
1165 1099 return 0
1166 1100
1167 1101 self.project = self.projectElement.tag
1168 1102
1169 1103 self.id = self.projectElement.get('id')
1170 1104 self.name = self.projectElement.get('name')
1171 1105 self.description = self.projectElement.get('description')
1172 1106
1173 1107 readUnitElementList = self.projectElement.iter(
1174 1108 ReadUnitConf().getElementName())
1175 1109
1176 1110 for readUnitElement in readUnitElementList:
1177 1111 readUnitConfObj = ReadUnitConf()
1178 1112 readUnitConfObj.readXml(readUnitElement)
1179
1180 if readUnitConfObj.parentId == None:
1181 readUnitConfObj.parentId = self.id
1182
1183 1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1184 1114
1185 1115 procUnitElementList = self.projectElement.iter(
1186 1116 ProcUnitConf().getElementName())
1187 1117
1188 1118 for procUnitElement in procUnitElementList:
1189 1119 procUnitConfObj = ProcUnitConf()
1190 1120 procUnitConfObj.readXml(procUnitElement)
1191
1192 if procUnitConfObj.parentId == None:
1193 procUnitConfObj.parentId = self.id
1194
1195 1121 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1196 1122
1197 1123 self.filename = abs_file
1198 1124
1199 1125 return 1
1200 1126
1201 def printattr(self):
1127 def __str__(self):
1202 1128
1203 1129 print('Project[%s]: name = %s, description = %s' % (self.id,
1204 1130 self.name,
1205 1131 self.description))
1206 1132
1207 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1208 procUnitConfObj.printattr()
1133 for procUnitConfObj in self.procUnitConfObjDict.values():
1134 print(procUnitConfObj)
1209 1135
1210 1136 def createObjects(self):
1211 1137
1212 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1213 print("Creating process:", procUnitConfObj.name)
1214 procUnitConfObj.createObjects(self.procUnitConfObjDict)
1215
1216
1217 print('All processes were created')
1138 for procUnitConfObj in self.procUnitConfObjDict.values():
1139 procUnitConfObj.createObjects()
1218 1140
1219 1141 def __handleError(self, procUnitConfObj, modes=None, stdout=True):
1220 1142
1221 1143 import socket
1222 1144
1223 1145 if modes is None:
1224 1146 modes = self.alarm
1225 1147
1226 1148 if not self.alarm:
1227 1149 modes = []
1228 1150
1229 1151 err = traceback.format_exception(sys.exc_info()[0],
1230 1152 sys.exc_info()[1],
1231 1153 sys.exc_info()[2])
1232 1154
1233 1155 log.error('{}'.format(err[-1]), procUnitConfObj.name)
1234 1156
1235 1157 message = ''.join(err)
1236 1158
1237 1159 if stdout:
1238 1160 sys.stderr.write(message)
1239 1161
1240 1162 subject = 'SChain v%s: Error running %s\n' % (
1241 1163 schainpy.__version__, procUnitConfObj.name)
1242 1164
1243 1165 subtitle = '%s: %s\n' % (
1244 1166 procUnitConfObj.getElementName(), procUnitConfObj.name)
1245 1167 subtitle += 'Hostname: %s\n' % socket.gethostbyname(
1246 1168 socket.gethostname())
1247 1169 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1248 1170 subtitle += 'Configuration file: %s\n' % self.filename
1249 1171 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1250 1172
1251 1173 readUnitConfObj = self.getReadUnitObj()
1252 1174 if readUnitConfObj:
1253 1175 subtitle += '\nInput parameters:\n'
1254 1176 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1255 1177 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1256 1178 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1257 1179 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1258 1180 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1259 1181 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1260 1182
1261 1183 a = Alarm(
1262 1184 modes=modes,
1263 1185 email=self.email,
1264 1186 message=message,
1265 1187 subject=subject,
1266 1188 subtitle=subtitle,
1267 1189 filename=self.filename
1268 1190 )
1269 1191
1270 1192 return a
1271 1193
1272 1194 def isPaused(self):
1273 1195 return 0
1274 1196
1275 1197 def isStopped(self):
1276 1198 return 0
1277 1199
1278 1200 def runController(self):
1279 1201 '''
1280 1202 returns 0 when this process has been stopped, 1 otherwise
1281 1203 '''
1282 1204
1283 1205 if self.isPaused():
1284 1206 print('Process suspended')
1285 1207
1286 1208 while True:
1287 1209 time.sleep(0.1)
1288 1210
1289 1211 if not self.isPaused():
1290 1212 break
1291 1213
1292 1214 if self.isStopped():
1293 1215 break
1294 1216
1295 1217 print('Process reinitialized')
1296 1218
1297 1219 if self.isStopped():
1298 1220 print('Process stopped')
1299 1221 return 0
1300 1222
1301 1223 return 1
1302 1224
1303 1225 def setFilename(self, filename):
1304 1226
1305 1227 self.filename = filename
1306 1228
1307 1229 def setProxyCom(self):
1308 1230
1309 ctx = zmq.Context()
1310 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
1311 xsub = ctx.socket(zmq.XSUB)
1312 xsub.bind('ipc:///tmp/socketTmp/a')
1313 xpub = ctx.socket(zmq.XPUB)
1314 xpub.bind('ipc:///tmp/socketTmp/b')
1315
1316 print("Controller Ready: Processes and proxy created")
1317 zmq.proxy(xsub, xpub)
1231 if not os.path.exists('/tmp/schain'):
1232 os.mkdir('/tmp/schain')
1318 1233
1234 self.ctx = zmq.Context()
1235 xpub = self.ctx.socket(zmq.XPUB)
1236 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1237 xsub = self.ctx.socket(zmq.XSUB)
1238 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1319 1239
1240 try:
1241 zmq.proxy(xpub, xsub)
1242 except zmq.ContextTerminated:
1243 xpub.close()
1244 xsub.close()
1320 1245
1321 1246 def run(self):
1322 1247
1323 log.success('Starting {}'.format(self.name), tag='')
1248 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1324 1249 self.start_time = time.time()
1325 1250 self.createObjects()
1251 # t = Thread(target=wait, args=(self.ctx, ))
1252 # t.start()
1326 1253 self.setProxyCom()
1327 1254
1328 1255 # Iniciar todos los procesos .start(), monitoreo de procesos. ELiminar lo de abajo
1329 1256
1330 # Closing every process
1331 1257 log.success('{} finished (time: {}s)'.format(
1332 1258 self.name,
1333 1259 time.time()-self.start_time))
@@ -1,1826 +1,1826
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import traceback
16 16 import zmq
17 17
18 18 try:
19 19 from gevent import sleep
20 20 except:
21 21 from time import sleep
22 22
23 23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
24 24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
25 25 from schainpy.utils import log
26 26 import schainpy.admin
27 27
28 28 LOCALTIME = True
29 29
30 30
31 31 def isNumber(cad):
32 32 """
33 33 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
34 34
35 35 Excepciones:
36 36 Si un determinado string no puede ser convertido a numero
37 37 Input:
38 38 str, string al cual se le analiza para determinar si convertible a un numero o no
39 39
40 40 Return:
41 41 True : si el string es uno numerico
42 42 False : no es un string numerico
43 43 """
44 44 try:
45 45 float(cad)
46 46 return True
47 47 except:
48 48 return False
49 49
50 50
51 51 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
52 52 """
53 53 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
54 54
55 55 Inputs:
56 56 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
57 57
58 58 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
59 59 segundos contados desde 01/01/1970.
60 60 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
61 61 segundos contados desde 01/01/1970.
62 62
63 63 Return:
64 64 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
65 65 fecha especificado, de lo contrario retorna False.
66 66
67 67 Excepciones:
68 68 Si el archivo no existe o no puede ser abierto
69 69 Si la cabecera no puede ser leida.
70 70
71 71 """
72 72 basicHeaderObj = BasicHeader(LOCALTIME)
73 73
74 74 try:
75 75 fp = open(filename, 'rb')
76 76 except IOError:
77 77 print("The file %s can't be opened" % (filename))
78 78 return 0
79 79
80 80 sts = basicHeaderObj.read(fp)
81 81 fp.close()
82 82
83 83 if not(sts):
84 84 print("Skipping the file %s because it has not a valid header" % (filename))
85 85 return 0
86 86
87 87 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
88 88 return 0
89 89
90 90 return 1
91 91
92 92
93 93 def isTimeInRange(thisTime, startTime, endTime):
94 94 if endTime >= startTime:
95 95 if (thisTime < startTime) or (thisTime > endTime):
96 96 return 0
97 97 return 1
98 98 else:
99 99 if (thisTime < startTime) and (thisTime > endTime):
100 100 return 0
101 101 return 1
102 102
103 103
104 104 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
105 105 """
106 106 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
107 107
108 108 Inputs:
109 109 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
110 110
111 111 startDate : fecha inicial del rango seleccionado en formato datetime.date
112 112
113 113 endDate : fecha final del rango seleccionado en formato datetime.date
114 114
115 115 startTime : tiempo inicial del rango seleccionado en formato datetime.time
116 116
117 117 endTime : tiempo final del rango seleccionado en formato datetime.time
118 118
119 119 Return:
120 120 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
121 121 fecha especificado, de lo contrario retorna False.
122 122
123 123 Excepciones:
124 124 Si el archivo no existe o no puede ser abierto
125 125 Si la cabecera no puede ser leida.
126 126
127 127 """
128 128
129 129 try:
130 130 fp = open(filename, 'rb')
131 131 except IOError:
132 132 print("The file %s can't be opened" % (filename))
133 133 return None
134 134
135 135 firstBasicHeaderObj = BasicHeader(LOCALTIME)
136 136 systemHeaderObj = SystemHeader()
137 137 radarControllerHeaderObj = RadarControllerHeader()
138 138 processingHeaderObj = ProcessingHeader()
139 139
140 140 lastBasicHeaderObj = BasicHeader(LOCALTIME)
141 141
142 142 sts = firstBasicHeaderObj.read(fp)
143 143
144 144 if not(sts):
145 145 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
146 146 return None
147 147
148 148 if not systemHeaderObj.read(fp):
149 149 return None
150 150
151 151 if not radarControllerHeaderObj.read(fp):
152 152 return None
153 153
154 154 if not processingHeaderObj.read(fp):
155 155 return None
156 156
157 157 filesize = os.path.getsize(filename)
158 158
159 159 offset = processingHeaderObj.blockSize + 24 # header size
160 160
161 161 if filesize <= offset:
162 162 print("[Reading] %s: This file has not enough data" % filename)
163 163 return None
164 164
165 165 fp.seek(-offset, 2)
166 166
167 167 sts = lastBasicHeaderObj.read(fp)
168 168
169 169 fp.close()
170 170
171 171 thisDatetime = lastBasicHeaderObj.datatime
172 172 thisTime_last_block = thisDatetime.time()
173 173
174 174 thisDatetime = firstBasicHeaderObj.datatime
175 175 thisDate = thisDatetime.date()
176 176 thisTime_first_block = thisDatetime.time()
177 177
178 178 # General case
179 179 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
180 180 #-----------o----------------------------o-----------
181 181 # startTime endTime
182 182
183 183 if endTime >= startTime:
184 184 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
185 185 return None
186 186
187 187 return thisDatetime
188 188
189 189 # If endTime < startTime then endTime belongs to the next day
190 190
191 191 #<<<<<<<<<<<o o>>>>>>>>>>>
192 192 #-----------o----------------------------o-----------
193 193 # endTime startTime
194 194
195 195 if (thisDate == startDate) and (thisTime_last_block < startTime):
196 196 return None
197 197
198 198 if (thisDate == endDate) and (thisTime_first_block > endTime):
199 199 return None
200 200
201 201 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
202 202 return None
203 203
204 204 return thisDatetime
205 205
206 206
207 207 def isFolderInDateRange(folder, startDate=None, endDate=None):
208 208 """
209 209 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
210 210
211 211 Inputs:
212 212 folder : nombre completo del directorio.
213 213 Su formato deberia ser "/path_root/?YYYYDDD"
214 214
215 215 siendo:
216 216 YYYY : Anio (ejemplo 2015)
217 217 DDD : Dia del anio (ejemplo 305)
218 218
219 219 startDate : fecha inicial del rango seleccionado en formato datetime.date
220 220
221 221 endDate : fecha final del rango seleccionado en formato datetime.date
222 222
223 223 Return:
224 224 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
225 225 fecha especificado, de lo contrario retorna False.
226 226 Excepciones:
227 227 Si el directorio no tiene el formato adecuado
228 228 """
229 229
230 230 basename = os.path.basename(folder)
231 231
232 232 if not isRadarFolder(basename):
233 233 print("The folder %s has not the rigth format" % folder)
234 234 return 0
235 235
236 236 if startDate and endDate:
237 237 thisDate = getDateFromRadarFolder(basename)
238 238
239 239 if thisDate < startDate:
240 240 return 0
241 241
242 242 if thisDate > endDate:
243 243 return 0
244 244
245 245 return 1
246 246
247 247
248 248 def isFileInDateRange(filename, startDate=None, endDate=None):
249 249 """
250 250 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
251 251
252 252 Inputs:
253 253 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
254 254
255 255 Su formato deberia ser "?YYYYDDDsss"
256 256
257 257 siendo:
258 258 YYYY : Anio (ejemplo 2015)
259 259 DDD : Dia del anio (ejemplo 305)
260 260 sss : set
261 261
262 262 startDate : fecha inicial del rango seleccionado en formato datetime.date
263 263
264 264 endDate : fecha final del rango seleccionado en formato datetime.date
265 265
266 266 Return:
267 267 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
268 268 fecha especificado, de lo contrario retorna False.
269 269 Excepciones:
270 270 Si el archivo no tiene el formato adecuado
271 271 """
272 272
273 273 basename = os.path.basename(filename)
274 274
275 275 if not isRadarFile(basename):
276 276 print("The filename %s has not the rigth format" % filename)
277 277 return 0
278 278
279 279 if startDate and endDate:
280 280 thisDate = getDateFromRadarFile(basename)
281 281
282 282 if thisDate < startDate:
283 283 return 0
284 284
285 285 if thisDate > endDate:
286 286 return 0
287 287
288 288 return 1
289 289
290 290
291 291 def getFileFromSet(path, ext, set):
292 292 validFilelist = []
293 293 fileList = os.listdir(path)
294 294
295 295 # 0 1234 567 89A BCDE
296 296 # H YYYY DDD SSS .ext
297 297
298 298 for thisFile in fileList:
299 299 try:
300 300 year = int(thisFile[1:5])
301 301 doy = int(thisFile[5:8])
302 302 except:
303 303 continue
304 304
305 305 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
306 306 continue
307 307
308 308 validFilelist.append(thisFile)
309 309
310 310 myfile = fnmatch.filter(
311 311 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
312 312
313 313 if len(myfile) != 0:
314 314 return myfile[0]
315 315 else:
316 316 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
317 317 print('the filename %s does not exist' % filename)
318 318 print('...going to the last file: ')
319 319
320 320 if validFilelist:
321 321 validFilelist = sorted(validFilelist, key=str.lower)
322 322 return validFilelist[-1]
323 323
324 324 return None
325 325
326 326
327 327 def getlastFileFromPath(path, ext):
328 328 """
329 329 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
330 330 al final de la depuracion devuelve el ultimo file de la lista que quedo.
331 331
332 332 Input:
333 333 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
334 334 ext : extension de los files contenidos en una carpeta
335 335
336 336 Return:
337 337 El ultimo file de una determinada carpeta, no se considera el path.
338 338 """
339 339 validFilelist = []
340 340 fileList = os.listdir(path)
341 341
342 342 # 0 1234 567 89A BCDE
343 343 # H YYYY DDD SSS .ext
344 344
345 345 for thisFile in fileList:
346 346
347 347 year = thisFile[1:5]
348 348 if not isNumber(year):
349 349 continue
350 350
351 351 doy = thisFile[5:8]
352 352 if not isNumber(doy):
353 353 continue
354 354
355 355 year = int(year)
356 356 doy = int(doy)
357 357
358 358 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
359 359 continue
360 360
361 361 validFilelist.append(thisFile)
362 362
363 363 if validFilelist:
364 364 validFilelist = sorted(validFilelist, key=str.lower)
365 365 return validFilelist[-1]
366 366
367 367 return None
368 368
369 369
370 370 def checkForRealPath(path, foldercounter, year, doy, set, ext):
371 371 """
372 372 Por ser Linux Case Sensitive entonces checkForRealPath encuentra el nombre correcto de un path,
373 373 Prueba por varias combinaciones de nombres entre mayusculas y minusculas para determinar
374 374 el path exacto de un determinado file.
375 375
376 376 Example :
377 377 nombre correcto del file es .../.../D2009307/P2009307367.ext
378 378
379 379 Entonces la funcion prueba con las siguientes combinaciones
380 380 .../.../y2009307367.ext
381 381 .../.../Y2009307367.ext
382 382 .../.../x2009307/y2009307367.ext
383 383 .../.../x2009307/Y2009307367.ext
384 384 .../.../X2009307/y2009307367.ext
385 385 .../.../X2009307/Y2009307367.ext
386 386 siendo para este caso, la ultima combinacion de letras, identica al file buscado
387 387
388 388 Return:
389 389 Si encuentra la cobinacion adecuada devuelve el path completo y el nombre del file
390 390 caso contrario devuelve None como path y el la ultima combinacion de nombre en mayusculas
391 391 para el filename
392 392 """
393 393 fullfilename = None
394 394 find_flag = False
395 395 filename = None
396 396
397 397 prefixDirList = [None, 'd', 'D']
398 398 if ext.lower() == ".r": # voltage
399 399 prefixFileList = ['d', 'D']
400 400 elif ext.lower() == ".pdata": # spectra
401 401 prefixFileList = ['p', 'P']
402 402 else:
403 403 return None, filename
404 404
405 405 # barrido por las combinaciones posibles
406 406 for prefixDir in prefixDirList:
407 407 thispath = path
408 408 if prefixDir != None:
409 409 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
410 410 if foldercounter == 0:
411 411 thispath = os.path.join(path, "%s%04d%03d" %
412 412 (prefixDir, year, doy))
413 413 else:
414 414 thispath = os.path.join(path, "%s%04d%03d_%02d" % (
415 415 prefixDir, year, doy, foldercounter))
416 416 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
417 417 # formo el nombre del file xYYYYDDDSSS.ext
418 418 filename = "%s%04d%03d%03d%s" % (prefixFile, year, doy, set, ext)
419 419 fullfilename = os.path.join(
420 420 thispath, filename) # formo el path completo
421 421
422 422 if os.path.exists(fullfilename): # verifico que exista
423 423 find_flag = True
424 424 break
425 425 if find_flag:
426 426 break
427 427
428 428 if not(find_flag):
429 429 return None, filename
430 430
431 431 return fullfilename, filename
432 432
433 433
434 434 def isRadarFolder(folder):
435 435 try:
436 436 year = int(folder[1:5])
437 437 doy = int(folder[5:8])
438 438 except:
439 439 return 0
440 440
441 441 return 1
442 442
443 443
444 444 def isRadarFile(file):
445 445 try:
446 446 year = int(file[1:5])
447 447 doy = int(file[5:8])
448 448 set = int(file[8:11])
449 449 except:
450 450 return 0
451 451
452 452 return 1
453 453
454 454
455 455 def getDateFromRadarFile(file):
456 456 try:
457 457 year = int(file[1:5])
458 458 doy = int(file[5:8])
459 459 set = int(file[8:11])
460 460 except:
461 461 return None
462 462
463 463 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
464 464 return thisDate
465 465
466 466
467 467 def getDateFromRadarFolder(folder):
468 468 try:
469 469 year = int(folder[1:5])
470 470 doy = int(folder[5:8])
471 471 except:
472 472 return None
473 473
474 474 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
475 475 return thisDate
476 476
477 477
478 478 class JRODataIO:
479 479
480 480 c = 3E8
481 481
482 482 isConfig = False
483 483
484 484 basicHeaderObj = None
485 485
486 486 systemHeaderObj = None
487 487
488 488 radarControllerHeaderObj = None
489 489
490 490 processingHeaderObj = None
491 491
492 492 dtype = None
493 493
494 494 pathList = []
495 495
496 496 filenameList = []
497 497
498 498 filename = None
499 499
500 500 ext = None
501 501
502 502 flagIsNewFile = 1
503 503
504 504 flagDiscontinuousBlock = 0
505 505
506 506 flagIsNewBlock = 0
507 507
508 508 fp = None
509 509
510 510 firstHeaderSize = 0
511 511
512 512 basicHeaderSize = 24
513 513
514 514 versionFile = 1103
515 515
516 516 fileSize = None
517 517
518 518 # ippSeconds = None
519 519
520 520 fileSizeByHeader = None
521 521
522 522 fileIndex = None
523 523
524 524 profileIndex = None
525 525
526 526 blockIndex = None
527 527
528 528 nTotalBlocks = None
529 529
530 530 maxTimeStep = 30
531 531
532 532 lastUTTime = None
533 533
534 534 datablock = None
535 535
536 536 dataOut = None
537 537
538 538 blocksize = None
539 539
540 540 getByBlock = False
541 541
542 542 def __init__(self):
543 543
544 544 raise NotImplementedError
545 545
546 546 def run(self):
547 547
548 548 raise NotImplementedError
549 549
550 550 def getDtypeWidth(self):
551 551
552 552 dtype_index = get_dtype_index(self.dtype)
553 553 dtype_width = get_dtype_width(dtype_index)
554 554
555 555 return dtype_width
556 556
557 557 def getAllowedArgs(self):
558 558 if hasattr(self, '__attrs__'):
559 559 return self.__attrs__
560 560 else:
561 561 return inspect.getargspec(self.run).args
562 562
563 563
564 564 class JRODataReader(JRODataIO):
565 565
566 566 online = 0
567 567
568 568 realtime = 0
569 569
570 570 nReadBlocks = 0
571 571
572 572 delay = 10 # number of seconds waiting a new file
573 573
574 574 nTries = 3 # quantity tries
575 575
576 576 nFiles = 3 # number of files for searching
577 577
578 578 path = None
579 579
580 580 foldercounter = 0
581 581
582 582 flagNoMoreFiles = 0
583 583
584 584 datetimeList = []
585 585
586 586 __isFirstTimeOnline = 1
587 587
588 588 __printInfo = True
589 589
590 590 profileIndex = None
591 591
592 592 nTxs = 1
593 593
594 594 txIndex = None
595 595
596 596 # Added--------------------
597 597
598 598 selBlocksize = None
599 599
600 600 selBlocktime = None
601 601
602 602 def __init__(self):
603 603 """
604 604 This class is used to find data files
605 605
606 606 Example:
607 607 reader = JRODataReader()
608 608 fileList = reader.findDataFiles()
609 609
610 610 """
611 611 pass
612 612
613 613 def createObjByDefault(self):
614 614 """
615 615
616 616 """
617 617 raise NotImplementedError
618 618
619 619 def getBlockDimension(self):
620 620
621 621 raise NotImplementedError
622 622
623 623 def searchFilesOffLine(self,
624 624 path,
625 625 startDate=None,
626 626 endDate=None,
627 627 startTime=datetime.time(0, 0, 0),
628 628 endTime=datetime.time(23, 59, 59),
629 629 set=None,
630 630 expLabel='',
631 631 ext='.r',
632 632 cursor=None,
633 633 skip=None,
634 634 walk=True):
635 635
636 636 self.filenameList = []
637 637 self.datetimeList = []
638 638
639 639 pathList = []
640 640
641 641 dateList, pathList = self.findDatafiles(
642 642 path, startDate, endDate, expLabel, ext, walk, include_path=True)
643 643
644 644 if dateList == []:
645 645 return [], []
646 646
647 647 if len(dateList) > 1:
648 648 print("[Reading] Data found for date range [%s - %s]: total days = %d" % (startDate, endDate, len(dateList)))
649 649 else:
650 650 print("[Reading] Data found for date range [%s - %s]: date = %s" % (startDate, endDate, dateList[0]))
651 651
652 652 filenameList = []
653 653 datetimeList = []
654 654
655 655 for thisPath in pathList:
656 656
657 657 fileList = glob.glob1(thisPath, "*%s" % ext)
658 658 fileList.sort()
659 659
660 660 for file in fileList:
661 661
662 662 filename = os.path.join(thisPath, file)
663 663
664 664 if not isFileInDateRange(filename, startDate, endDate):
665 665 continue
666 666
667 667 thisDatetime = isFileInTimeRange(
668 668 filename, startDate, endDate, startTime, endTime)
669 669
670 670 if not(thisDatetime):
671 671 continue
672 672
673 673 filenameList.append(filename)
674 674 datetimeList.append(thisDatetime)
675 675
676 676 if cursor is not None and skip is not None:
677 677 filenameList = filenameList[cursor * skip:cursor * skip + skip]
678 678 datetimeList = datetimeList[cursor * skip:cursor * skip + skip]
679 679
680 680 if not(filenameList):
681 681 print("[Reading] Time range selected invalid [%s - %s]: No *%s files in %s)" % (startTime, endTime, ext, path))
682 682 return [], []
683 683
684 684 print("[Reading] %d file(s) was(were) found in time range: %s - %s" % (len(filenameList), startTime, endTime))
685 685
686 686 # for i in range(len(filenameList)):
687 687 # print "[Reading] %s -> [%s]" %(filenameList[i], datetimeList[i].ctime())
688 688
689 689 self.filenameList = filenameList
690 690 self.datetimeList = datetimeList
691 691
692 692 return pathList, filenameList
693 693
694 694 def __searchFilesOnLine(self, path, expLabel="", ext=None, walk=True, set=None):
695 695 """
696 696 Busca el ultimo archivo de la ultima carpeta (determinada o no por startDateTime) y
697 697 devuelve el archivo encontrado ademas de otros datos.
698 698
699 699 Input:
700 700 path : carpeta donde estan contenidos los files que contiene data
701 701
702 702 expLabel : Nombre del subexperimento (subfolder)
703 703
704 704 ext : extension de los files
705 705
706 706 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
707 707
708 708 Return:
709 709 directory : eL directorio donde esta el file encontrado
710 710 filename : el ultimo file de una determinada carpeta
711 711 year : el anho
712 712 doy : el numero de dia del anho
713 713 set : el set del archivo
714 714
715 715
716 716 """
717 717 if not os.path.isdir(path):
718 718 return None, None, None, None, None, None
719 719
720 720 dirList = []
721 721
722 722 if not walk:
723 723 fullpath = path
724 724 foldercounter = 0
725 725 else:
726 726 # Filtra solo los directorios
727 727 for thisPath in os.listdir(path):
728 728 if not os.path.isdir(os.path.join(path, thisPath)):
729 729 continue
730 730 if not isRadarFolder(thisPath):
731 731 continue
732 732
733 733 dirList.append(thisPath)
734 734
735 735 if not(dirList):
736 736 return None, None, None, None, None, None
737 737
738 738 dirList = sorted(dirList, key=str.lower)
739 739
740 740 doypath = dirList[-1]
741 741 foldercounter = int(doypath.split('_')[1]) if len(
742 742 doypath.split('_')) > 1 else 0
743 743 fullpath = os.path.join(path, doypath, expLabel)
744 744
745 745 print("[Reading] %s folder was found: " % (fullpath))
746 746
747 747 if set == None:
748 748 filename = getlastFileFromPath(fullpath, ext)
749 749 else:
750 750 filename = getFileFromSet(fullpath, ext, set)
751 751
752 752 if not(filename):
753 753 return None, None, None, None, None, None
754 754
755 755 print("[Reading] %s file was found" % (filename))
756 756
757 757 if not(self.__verifyFile(os.path.join(fullpath, filename))):
758 758 return None, None, None, None, None, None
759 759
760 760 year = int(filename[1:5])
761 761 doy = int(filename[5:8])
762 762 set = int(filename[8:11])
763 763
764 764 return fullpath, foldercounter, filename, year, doy, set
765 765
766 766 def __setNextFileOffline(self):
767 767
768 768 idFile = self.fileIndex
769 769
770 770 while (True):
771 771 idFile += 1
772 772 if not(idFile < len(self.filenameList)):
773 773 self.flagNoMoreFiles = 1
774 774 # print "[Reading] No more Files"
775 775 return 0
776 776
777 777 filename = self.filenameList[idFile]
778 778
779 779 if not(self.__verifyFile(filename)):
780 780 continue
781 781
782 782 fileSize = os.path.getsize(filename)
783 783 fp = open(filename, 'rb')
784 784 break
785 785
786 786 self.flagIsNewFile = 1
787 787 self.fileIndex = idFile
788 788 self.filename = filename
789 789 self.fileSize = fileSize
790 790 self.fp = fp
791 791
792 792 # print "[Reading] Setting the file: %s"%self.filename
793 793
794 794 return 1
795 795
796 796 def __setNextFileOnline(self):
797 797 """
798 798 Busca el siguiente file que tenga suficiente data para ser leida, dentro de un folder especifico, si
799 799 no encuentra un file valido espera un tiempo determinado y luego busca en los posibles n files
800 800 siguientes.
801 801
802 802 Affected:
803 803 self.flagIsNewFile
804 804 self.filename
805 805 self.fileSize
806 806 self.fp
807 807 self.set
808 808 self.flagNoMoreFiles
809 809
810 810 Return:
811 811 0 : si luego de una busqueda del siguiente file valido este no pudo ser encontrado
812 812 1 : si el file fue abierto con exito y esta listo a ser leido
813 813
814 814 Excepciones:
815 815 Si un determinado file no puede ser abierto
816 816 """
817 817 nFiles = 0
818 818 fileOk_flag = False
819 819 firstTime_flag = True
820 820
821 821 self.set += 1
822 822
823 823 if self.set > 999:
824 824 self.set = 0
825 825 self.foldercounter += 1
826 826
827 827 # busca el 1er file disponible
828 828 fullfilename, filename = checkForRealPath(
829 829 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
830 830 if fullfilename:
831 831 if self.__verifyFile(fullfilename, False):
832 832 fileOk_flag = True
833 833
834 834 # si no encuentra un file entonces espera y vuelve a buscar
835 835 if not(fileOk_flag):
836 836 # busco en los siguientes self.nFiles+1 files posibles
837 837 for nFiles in range(self.nFiles + 1):
838 838
839 839 if firstTime_flag: # si es la 1era vez entonces hace el for self.nTries veces
840 840 tries = self.nTries
841 841 else:
842 842 tries = 1 # si no es la 1era vez entonces solo lo hace una vez
843 843
844 844 for nTries in range(tries):
845 845 if firstTime_flag:
846 846 print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1))
847 847 sleep(self.delay)
848 848 else:
849 849 print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext))
850 850
851 851 fullfilename, filename = checkForRealPath(
852 852 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
853 853 if fullfilename:
854 854 if self.__verifyFile(fullfilename):
855 855 fileOk_flag = True
856 856 break
857 857
858 858 if fileOk_flag:
859 859 break
860 860
861 861 firstTime_flag = False
862 862
863 863 log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename))
864 864 self.set += 1
865 865
866 866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
867 867 if nFiles == (self.nFiles - 1):
868 868 self.set = 0
869 869 self.doy += 1
870 870 self.foldercounter = 0
871 871
872 872 if fileOk_flag:
873 873 self.fileSize = os.path.getsize(fullfilename)
874 874 self.filename = fullfilename
875 875 self.flagIsNewFile = 1
876 876 if self.fp != None:
877 877 self.fp.close()
878 878 self.fp = open(fullfilename, 'rb')
879 879 self.flagNoMoreFiles = 0
880 880 # print '[Reading] Setting the file: %s' % fullfilename
881 881 else:
882 882 self.fileSize = 0
883 883 self.filename = None
884 884 self.flagIsNewFile = 0
885 885 self.fp = None
886 886 self.flagNoMoreFiles = 1
887 887 # print '[Reading] No more files to read'
888 888
889 889 return fileOk_flag
890 890
891 891 def setNextFile(self):
892 892 if self.fp != None:
893 893 self.fp.close()
894 894
895 895 if self.online:
896 896 newFile = self.__setNextFileOnline()
897 897 else:
898 898 newFile = self.__setNextFileOffline()
899 899
900 900 if not(newFile):
901 raise(schainpy.admin.SchainWarning('No more files to read'))
901 self.dataOut.error = (-1, 'No more files to read')
902 902 return 0
903 903
904 904 if self.verbose:
905 905 print('[Reading] Setting the file: %s' % self.filename)
906 906
907 907 self.__readFirstHeader()
908 908 self.nReadBlocks = 0
909 909 return 1
910 910
911 911 def __waitNewBlock(self):
912 912 """
913 913 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
914 914
915 915 Si el modo de lectura es OffLine siempre retorn 0
916 916 """
917 917 if not self.online:
918 918 return 0
919 919
920 920 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
921 921 return 0
922 922
923 923 currentPointer = self.fp.tell()
924 924
925 925 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
926 926
927 927 for nTries in range(self.nTries):
928 928
929 929 self.fp.close()
930 930 self.fp = open(self.filename, 'rb')
931 931 self.fp.seek(currentPointer)
932 932
933 933 self.fileSize = os.path.getsize(self.filename)
934 934 currentSize = self.fileSize - currentPointer
935 935
936 936 if (currentSize >= neededSize):
937 937 self.basicHeaderObj.read(self.fp)
938 938 return 1
939 939
940 940 if self.fileSize == self.fileSizeByHeader:
941 941 # self.flagEoF = True
942 942 return 0
943 943
944 944 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
945 945 sleep(self.delay)
946 946
947 947 return 0
948 948
949 949 def waitDataBlock(self, pointer_location):
950 950
951 951 currentPointer = pointer_location
952 952
953 953 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
954 954
955 955 for nTries in range(self.nTries):
956 956 self.fp.close()
957 957 self.fp = open(self.filename, 'rb')
958 958 self.fp.seek(currentPointer)
959 959
960 960 self.fileSize = os.path.getsize(self.filename)
961 961 currentSize = self.fileSize - currentPointer
962 962
963 963 if (currentSize >= neededSize):
964 964 return 1
965 965
966 966 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
967 967 sleep(self.delay)
968 968
969 969 return 0
970 970
971 971 def __jumpToLastBlock(self):
972 972
973 973 if not(self.__isFirstTimeOnline):
974 974 return
975 975
976 976 csize = self.fileSize - self.fp.tell()
977 977 blocksize = self.processingHeaderObj.blockSize
978 978
979 979 # salta el primer bloque de datos
980 980 if csize > self.processingHeaderObj.blockSize:
981 981 self.fp.seek(self.fp.tell() + blocksize)
982 982 else:
983 983 return
984 984
985 985 csize = self.fileSize - self.fp.tell()
986 986 neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
987 987 while True:
988 988
989 989 if self.fp.tell() < self.fileSize:
990 990 self.fp.seek(self.fp.tell() + neededsize)
991 991 else:
992 992 self.fp.seek(self.fp.tell() - neededsize)
993 993 break
994 994
995 995 # csize = self.fileSize - self.fp.tell()
996 996 # neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
997 997 # factor = int(csize/neededsize)
998 998 # if factor > 0:
999 999 # self.fp.seek(self.fp.tell() + factor*neededsize)
1000 1000
1001 1001 self.flagIsNewFile = 0
1002 1002 self.__isFirstTimeOnline = 0
1003 1003
1004 1004 def __setNewBlock(self):
1005 1005 # if self.server is None:
1006 1006 if self.fp == None:
1007 1007 return 0
1008 1008
1009 1009 # if self.online:
1010 1010 # self.__jumpToLastBlock()
1011 1011
1012 1012 if self.flagIsNewFile:
1013 1013 self.lastUTTime = self.basicHeaderObj.utc
1014 1014 return 1
1015 1015
1016 1016 if self.realtime:
1017 1017 self.flagDiscontinuousBlock = 1
1018 1018 if not(self.setNextFile()):
1019 1019 return 0
1020 1020 else:
1021 1021 return 1
1022 1022 # if self.server is None:
1023 1023 currentSize = self.fileSize - self.fp.tell()
1024 1024 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
1025 1025 if (currentSize >= neededSize):
1026 1026 self.basicHeaderObj.read(self.fp)
1027 1027 self.lastUTTime = self.basicHeaderObj.utc
1028 1028 return 1
1029 1029 # else:
1030 1030 # self.basicHeaderObj.read(self.zHeader)
1031 1031 # self.lastUTTime = self.basicHeaderObj.utc
1032 1032 # return 1
1033 1033 if self.__waitNewBlock():
1034 1034 self.lastUTTime = self.basicHeaderObj.utc
1035 1035 return 1
1036 1036 # if self.server is None:
1037 1037 if not(self.setNextFile()):
1038 1038 return 0
1039 1039
1040 1040 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
1041 1041 self.lastUTTime = self.basicHeaderObj.utc
1042 1042
1043 1043 self.flagDiscontinuousBlock = 0
1044 1044
1045 1045 if deltaTime > self.maxTimeStep:
1046 1046 self.flagDiscontinuousBlock = 1
1047 1047
1048 1048 return 1
1049 1049
1050 1050 def readNextBlock(self):
1051 1051
1052 1052 # Skip block out of startTime and endTime
1053 1053 while True:
1054 1054 if not(self.__setNewBlock()):
1055 raise(schainpy.admin.SchainWarning('No more files'))
1055 self.dataOut.error = (-1, 'No more files to read')
1056 1056 return 0
1057 1057
1058 1058 if not(self.readBlock()):
1059 1059 return 0
1060 1060
1061 1061 self.getBasicHeader()
1062 1062 if (self.dataOut.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or (self.dataOut.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
1063 1063 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
1064 1064 self.processingHeaderObj.dataBlocksPerFile,
1065 1065 self.dataOut.datatime.ctime()))
1066 1066 continue
1067 1067
1068 1068 break
1069 1069
1070 1070 if self.verbose:
1071 1071 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
1072 1072 self.processingHeaderObj.dataBlocksPerFile,
1073 1073 self.dataOut.datatime.ctime()))
1074 1074 return 1
1075 1075
1076 1076 def __readFirstHeader(self):
1077 1077
1078 1078 self.basicHeaderObj.read(self.fp)
1079 1079 self.systemHeaderObj.read(self.fp)
1080 1080 self.radarControllerHeaderObj.read(self.fp)
1081 1081 self.processingHeaderObj.read(self.fp)
1082 1082
1083 1083 self.firstHeaderSize = self.basicHeaderObj.size
1084 1084
1085 1085 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
1086 1086 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
1087 1087 if datatype == 0:
1088 1088 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
1089 1089 elif datatype == 1:
1090 1090 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
1091 1091 elif datatype == 2:
1092 1092 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
1093 1093 elif datatype == 3:
1094 1094 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
1095 1095 elif datatype == 4:
1096 1096 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
1097 1097 elif datatype == 5:
1098 1098 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
1099 1099 else:
1100 1100 raise ValueError('Data type was not defined')
1101 1101
1102 1102 self.dtype = datatype_str
1103 1103 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
1104 1104 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
1105 1105 self.firstHeaderSize + self.basicHeaderSize * \
1106 1106 (self.processingHeaderObj.dataBlocksPerFile - 1)
1107 1107 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
1108 1108 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
1109 1109 self.getBlockDimension()
1110 1110
1111 1111 def __verifyFile(self, filename, msgFlag=True):
1112 1112
1113 1113 msg = None
1114 1114
1115 1115 try:
1116 1116 fp = open(filename, 'rb')
1117 1117 except IOError:
1118 1118
1119 1119 if msgFlag:
1120 1120 print("[Reading] File %s can't be opened" % (filename))
1121 1121
1122 1122 return False
1123 1123
1124 1124 currentPosition = fp.tell()
1125 1125 neededSize = self.processingHeaderObj.blockSize + self.firstHeaderSize
1126 1126
1127 1127 if neededSize == 0:
1128 1128 basicHeaderObj = BasicHeader(LOCALTIME)
1129 1129 systemHeaderObj = SystemHeader()
1130 1130 radarControllerHeaderObj = RadarControllerHeader()
1131 1131 processingHeaderObj = ProcessingHeader()
1132 1132
1133 1133 if not(basicHeaderObj.read(fp)):
1134 1134 fp.close()
1135 1135 return False
1136 1136
1137 1137 if not(systemHeaderObj.read(fp)):
1138 1138 fp.close()
1139 1139 return False
1140 1140
1141 1141 if not(radarControllerHeaderObj.read(fp)):
1142 1142 fp.close()
1143 1143 return False
1144 1144
1145 1145 if not(processingHeaderObj.read(fp)):
1146 1146 fp.close()
1147 1147 return False
1148 1148
1149 1149 neededSize = processingHeaderObj.blockSize + basicHeaderObj.size
1150 1150 else:
1151 1151 msg = "[Reading] Skipping the file %s due to it hasn't enough data" % filename
1152 1152
1153 1153 fp.close()
1154 1154
1155 1155 fileSize = os.path.getsize(filename)
1156 1156 currentSize = fileSize - currentPosition
1157 1157
1158 1158 if currentSize < neededSize:
1159 1159 if msgFlag and (msg != None):
1160 1160 print(msg)
1161 1161 return False
1162 1162
1163 1163 return True
1164 1164
1165 1165 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1166 1166
1167 1167 path_empty = True
1168 1168
1169 1169 dateList = []
1170 1170 pathList = []
1171 1171
1172 1172 multi_path = path.split(',')
1173 1173
1174 1174 if not walk:
1175 1175
1176 1176 for single_path in multi_path:
1177 1177
1178 1178 if not os.path.isdir(single_path):
1179 1179 continue
1180 1180
1181 1181 fileList = glob.glob1(single_path, "*" + ext)
1182 1182
1183 1183 if not fileList:
1184 1184 continue
1185 1185
1186 1186 path_empty = False
1187 1187
1188 1188 fileList.sort()
1189 1189
1190 1190 for thisFile in fileList:
1191 1191
1192 1192 if not os.path.isfile(os.path.join(single_path, thisFile)):
1193 1193 continue
1194 1194
1195 1195 if not isRadarFile(thisFile):
1196 1196 continue
1197 1197
1198 1198 if not isFileInDateRange(thisFile, startDate, endDate):
1199 1199 continue
1200 1200
1201 1201 thisDate = getDateFromRadarFile(thisFile)
1202 1202
1203 1203 if thisDate in dateList:
1204 1204 continue
1205 1205
1206 1206 dateList.append(thisDate)
1207 1207 pathList.append(single_path)
1208 1208
1209 1209 else:
1210 1210 for single_path in multi_path:
1211 1211
1212 1212 if not os.path.isdir(single_path):
1213 1213 continue
1214 1214
1215 1215 dirList = []
1216 1216
1217 1217 for thisPath in os.listdir(single_path):
1218 1218
1219 1219 if not os.path.isdir(os.path.join(single_path, thisPath)):
1220 1220 continue
1221 1221
1222 1222 if not isRadarFolder(thisPath):
1223 1223 continue
1224 1224
1225 1225 if not isFolderInDateRange(thisPath, startDate, endDate):
1226 1226 continue
1227 1227
1228 1228 dirList.append(thisPath)
1229 1229
1230 1230 if not dirList:
1231 1231 continue
1232 1232
1233 1233 dirList.sort()
1234 1234
1235 1235 for thisDir in dirList:
1236 1236
1237 1237 datapath = os.path.join(single_path, thisDir, expLabel)
1238 1238 fileList = glob.glob1(datapath, "*" + ext)
1239 1239
1240 1240 if not fileList:
1241 1241 continue
1242 1242
1243 1243 path_empty = False
1244 1244
1245 1245 thisDate = getDateFromRadarFolder(thisDir)
1246 1246
1247 1247 pathList.append(datapath)
1248 1248 dateList.append(thisDate)
1249 1249
1250 1250 dateList.sort()
1251 1251
1252 1252 if walk:
1253 1253 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1254 1254 else:
1255 1255 pattern_path = multi_path[0]
1256 1256
1257 1257 if path_empty:
1258 1258 print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1259 1259 else:
1260 1260 if not dateList:
1261 1261 print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1262 1262
1263 1263 if include_path:
1264 1264 return dateList, pathList
1265 1265
1266 1266 return dateList
1267 1267
1268 1268 def setup(self,
1269 1269 path=None,
1270 1270 startDate=None,
1271 1271 endDate=None,
1272 1272 startTime=datetime.time(0, 0, 0),
1273 1273 endTime=datetime.time(23, 59, 59),
1274 1274 set=None,
1275 1275 expLabel="",
1276 1276 ext=None,
1277 1277 online=False,
1278 1278 delay=60,
1279 1279 walk=True,
1280 1280 getblock=False,
1281 1281 nTxs=1,
1282 1282 realtime=False,
1283 1283 blocksize=None,
1284 1284 blocktime=None,
1285 1285 skip=None,
1286 1286 cursor=None,
1287 1287 warnings=True,
1288 1288 verbose=True,
1289 1289 server=None,
1290 1290 format=None,
1291 1291 oneDDict=None,
1292 1292 twoDDict=None,
1293 1293 ind2DList=None):
1294 1294 if server is not None:
1295 1295 if 'tcp://' in server:
1296 1296 address = server
1297 1297 else:
1298 1298 address = 'ipc:///tmp/%s' % server
1299 1299 self.server = address
1300 1300 self.context = zmq.Context()
1301 1301 self.receiver = self.context.socket(zmq.PULL)
1302 1302 self.receiver.connect(self.server)
1303 1303 time.sleep(0.5)
1304 1304 print('[Starting] ReceiverData from {}'.format(self.server))
1305 1305 else:
1306 1306 self.server = None
1307 1307 if path == None:
1308 1308 raise ValueError("[Reading] The path is not valid")
1309 1309
1310 1310 if ext == None:
1311 1311 ext = self.ext
1312 1312
1313 1313 if online:
1314 1314 print("[Reading] Searching files in online mode...")
1315 1315
1316 1316 for nTries in range(self.nTries):
1317 1317 fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(
1318 1318 path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
1319 1319
1320 1320 if fullpath:
1321 1321 break
1322 1322
1323 1323 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1324 1324 sleep(self.delay)
1325 1325
1326 1326 if not(fullpath):
1327 raise(schainpy.admin.SchainWarning('There isn\'t any valid file in {}'.format(path)))
1327 self.dataOut.error = (-1, 'There isn\'t any valid file in {}'.format(path))
1328 1328 return
1329 1329
1330 1330 self.year = year
1331 1331 self.doy = doy
1332 1332 self.set = set - 1
1333 1333 self.path = path
1334 1334 self.foldercounter = foldercounter
1335 1335 last_set = None
1336 1336 else:
1337 1337 print("[Reading] Searching files in offline mode ...")
1338 1338 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1339 1339 startTime=startTime, endTime=endTime,
1340 1340 set=set, expLabel=expLabel, ext=ext,
1341 1341 walk=walk, cursor=cursor,
1342 1342 skip=skip)
1343 1343
1344 1344 if not(pathList):
1345 1345 self.fileIndex = -1
1346 1346 self.pathList = []
1347 1347 self.filenameList = []
1348 1348 return
1349 1349
1350 1350 self.fileIndex = -1
1351 1351 self.pathList = pathList
1352 1352 self.filenameList = filenameList
1353 1353 file_name = os.path.basename(filenameList[-1])
1354 1354 basename, ext = os.path.splitext(file_name)
1355 1355 last_set = int(basename[-3:])
1356 1356
1357 1357 self.online = online
1358 1358 self.realtime = realtime
1359 1359 self.delay = delay
1360 1360 ext = ext.lower()
1361 1361 self.ext = ext
1362 1362 self.getByBlock = getblock
1363 1363 self.nTxs = nTxs
1364 1364 self.startTime = startTime
1365 1365 self.endTime = endTime
1366 1366 self.endDate = endDate
1367 1367 self.startDate = startDate
1368 1368 # Added-----------------
1369 1369 self.selBlocksize = blocksize
1370 1370 self.selBlocktime = blocktime
1371 1371
1372 1372 # Verbose-----------
1373 1373 self.verbose = verbose
1374 1374 self.warnings = warnings
1375 1375
1376 1376 if not(self.setNextFile()):
1377 1377 if (startDate != None) and (endDate != None):
1378 1378 print("[Reading] No files in range: %s - %s" % (datetime.datetime.combine(startDate, startTime).ctime(), datetime.datetime.combine(endDate, endTime).ctime()))
1379 1379 elif startDate != None:
1380 1380 print("[Reading] No files in range: %s" % (datetime.datetime.combine(startDate, startTime).ctime()))
1381 1381 else:
1382 1382 print("[Reading] No files")
1383 1383
1384 1384 self.fileIndex = -1
1385 1385 self.pathList = []
1386 1386 self.filenameList = []
1387 1387 return
1388 1388
1389 1389 # self.getBasicHeader()
1390 1390
1391 1391 if last_set != None:
1392 1392 self.dataOut.last_block = last_set * \
1393 1393 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
1394 1394 return
1395 1395
1396 1396 def getBasicHeader(self):
1397 1397
1398 1398 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1399 1399 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1400 1400
1401 1401 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1402 1402
1403 1403 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1404 1404
1405 1405 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1406 1406
1407 1407 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1408 1408
1409 1409 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1410 1410
1411 1411 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1412 1412
1413 1413 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1414 1414
1415 1415 def getFirstHeader(self):
1416 1416
1417 1417 raise NotImplementedError
1418 1418
1419 1419 def getData(self):
1420 1420
1421 1421 raise NotImplementedError
1422 1422
1423 1423 def hasNotDataInBuffer(self):
1424 1424
1425 1425 raise NotImplementedError
1426 1426
1427 1427 def readBlock(self):
1428 1428
1429 1429 raise NotImplementedError
1430 1430
1431 1431 def isEndProcess(self):
1432 1432
1433 1433 return self.flagNoMoreFiles
1434 1434
1435 1435 def printReadBlocks(self):
1436 1436
1437 1437 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1438 1438
1439 1439 def printTotalBlocks(self):
1440 1440
1441 1441 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1442 1442
1443 1443 def printNumberOfBlock(self):
1444 1444 'SPAM!'
1445 1445
1446 1446 # if self.flagIsNewBlock:
1447 1447 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1448 1448 # self.processingHeaderObj.dataBlocksPerFile,
1449 1449 # self.dataOut.datatime.ctime())
1450 1450
1451 1451 def printInfo(self):
1452 1452
1453 1453 if self.__printInfo == False:
1454 1454 return
1455 1455
1456 1456 self.basicHeaderObj.printInfo()
1457 1457 self.systemHeaderObj.printInfo()
1458 1458 self.radarControllerHeaderObj.printInfo()
1459 1459 self.processingHeaderObj.printInfo()
1460 1460
1461 1461 self.__printInfo = False
1462 1462
1463 1463 def run(self,
1464 1464 path=None,
1465 1465 startDate=None,
1466 1466 endDate=None,
1467 1467 startTime=datetime.time(0, 0, 0),
1468 1468 endTime=datetime.time(23, 59, 59),
1469 1469 set=None,
1470 1470 expLabel="",
1471 1471 ext=None,
1472 1472 online=False,
1473 1473 delay=60,
1474 1474 walk=True,
1475 1475 getblock=False,
1476 1476 nTxs=1,
1477 1477 realtime=False,
1478 1478 blocksize=None,
1479 1479 blocktime=None,
1480 1480 skip=None,
1481 1481 cursor=None,
1482 1482 warnings=True,
1483 1483 server=None,
1484 1484 verbose=True,
1485 1485 format=None,
1486 1486 oneDDict=None,
1487 1487 twoDDict=None,
1488 1488 ind2DList=None, **kwargs):
1489 1489
1490 1490 if not(self.isConfig):
1491 1491 self.setup(path=path,
1492 1492 startDate=startDate,
1493 1493 endDate=endDate,
1494 1494 startTime=startTime,
1495 1495 endTime=endTime,
1496 1496 set=set,
1497 1497 expLabel=expLabel,
1498 1498 ext=ext,
1499 1499 online=online,
1500 1500 delay=delay,
1501 1501 walk=walk,
1502 1502 getblock=getblock,
1503 1503 nTxs=nTxs,
1504 1504 realtime=realtime,
1505 1505 blocksize=blocksize,
1506 1506 blocktime=blocktime,
1507 1507 skip=skip,
1508 1508 cursor=cursor,
1509 1509 warnings=warnings,
1510 1510 server=server,
1511 1511 verbose=verbose,
1512 1512 format=format,
1513 1513 oneDDict=oneDDict,
1514 1514 twoDDict=twoDDict,
1515 1515 ind2DList=ind2DList)
1516 1516 self.isConfig = True
1517 1517 if server is None:
1518 1518 self.getData()
1519 1519 else:
1520 1520 self.getFromServer()
1521 1521
1522 1522
1523 1523 class JRODataWriter(JRODataIO):
1524 1524
1525 1525 """
1526 1526 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1527 1527 de los datos siempre se realiza por bloques.
1528 1528 """
1529 1529
1530 1530 blockIndex = 0
1531 1531
1532 1532 path = None
1533 1533
1534 1534 setFile = None
1535 1535
1536 1536 profilesPerBlock = None
1537 1537
1538 1538 blocksPerFile = None
1539 1539
1540 1540 nWriteBlocks = 0
1541 1541
1542 1542 fileDate = None
1543 1543
1544 1544 def __init__(self, dataOut=None):
1545 1545 raise NotImplementedError
1546 1546
1547 1547 def hasAllDataInBuffer(self):
1548 1548 raise NotImplementedError
1549 1549
1550 1550 def setBlockDimension(self):
1551 1551 raise NotImplementedError
1552 1552
1553 1553 def writeBlock(self):
1554 1554 raise NotImplementedError
1555 1555
1556 1556 def putData(self):
1557 1557 raise NotImplementedError
1558 1558
1559 1559 def getProcessFlags(self):
1560 1560
1561 1561 processFlags = 0
1562 1562
1563 1563 dtype_index = get_dtype_index(self.dtype)
1564 1564 procflag_dtype = get_procflag_dtype(dtype_index)
1565 1565
1566 1566 processFlags += procflag_dtype
1567 1567
1568 1568 if self.dataOut.flagDecodeData:
1569 1569 processFlags += PROCFLAG.DECODE_DATA
1570 1570
1571 1571 if self.dataOut.flagDeflipData:
1572 1572 processFlags += PROCFLAG.DEFLIP_DATA
1573 1573
1574 1574 if self.dataOut.code is not None:
1575 1575 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1576 1576
1577 1577 if self.dataOut.nCohInt > 1:
1578 1578 processFlags += PROCFLAG.COHERENT_INTEGRATION
1579 1579
1580 1580 if self.dataOut.type == "Spectra":
1581 1581 if self.dataOut.nIncohInt > 1:
1582 1582 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1583 1583
1584 1584 if self.dataOut.data_dc is not None:
1585 1585 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1586 1586
1587 1587 if self.dataOut.flagShiftFFT:
1588 1588 processFlags += PROCFLAG.SHIFT_FFT_DATA
1589 1589
1590 1590 return processFlags
1591 1591
1592 1592 def setBasicHeader(self):
1593 1593
1594 1594 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1595 1595 self.basicHeaderObj.version = self.versionFile
1596 1596 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1597 1597
1598 1598 utc = numpy.floor(self.dataOut.utctime)
1599 1599 milisecond = (self.dataOut.utctime - utc) * 1000.0
1600 1600
1601 1601 self.basicHeaderObj.utc = utc
1602 1602 self.basicHeaderObj.miliSecond = milisecond
1603 1603 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1604 1604 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1605 1605 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1606 1606
1607 1607 def setFirstHeader(self):
1608 1608 """
1609 1609 Obtiene una copia del First Header
1610 1610
1611 1611 Affected:
1612 1612
1613 1613 self.basicHeaderObj
1614 1614 self.systemHeaderObj
1615 1615 self.radarControllerHeaderObj
1616 1616 self.processingHeaderObj self.
1617 1617
1618 1618 Return:
1619 1619 None
1620 1620 """
1621 1621
1622 1622 raise NotImplementedError
1623 1623
1624 1624 def __writeFirstHeader(self):
1625 1625 """
1626 1626 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1627 1627
1628 1628 Affected:
1629 1629 __dataType
1630 1630
1631 1631 Return:
1632 1632 None
1633 1633 """
1634 1634
1635 1635 # CALCULAR PARAMETROS
1636 1636
1637 1637 sizeLongHeader = self.systemHeaderObj.size + \
1638 1638 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1639 1639 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1640 1640
1641 1641 self.basicHeaderObj.write(self.fp)
1642 1642 self.systemHeaderObj.write(self.fp)
1643 1643 self.radarControllerHeaderObj.write(self.fp)
1644 1644 self.processingHeaderObj.write(self.fp)
1645 1645
1646 1646 def __setNewBlock(self):
1647 1647 """
1648 1648 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1649 1649
1650 1650 Return:
1651 1651 0 : si no pudo escribir nada
1652 1652 1 : Si escribio el Basic el First Header
1653 1653 """
1654 1654 if self.fp == None:
1655 1655 self.setNextFile()
1656 1656
1657 1657 if self.flagIsNewFile:
1658 1658 return 1
1659 1659
1660 1660 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1661 1661 self.basicHeaderObj.write(self.fp)
1662 1662 return 1
1663 1663
1664 1664 if not(self.setNextFile()):
1665 1665 return 0
1666 1666
1667 1667 return 1
1668 1668
1669 1669 def writeNextBlock(self):
1670 1670 """
1671 1671 Selecciona el bloque siguiente de datos y los escribe en un file
1672 1672
1673 1673 Return:
1674 1674 0 : Si no hizo pudo escribir el bloque de datos
1675 1675 1 : Si no pudo escribir el bloque de datos
1676 1676 """
1677 1677 if not(self.__setNewBlock()):
1678 1678 return 0
1679 1679
1680 1680 self.writeBlock()
1681 1681
1682 1682 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1683 1683 self.processingHeaderObj.dataBlocksPerFile))
1684 1684
1685 1685 return 1
1686 1686
1687 1687 def setNextFile(self):
1688 1688 """
1689 1689 Determina el siguiente file que sera escrito
1690 1690
1691 1691 Affected:
1692 1692 self.filename
1693 1693 self.subfolder
1694 1694 self.fp
1695 1695 self.setFile
1696 1696 self.flagIsNewFile
1697 1697
1698 1698 Return:
1699 1699 0 : Si el archivo no puede ser escrito
1700 1700 1 : Si el archivo esta listo para ser escrito
1701 1701 """
1702 1702 ext = self.ext
1703 1703 path = self.path
1704 1704
1705 1705 if self.fp != None:
1706 1706 self.fp.close()
1707 1707
1708 1708 timeTuple = time.localtime(self.dataOut.utctime)
1709 1709 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1710 1710
1711 1711 fullpath = os.path.join(path, subfolder)
1712 1712 setFile = self.setFile
1713 1713
1714 1714 if not(os.path.exists(fullpath)):
1715 1715 os.mkdir(fullpath)
1716 1716 setFile = -1 # inicializo mi contador de seteo
1717 1717 else:
1718 1718 filesList = os.listdir(fullpath)
1719 1719 if len(filesList) > 0:
1720 1720 filesList = sorted(filesList, key=str.lower)
1721 1721 filen = filesList[-1]
1722 1722 # el filename debera tener el siguiente formato
1723 1723 # 0 1234 567 89A BCDE (hex)
1724 1724 # x YYYY DDD SSS .ext
1725 1725 if isNumber(filen[8:11]):
1726 1726 # inicializo mi contador de seteo al seteo del ultimo file
1727 1727 setFile = int(filen[8:11])
1728 1728 else:
1729 1729 setFile = -1
1730 1730 else:
1731 1731 setFile = -1 # inicializo mi contador de seteo
1732 1732
1733 1733 setFile += 1
1734 1734
1735 1735 # If this is a new day it resets some values
1736 1736 if self.dataOut.datatime.date() > self.fileDate:
1737 1737 setFile = 0
1738 1738 self.nTotalBlocks = 0
1739 1739
1740 1740 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1741 1741 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1742 1742
1743 1743 filename = os.path.join(path, subfolder, filen)
1744 1744
1745 1745 fp = open(filename, 'wb')
1746 1746
1747 1747 self.blockIndex = 0
1748 1748
1749 1749 # guardando atributos
1750 1750 self.filename = filename
1751 1751 self.subfolder = subfolder
1752 1752 self.fp = fp
1753 1753 self.setFile = setFile
1754 1754 self.flagIsNewFile = 1
1755 1755 self.fileDate = self.dataOut.datatime.date()
1756 1756
1757 1757 self.setFirstHeader()
1758 1758
1759 1759 print('[Writing] Opening file: %s' % self.filename)
1760 1760
1761 1761 self.__writeFirstHeader()
1762 1762
1763 1763 return 1
1764 1764
1765 1765 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1766 1766 """
1767 1767 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1768 1768
1769 1769 Inputs:
1770 1770 path : directory where data will be saved
1771 1771 profilesPerBlock : number of profiles per block
1772 1772 set : initial file set
1773 1773 datatype : An integer number that defines data type:
1774 1774 0 : int8 (1 byte)
1775 1775 1 : int16 (2 bytes)
1776 1776 2 : int32 (4 bytes)
1777 1777 3 : int64 (8 bytes)
1778 1778 4 : float32 (4 bytes)
1779 1779 5 : double64 (8 bytes)
1780 1780
1781 1781 Return:
1782 1782 0 : Si no realizo un buen seteo
1783 1783 1 : Si realizo un buen seteo
1784 1784 """
1785 1785
1786 1786 if ext == None:
1787 1787 ext = self.ext
1788 1788
1789 1789 self.ext = ext.lower()
1790 1790
1791 1791 self.path = path
1792 1792
1793 1793 if set is None:
1794 1794 self.setFile = -1
1795 1795 else:
1796 1796 self.setFile = set - 1
1797 1797
1798 1798 self.blocksPerFile = blocksPerFile
1799 1799
1800 1800 self.profilesPerBlock = profilesPerBlock
1801 1801
1802 1802 self.dataOut = dataOut
1803 1803 self.fileDate = self.dataOut.datatime.date()
1804 1804 # By default
1805 1805 self.dtype = self.dataOut.dtype
1806 1806
1807 1807 if datatype is not None:
1808 1808 self.dtype = get_numpy_dtype(datatype)
1809 1809
1810 1810 if not(self.setNextFile()):
1811 1811 print("[Writing] There isn't a next file")
1812 1812 return 0
1813 1813
1814 1814 self.setBlockDimension()
1815 1815
1816 1816 return 1
1817 1817
1818 1818 def run(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1819 1819
1820 1820 if not(self.isConfig):
1821 1821
1822 1822 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1823 1823 set=set, ext=ext, datatype=datatype, **kwargs)
1824 1824 self.isConfig = True
1825 1825
1826 1826 self.putData() No newline at end of file
@@ -1,538 +1,367
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14 from platform import python_version
15 15 import inspect
16 16 import zmq
17 17 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21
22 21 from schainpy.utils import log
23 22
24 23
25 24 class ProcessingUnit(object):
26 25
27 26 """
28 27 Update - Jan 2018 - MULTIPROCESSING
29 28 All the "call" methods present in the previous base were removed.
30 29 The majority of operations are independant processes, thus
31 30 the decorator is in charge of communicate the operation processes
32 31 with the proccessing unit via IPC.
33 32
34 33 The constructor does not receive any argument. The remaining methods
35 34 are related with the operations to execute.
36 35
37 36
38 37 """
39 # objeto de datos de entrada (Voltage, Spectra o Correlation)
38
39 METHODS = {}
40 40 dataIn = None
41 41 dataInList = []
42
43 # objeto de datos de entrada (Voltage, Spectra o Correlation)
44
45 42 id = None
46 43 inputId = None
47
48 44 dataOut = None
49
50 45 dictProcs = None
51
52 operations2RunDict = None
53
54 46 isConfig = False
55 47
56 48 def __init__(self):
57 49
58 50 self.dataIn = None
59 51 self.dataOut = None
60
61 52 self.isConfig = False
53 self.operations = []
62 54
63 55 def getAllowedArgs(self):
64 56 if hasattr(self, '__attrs__'):
65 57 return self.__attrs__
66 58 else:
67 59 return inspect.getargspec(self.run).args
68 60
69 def addOperationKwargs(self, objId, **kwargs):
70 '''
71 '''
72
73 self.operationKwargs[objId] = kwargs
74
75 def addOperation(self, opObj, objId):
61 def addOperation(self, conf, operation):
76 62
77 63 """
78 64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
79 65 posses the id of the operation process (IPC purposes)
80 66
81 67 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
82 68 identificador asociado a este objeto.
83 69
84 70 Input:
85 71
86 72 object : objeto de la clase "Operation"
87 73
88 74 Return:
89 75
90 76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
91 77 """
92 78
93 self.operations2RunDict[objId] = opObj
94
95 return objId
96
79 self.operations.append((operation, conf.type, conf.id, conf.getKwargs()))
97 80
98 81 def getOperationObj(self, objId):
99 82
100 if objId not in list(self.operations2RunDict.keys()):
83 if objId not in list(self.operations.keys()):
101 84 return None
102 85
103 return self.operations2RunDict[objId]
86 return self.operations[objId]
104 87
105 88 def operation(self, **kwargs):
106 89
107 90 """
108 91 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
109 92 atributos del objeto dataOut
110 93
111 94 Input:
112 95
113 96 **kwargs : Diccionario de argumentos de la funcion a ejecutar
114 97 """
115 98
116 99 raise NotImplementedError
117 100
118 101 def setup(self):
119 102
120 103 raise NotImplementedError
121 104
122 105 def run(self):
123 106
124 107 raise NotImplementedError
125 108
126 109 def close(self):
127 110 #Close every thread, queue or any other object here is it is neccesary.
128 111 return
129 112
130 113 class Operation(object):
131 114
132 115 """
133 116 Update - Jan 2018 - MULTIPROCESSING
134 117
135 118 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
136 119 The constructor doe snot receive any argument, neither the baseclass.
137 120
138 121
139 122 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
140 123 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
141 124 acumulacion dentro de esta clase
142 125
143 126 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
144 127
145 128 """
146 129 id = None
147 130 __buffer = None
148 131 dest = None
149 132 isConfig = False
150 133 readyFlag = None
151 134
152 135 def __init__(self):
153 136
154 137 self.buffer = None
155 138 self.dest = None
156 139 self.isConfig = False
157 140 self.readyFlag = False
158 141
159 142 if not hasattr(self, 'name'):
160 143 self.name = self.__class__.__name__
161 144
162 145 def getAllowedArgs(self):
163 146 if hasattr(self, '__attrs__'):
164 147 return self.__attrs__
165 148 else:
166 149 return inspect.getargspec(self.run).args
167 150
168 151 def setup(self):
169 152
170 153 self.isConfig = True
171 154
172 155 raise NotImplementedError
173 156
174 157
175 158 def run(self, dataIn, **kwargs):
176 159
177 160 """
178 161 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
179 162 atributos del objeto dataIn.
180 163
181 164 Input:
182 165
183 166 dataIn : objeto del tipo JROData
184 167
185 168 Return:
186 169
187 170 None
188 171
189 172 Affected:
190 173 __buffer : buffer de recepcion de datos.
191 174
192 175 """
193 176 if not self.isConfig:
194 177 self.setup(**kwargs)
195 178
196 179 raise NotImplementedError
197 180
198 181 def close(self):
199 182
200 183 pass
201 184
202 185
203 ######### Decorator #########
204
205
206 186 def MPDecorator(BaseClass):
207 187
208 188 """
209 "Multiprocessing class decorator"
189 Multiprocessing class decorator
210 190
211 This function add multiprocessing features to the base class. Also,
212 it handle the communication beetween processes (readers, procUnits and operations).
213 Receive the arguments at the moment of instantiation. According to that, discriminates if it
214 is a procUnit or an operation
191 This function add multiprocessing features to a BaseClass. Also, it handle
192 the communication beetween processes (readers, procUnits and operations).
215 193 """
216 194
217 195 class MPClass(BaseClass, Process):
218 196
219 "This is the overwritten class"
220 operations2RunDict = None
221 socket_l = None
222 socket_p = None
223 socketOP = None
224 socket_router = None
225 dictProcs = None
226 typeProc = None
227 197 def __init__(self, *args, **kwargs):
228 198 super(MPClass, self).__init__()
229 199 Process.__init__(self)
230
231
232 200 self.operationKwargs = {}
233 201 self.args = args
234
235
236 self.operations2RunDict = {}
237 202 self.kwargs = kwargs
238
239 # The number of arguments (args) determine the type of process
203 self.sender = None
204 self.receiver = None
205 self.name = BaseClass.__name__
240 206
241 207 if len(self.args) is 3:
242 208 self.typeProc = "ProcUnit"
243 self.id = args[0] #topico de publicacion
244 self.inputId = args[1] #topico de subcripcion
245 self.dictProcs = args[2] #diccionario de procesos globales
209 self.id = args[0]
210 self.inputId = args[1]
211 self.project_id = args[2]
246 212 else:
247 213 self.id = args[0]
214 self.inputId = args[0]
215 self.project_id = args[1]
248 216 self.typeProc = "Operation"
249 217
250 def addOperationKwargs(self, objId, **kwargs):
251
252 self.operationKwargs[objId] = kwargs
253
254 218 def getAllowedArgs(self):
255 219
256 220 if hasattr(self, '__attrs__'):
257 221 return self.__attrs__
258 222 else:
259 return inspect.getargspec(self.run).args
260
261
262 def sockListening(self, topic):
263
264 """
265 This function create a socket to receive objects.
266 The 'topic' argument is related to the publisher process from which the self process is
267 listening (data).
268 In the case were the self process is listening to a Reader (proc Unit),
269 special conditions are introduced to maximize parallelism.
270 """
271
272 cont = zmq.Context()
273 zmq_socket = cont.socket(zmq.SUB)
274 if not os.path.exists('/tmp/socketTmp'):
275 os.mkdir('/tmp/socketTmp')
276
277 if 'Reader' in self.dictProcs[self.inputId].name:
278 zmq_socket.connect('ipc:///tmp/socketTmp/b')
279
280 else:
281 zmq_socket.connect('ipc:///tmp/socketTmp/%s' % self.inputId)
282
283 #log.error('RECEIVING FROM {} {}'.format(self.inputId, str(topic).encode()))
284 zmq_socket.setsockopt(zmq.SUBSCRIBE, str(topic).encode()) #yong
285
286 return zmq_socket
223 return inspect.getargspec(BaseClass.run).args
287 224
225 def subscribe(self):
226 '''
227 This function create a socket to receive objects from the
228 topic `inputId`.
229 '''
288 230
289 def listenProc(self, sock):
231 c = zmq.Context()
232 self.receiver = c.socket(zmq.SUB)
233 self.receiver.connect('ipc:///tmp/schain/{}_pub'.format(self.project_id))
234 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
290 235
291 """
292 This function listen to a ipc addres until a message is recovered. To serialize the
293 data (object), pickle has been use.
294 The 'sock' argument is the socket previously connect to an ipc address and with a topic subscription.
295 """
296
297 a = sock.recv_multipart()
298 a = pickle.loads(a[1])
299 return a
236 def listen(self):
237 '''
238 This function waits for objects and deserialize using pickle
239 '''
300 240
301 def sockPublishing(self):
241 data = pickle.loads(self.receiver.recv_multipart()[1])
242 return data
302 243
303 """
244 def set_publisher(self):
245 '''
304 246 This function create a socket for publishing purposes.
305 Depending on the process type from where is created, it binds or connect
306 to special IPC addresses.
307 """
308 time.sleep(4) #yong
309 context = zmq.Context()
310 zmq_socket = context.socket(zmq.PUB)
311 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
312 if 'Reader' in self.dictProcs[self.id].name:
313 zmq_socket.connect('ipc:///tmp/socketTmp/a')
314 else:
315 zmq_socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
316
317 return zmq_socket
318
319 def publishProc(self, sock, data):
320
321 """
322 This function publish a python object (data) under a specific topic in a socket (sock).
323 Usually, the topic is the self id of the process.
324 """
325
326 sock.send_multipart([str(self.id).encode(), pickle.dumps(data)]) #yong
327
328 return True
329
330 def sockOp(self):
331
332 """
333 This function create a socket for communication purposes with operation processes.
334 """
335
336 cont = zmq.Context()
337 zmq_socket = cont.socket(zmq.DEALER)
338
339 if python_version()[0] == '2':
340 zmq_socket.setsockopt(zmq.IDENTITY, self.id)
341 if python_version()[0] == '3':
342 zmq_socket.setsockopt_string(zmq.IDENTITY, self.id)
343
344
345 return zmq_socket
346
347
348 def execOp(self, socket, opId, dataObj):
349
350 """
351 This function 'execute' an operation main routine by establishing a
352 connection with it and sending a python object (dataOut).
353 """
354 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
355 socket.connect('ipc:///tmp/socketTmp/%s' %opId)
356
357
358 socket.send(pickle.dumps(dataObj)) #yong
359
360 argument = socket.recv_multipart()[0]
361
362 argument = pickle.loads(argument)
363
364 return argument
365
366 def sockIO(self):
367
368 """
369 Socket defined for an operation process. It is able to recover the object sent from another process as well as a
370 identifier of who sent it.
371 """
372
373 cont = zmq.Context()
374 if not os.path.exists('/tmp/socketTmp'): os.mkdir('/tmp/socketTmp')
375 socket = cont.socket(zmq.ROUTER)
376 socket.bind('ipc:///tmp/socketTmp/%s' % self.id)
377
378 return socket
379
380 def funIOrec(self, socket):
381
382 """
383 Operation method, recover the id of the process who sent a python object.
384 The 'socket' argument is the socket binded to a specific process ipc.
385 """
386
387 #id_proc = socket.recv()
388
389 #dataObj = socket.recv_pyobj()
390
391 dataObj = socket.recv_multipart()
392
393 dataObj[1] = pickle.loads(dataObj[1])
394 return dataObj[0], dataObj[1]
395
396 def funIOsen(self, socket, data, dest):
397
398 """
399 Operation method, send a python object to a specific destination.
400 The 'dest' argument is the id of a proccesinf unit.
401 """
247 '''
402 248
403 socket.send_multipart([dest, pickle.dumps(data)]) #yong
249 time.sleep(1)
250 c = zmq.Context()
251 self.sender = c.socket(zmq.PUB)
252 self.sender.connect('ipc:///tmp/schain/{}_sub'.format(self.project_id))
404 253
405 return True
254 def publish(self, data, id):
255 '''
256 This function publish an object, to a specific topic.
257 '''
406 258
259 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
407 260
408 261 def runReader(self):
409
410 # time.sleep(3)
262 '''
263 Run fuction for read units
264 '''
411 265 while True:
412 266
413 267 BaseClass.run(self, **self.kwargs)
414 268
415
416 keyList = list(self.operations2RunDict.keys())
417 keyList.sort()
418
419 for key in keyList:
420 self.socketOP = self.sockOp()
421 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
422
423
424 if self.flagNoMoreFiles: #Usar un objeto con flags para saber si termino el proc o hubo un error
425 self.publishProc(self.socket_p, "Finish")
269 if self.dataOut.error[0] == -1:
270 log.error(self.dataOut.error[1])
271 self.publish('end', self.id)
272 #self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
426 273 break
427 274
275 for op, optype, id, kwargs in self.operations:
276 if optype=='self':
277 op(**kwargs)
278 elif optype=='other':
279 self.dataOut = op.run(self.dataOut, **self.kwargs)
280 elif optype=='external':
281 self.publish(self.dataOut, opId)
282
428 283 if self.dataOut.flagNoData:
429 284 continue
430 285
431 #print("Publishing data...")
432 self.publishProc(self.socket_p, self.dataOut)
433 # time.sleep(2)
434
435
436 print("%s done" %BaseClass.__name__)
437 return 0
286 self.publish(self.dataOut, self.id)
438 287
439 288 def runProc(self):
440
441 # All the procUnits with kwargs that require a setup initialization must be defined here.
442
443 if self.setupReq:
444 BaseClass.setup(self, **self.kwargs)
289 '''
290 Run function for proccessing units
291 '''
445 292
446 293 while True:
447 self.dataIn = self.listenProc(self.socket_l)
448 #print("%s received data" %BaseClass.__name__)
294 self.dataIn = self.listen()
449 295
450 if self.dataIn == "Finish":
296 if self.dataIn == 'end':
297 self.publish('end', self.id)
298 for op, optype, opId, kwargs in self.operations:
299 if optype == 'external':
300 self.publish('end', opId)
451 301 break
452 302
453 m_arg = list(self.kwargs.keys())
454 num_arg = list(range(1,int(BaseClass.run.__code__.co_argcount)))
455
456 run_arg = {}
457
458 for var in num_arg:
459 if BaseClass.run.__code__.co_varnames[var] in m_arg:
460 run_arg[BaseClass.run.__code__.co_varnames[var]] = self.kwargs[BaseClass.run.__code__.co_varnames[var]]
461
462 #BaseClass.run(self, **self.kwargs)
463 BaseClass.run(self, **run_arg)
464
465 ## Iterar sobre una serie de data que podrias aplicarse
466
467 for m_name in BaseClass.METHODS:
303 if self.dataIn.flagNoData:
304 continue
468 305
469 met_arg = {}
306 BaseClass.run(self, **self.kwargs)
470 307
471 for arg in m_arg:
472 if arg in BaseClass.METHODS[m_name]:
473 for att in BaseClass.METHODS[m_name]:
474 met_arg[att] = self.kwargs[att]
475
476 method = getattr(BaseClass, m_name)
477 method(self, **met_arg)
478 break
308 for op, optype, opId, kwargs in self.operations:
309 if optype=='self':
310 op(**kwargs)
311 elif optype=='other':
312 self.dataOut = op.run(self.dataOut, **kwargs)
313 elif optype=='external':
314 self.publish(self.dataOut, opId)
479 315
480 316 if self.dataOut.flagNoData:
481 317 continue
482 318
483 keyList = list(self.operations2RunDict.keys())
484 keyList.sort()
485
486 for key in keyList:
487
488 self.socketOP = self.sockOp()
489 self.dataOut = self.execOp(self.socketOP, key, self.dataOut)
490
491
492 self.publishProc(self.socket_p, self.dataOut)
493
494
495 print("%s done" %BaseClass.__name__)
496
497 return 0
319 self.publish(self.dataOut, self.id)
498 320
499 321 def runOp(self):
322 '''
323 Run function for operations
324 '''
500 325
501 326 while True:
502 327
503 [self.dest ,self.buffer] = self.funIOrec(self.socket_router)
328 dataOut = self.listen()
504 329
505 self.buffer = BaseClass.run(self, self.buffer, **self.kwargs)
506
507 self.funIOsen(self.socket_router, self.buffer, self.dest)
508
509 print("%s done" %BaseClass.__name__)
510 return 0
330 if dataOut == 'end':
331 break
511 332
333 BaseClass.run(self, dataOut, **self.kwargs)
512 334
513 335 def run(self):
514 336
515 337 if self.typeProc is "ProcUnit":
516 338
517 self.socket_p = self.sockPublishing()
339 if self.inputId is not None:
340 self.subscribe()
341 self.set_publisher()
518 342
519 if 'Reader' not in self.dictProcs[self.id].name:
520 self.socket_l = self.sockListening(self.inputId)
343 if 'Reader' not in BaseClass.__name__:
521 344 self.runProc()
522
523 345 else:
524
525 346 self.runReader()
526 347
527 348 elif self.typeProc is "Operation":
528 349
529 self.socket_router = self.sockIO()
530
350 self.subscribe()
531 351 self.runOp()
532 352
533 353 else:
534 354 raise ValueError("Unknown type")
535 355
536 return 0
356 print("%s done" % BaseClass.__name__)
357 self.close()
358
359 def close(self):
360
361 if self.sender:
362 self.sender.close()
363
364 if self.receiver:
365 self.receiver.close()
537 366
538 367 return MPClass No newline at end of file
@@ -1,966 +1,964
1 1 import itertools
2 2
3 3 import numpy
4 4
5 5 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
6 6 from schainpy.model.data.jrodata import Spectra
7 7 from schainpy.model.data.jrodata import hildebrand_sekhon
8 8 from schainpy.utils import log
9 9
10 10 @MPDecorator
11 11 class SpectraProc(ProcessingUnit):
12 12
13 13 METHODS = {'selectHeights' : ['minHei', 'maxHei'],
14 14 'selectChannels' : 'channelList',
15 15 'selectChannelsByIndex': 'channelIndexList',
16 16 'getBeaconSignal' : ['tauindex', 'channelindex', 'hei_ref'],
17 17 'selectHeightsByIndex' : ['minIndex', 'maxIndex']
18 18 }
19 19
20 20 def __init__(self):#, **kwargs):
21 21
22 22 ProcessingUnit.__init__(self)#, **kwargs)
23 23
24 24 self.buffer = None
25 25 self.firstdatatime = None
26 26 self.profIndex = 0
27 27 self.dataOut = Spectra()
28 28 self.id_min = None
29 29 self.id_max = None
30 30 self.setupReq = False #Agregar a todas las unidades de proc
31 31
32 32 def __updateSpecFromVoltage(self):
33 33
34 34 self.dataOut.timeZone = self.dataIn.timeZone
35 35 self.dataOut.dstFlag = self.dataIn.dstFlag
36 36 self.dataOut.errorCount = self.dataIn.errorCount
37 37 self.dataOut.useLocalTime = self.dataIn.useLocalTime
38 38 try:
39 39 self.dataOut.processingHeaderObj = self.dataIn.processingHeaderObj.copy()
40 40 except:
41 41 pass
42 42 self.dataOut.radarControllerHeaderObj = self.dataIn.radarControllerHeaderObj.copy()
43 43 self.dataOut.systemHeaderObj = self.dataIn.systemHeaderObj.copy()
44 44 self.dataOut.channelList = self.dataIn.channelList
45 45 self.dataOut.heightList = self.dataIn.heightList
46 46 self.dataOut.dtype = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
47 47
48 48 self.dataOut.nBaud = self.dataIn.nBaud
49 49 self.dataOut.nCode = self.dataIn.nCode
50 50 self.dataOut.code = self.dataIn.code
51 51 self.dataOut.nProfiles = self.dataOut.nFFTPoints
52 52
53 53 self.dataOut.flagDiscontinuousBlock = self.dataIn.flagDiscontinuousBlock
54 54 self.dataOut.utctime = self.firstdatatime
55 55 # asumo q la data esta decodificada
56 56 self.dataOut.flagDecodeData = self.dataIn.flagDecodeData
57 57 # asumo q la data esta sin flip
58 58 self.dataOut.flagDeflipData = self.dataIn.flagDeflipData
59 59 self.dataOut.flagShiftFFT = False
60 60
61 61 self.dataOut.nCohInt = self.dataIn.nCohInt
62 62 self.dataOut.nIncohInt = 1
63 63
64 64 self.dataOut.windowOfFilter = self.dataIn.windowOfFilter
65 65
66 66 self.dataOut.frequency = self.dataIn.frequency
67 67 self.dataOut.realtime = self.dataIn.realtime
68 68
69 69 self.dataOut.azimuth = self.dataIn.azimuth
70 70 self.dataOut.zenith = self.dataIn.zenith
71 71
72 72 self.dataOut.beam.codeList = self.dataIn.beam.codeList
73 73 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
74 74 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
75 75
76 76 def __getFft(self):
77 77 """
78 78 Convierte valores de Voltaje a Spectra
79 79
80 80 Affected:
81 81 self.dataOut.data_spc
82 82 self.dataOut.data_cspc
83 83 self.dataOut.data_dc
84 84 self.dataOut.heightList
85 85 self.profIndex
86 86 self.buffer
87 87 self.dataOut.flagNoData
88 88 """
89 89 fft_volt = numpy.fft.fft(
90 90 self.buffer, n=self.dataOut.nFFTPoints, axis=1)
91 91 fft_volt = fft_volt.astype(numpy.dtype('complex'))
92 92 dc = fft_volt[:, 0, :]
93 93
94 94 # calculo de self-spectra
95 95 fft_volt = numpy.fft.fftshift(fft_volt, axes=(1,))
96 96 spc = fft_volt * numpy.conjugate(fft_volt)
97 97 spc = spc.real
98 98
99 99 blocksize = 0
100 100 blocksize += dc.size
101 101 blocksize += spc.size
102 102
103 103 cspc = None
104 104 pairIndex = 0
105 105 if self.dataOut.pairsList != None:
106 106 # calculo de cross-spectra
107 107 cspc = numpy.zeros(
108 108 (self.dataOut.nPairs, self.dataOut.nFFTPoints, self.dataOut.nHeights), dtype='complex')
109 109 for pair in self.dataOut.pairsList:
110 110 if pair[0] not in self.dataOut.channelList:
111 111 raise ValueError("Error getting CrossSpectra: pair 0 of %s is not in channelList = %s" % (
112 112 str(pair), str(self.dataOut.channelList)))
113 113 if pair[1] not in self.dataOut.channelList:
114 114 raise ValueError("Error getting CrossSpectra: pair 1 of %s is not in channelList = %s" % (
115 115 str(pair), str(self.dataOut.channelList)))
116 116
117 117 cspc[pairIndex, :, :] = fft_volt[pair[0], :, :] * \
118 118 numpy.conjugate(fft_volt[pair[1], :, :])
119 119 pairIndex += 1
120 120 blocksize += cspc.size
121 121
122 122 self.dataOut.data_spc = spc
123 123 self.dataOut.data_cspc = cspc
124 124 self.dataOut.data_dc = dc
125 125 self.dataOut.blockSize = blocksize
126 126 self.dataOut.flagShiftFFT = True
127 127
128 128 def run(self, nProfiles=None, nFFTPoints=None, pairsList=[], ippFactor=None, shift_fft=False):
129 129
130 self.dataOut.flagNoData = True
131
132 130 if self.dataIn.type == "Spectra":
133 131 self.dataOut.copy(self.dataIn)
134 132 # if not pairsList:
135 133 # pairsList = itertools.combinations(self.dataOut.channelList, 2)
136 134 # if self.dataOut.data_cspc is not None:
137 135 # self.__selectPairs(pairsList)
138 136 if shift_fft:
139 137 #desplaza a la derecha en el eje 2 determinadas posiciones
140 138 shift = int(self.dataOut.nFFTPoints/2)
141 139 self.dataOut.data_spc = numpy.roll(self.dataOut.data_spc, shift , axis=1)
142 140
143 141 if self.dataOut.data_cspc is not None:
144 142 #desplaza a la derecha en el eje 2 determinadas posiciones
145 143 self.dataOut.data_cspc = numpy.roll(self.dataOut.data_cspc, shift, axis=1)
146 144
147 145 return True
148 146
149 147 if self.dataIn.type == "Voltage":
150 148
151 149 if nFFTPoints == None:
152 150 raise ValueError("This SpectraProc.run() need nFFTPoints input variable")
153 151
154 152 if nProfiles == None:
155 153 nProfiles = nFFTPoints
156 154
157 155 if ippFactor == None:
158 156 ippFactor = 1
159 157
160 158 self.dataOut.ippFactor = ippFactor
161 159
162 160 self.dataOut.nFFTPoints = nFFTPoints
163 161 self.dataOut.pairsList = pairsList
164 162
165 163 if self.buffer is None:
166 164 self.buffer = numpy.zeros((self.dataIn.nChannels,
167 165 nProfiles,
168 166 self.dataIn.nHeights),
169 167 dtype='complex')
170 168
171 169 if self.dataIn.flagDataAsBlock:
172 170 # data dimension: [nChannels, nProfiles, nSamples]
173 171 nVoltProfiles = self.dataIn.data.shape[1]
174 172 # nVoltProfiles = self.dataIn.nProfiles
175 173
176 174 if nVoltProfiles == nProfiles:
177 175 self.buffer = self.dataIn.data.copy()
178 176 self.profIndex = nVoltProfiles
179 177
180 178 elif nVoltProfiles < nProfiles:
181 179
182 180 if self.profIndex == 0:
183 181 self.id_min = 0
184 182 self.id_max = nVoltProfiles
185 183
186 184 self.buffer[:, self.id_min:self.id_max,
187 185 :] = self.dataIn.data
188 186 self.profIndex += nVoltProfiles
189 187 self.id_min += nVoltProfiles
190 188 self.id_max += nVoltProfiles
191 189 else:
192 190 raise ValueError("The type object %s has %d profiles, it should just has %d profiles" % (
193 191 self.dataIn.type, self.dataIn.data.shape[1], nProfiles))
194 192 self.dataOut.flagNoData = True
195 193 return 0
196 194 else:
197 195 self.buffer[:, self.profIndex, :] = self.dataIn.data.copy()
198 196 self.profIndex += 1
199 197
200 198 if self.firstdatatime == None:
201 199 self.firstdatatime = self.dataIn.utctime
202 200
203 201 if self.profIndex == nProfiles:
204 202 self.__updateSpecFromVoltage()
205 203 self.__getFft()
206 204
207 205 self.dataOut.flagNoData = False
208 206 self.firstdatatime = None
209 207 self.profIndex = 0
210 208
211 209 return True
212 210
213 211 raise ValueError("The type of input object '%s' is not valid" % (
214 212 self.dataIn.type))
215 213
216 214 def __selectPairs(self, pairsList):
217 215
218 216 if not pairsList:
219 217 return
220 218
221 219 pairs = []
222 220 pairsIndex = []
223 221
224 222 for pair in pairsList:
225 223 if pair[0] not in self.dataOut.channelList or pair[1] not in self.dataOut.channelList:
226 224 continue
227 225 pairs.append(pair)
228 226 pairsIndex.append(pairs.index(pair))
229 227
230 228 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndex]
231 229 self.dataOut.pairsList = pairs
232 230
233 231 return
234 232
235 233 def __selectPairsByChannel(self, channelList=None):
236 234
237 235 if channelList == None:
238 236 return
239 237
240 238 pairsIndexListSelected = []
241 239 for pairIndex in self.dataOut.pairsIndexList:
242 240 # First pair
243 241 if self.dataOut.pairsList[pairIndex][0] not in channelList:
244 242 continue
245 243 # Second pair
246 244 if self.dataOut.pairsList[pairIndex][1] not in channelList:
247 245 continue
248 246
249 247 pairsIndexListSelected.append(pairIndex)
250 248
251 249 if not pairsIndexListSelected:
252 250 self.dataOut.data_cspc = None
253 251 self.dataOut.pairsList = []
254 252 return
255 253
256 254 self.dataOut.data_cspc = self.dataOut.data_cspc[pairsIndexListSelected]
257 255 self.dataOut.pairsList = [self.dataOut.pairsList[i]
258 256 for i in pairsIndexListSelected]
259 257
260 258 return
261 259
262 260 def selectChannels(self, channelList):
263 261
264 262 channelIndexList = []
265 263
266 264 for channel in channelList:
267 265 if channel not in self.dataOut.channelList:
268 266 raise ValueError("Error selecting channels, Channel %d is not valid.\nAvailable channels = %s" % (
269 267 channel, str(self.dataOut.channelList)))
270 268
271 269 index = self.dataOut.channelList.index(channel)
272 270 channelIndexList.append(index)
273 271
274 272 self.selectChannelsByIndex(channelIndexList)
275 273
276 274 def selectChannelsByIndex(self, channelIndexList):
277 275 """
278 276 Selecciona un bloque de datos en base a canales segun el channelIndexList
279 277
280 278 Input:
281 279 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
282 280
283 281 Affected:
284 282 self.dataOut.data_spc
285 283 self.dataOut.channelIndexList
286 284 self.dataOut.nChannels
287 285
288 286 Return:
289 287 None
290 288 """
291 289
292 290 for channelIndex in channelIndexList:
293 291 if channelIndex not in self.dataOut.channelIndexList:
294 292 raise ValueError("Error selecting channels: The value %d in channelIndexList is not valid.\nAvailable channel indexes = " % (
295 293 channelIndex, self.dataOut.channelIndexList))
296 294
297 295 # nChannels = len(channelIndexList)
298 296
299 297 data_spc = self.dataOut.data_spc[channelIndexList, :]
300 298 data_dc = self.dataOut.data_dc[channelIndexList, :]
301 299
302 300 self.dataOut.data_spc = data_spc
303 301 self.dataOut.data_dc = data_dc
304 302
305 303 self.dataOut.channelList = [
306 304 self.dataOut.channelList[i] for i in channelIndexList]
307 305 # self.dataOut.nChannels = nChannels
308 306
309 307 self.__selectPairsByChannel(self.dataOut.channelList)
310 308
311 309 return 1
312 310
313 311 def selectHeights(self, minHei, maxHei):
314 312 """
315 313 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
316 314 minHei <= height <= maxHei
317 315
318 316 Input:
319 317 minHei : valor minimo de altura a considerar
320 318 maxHei : valor maximo de altura a considerar
321 319
322 320 Affected:
323 321 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
324 322
325 323 Return:
326 324 1 si el metodo se ejecuto con exito caso contrario devuelve 0
327 325 """
328 326
329 327 if (minHei > maxHei):
330 328 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (
331 329 minHei, maxHei))
332 330
333 331 if (minHei < self.dataOut.heightList[0]):
334 332 minHei = self.dataOut.heightList[0]
335 333
336 334 if (maxHei > self.dataOut.heightList[-1]):
337 335 maxHei = self.dataOut.heightList[-1]
338 336
339 337 minIndex = 0
340 338 maxIndex = 0
341 339 heights = self.dataOut.heightList
342 340
343 341 inda = numpy.where(heights >= minHei)
344 342 indb = numpy.where(heights <= maxHei)
345 343
346 344 try:
347 345 minIndex = inda[0][0]
348 346 except:
349 347 minIndex = 0
350 348
351 349 try:
352 350 maxIndex = indb[0][-1]
353 351 except:
354 352 maxIndex = len(heights)
355 353
356 354 self.selectHeightsByIndex(minIndex, maxIndex)
357 355
358 356 return 1
359 357
360 358 def getBeaconSignal(self, tauindex=0, channelindex=0, hei_ref=None):
361 359 newheis = numpy.where(
362 360 self.dataOut.heightList > self.dataOut.radarControllerHeaderObj.Taus[tauindex])
363 361
364 362 if hei_ref != None:
365 363 newheis = numpy.where(self.dataOut.heightList > hei_ref)
366 364
367 365 minIndex = min(newheis[0])
368 366 maxIndex = max(newheis[0])
369 367 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
370 368 heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
371 369
372 370 # determina indices
373 371 nheis = int(self.dataOut.radarControllerHeaderObj.txB /
374 372 (self.dataOut.heightList[1] - self.dataOut.heightList[0]))
375 373 avg_dB = 10 * \
376 374 numpy.log10(numpy.sum(data_spc[channelindex, :, :], axis=0))
377 375 beacon_dB = numpy.sort(avg_dB)[-nheis:]
378 376 beacon_heiIndexList = []
379 377 for val in avg_dB.tolist():
380 378 if val >= beacon_dB[0]:
381 379 beacon_heiIndexList.append(avg_dB.tolist().index(val))
382 380
383 381 #data_spc = data_spc[:,:,beacon_heiIndexList]
384 382 data_cspc = None
385 383 if self.dataOut.data_cspc is not None:
386 384 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
387 385 #data_cspc = data_cspc[:,:,beacon_heiIndexList]
388 386
389 387 data_dc = None
390 388 if self.dataOut.data_dc is not None:
391 389 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
392 390 #data_dc = data_dc[:,beacon_heiIndexList]
393 391
394 392 self.dataOut.data_spc = data_spc
395 393 self.dataOut.data_cspc = data_cspc
396 394 self.dataOut.data_dc = data_dc
397 395 self.dataOut.heightList = heightList
398 396 self.dataOut.beacon_heiIndexList = beacon_heiIndexList
399 397
400 398 return 1
401 399
402 400 def selectHeightsByIndex(self, minIndex, maxIndex):
403 401 """
404 402 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
405 403 minIndex <= index <= maxIndex
406 404
407 405 Input:
408 406 minIndex : valor de indice minimo de altura a considerar
409 407 maxIndex : valor de indice maximo de altura a considerar
410 408
411 409 Affected:
412 410 self.dataOut.data_spc
413 411 self.dataOut.data_cspc
414 412 self.dataOut.data_dc
415 413 self.dataOut.heightList
416 414
417 415 Return:
418 416 1 si el metodo se ejecuto con exito caso contrario devuelve 0
419 417 """
420 418
421 419 if (minIndex < 0) or (minIndex > maxIndex):
422 420 raise ValueError("Error selecting heights: Index range (%d,%d) is not valid" % (
423 421 minIndex, maxIndex))
424 422
425 423 if (maxIndex >= self.dataOut.nHeights):
426 424 maxIndex = self.dataOut.nHeights - 1
427 425
428 426 # Spectra
429 427 data_spc = self.dataOut.data_spc[:, :, minIndex:maxIndex + 1]
430 428
431 429 data_cspc = None
432 430 if self.dataOut.data_cspc is not None:
433 431 data_cspc = self.dataOut.data_cspc[:, :, minIndex:maxIndex + 1]
434 432
435 433 data_dc = None
436 434 if self.dataOut.data_dc is not None:
437 435 data_dc = self.dataOut.data_dc[:, minIndex:maxIndex + 1]
438 436
439 437 self.dataOut.data_spc = data_spc
440 438 self.dataOut.data_cspc = data_cspc
441 439 self.dataOut.data_dc = data_dc
442 440
443 441 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex + 1]
444 442
445 443 return 1
446 444
447 445 def removeDC(self, mode=2):
448 446 jspectra = self.dataOut.data_spc
449 447 jcspectra = self.dataOut.data_cspc
450 448
451 449 num_chan = jspectra.shape[0]
452 450 num_hei = jspectra.shape[2]
453 451
454 452 if jcspectra is not None:
455 453 jcspectraExist = True
456 454 num_pairs = jcspectra.shape[0]
457 455 else:
458 456 jcspectraExist = False
459 457
460 458 freq_dc = int(jspectra.shape[1] / 2)
461 459 ind_vel = numpy.array([-2, -1, 1, 2]) + freq_dc
462 460 ind_vel = ind_vel.astype(int)
463 461
464 462 if ind_vel[0] < 0:
465 463 ind_vel[list(range(0, 1))] = ind_vel[list(range(0, 1))] + self.num_prof
466 464
467 465 if mode == 1:
468 466 jspectra[:, freq_dc, :] = (
469 467 jspectra[:, ind_vel[1], :] + jspectra[:, ind_vel[2], :]) / 2 # CORRECCION
470 468
471 469 if jcspectraExist:
472 470 jcspectra[:, freq_dc, :] = (
473 471 jcspectra[:, ind_vel[1], :] + jcspectra[:, ind_vel[2], :]) / 2
474 472
475 473 if mode == 2:
476 474
477 475 vel = numpy.array([-2, -1, 1, 2])
478 476 xx = numpy.zeros([4, 4])
479 477
480 478 for fil in range(4):
481 479 xx[fil, :] = vel[fil]**numpy.asarray(list(range(4)))
482 480
483 481 xx_inv = numpy.linalg.inv(xx)
484 482 xx_aux = xx_inv[0, :]
485 483
486 484 for ich in range(num_chan):
487 485 yy = jspectra[ich, ind_vel, :]
488 486 jspectra[ich, freq_dc, :] = numpy.dot(xx_aux, yy)
489 487
490 488 junkid = jspectra[ich, freq_dc, :] <= 0
491 489 cjunkid = sum(junkid)
492 490
493 491 if cjunkid.any():
494 492 jspectra[ich, freq_dc, junkid.nonzero()] = (
495 493 jspectra[ich, ind_vel[1], junkid] + jspectra[ich, ind_vel[2], junkid]) / 2
496 494
497 495 if jcspectraExist:
498 496 for ip in range(num_pairs):
499 497 yy = jcspectra[ip, ind_vel, :]
500 498 jcspectra[ip, freq_dc, :] = numpy.dot(xx_aux, yy)
501 499
502 500 self.dataOut.data_spc = jspectra
503 501 self.dataOut.data_cspc = jcspectra
504 502
505 503 return 1
506 504
507 505 def removeInterference(self, interf=2, hei_interf=None, nhei_interf=None, offhei_interf=None):
508 506
509 507 jspectra = self.dataOut.data_spc
510 508 jcspectra = self.dataOut.data_cspc
511 509 jnoise = self.dataOut.getNoise()
512 510 num_incoh = self.dataOut.nIncohInt
513 511
514 512 num_channel = jspectra.shape[0]
515 513 num_prof = jspectra.shape[1]
516 514 num_hei = jspectra.shape[2]
517 515
518 516 # hei_interf
519 517 if hei_interf is None:
520 518 count_hei = num_hei / 2 # Como es entero no importa
521 519 hei_interf = numpy.asmatrix(list(range(count_hei))) + num_hei - count_hei
522 520 hei_interf = numpy.asarray(hei_interf)[0]
523 521 # nhei_interf
524 522 if (nhei_interf == None):
525 523 nhei_interf = 5
526 524 if (nhei_interf < 1):
527 525 nhei_interf = 1
528 526 if (nhei_interf > count_hei):
529 527 nhei_interf = count_hei
530 528 if (offhei_interf == None):
531 529 offhei_interf = 0
532 530
533 531 ind_hei = list(range(num_hei))
534 532 # mask_prof = numpy.asarray(range(num_prof - 2)) + 1
535 533 # mask_prof[range(num_prof/2 - 1,len(mask_prof))] += 1
536 534 mask_prof = numpy.asarray(list(range(num_prof)))
537 535 num_mask_prof = mask_prof.size
538 536 comp_mask_prof = [0, num_prof / 2]
539 537
540 538 # noise_exist: Determina si la variable jnoise ha sido definida y contiene la informacion del ruido de cada canal
541 539 if (jnoise.size < num_channel or numpy.isnan(jnoise).any()):
542 540 jnoise = numpy.nan
543 541 noise_exist = jnoise[0] < numpy.Inf
544 542
545 543 # Subrutina de Remocion de la Interferencia
546 544 for ich in range(num_channel):
547 545 # Se ordena los espectros segun su potencia (menor a mayor)
548 546 power = jspectra[ich, mask_prof, :]
549 547 power = power[:, hei_interf]
550 548 power = power.sum(axis=0)
551 549 psort = power.ravel().argsort()
552 550
553 551 # Se estima la interferencia promedio en los Espectros de Potencia empleando
554 552 junkspc_interf = jspectra[ich, :, hei_interf[psort[list(range(
555 553 offhei_interf, nhei_interf + offhei_interf))]]]
556 554
557 555 if noise_exist:
558 556 # tmp_noise = jnoise[ich] / num_prof
559 557 tmp_noise = jnoise[ich]
560 558 junkspc_interf = junkspc_interf - tmp_noise
561 559 #junkspc_interf[:,comp_mask_prof] = 0
562 560
563 561 jspc_interf = junkspc_interf.sum(axis=0) / nhei_interf
564 562 jspc_interf = jspc_interf.transpose()
565 563 # Calculando el espectro de interferencia promedio
566 564 noiseid = numpy.where(
567 565 jspc_interf <= tmp_noise / numpy.sqrt(num_incoh))
568 566 noiseid = noiseid[0]
569 567 cnoiseid = noiseid.size
570 568 interfid = numpy.where(
571 569 jspc_interf > tmp_noise / numpy.sqrt(num_incoh))
572 570 interfid = interfid[0]
573 571 cinterfid = interfid.size
574 572
575 573 if (cnoiseid > 0):
576 574 jspc_interf[noiseid] = 0
577 575
578 576 # Expandiendo los perfiles a limpiar
579 577 if (cinterfid > 0):
580 578 new_interfid = (
581 579 numpy.r_[interfid - 1, interfid, interfid + 1] + num_prof) % num_prof
582 580 new_interfid = numpy.asarray(new_interfid)
583 581 new_interfid = {x for x in new_interfid}
584 582 new_interfid = numpy.array(list(new_interfid))
585 583 new_cinterfid = new_interfid.size
586 584 else:
587 585 new_cinterfid = 0
588 586
589 587 for ip in range(new_cinterfid):
590 588 ind = junkspc_interf[:, new_interfid[ip]].ravel().argsort()
591 589 jspc_interf[new_interfid[ip]
592 590 ] = junkspc_interf[ind[nhei_interf / 2], new_interfid[ip]]
593 591
594 592 jspectra[ich, :, ind_hei] = jspectra[ich, :,
595 593 ind_hei] - jspc_interf # Corregir indices
596 594
597 595 # Removiendo la interferencia del punto de mayor interferencia
598 596 ListAux = jspc_interf[mask_prof].tolist()
599 597 maxid = ListAux.index(max(ListAux))
600 598
601 599 if cinterfid > 0:
602 600 for ip in range(cinterfid * (interf == 2) - 1):
603 601 ind = (jspectra[ich, interfid[ip], :] < tmp_noise *
604 602 (1 + 1 / numpy.sqrt(num_incoh))).nonzero()
605 603 cind = len(ind)
606 604
607 605 if (cind > 0):
608 606 jspectra[ich, interfid[ip], ind] = tmp_noise * \
609 607 (1 + (numpy.random.uniform(cind) - 0.5) /
610 608 numpy.sqrt(num_incoh))
611 609
612 610 ind = numpy.array([-2, -1, 1, 2])
613 611 xx = numpy.zeros([4, 4])
614 612
615 613 for id1 in range(4):
616 614 xx[:, id1] = ind[id1]**numpy.asarray(list(range(4)))
617 615
618 616 xx_inv = numpy.linalg.inv(xx)
619 617 xx = xx_inv[:, 0]
620 618 ind = (ind + maxid + num_mask_prof) % num_mask_prof
621 619 yy = jspectra[ich, mask_prof[ind], :]
622 620 jspectra[ich, mask_prof[maxid], :] = numpy.dot(
623 621 yy.transpose(), xx)
624 622
625 623 indAux = (jspectra[ich, :, :] < tmp_noise *
626 624 (1 - 1 / numpy.sqrt(num_incoh))).nonzero()
627 625 jspectra[ich, indAux[0], indAux[1]] = tmp_noise * \
628 626 (1 - 1 / numpy.sqrt(num_incoh))
629 627
630 628 # Remocion de Interferencia en el Cross Spectra
631 629 if jcspectra is None:
632 630 return jspectra, jcspectra
633 631 num_pairs = jcspectra.size / (num_prof * num_hei)
634 632 jcspectra = jcspectra.reshape(num_pairs, num_prof, num_hei)
635 633
636 634 for ip in range(num_pairs):
637 635
638 636 #-------------------------------------------
639 637
640 638 cspower = numpy.abs(jcspectra[ip, mask_prof, :])
641 639 cspower = cspower[:, hei_interf]
642 640 cspower = cspower.sum(axis=0)
643 641
644 642 cspsort = cspower.ravel().argsort()
645 643 junkcspc_interf = jcspectra[ip, :, hei_interf[cspsort[list(range(
646 644 offhei_interf, nhei_interf + offhei_interf))]]]
647 645 junkcspc_interf = junkcspc_interf.transpose()
648 646 jcspc_interf = junkcspc_interf.sum(axis=1) / nhei_interf
649 647
650 648 ind = numpy.abs(jcspc_interf[mask_prof]).ravel().argsort()
651 649
652 650 median_real = numpy.median(numpy.real(
653 651 junkcspc_interf[mask_prof[ind[list(range(3 * num_prof / 4))]], :]))
654 652 median_imag = numpy.median(numpy.imag(
655 653 junkcspc_interf[mask_prof[ind[list(range(3 * num_prof / 4))]], :]))
656 654 junkcspc_interf[comp_mask_prof, :] = numpy.complex(
657 655 median_real, median_imag)
658 656
659 657 for iprof in range(num_prof):
660 658 ind = numpy.abs(junkcspc_interf[iprof, :]).ravel().argsort()
661 659 jcspc_interf[iprof] = junkcspc_interf[iprof,
662 660 ind[nhei_interf / 2]]
663 661
664 662 # Removiendo la Interferencia
665 663 jcspectra[ip, :, ind_hei] = jcspectra[ip,
666 664 :, ind_hei] - jcspc_interf
667 665
668 666 ListAux = numpy.abs(jcspc_interf[mask_prof]).tolist()
669 667 maxid = ListAux.index(max(ListAux))
670 668
671 669 ind = numpy.array([-2, -1, 1, 2])
672 670 xx = numpy.zeros([4, 4])
673 671
674 672 for id1 in range(4):
675 673 xx[:, id1] = ind[id1]**numpy.asarray(list(range(4)))
676 674
677 675 xx_inv = numpy.linalg.inv(xx)
678 676 xx = xx_inv[:, 0]
679 677
680 678 ind = (ind + maxid + num_mask_prof) % num_mask_prof
681 679 yy = jcspectra[ip, mask_prof[ind], :]
682 680 jcspectra[ip, mask_prof[maxid], :] = numpy.dot(yy.transpose(), xx)
683 681
684 682 # Guardar Resultados
685 683 self.dataOut.data_spc = jspectra
686 684 self.dataOut.data_cspc = jcspectra
687 685
688 686 return 1
689 687
690 688 def setRadarFrequency(self, frequency=None):
691 689
692 690 if frequency != None:
693 691 self.dataOut.frequency = frequency
694 692
695 693 return 1
696 694
697 695 def getNoise(self, minHei=None, maxHei=None, minVel=None, maxVel=None):
698 696 # validacion de rango
699 697 if minHei == None:
700 698 minHei = self.dataOut.heightList[0]
701 699
702 700 if maxHei == None:
703 701 maxHei = self.dataOut.heightList[-1]
704 702
705 703 if (minHei < self.dataOut.heightList[0]) or (minHei > maxHei):
706 704 print('minHei: %.2f is out of the heights range' % (minHei))
707 705 print('minHei is setting to %.2f' % (self.dataOut.heightList[0]))
708 706 minHei = self.dataOut.heightList[0]
709 707
710 708 if (maxHei > self.dataOut.heightList[-1]) or (maxHei < minHei):
711 709 print('maxHei: %.2f is out of the heights range' % (maxHei))
712 710 print('maxHei is setting to %.2f' % (self.dataOut.heightList[-1]))
713 711 maxHei = self.dataOut.heightList[-1]
714 712
715 713 # validacion de velocidades
716 714 velrange = self.dataOut.getVelRange(1)
717 715
718 716 if minVel == None:
719 717 minVel = velrange[0]
720 718
721 719 if maxVel == None:
722 720 maxVel = velrange[-1]
723 721
724 722 if (minVel < velrange[0]) or (minVel > maxVel):
725 723 print('minVel: %.2f is out of the velocity range' % (minVel))
726 724 print('minVel is setting to %.2f' % (velrange[0]))
727 725 minVel = velrange[0]
728 726
729 727 if (maxVel > velrange[-1]) or (maxVel < minVel):
730 728 print('maxVel: %.2f is out of the velocity range' % (maxVel))
731 729 print('maxVel is setting to %.2f' % (velrange[-1]))
732 730 maxVel = velrange[-1]
733 731
734 732 # seleccion de indices para rango
735 733 minIndex = 0
736 734 maxIndex = 0
737 735 heights = self.dataOut.heightList
738 736
739 737 inda = numpy.where(heights >= minHei)
740 738 indb = numpy.where(heights <= maxHei)
741 739
742 740 try:
743 741 minIndex = inda[0][0]
744 742 except:
745 743 minIndex = 0
746 744
747 745 try:
748 746 maxIndex = indb[0][-1]
749 747 except:
750 748 maxIndex = len(heights)
751 749
752 750 if (minIndex < 0) or (minIndex > maxIndex):
753 751 raise ValueError("some value in (%d,%d) is not valid" % (
754 752 minIndex, maxIndex))
755 753
756 754 if (maxIndex >= self.dataOut.nHeights):
757 755 maxIndex = self.dataOut.nHeights - 1
758 756
759 757 # seleccion de indices para velocidades
760 758 indminvel = numpy.where(velrange >= minVel)
761 759 indmaxvel = numpy.where(velrange <= maxVel)
762 760 try:
763 761 minIndexVel = indminvel[0][0]
764 762 except:
765 763 minIndexVel = 0
766 764
767 765 try:
768 766 maxIndexVel = indmaxvel[0][-1]
769 767 except:
770 768 maxIndexVel = len(velrange)
771 769
772 770 # seleccion del espectro
773 771 data_spc = self.dataOut.data_spc[:,
774 772 minIndexVel:maxIndexVel + 1, minIndex:maxIndex + 1]
775 773 # estimacion de ruido
776 774 noise = numpy.zeros(self.dataOut.nChannels)
777 775
778 776 for channel in range(self.dataOut.nChannels):
779 777 daux = data_spc[channel, :, :]
780 778 noise[channel] = hildebrand_sekhon(daux, self.dataOut.nIncohInt)
781 779
782 780 self.dataOut.noise_estimation = noise.copy()
783 781
784 782 return 1
785 783
786 @MPDecorator
784
787 785 class IncohInt(Operation):
788 786
789 787 __profIndex = 0
790 788 __withOverapping = False
791 789
792 790 __byTime = False
793 791 __initime = None
794 792 __lastdatatime = None
795 793 __integrationtime = None
796 794
797 795 __buffer_spc = None
798 796 __buffer_cspc = None
799 797 __buffer_dc = None
800 798
801 799 __dataReady = False
802 800
803 801 __timeInterval = None
804 802
805 803 n = None
806 804
807 805 def __init__(self):#, **kwargs):
808 806
809 807 Operation.__init__(self)#, **kwargs)
810 808
811 809
812 810 # self.isConfig = False
813 811
814 812 def setup(self, n=None, timeInterval=None, overlapping=False):
815 813 """
816 814 Set the parameters of the integration class.
817 815
818 816 Inputs:
819 817
820 818 n : Number of coherent integrations
821 819 timeInterval : Time of integration. If the parameter "n" is selected this one does not work
822 820 overlapping :
823 821
824 822 """
825 823
826 824 self.__initime = None
827 825 self.__lastdatatime = 0
828 826
829 827 self.__buffer_spc = 0
830 828 self.__buffer_cspc = 0
831 829 self.__buffer_dc = 0
832 830
833 831 self.__profIndex = 0
834 832 self.__dataReady = False
835 833 self.__byTime = False
836 834
837 835 if n is None and timeInterval is None:
838 836 raise ValueError("n or timeInterval should be specified ...")
839 837
840 838 if n is not None:
841 839 self.n = int(n)
842 840 else:
843 841 # if (type(timeInterval)!=integer) -> change this line
844 842 self.__integrationtime = int(timeInterval)
845 843 self.n = None
846 844 self.__byTime = True
847 845
848 846 def putData(self, data_spc, data_cspc, data_dc):
849 847 """
850 848 Add a profile to the __buffer_spc and increase in one the __profileIndex
851 849
852 850 """
853 851
854 852 self.__buffer_spc += data_spc
855 853
856 854 if data_cspc is None:
857 855 self.__buffer_cspc = None
858 856 else:
859 857 self.__buffer_cspc += data_cspc
860 858
861 859 if data_dc is None:
862 860 self.__buffer_dc = None
863 861 else:
864 862 self.__buffer_dc += data_dc
865 863
866 864 self.__profIndex += 1
867 865
868 866 return
869 867
870 868 def pushData(self):
871 869 """
872 870 Return the sum of the last profiles and the profiles used in the sum.
873 871
874 872 Affected:
875 873
876 874 self.__profileIndex
877 875
878 876 """
879 877
880 878 data_spc = self.__buffer_spc
881 879 data_cspc = self.__buffer_cspc
882 880 data_dc = self.__buffer_dc
883 881 n = self.__profIndex
884 882
885 883 self.__buffer_spc = 0
886 884 self.__buffer_cspc = 0
887 885 self.__buffer_dc = 0
888 886 self.__profIndex = 0
889 887
890 888 return data_spc, data_cspc, data_dc, n
891 889
892 890 def byProfiles(self, *args):
893 891
894 892 self.__dataReady = False
895 893 avgdata_spc = None
896 894 avgdata_cspc = None
897 895 avgdata_dc = None
898 896
899 897 self.putData(*args)
900 898
901 899 if self.__profIndex == self.n:
902 900
903 901 avgdata_spc, avgdata_cspc, avgdata_dc, n = self.pushData()
904 902 self.n = n
905 903 self.__dataReady = True
906 904
907 905 return avgdata_spc, avgdata_cspc, avgdata_dc
908 906
909 907 def byTime(self, datatime, *args):
910 908
911 909 self.__dataReady = False
912 910 avgdata_spc = None
913 911 avgdata_cspc = None
914 912 avgdata_dc = None
915 913
916 914 self.putData(*args)
917 915
918 916 if (datatime - self.__initime) >= self.__integrationtime:
919 917 avgdata_spc, avgdata_cspc, avgdata_dc, n = self.pushData()
920 918 self.n = n
921 919 self.__dataReady = True
922 920
923 921 return avgdata_spc, avgdata_cspc, avgdata_dc
924 922
925 923 def integrate(self, datatime, *args):
926 924
927 925 if self.__profIndex == 0:
928 926 self.__initime = datatime
929 927
930 928 if self.__byTime:
931 929 avgdata_spc, avgdata_cspc, avgdata_dc = self.byTime(
932 930 datatime, *args)
933 931 else:
934 932 avgdata_spc, avgdata_cspc, avgdata_dc = self.byProfiles(*args)
935 933
936 934 if not self.__dataReady:
937 935 return None, None, None, None
938 936
939 937 return self.__initime, avgdata_spc, avgdata_cspc, avgdata_dc
940 938
941 939 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
942 940 if n == 1:
943 941 return
944 942
945 943 dataOut.flagNoData = True
946 944
947 945 if not self.isConfig:
948 946 self.setup(n, timeInterval, overlapping)
949 947 self.isConfig = True
950 948
951 949 avgdatatime, avgdata_spc, avgdata_cspc, avgdata_dc = self.integrate(dataOut.utctime,
952 950 dataOut.data_spc,
953 951 dataOut.data_cspc,
954 952 dataOut.data_dc)
955 953
956 954 if self.__dataReady:
957 955
958 956 dataOut.data_spc = avgdata_spc
959 957 dataOut.data_cspc = avgdata_cspc
960 958 dataOut.data_dc = avgdata_dc
961 959
962 960 dataOut.nIncohInt *= self.n
963 961 dataOut.utctime = avgdatatime
964 962 dataOut.flagNoData = False
965 963
966 964 return dataOut No newline at end of file
@@ -1,1333 +1,1330
1 1 import sys
2 2 import numpy
3 3 from scipy import interpolate
4 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator, Operation
4 from schainpy.model.proc.jroproc_base import ProcessingUnit,, Operation
5 5 from schainpy.model.data.jrodata import Voltage
6 6 from schainpy.utils import log
7 7 from time import time
8 8
9 9
10 10 @MPDecorator
11 11 class VoltageProc(ProcessingUnit):
12 12
13 METHODS = {} #yong
13 def __init__(self):
14 14
15 def __init__(self):#, **kwargs): #yong
15 ProcessingUnit.__init__(self)
16 16
17 ProcessingUnit.__init__(self)#, **kwargs)
18
19 # self.objectDict = {}
20 17 self.dataOut = Voltage()
21 18 self.flip = 1
22 self.setupReq = False #yong
19 self.setupReq = False
23 20
24 21 def run(self):
25 22
26 23 if self.dataIn.type == 'AMISR':
27 24 self.__updateObjFromAmisrInput()
28 25
29 26 if self.dataIn.type == 'Voltage':
30 27 self.dataOut.copy(self.dataIn)
31 28
32 29 # self.dataOut.copy(self.dataIn)
33 30
34 31 def __updateObjFromAmisrInput(self):
35 32
36 33 self.dataOut.timeZone = self.dataIn.timeZone
37 34 self.dataOut.dstFlag = self.dataIn.dstFlag
38 35 self.dataOut.errorCount = self.dataIn.errorCount
39 36 self.dataOut.useLocalTime = self.dataIn.useLocalTime
40 37
41 38 self.dataOut.flagNoData = self.dataIn.flagNoData
42 39 self.dataOut.data = self.dataIn.data
43 40 self.dataOut.utctime = self.dataIn.utctime
44 41 self.dataOut.channelList = self.dataIn.channelList
45 42 # self.dataOut.timeInterval = self.dataIn.timeInterval
46 43 self.dataOut.heightList = self.dataIn.heightList
47 44 self.dataOut.nProfiles = self.dataIn.nProfiles
48 45
49 46 self.dataOut.nCohInt = self.dataIn.nCohInt
50 47 self.dataOut.ippSeconds = self.dataIn.ippSeconds
51 48 self.dataOut.frequency = self.dataIn.frequency
52 49
53 50 self.dataOut.azimuth = self.dataIn.azimuth
54 51 self.dataOut.zenith = self.dataIn.zenith
55 52
56 53 self.dataOut.beam.codeList = self.dataIn.beam.codeList
57 54 self.dataOut.beam.azimuthList = self.dataIn.beam.azimuthList
58 55 self.dataOut.beam.zenithList = self.dataIn.beam.zenithList
59 56 #
60 57 # pass#
61 58 #
62 59 # def init(self):
63 60 #
64 61 #
65 62 # if self.dataIn.type == 'AMISR':
66 63 # self.__updateObjFromAmisrInput()
67 64 #
68 65 # if self.dataIn.type == 'Voltage':
69 66 # self.dataOut.copy(self.dataIn)
70 67 # # No necesita copiar en cada init() los atributos de dataIn
71 68 # # la copia deberia hacerse por cada nuevo bloque de datos
72 69
73 70 def selectChannels(self, channelList):
74 71
75 72 channelIndexList = []
76 73
77 74 for channel in channelList:
78 75 if channel not in self.dataOut.channelList:
79 76 raise ValueError("Channel %d is not in %s" %(channel, str(self.dataOut.channelList)))
80 77
81 78 index = self.dataOut.channelList.index(channel)
82 79 channelIndexList.append(index)
83 80
84 81 self.selectChannelsByIndex(channelIndexList)
85 82
86 83 def selectChannelsByIndex(self, channelIndexList):
87 84 """
88 85 Selecciona un bloque de datos en base a canales segun el channelIndexList
89 86
90 87 Input:
91 88 channelIndexList : lista sencilla de canales a seleccionar por ej. [2,3,7]
92 89
93 90 Affected:
94 91 self.dataOut.data
95 92 self.dataOut.channelIndexList
96 93 self.dataOut.nChannels
97 94 self.dataOut.m_ProcessingHeader.totalSpectra
98 95 self.dataOut.systemHeaderObj.numChannels
99 96 self.dataOut.m_ProcessingHeader.blockSize
100 97
101 98 Return:
102 99 None
103 100 """
104 101
105 102 for channelIndex in channelIndexList:
106 103 if channelIndex not in self.dataOut.channelIndexList:
107 104 print(channelIndexList)
108 105 raise ValueError("The value %d in channelIndexList is not valid" %channelIndex)
109 106
110 107 if self.dataOut.flagDataAsBlock:
111 108 """
112 109 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
113 110 """
114 111 data = self.dataOut.data[channelIndexList,:,:]
115 112 else:
116 113 data = self.dataOut.data[channelIndexList,:]
117 114
118 115 self.dataOut.data = data
119 116 self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
120 117 # self.dataOut.nChannels = nChannels
121 118
122 119 return 1
123 120
124 121 def selectHeights(self, minHei=None, maxHei=None):
125 122 """
126 123 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
127 124 minHei <= height <= maxHei
128 125
129 126 Input:
130 127 minHei : valor minimo de altura a considerar
131 128 maxHei : valor maximo de altura a considerar
132 129
133 130 Affected:
134 131 Indirectamente son cambiados varios valores a travez del metodo selectHeightsByIndex
135 132
136 133 Return:
137 134 1 si el metodo se ejecuto con exito caso contrario devuelve 0
138 135 """
139 136
140 137 if minHei == None:
141 138 minHei = self.dataOut.heightList[0]
142 139
143 140 if maxHei == None:
144 141 maxHei = self.dataOut.heightList[-1]
145 142
146 143 if (minHei < self.dataOut.heightList[0]):
147 144 minHei = self.dataOut.heightList[0]
148 145
149 146 if (maxHei > self.dataOut.heightList[-1]):
150 147 maxHei = self.dataOut.heightList[-1]
151 148
152 149 minIndex = 0
153 150 maxIndex = 0
154 151 heights = self.dataOut.heightList
155 152
156 153 inda = numpy.where(heights >= minHei)
157 154 indb = numpy.where(heights <= maxHei)
158 155
159 156 try:
160 157 minIndex = inda[0][0]
161 158 except:
162 159 minIndex = 0
163 160
164 161 try:
165 162 maxIndex = indb[0][-1]
166 163 except:
167 164 maxIndex = len(heights)
168 165
169 166 self.selectHeightsByIndex(minIndex, maxIndex)
170 167
171 168 return 1
172 169
173 170
174 171 def selectHeightsByIndex(self, minIndex, maxIndex):
175 172 """
176 173 Selecciona un bloque de datos en base a un grupo indices de alturas segun el rango
177 174 minIndex <= index <= maxIndex
178 175
179 176 Input:
180 177 minIndex : valor de indice minimo de altura a considerar
181 178 maxIndex : valor de indice maximo de altura a considerar
182 179
183 180 Affected:
184 181 self.dataOut.data
185 182 self.dataOut.heightList
186 183
187 184 Return:
188 185 1 si el metodo se ejecuto con exito caso contrario devuelve 0
189 186 """
190 187
191 188 if (minIndex < 0) or (minIndex > maxIndex):
192 189 raise ValueError("Height index range (%d,%d) is not valid" % (minIndex, maxIndex))
193 190
194 191 if (maxIndex >= self.dataOut.nHeights):
195 192 maxIndex = self.dataOut.nHeights
196 193
197 194 #voltage
198 195 if self.dataOut.flagDataAsBlock:
199 196 """
200 197 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
201 198 """
202 199 data = self.dataOut.data[:,:, minIndex:maxIndex]
203 200 else:
204 201 data = self.dataOut.data[:, minIndex:maxIndex]
205 202
206 203 # firstHeight = self.dataOut.heightList[minIndex]
207 204
208 205 self.dataOut.data = data
209 206 self.dataOut.heightList = self.dataOut.heightList[minIndex:maxIndex]
210 207
211 208 if self.dataOut.nHeights <= 1:
212 209 raise ValueError("selectHeights: Too few heights. Current number of heights is %d" %(self.dataOut.nHeights))
213 210
214 211 return 1
215 212
216 213
217 214 def filterByHeights(self, window):
218 215
219 216 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
220 217
221 218 if window == None:
222 219 window = (self.dataOut.radarControllerHeaderObj.txA/self.dataOut.radarControllerHeaderObj.nBaud) / deltaHeight
223 220
224 221 newdelta = deltaHeight * window
225 222 r = self.dataOut.nHeights % window
226 223 newheights = (self.dataOut.nHeights-r)/window
227 224
228 225 if newheights <= 1:
229 226 raise ValueError("filterByHeights: Too few heights. Current number of heights is %d and window is %d" %(self.dataOut.nHeights, window))
230 227
231 228 if self.dataOut.flagDataAsBlock:
232 229 """
233 230 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
234 231 """
235 232 buffer = self.dataOut.data[:, :, 0:self.dataOut.nHeights-r]
236 233 buffer = buffer.reshape(self.dataOut.nChannels,self.dataOut.nProfiles,self.dataOut.nHeights/window,window)
237 234 buffer = numpy.sum(buffer,3)
238 235
239 236 else:
240 237 buffer = self.dataOut.data[:,0:self.dataOut.nHeights-r]
241 238 buffer = buffer.reshape(self.dataOut.nChannels,self.dataOut.nHeights/window,window)
242 239 buffer = numpy.sum(buffer,2)
243 240
244 241 self.dataOut.data = buffer
245 242 self.dataOut.heightList = self.dataOut.heightList[0] + numpy.arange( newheights )*newdelta
246 243 self.dataOut.windowOfFilter = window
247 244
248 245 def setH0(self, h0, deltaHeight = None):
249 246
250 247 if not deltaHeight:
251 248 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
252 249
253 250 nHeights = self.dataOut.nHeights
254 251
255 252 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
256 253
257 254 self.dataOut.heightList = newHeiRange
258 255
259 256 def deFlip(self, channelList = []):
260 257
261 258 data = self.dataOut.data.copy()
262 259
263 260 if self.dataOut.flagDataAsBlock:
264 261 flip = self.flip
265 262 profileList = list(range(self.dataOut.nProfiles))
266 263
267 264 if not channelList:
268 265 for thisProfile in profileList:
269 266 data[:,thisProfile,:] = data[:,thisProfile,:]*flip
270 267 flip *= -1.0
271 268 else:
272 269 for thisChannel in channelList:
273 270 if thisChannel not in self.dataOut.channelList:
274 271 continue
275 272
276 273 for thisProfile in profileList:
277 274 data[thisChannel,thisProfile,:] = data[thisChannel,thisProfile,:]*flip
278 275 flip *= -1.0
279 276
280 277 self.flip = flip
281 278
282 279 else:
283 280 if not channelList:
284 281 data[:,:] = data[:,:]*self.flip
285 282 else:
286 283 for thisChannel in channelList:
287 284 if thisChannel not in self.dataOut.channelList:
288 285 continue
289 286
290 287 data[thisChannel,:] = data[thisChannel,:]*self.flip
291 288
292 289 self.flip *= -1.
293 290
294 291 self.dataOut.data = data
295 292
296 293 def setRadarFrequency(self, frequency=None):
297 294
298 295 if frequency != None:
299 296 self.dataOut.frequency = frequency
300 297
301 298 return 1
302 299
303 300 def interpolateHeights(self, topLim, botLim):
304 301 #69 al 72 para julia
305 302 #82-84 para meteoros
306 303 if len(numpy.shape(self.dataOut.data))==2:
307 304 sampInterp = (self.dataOut.data[:,botLim-1] + self.dataOut.data[:,topLim+1])/2
308 305 sampInterp = numpy.transpose(numpy.tile(sampInterp,(topLim-botLim + 1,1)))
309 306 #self.dataOut.data[:,botLim:limSup+1] = sampInterp
310 307 self.dataOut.data[:,botLim:topLim+1] = sampInterp
311 308 else:
312 309 nHeights = self.dataOut.data.shape[2]
313 310 x = numpy.hstack((numpy.arange(botLim),numpy.arange(topLim+1,nHeights)))
314 311 y = self.dataOut.data[:,:,list(range(botLim))+list(range(topLim+1,nHeights))]
315 312 f = interpolate.interp1d(x, y, axis = 2)
316 313 xnew = numpy.arange(botLim,topLim+1)
317 314 ynew = f(xnew)
318 315
319 316 self.dataOut.data[:,:,botLim:topLim+1] = ynew
320 317
321 318 # import collections
322 @MPDecorator
319
323 320 class CohInt(Operation):
324 321
325 322 isConfig = False
326 323 __profIndex = 0
327 324 __byTime = False
328 325 __initime = None
329 326 __lastdatatime = None
330 327 __integrationtime = None
331 328 __buffer = None
332 329 __bufferStride = []
333 330 __dataReady = False
334 331 __profIndexStride = 0
335 332 __dataToPutStride = False
336 333 n = None
337 334
338 335 def __init__(self):#, **kwargs):
339 336
340 337 Operation.__init__(self)#, **kwargs)
341 338
342 339 # self.isConfig = False
343 340
344 341 def setup(self, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False):
345 342 """
346 343 Set the parameters of the integration class.
347 344
348 345 Inputs:
349 346
350 347 n : Number of coherent integrations
351 348 timeInterval : Time of integration. If the parameter "n" is selected this one does not work
352 349 overlapping :
353 350 """
354 351
355 352 self.__initime = None
356 353 self.__lastdatatime = 0
357 354 self.__buffer = None
358 355 self.__dataReady = False
359 356 self.byblock = byblock
360 357 self.stride = stride
361 358
362 359 if n == None and timeInterval == None:
363 360 raise ValueError("n or timeInterval should be specified ...")
364 361
365 362 if n != None:
366 363 self.n = n
367 364 self.__byTime = False
368 365 else:
369 366 self.__integrationtime = timeInterval #* 60. #if (type(timeInterval)!=integer) -> change this line
370 367 self.n = 9999
371 368 self.__byTime = True
372 369
373 370 if overlapping:
374 371 self.__withOverlapping = True
375 372 self.__buffer = None
376 373 else:
377 374 self.__withOverlapping = False
378 375 self.__buffer = 0
379 376
380 377 self.__profIndex = 0
381 378
382 379 def putData(self, data):
383 380
384 381 """
385 382 Add a profile to the __buffer and increase in one the __profileIndex
386 383
387 384 """
388 385
389 386 if not self.__withOverlapping:
390 387 self.__buffer += data.copy()
391 388 self.__profIndex += 1
392 389 return
393 390
394 391 #Overlapping data
395 392 nChannels, nHeis = data.shape
396 393 data = numpy.reshape(data, (1, nChannels, nHeis))
397 394
398 395 #If the buffer is empty then it takes the data value
399 396 if self.__buffer is None:
400 397 self.__buffer = data
401 398 self.__profIndex += 1
402 399 return
403 400
404 401 #If the buffer length is lower than n then stakcing the data value
405 402 if self.__profIndex < self.n:
406 403 self.__buffer = numpy.vstack((self.__buffer, data))
407 404 self.__profIndex += 1
408 405 return
409 406
410 407 #If the buffer length is equal to n then replacing the last buffer value with the data value
411 408 self.__buffer = numpy.roll(self.__buffer, -1, axis=0)
412 409 self.__buffer[self.n-1] = data
413 410 self.__profIndex = self.n
414 411 return
415 412
416 413
417 414 def pushData(self):
418 415 """
419 416 Return the sum of the last profiles and the profiles used in the sum.
420 417
421 418 Affected:
422 419
423 420 self.__profileIndex
424 421
425 422 """
426 423
427 424 if not self.__withOverlapping:
428 425 data = self.__buffer
429 426 n = self.__profIndex
430 427
431 428 self.__buffer = 0
432 429 self.__profIndex = 0
433 430
434 431 return data, n
435 432
436 433 #Integration with Overlapping
437 434 data = numpy.sum(self.__buffer, axis=0)
438 435 # print data
439 436 # raise
440 437 n = self.__profIndex
441 438
442 439 return data, n
443 440
444 441 def byProfiles(self, data):
445 442
446 443 self.__dataReady = False
447 444 avgdata = None
448 445 # n = None
449 446 # print data
450 447 # raise
451 448 self.putData(data)
452 449
453 450 if self.__profIndex == self.n:
454 451 avgdata, n = self.pushData()
455 452 self.__dataReady = True
456 453
457 454 return avgdata
458 455
459 456 def byTime(self, data, datatime):
460 457
461 458 self.__dataReady = False
462 459 avgdata = None
463 460 n = None
464 461
465 462 self.putData(data)
466 463
467 464 if (datatime - self.__initime) >= self.__integrationtime:
468 465 avgdata, n = self.pushData()
469 466 self.n = n
470 467 self.__dataReady = True
471 468
472 469 return avgdata
473 470
474 471 def integrateByStride(self, data, datatime):
475 472 # print data
476 473 if self.__profIndex == 0:
477 474 self.__buffer = [[data.copy(), datatime]]
478 475 else:
479 476 self.__buffer.append([data.copy(),datatime])
480 477 self.__profIndex += 1
481 478 self.__dataReady = False
482 479
483 480 if self.__profIndex == self.n * self.stride :
484 481 self.__dataToPutStride = True
485 482 self.__profIndexStride = 0
486 483 self.__profIndex = 0
487 484 self.__bufferStride = []
488 485 for i in range(self.stride):
489 486 current = self.__buffer[i::self.stride]
490 487 data = numpy.sum([t[0] for t in current], axis=0)
491 488 avgdatatime = numpy.average([t[1] for t in current])
492 489 # print data
493 490 self.__bufferStride.append((data, avgdatatime))
494 491
495 492 if self.__dataToPutStride:
496 493 self.__dataReady = True
497 494 self.__profIndexStride += 1
498 495 if self.__profIndexStride == self.stride:
499 496 self.__dataToPutStride = False
500 497 # print self.__bufferStride[self.__profIndexStride - 1]
501 498 # raise
502 499 return self.__bufferStride[self.__profIndexStride - 1]
503 500
504 501
505 502 return None, None
506 503
507 504 def integrate(self, data, datatime=None):
508 505
509 506 if self.__initime == None:
510 507 self.__initime = datatime
511 508
512 509 if self.__byTime:
513 510 avgdata = self.byTime(data, datatime)
514 511 else:
515 512 avgdata = self.byProfiles(data)
516 513
517 514
518 515 self.__lastdatatime = datatime
519 516
520 517 if avgdata is None:
521 518 return None, None
522 519
523 520 avgdatatime = self.__initime
524 521
525 522 deltatime = datatime - self.__lastdatatime
526 523
527 524 if not self.__withOverlapping:
528 525 self.__initime = datatime
529 526 else:
530 527 self.__initime += deltatime
531 528
532 529 return avgdata, avgdatatime
533 530
534 531 def integrateByBlock(self, dataOut):
535 532
536 533 times = int(dataOut.data.shape[1]/self.n)
537 534 avgdata = numpy.zeros((dataOut.nChannels, times, dataOut.nHeights), dtype=numpy.complex)
538 535
539 536 id_min = 0
540 537 id_max = self.n
541 538
542 539 for i in range(times):
543 540 junk = dataOut.data[:,id_min:id_max,:]
544 541 avgdata[:,i,:] = junk.sum(axis=1)
545 542 id_min += self.n
546 543 id_max += self.n
547 544
548 545 timeInterval = dataOut.ippSeconds*self.n
549 546 avgdatatime = (times - 1) * timeInterval + dataOut.utctime
550 547 self.__dataReady = True
551 548 return avgdata, avgdatatime
552 549
553 550 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
554 551
555 552 if not self.isConfig:
556 553 self.setup(n=n, stride=stride, timeInterval=timeInterval, overlapping=overlapping, byblock=byblock, **kwargs)
557 554 self.isConfig = True
558 555
559 556 if dataOut.flagDataAsBlock:
560 557 """
561 558 Si la data es leida por bloques, dimension = [nChannels, nProfiles, nHeis]
562 559 """
563 560 avgdata, avgdatatime = self.integrateByBlock(dataOut)
564 561 dataOut.nProfiles /= self.n
565 562 else:
566 563 if stride is None:
567 564 avgdata, avgdatatime = self.integrate(dataOut.data, dataOut.utctime)
568 565 else:
569 566 avgdata, avgdatatime = self.integrateByStride(dataOut.data, dataOut.utctime)
570 567
571 568
572 569 # dataOut.timeInterval *= n
573 570 dataOut.flagNoData = True
574 571
575 572 if self.__dataReady:
576 573 dataOut.data = avgdata
577 574 dataOut.nCohInt *= self.n
578 575 dataOut.utctime = avgdatatime
579 576 # print avgdata, avgdatatime
580 577 # raise
581 578 # dataOut.timeInterval = dataOut.ippSeconds * dataOut.nCohInt
582 579 dataOut.flagNoData = False
583 580 return dataOut
584 @MPDecorator
581
585 582 class Decoder(Operation):
586 583
587 584 isConfig = False
588 585 __profIndex = 0
589 586
590 587 code = None
591 588
592 589 nCode = None
593 590 nBaud = None
594 591
595 592 def __init__(self):#, **kwargs):
596 593
597 594 Operation.__init__(self)#, **kwargs)
598 595
599 596 self.times = None
600 597 self.osamp = None
601 598 # self.__setValues = False
602 599 # self.isConfig = False
603 600 self.setupReq = False
604 601 def setup(self, code, osamp, dataOut):
605 602
606 603 self.__profIndex = 0
607 604
608 605 self.code = code
609 606
610 607 self.nCode = len(code)
611 608 self.nBaud = len(code[0])
612 609
613 610 if (osamp != None) and (osamp >1):
614 611 self.osamp = osamp
615 612 self.code = numpy.repeat(code, repeats=self.osamp, axis=1)
616 613 self.nBaud = self.nBaud*self.osamp
617 614
618 615 self.__nChannels = dataOut.nChannels
619 616 self.__nProfiles = dataOut.nProfiles
620 617 self.__nHeis = dataOut.nHeights
621 618
622 619 if self.__nHeis < self.nBaud:
623 620 raise ValueError('Number of heights (%d) should be greater than number of bauds (%d)' %(self.__nHeis, self.nBaud))
624 621
625 622 #Frequency
626 623 __codeBuffer = numpy.zeros((self.nCode, self.__nHeis), dtype=numpy.complex)
627 624
628 625 __codeBuffer[:,0:self.nBaud] = self.code
629 626
630 627 self.fft_code = numpy.conj(numpy.fft.fft(__codeBuffer, axis=1))
631 628
632 629 if dataOut.flagDataAsBlock:
633 630
634 631 self.ndatadec = self.__nHeis #- self.nBaud + 1
635 632
636 633 self.datadecTime = numpy.zeros((self.__nChannels, self.__nProfiles, self.ndatadec), dtype=numpy.complex)
637 634
638 635 else:
639 636
640 637 #Time
641 638 self.ndatadec = self.__nHeis #- self.nBaud + 1
642 639
643 640 self.datadecTime = numpy.zeros((self.__nChannels, self.ndatadec), dtype=numpy.complex)
644 641
645 642 def __convolutionInFreq(self, data):
646 643
647 644 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
648 645
649 646 fft_data = numpy.fft.fft(data, axis=1)
650 647
651 648 conv = fft_data*fft_code
652 649
653 650 data = numpy.fft.ifft(conv,axis=1)
654 651
655 652 return data
656 653
657 654 def __convolutionInFreqOpt(self, data):
658 655
659 656 raise NotImplementedError
660 657
661 658 def __convolutionInTime(self, data):
662 659
663 660 code = self.code[self.__profIndex]
664 661 for i in range(self.__nChannels):
665 662 self.datadecTime[i,:] = numpy.correlate(data[i,:], code, mode='full')[self.nBaud-1:]
666 663
667 664 return self.datadecTime
668 665
669 666 def __convolutionByBlockInTime(self, data):
670 667
671 668 repetitions = self.__nProfiles / self.nCode
672 669
673 670 junk = numpy.lib.stride_tricks.as_strided(self.code, (repetitions, self.code.size), (0, self.code.itemsize))
674 671 junk = junk.flatten()
675 672 code_block = numpy.reshape(junk, (self.nCode*repetitions, self.nBaud))
676 673 profilesList = range(self.__nProfiles)
677 674
678 675 for i in range(self.__nChannels):
679 676 for j in profilesList:
680 677 self.datadecTime[i,j,:] = numpy.correlate(data[i,j,:], code_block[j,:], mode='full')[self.nBaud-1:]
681 678 return self.datadecTime
682 679
683 680 def __convolutionByBlockInFreq(self, data):
684 681
685 682 raise NotImplementedError("Decoder by frequency fro Blocks not implemented")
686 683
687 684
688 685 fft_code = self.fft_code[self.__profIndex].reshape(1,-1)
689 686
690 687 fft_data = numpy.fft.fft(data, axis=2)
691 688
692 689 conv = fft_data*fft_code
693 690
694 691 data = numpy.fft.ifft(conv,axis=2)
695 692
696 693 return data
697 694
698 695
699 696 def run(self, dataOut, code=None, nCode=None, nBaud=None, mode = 0, osamp=None, times=None):
700 697
701 698 if dataOut.flagDecodeData:
702 699 print("This data is already decoded, recoding again ...")
703 700
704 701 if not self.isConfig:
705 702
706 703 if code is None:
707 704 if dataOut.code is None:
708 705 raise ValueError("Code could not be read from %s instance. Enter a value in Code parameter" %dataOut.type)
709 706
710 707 code = dataOut.code
711 708 else:
712 709 code = numpy.array(code).reshape(nCode,nBaud)
713 710 self.setup(code, osamp, dataOut)
714 711
715 712 self.isConfig = True
716 713
717 714 if mode == 3:
718 715 sys.stderr.write("Decoder Warning: mode=%d is not valid, using mode=0\n" %mode)
719 716
720 717 if times != None:
721 718 sys.stderr.write("Decoder Warning: Argument 'times' in not used anymore\n")
722 719
723 720 if self.code is None:
724 721 print("Fail decoding: Code is not defined.")
725 722 return
726 723
727 724 self.__nProfiles = dataOut.nProfiles
728 725 datadec = None
729 726
730 727 if mode == 3:
731 728 mode = 0
732 729
733 730 if dataOut.flagDataAsBlock:
734 731 """
735 732 Decoding when data have been read as block,
736 733 """
737 734
738 735 if mode == 0:
739 736 datadec = self.__convolutionByBlockInTime(dataOut.data)
740 737 if mode == 1:
741 738 datadec = self.__convolutionByBlockInFreq(dataOut.data)
742 739 else:
743 740 """
744 741 Decoding when data have been read profile by profile
745 742 """
746 743 if mode == 0:
747 744 datadec = self.__convolutionInTime(dataOut.data)
748 745
749 746 if mode == 1:
750 747 datadec = self.__convolutionInFreq(dataOut.data)
751 748
752 749 if mode == 2:
753 750 datadec = self.__convolutionInFreqOpt(dataOut.data)
754 751
755 752 if datadec is None:
756 753 raise ValueError("Codification mode selected is not valid: mode=%d. Try selecting 0 or 1" %mode)
757 754
758 755 dataOut.code = self.code
759 756 dataOut.nCode = self.nCode
760 757 dataOut.nBaud = self.nBaud
761 758
762 759 dataOut.data = datadec
763 760
764 761 dataOut.heightList = dataOut.heightList[0:datadec.shape[-1]]
765 762
766 763 dataOut.flagDecodeData = True #asumo q la data esta decodificada
767 764
768 765 if self.__profIndex == self.nCode-1:
769 766 self.__profIndex = 0
770 767 return dataOut
771 768
772 769 self.__profIndex += 1
773 770
774 771 return dataOut
775 772 # dataOut.flagDeflipData = True #asumo q la data no esta sin flip
776 773
777 @MPDecorator
774
778 775 class ProfileConcat(Operation):
779 776
780 777 isConfig = False
781 778 buffer = None
782 779
783 780 def __init__(self):#, **kwargs):
784 781
785 782 Operation.__init__(self)#, **kwargs)
786 783 self.profileIndex = 0
787 784
788 785 def reset(self):
789 786 self.buffer = numpy.zeros_like(self.buffer)
790 787 self.start_index = 0
791 788 self.times = 1
792 789
793 790 def setup(self, data, m, n=1):
794 791 self.buffer = numpy.zeros((data.shape[0],data.shape[1]*m),dtype=type(data[0,0]))
795 792 self.nHeights = data.shape[1]#.nHeights
796 793 self.start_index = 0
797 794 self.times = 1
798 795
799 796 def concat(self, data):
800 797
801 798 self.buffer[:,self.start_index:self.nHeights*self.times] = data.copy()
802 799 self.start_index = self.start_index + self.nHeights
803 800
804 801 def run(self, dataOut, m):
805 802
806 803 dataOut.flagNoData = True
807 804
808 805 if not self.isConfig:
809 806 self.setup(dataOut.data, m, 1)
810 807 self.isConfig = True
811 808
812 809 if dataOut.flagDataAsBlock:
813 810 raise ValueError("ProfileConcat can only be used when voltage have been read profile by profile, getBlock = False")
814 811
815 812 else:
816 813 self.concat(dataOut.data)
817 814 self.times += 1
818 815 if self.times > m:
819 816 dataOut.data = self.buffer
820 817 self.reset()
821 818 dataOut.flagNoData = False
822 819 # se deben actualizar mas propiedades del header y del objeto dataOut, por ejemplo, las alturas
823 820 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
824 821 xf = dataOut.heightList[0] + dataOut.nHeights * deltaHeight * m
825 822 dataOut.heightList = numpy.arange(dataOut.heightList[0], xf, deltaHeight)
826 823 dataOut.ippSeconds *= m
827 824 return dataOut
828 @MPDecorator
825
829 826 class ProfileSelector(Operation):
830 827
831 828 profileIndex = None
832 829 # Tamanho total de los perfiles
833 830 nProfiles = None
834 831
835 832 def __init__(self):#, **kwargs):
836 833
837 834 Operation.__init__(self)#, **kwargs)
838 835 self.profileIndex = 0
839 836
840 837 def incProfileIndex(self):
841 838
842 839 self.profileIndex += 1
843 840
844 841 if self.profileIndex >= self.nProfiles:
845 842 self.profileIndex = 0
846 843
847 844 def isThisProfileInRange(self, profileIndex, minIndex, maxIndex):
848 845
849 846 if profileIndex < minIndex:
850 847 return False
851 848
852 849 if profileIndex > maxIndex:
853 850 return False
854 851
855 852 return True
856 853
857 854 def isThisProfileInList(self, profileIndex, profileList):
858 855
859 856 if profileIndex not in profileList:
860 857 return False
861 858
862 859 return True
863 860
864 861 def run(self, dataOut, profileList=None, profileRangeList=None, beam=None, byblock=False, rangeList = None, nProfiles=None):
865 862
866 863 """
867 864 ProfileSelector:
868 865
869 866 Inputs:
870 867 profileList : Index of profiles selected. Example: profileList = (0,1,2,7,8)
871 868
872 869 profileRangeList : Minimum and maximum profile indexes. Example: profileRangeList = (4, 30)
873 870
874 871 rangeList : List of profile ranges. Example: rangeList = ((4, 30), (32, 64), (128, 256))
875 872
876 873 """
877 874
878 875 if rangeList is not None:
879 876 if type(rangeList[0]) not in (tuple, list):
880 877 rangeList = [rangeList]
881 878
882 879 dataOut.flagNoData = True
883 880
884 881 if dataOut.flagDataAsBlock:
885 882 """
886 883 data dimension = [nChannels, nProfiles, nHeis]
887 884 """
888 885 if profileList != None:
889 886 dataOut.data = dataOut.data[:,profileList,:]
890 887
891 888 if profileRangeList != None:
892 889 minIndex = profileRangeList[0]
893 890 maxIndex = profileRangeList[1]
894 891 profileList = list(range(minIndex, maxIndex+1))
895 892
896 893 dataOut.data = dataOut.data[:,minIndex:maxIndex+1,:]
897 894
898 895 if rangeList != None:
899 896
900 897 profileList = []
901 898
902 899 for thisRange in rangeList:
903 900 minIndex = thisRange[0]
904 901 maxIndex = thisRange[1]
905 902
906 903 profileList.extend(list(range(minIndex, maxIndex+1)))
907 904
908 905 dataOut.data = dataOut.data[:,profileList,:]
909 906
910 907 dataOut.nProfiles = len(profileList)
911 908 dataOut.profileIndex = dataOut.nProfiles - 1
912 909 dataOut.flagNoData = False
913 910
914 911 return True
915 912
916 913 """
917 914 data dimension = [nChannels, nHeis]
918 915 """
919 916
920 917 if profileList != None:
921 918
922 919 if self.isThisProfileInList(dataOut.profileIndex, profileList):
923 920
924 921 self.nProfiles = len(profileList)
925 922 dataOut.nProfiles = self.nProfiles
926 923 dataOut.profileIndex = self.profileIndex
927 924 dataOut.flagNoData = False
928 925
929 926 self.incProfileIndex()
930 927 return True
931 928
932 929 if profileRangeList != None:
933 930
934 931 minIndex = profileRangeList[0]
935 932 maxIndex = profileRangeList[1]
936 933
937 934 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
938 935
939 936 self.nProfiles = maxIndex - minIndex + 1
940 937 dataOut.nProfiles = self.nProfiles
941 938 dataOut.profileIndex = self.profileIndex
942 939 dataOut.flagNoData = False
943 940
944 941 self.incProfileIndex()
945 942 return True
946 943
947 944 if rangeList != None:
948 945
949 946 nProfiles = 0
950 947
951 948 for thisRange in rangeList:
952 949 minIndex = thisRange[0]
953 950 maxIndex = thisRange[1]
954 951
955 952 nProfiles += maxIndex - minIndex + 1
956 953
957 954 for thisRange in rangeList:
958 955
959 956 minIndex = thisRange[0]
960 957 maxIndex = thisRange[1]
961 958
962 959 if self.isThisProfileInRange(dataOut.profileIndex, minIndex, maxIndex):
963 960
964 961 self.nProfiles = nProfiles
965 962 dataOut.nProfiles = self.nProfiles
966 963 dataOut.profileIndex = self.profileIndex
967 964 dataOut.flagNoData = False
968 965
969 966 self.incProfileIndex()
970 967
971 968 break
972 969
973 970 return True
974 971
975 972
976 973 if beam != None: #beam is only for AMISR data
977 974 if self.isThisProfileInList(dataOut.profileIndex, dataOut.beamRangeDict[beam]):
978 975 dataOut.flagNoData = False
979 976 dataOut.profileIndex = self.profileIndex
980 977
981 978 self.incProfileIndex()
982 979
983 980 return True
984 981
985 982 raise ValueError("ProfileSelector needs profileList, profileRangeList or rangeList parameter")
986 983
987 984 #return False
988 985 return dataOut
989 @MPDecorator
986
990 987 class Reshaper(Operation):
991 988
992 989 def __init__(self):#, **kwargs):
993 990
994 991 Operation.__init__(self)#, **kwargs)
995 992
996 993 self.__buffer = None
997 994 self.__nitems = 0
998 995
999 996 def __appendProfile(self, dataOut, nTxs):
1000 997
1001 998 if self.__buffer is None:
1002 999 shape = (dataOut.nChannels, int(dataOut.nHeights/nTxs) )
1003 1000 self.__buffer = numpy.empty(shape, dtype = dataOut.data.dtype)
1004 1001
1005 1002 ini = dataOut.nHeights * self.__nitems
1006 1003 end = ini + dataOut.nHeights
1007 1004
1008 1005 self.__buffer[:, ini:end] = dataOut.data
1009 1006
1010 1007 self.__nitems += 1
1011 1008
1012 1009 return int(self.__nitems*nTxs)
1013 1010
1014 1011 def __getBuffer(self):
1015 1012
1016 1013 if self.__nitems == int(1./self.__nTxs):
1017 1014
1018 1015 self.__nitems = 0
1019 1016
1020 1017 return self.__buffer.copy()
1021 1018
1022 1019 return None
1023 1020
1024 1021 def __checkInputs(self, dataOut, shape, nTxs):
1025 1022
1026 1023 if shape is None and nTxs is None:
1027 1024 raise ValueError("Reshaper: shape of factor should be defined")
1028 1025
1029 1026 if nTxs:
1030 1027 if nTxs < 0:
1031 1028 raise ValueError("nTxs should be greater than 0")
1032 1029
1033 1030 if nTxs < 1 and dataOut.nProfiles % (1./nTxs) != 0:
1034 1031 raise ValueError("nProfiles= %d is not divisibled by (1./nTxs) = %f" %(dataOut.nProfiles, (1./nTxs)))
1035 1032
1036 1033 shape = [dataOut.nChannels, dataOut.nProfiles*nTxs, dataOut.nHeights/nTxs]
1037 1034
1038 1035 return shape, nTxs
1039 1036
1040 1037 if len(shape) != 2 and len(shape) != 3:
1041 1038 raise ValueError("shape dimension should be equal to 2 or 3. shape = (nProfiles, nHeis) or (nChannels, nProfiles, nHeis). Actually shape = (%d, %d, %d)" %(dataOut.nChannels, dataOut.nProfiles, dataOut.nHeights))
1042 1039
1043 1040 if len(shape) == 2:
1044 1041 shape_tuple = [dataOut.nChannels]
1045 1042 shape_tuple.extend(shape)
1046 1043 else:
1047 1044 shape_tuple = list(shape)
1048 1045
1049 1046 nTxs = 1.0*shape_tuple[1]/dataOut.nProfiles
1050 1047
1051 1048 return shape_tuple, nTxs
1052 1049
1053 1050 def run(self, dataOut, shape=None, nTxs=None):
1054 1051
1055 1052 shape_tuple, self.__nTxs = self.__checkInputs(dataOut, shape, nTxs)
1056 1053
1057 1054 dataOut.flagNoData = True
1058 1055 profileIndex = None
1059 1056
1060 1057 if dataOut.flagDataAsBlock:
1061 1058
1062 1059 dataOut.data = numpy.reshape(dataOut.data, shape_tuple)
1063 1060 dataOut.flagNoData = False
1064 1061
1065 1062 profileIndex = int(dataOut.nProfiles*self.__nTxs) - 1
1066 1063
1067 1064 else:
1068 1065
1069 1066 if self.__nTxs < 1:
1070 1067
1071 1068 self.__appendProfile(dataOut, self.__nTxs)
1072 1069 new_data = self.__getBuffer()
1073 1070
1074 1071 if new_data is not None:
1075 1072 dataOut.data = new_data
1076 1073 dataOut.flagNoData = False
1077 1074
1078 1075 profileIndex = dataOut.profileIndex*nTxs
1079 1076
1080 1077 else:
1081 1078 raise ValueError("nTxs should be greater than 0 and lower than 1, or use VoltageReader(..., getblock=True)")
1082 1079
1083 1080 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1084 1081
1085 1082 dataOut.heightList = numpy.arange(dataOut.nHeights/self.__nTxs) * deltaHeight + dataOut.heightList[0]
1086 1083
1087 1084 dataOut.nProfiles = int(dataOut.nProfiles*self.__nTxs)
1088 1085
1089 1086 dataOut.profileIndex = profileIndex
1090 1087
1091 1088 dataOut.ippSeconds /= self.__nTxs
1092 1089
1093 1090 return dataOut
1094 @MPDecorator
1091
1095 1092 class SplitProfiles(Operation):
1096 1093
1097 1094 def __init__(self):#, **kwargs):
1098 1095
1099 1096 Operation.__init__(self)#, **kwargs)
1100 1097
1101 1098 def run(self, dataOut, n):
1102 1099
1103 1100 dataOut.flagNoData = True
1104 1101 profileIndex = None
1105 1102
1106 1103 if dataOut.flagDataAsBlock:
1107 1104
1108 1105 #nchannels, nprofiles, nsamples
1109 1106 shape = dataOut.data.shape
1110 1107
1111 1108 if shape[2] % n != 0:
1112 1109 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1113 1110
1114 1111 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1115 1112
1116 1113 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1117 1114 dataOut.flagNoData = False
1118 1115
1119 1116 profileIndex = int(dataOut.nProfiles/n) - 1
1120 1117
1121 1118 else:
1122 1119
1123 1120 raise ValueError("Could not split the data when is read Profile by Profile. Use VoltageReader(..., getblock=True)")
1124 1121
1125 1122 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1126 1123
1127 1124 dataOut.heightList = numpy.arange(dataOut.nHeights/n) * deltaHeight + dataOut.heightList[0]
1128 1125
1129 1126 dataOut.nProfiles = int(dataOut.nProfiles*n)
1130 1127
1131 1128 dataOut.profileIndex = profileIndex
1132 1129
1133 1130 dataOut.ippSeconds /= n
1134 1131
1135 1132 return dataOut
1136 @MPDecorator
1133
1137 1134 class CombineProfiles(Operation):
1138 1135 def __init__(self):#, **kwargs):
1139 1136
1140 1137 Operation.__init__(self)#, **kwargs)
1141 1138
1142 1139 self.__remData = None
1143 1140 self.__profileIndex = 0
1144 1141
1145 1142 def run(self, dataOut, n):
1146 1143
1147 1144 dataOut.flagNoData = True
1148 1145 profileIndex = None
1149 1146
1150 1147 if dataOut.flagDataAsBlock:
1151 1148
1152 1149 #nchannels, nprofiles, nsamples
1153 1150 shape = dataOut.data.shape
1154 1151 new_shape = shape[0], shape[1]/n, shape[2]*n
1155 1152
1156 1153 if shape[1] % n != 0:
1157 1154 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[1]))
1158 1155
1159 1156 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1160 1157 dataOut.flagNoData = False
1161 1158
1162 1159 profileIndex = int(dataOut.nProfiles*n) - 1
1163 1160
1164 1161 else:
1165 1162
1166 1163 #nchannels, nsamples
1167 1164 if self.__remData is None:
1168 1165 newData = dataOut.data
1169 1166 else:
1170 1167 newData = numpy.concatenate((self.__remData, dataOut.data), axis=1)
1171 1168
1172 1169 self.__profileIndex += 1
1173 1170
1174 1171 if self.__profileIndex < n:
1175 1172 self.__remData = newData
1176 1173 #continue
1177 1174 return
1178 1175
1179 1176 self.__profileIndex = 0
1180 1177 self.__remData = None
1181 1178
1182 1179 dataOut.data = newData
1183 1180 dataOut.flagNoData = False
1184 1181
1185 1182 profileIndex = dataOut.profileIndex/n
1186 1183
1187 1184
1188 1185 deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1189 1186
1190 1187 dataOut.heightList = numpy.arange(dataOut.nHeights*n) * deltaHeight + dataOut.heightList[0]
1191 1188
1192 1189 dataOut.nProfiles = int(dataOut.nProfiles/n)
1193 1190
1194 1191 dataOut.profileIndex = profileIndex
1195 1192
1196 1193 dataOut.ippSeconds *= n
1197 1194
1198 1195 return dataOut
1199 1196 # import collections
1200 1197 # from scipy.stats import mode
1201 1198 #
1202 1199 # class Synchronize(Operation):
1203 1200 #
1204 1201 # isConfig = False
1205 1202 # __profIndex = 0
1206 1203 #
1207 1204 # def __init__(self, **kwargs):
1208 1205 #
1209 1206 # Operation.__init__(self, **kwargs)
1210 1207 # # self.isConfig = False
1211 1208 # self.__powBuffer = None
1212 1209 # self.__startIndex = 0
1213 1210 # self.__pulseFound = False
1214 1211 #
1215 1212 # def __findTxPulse(self, dataOut, channel=0, pulse_with = None):
1216 1213 #
1217 1214 # #Read data
1218 1215 #
1219 1216 # powerdB = dataOut.getPower(channel = channel)
1220 1217 # noisedB = dataOut.getNoise(channel = channel)[0]
1221 1218 #
1222 1219 # self.__powBuffer.extend(powerdB.flatten())
1223 1220 #
1224 1221 # dataArray = numpy.array(self.__powBuffer)
1225 1222 #
1226 1223 # filteredPower = numpy.correlate(dataArray, dataArray[0:self.__nSamples], "same")
1227 1224 #
1228 1225 # maxValue = numpy.nanmax(filteredPower)
1229 1226 #
1230 1227 # if maxValue < noisedB + 10:
1231 1228 # #No se encuentra ningun pulso de transmision
1232 1229 # return None
1233 1230 #
1234 1231 # maxValuesIndex = numpy.where(filteredPower > maxValue - 0.1*abs(maxValue))[0]
1235 1232 #
1236 1233 # if len(maxValuesIndex) < 2:
1237 1234 # #Solo se encontro un solo pulso de transmision de un baudio, esperando por el siguiente TX
1238 1235 # return None
1239 1236 #
1240 1237 # phasedMaxValuesIndex = maxValuesIndex - self.__nSamples
1241 1238 #
1242 1239 # #Seleccionar solo valores con un espaciamiento de nSamples
1243 1240 # pulseIndex = numpy.intersect1d(maxValuesIndex, phasedMaxValuesIndex)
1244 1241 #
1245 1242 # if len(pulseIndex) < 2:
1246 1243 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1247 1244 # return None
1248 1245 #
1249 1246 # spacing = pulseIndex[1:] - pulseIndex[:-1]
1250 1247 #
1251 1248 # #remover senales que se distancien menos de 10 unidades o muestras
1252 1249 # #(No deberian existir IPP menor a 10 unidades)
1253 1250 #
1254 1251 # realIndex = numpy.where(spacing > 10 )[0]
1255 1252 #
1256 1253 # if len(realIndex) < 2:
1257 1254 # #Solo se encontro un pulso de transmision con ancho mayor a 1
1258 1255 # return None
1259 1256 #
1260 1257 # #Eliminar pulsos anchos (deja solo la diferencia entre IPPs)
1261 1258 # realPulseIndex = pulseIndex[realIndex]
1262 1259 #
1263 1260 # period = mode(realPulseIndex[1:] - realPulseIndex[:-1])[0][0]
1264 1261 #
1265 1262 # print "IPP = %d samples" %period
1266 1263 #
1267 1264 # self.__newNSamples = dataOut.nHeights #int(period)
1268 1265 # self.__startIndex = int(realPulseIndex[0])
1269 1266 #
1270 1267 # return 1
1271 1268 #
1272 1269 #
1273 1270 # def setup(self, nSamples, nChannels, buffer_size = 4):
1274 1271 #
1275 1272 # self.__powBuffer = collections.deque(numpy.zeros( buffer_size*nSamples,dtype=numpy.float),
1276 1273 # maxlen = buffer_size*nSamples)
1277 1274 #
1278 1275 # bufferList = []
1279 1276 #
1280 1277 # for i in range(nChannels):
1281 1278 # bufferByChannel = collections.deque(numpy.zeros( buffer_size*nSamples, dtype=numpy.complex) + numpy.NAN,
1282 1279 # maxlen = buffer_size*nSamples)
1283 1280 #
1284 1281 # bufferList.append(bufferByChannel)
1285 1282 #
1286 1283 # self.__nSamples = nSamples
1287 1284 # self.__nChannels = nChannels
1288 1285 # self.__bufferList = bufferList
1289 1286 #
1290 1287 # def run(self, dataOut, channel = 0):
1291 1288 #
1292 1289 # if not self.isConfig:
1293 1290 # nSamples = dataOut.nHeights
1294 1291 # nChannels = dataOut.nChannels
1295 1292 # self.setup(nSamples, nChannels)
1296 1293 # self.isConfig = True
1297 1294 #
1298 1295 # #Append new data to internal buffer
1299 1296 # for thisChannel in range(self.__nChannels):
1300 1297 # bufferByChannel = self.__bufferList[thisChannel]
1301 1298 # bufferByChannel.extend(dataOut.data[thisChannel])
1302 1299 #
1303 1300 # if self.__pulseFound:
1304 1301 # self.__startIndex -= self.__nSamples
1305 1302 #
1306 1303 # #Finding Tx Pulse
1307 1304 # if not self.__pulseFound:
1308 1305 # indexFound = self.__findTxPulse(dataOut, channel)
1309 1306 #
1310 1307 # if indexFound == None:
1311 1308 # dataOut.flagNoData = True
1312 1309 # return
1313 1310 #
1314 1311 # self.__arrayBuffer = numpy.zeros((self.__nChannels, self.__newNSamples), dtype = numpy.complex)
1315 1312 # self.__pulseFound = True
1316 1313 # self.__startIndex = indexFound
1317 1314 #
1318 1315 # #If pulse was found ...
1319 1316 # for thisChannel in range(self.__nChannels):
1320 1317 # bufferByChannel = self.__bufferList[thisChannel]
1321 1318 # #print self.__startIndex
1322 1319 # x = numpy.array(bufferByChannel)
1323 1320 # self.__arrayBuffer[thisChannel] = x[self.__startIndex:self.__startIndex+self.__newNSamples]
1324 1321 #
1325 1322 # deltaHeight = dataOut.heightList[1] - dataOut.heightList[0]
1326 1323 # dataOut.heightList = numpy.arange(self.__newNSamples)*deltaHeight
1327 1324 # # dataOut.ippSeconds = (self.__newNSamples / deltaHeight)/1e6
1328 1325 #
1329 1326 # dataOut.data = self.__arrayBuffer
1330 1327 #
1331 1328 # self.__startIndex += self.__newNSamples
1332 1329 #
1333 1330 # return
General Comments 0
You need to be logged in to leave comments. Login now