##// END OF EJS Templates
Test Read by block in DigitalRF
jespinoza -
r1437:bdaf7e9a5472
parent child
Show More
@@ -1,837 +1,834
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 # SUBCHANNELS EN VEZ DE CHANNELS
7 7 # BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO
8 8 # ACTUALIZACION DE VERSION
9 9 # HEADERS
10 10 # MODULO DE ESCRITURA
11 11 # METADATA
12 12
13 13 import os
14 14 import time
15 15 import datetime
16 16 import numpy
17 17 import timeit
18 18 from fractions import Fraction
19 19 from time import time
20 20 from time import sleep
21 21
22 22 import schainpy.admin
23 23 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
24 24 from schainpy.model.data.jrodata import Voltage
25 25 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
26 26
27 27 import pickle
28 28 try:
29 29 import digital_rf
30 30 except:
31 31 pass
32 32
33 33
34 34 class DigitalRFReader(ProcessingUnit):
35 35 '''
36 36 classdocs
37 37 '''
38 38
39 39 def __init__(self):
40 40 '''
41 41 Constructor
42 42 '''
43 43
44 44 ProcessingUnit.__init__(self)
45 45
46 46 self.dataOut = Voltage()
47 47 self.__printInfo = True
48 48 self.__flagDiscontinuousBlock = False
49 49 self.__bufferIndex = 9999999
50 50 self.__codeType = 0
51 51 self.__ippKm = None
52 52 self.__nCode = None
53 53 self.__nBaud = None
54 54 self.__code = None
55 55 self.dtype = None
56 56 self.oldAverage = None
57 57 self.path = None
58 58
59 59 def close(self):
60 60 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
61 61 return
62 62
63 63 def __getCurrentSecond(self):
64 64
65 65 return self.__thisUnixSample / self.__sample_rate
66 66
67 67 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
68 68
69 69 def __setFileHeader(self):
70 70 '''
71 71 In this method will be initialized every parameter of dataOut object (header, no data)
72 72 '''
73 73 ippSeconds = 1.0 * self.__nSamples / self.__sample_rate
74 74 if not self.getByBlock:
75 75 nProfiles = 1.0 / ippSeconds # Number of profiles in one second
76 76 else:
77 77 nProfiles = self.nProfileBlocks # Number of profiles in one block
78 78
79 79 try:
80 80 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
81 81 self.__radarControllerHeader)
82 82 except:
83 83 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
84 84 txA=0,
85 85 txB=0,
86 86 nWindows=1,
87 87 nHeights=self.__nSamples,
88 88 firstHeight=self.__firstHeigth,
89 89 deltaHeight=self.__deltaHeigth,
90 90 codeType=self.__codeType,
91 91 nCode=self.__nCode, nBaud=self.__nBaud,
92 92 code=self.__code)
93 93
94 94 try:
95 95 self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader)
96 96 except:
97 97 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
98 98 nProfiles=nProfiles,
99 99 nChannels=len(
100 100 self.__channelList),
101 101 adcResolution=14)
102 102 self.dataOut.type = "Voltage"
103 103
104 104 self.dataOut.data = None
105 105
106 106 self.dataOut.dtype = self.dtype
107 107
108 108 # self.dataOut.nChannels = 0
109 109
110 110 # self.dataOut.nHeights = 0
111 111
112 112 self.dataOut.nProfiles = int(nProfiles)
113 113
114 114 self.dataOut.heightList = self.__firstHeigth + \
115 115 numpy.arange(self.__nSamples, dtype=numpy.float) * \
116 116 self.__deltaHeigth
117 117
118 118 #self.dataOut.channelList = list(range(self.__num_subchannels))
119 119 self.dataOut.channelList = list(range(len(self.__channelList)))
120 120 if not self.getByBlock:
121 121
122 122 self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights
123 123 else:
124 124 self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights*self.nProfileBlocks
125 125
126 126 # self.dataOut.channelIndexList = None
127 127
128 128 self.dataOut.flagNoData = True
129 129 if not self.getByBlock:
130 130 self.dataOut.flagDataAsBlock = False
131 131 else:
132 132 self.dataOut.flagDataAsBlock = True
133 133 # Set to TRUE if the data is discontinuous
134 134 self.dataOut.flagDiscontinuousBlock = False
135 135
136 136 self.dataOut.utctime = None
137 137
138 138 # timezone like jroheader, difference in minutes between UTC and localtime
139 139 self.dataOut.timeZone = self.__timezone / 60
140 140
141 141 self.dataOut.dstFlag = 0
142 142
143 143 self.dataOut.errorCount = 0
144 144
145 145 try:
146 146 self.dataOut.nCohInt = self.fixed_metadata_dict.get(
147 147 'nCohInt', self.nCohInt)
148 148
149 149 # asumo que la data esta decodificada
150 150 self.dataOut.flagDecodeData = self.fixed_metadata_dict.get(
151 151 'flagDecodeData', self.flagDecodeData)
152 152
153 153 # asumo que la data esta sin flip
154 154 self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData']
155 155
156 156 self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT']
157 157
158 158 self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime']
159 159 except:
160 160 pass
161 161
162 162 self.dataOut.ippSeconds = ippSeconds
163 163
164 164 # Time interval between profiles
165 165 # self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
166 166
167 167 self.dataOut.frequency = self.__frequency
168 168
169 169 self.dataOut.realtime = self.__online
170 170
171 171 def findDatafiles(self, path, startDate=None, endDate=None):
172 172
173 173 if not os.path.isdir(path):
174 174 return []
175 175
176 176 try:
177 177 digitalReadObj = digital_rf.DigitalRFReader(
178 178 path, load_all_metadata=True)
179 179 except:
180 180 digitalReadObj = digital_rf.DigitalRFReader(path)
181 181
182 182 channelNameList = digitalReadObj.get_channels()
183 183
184 184 if not channelNameList:
185 185 return []
186 186
187 187 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
188 188
189 189 sample_rate = metadata_dict['sample_rate'][0]
190 190
191 191 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
192 192
193 193 try:
194 194 timezone = this_metadata_file['timezone'].value
195 195 except:
196 196 timezone = 0
197 197
198 198 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(
199 199 channelNameList[0]) / sample_rate - timezone
200 200
201 201 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
202 202 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
203 203
204 204 if not startDate:
205 205 startDate = startDatetime.date()
206 206
207 207 if not endDate:
208 208 endDate = endDatatime.date()
209 209
210 210 dateList = []
211 211
212 212 thisDatetime = startDatetime
213 213
214 214 while(thisDatetime <= endDatatime):
215 215
216 216 thisDate = thisDatetime.date()
217 217
218 218 if thisDate < startDate:
219 219 continue
220 220
221 221 if thisDate > endDate:
222 222 break
223 223
224 224 dateList.append(thisDate)
225 225 thisDatetime += datetime.timedelta(1)
226 226
227 227 return dateList
228 228
229 229 def setup(self, path=None,
230 230 startDate=None,
231 231 endDate=None,
232 232 startTime=datetime.time(0, 0, 0),
233 233 endTime=datetime.time(23, 59, 59),
234 234 channelList=None,
235 235 nSamples=None,
236 236 online=False,
237 237 delay=60,
238 238 buffer_size=1024,
239 239 ippKm=None,
240 240 nCohInt=1,
241 241 nCode=1,
242 242 nBaud=1,
243 243 flagDecodeData=False,
244 244 code=numpy.ones((1, 1), dtype=numpy.int),
245 245 getByBlock=0,
246 246 nProfileBlocks=1,
247 247 **kwargs):
248 248 '''
249 249 In this method we should set all initial parameters.
250 250
251 251 Inputs:
252 252 path
253 253 startDate
254 254 endDate
255 255 startTime
256 256 endTime
257 257 set
258 258 expLabel
259 259 ext
260 260 online
261 261 delay
262 262 '''
263 263 self.path = path
264 264 self.nCohInt = nCohInt
265 265 self.flagDecodeData = flagDecodeData
266 266 self.i = 0
267 267
268 268 self.getByBlock = getByBlock
269 269 self.nProfileBlocks = nProfileBlocks
270 270 if not os.path.isdir(path):
271 271 raise ValueError("[Reading] Directory %s does not exist" % path)
272 272
273 273 try:
274 274 self.digitalReadObj = digital_rf.DigitalRFReader(
275 275 path, load_all_metadata=True)
276 276 except:
277 277 self.digitalReadObj = digital_rf.DigitalRFReader(path)
278 278
279 279 channelNameList = self.digitalReadObj.get_channels()
280 280
281 281 if not channelNameList:
282 282 raise ValueError("[Reading] Directory %s does not have any files" % path)
283 283
284 284 if not channelList:
285 285 channelList = list(range(len(channelNameList)))
286 286
287 287 ########## Reading metadata ######################
288 288
289 289 top_properties = self.digitalReadObj.get_properties(
290 290 channelNameList[channelList[0]])
291 291
292 292 self.__num_subchannels = top_properties['num_subchannels']
293 293 self.__sample_rate = 1.0 * \
294 294 top_properties['sample_rate_numerator'] / \
295 295 top_properties['sample_rate_denominator']
296 296 # self.__samples_per_file = top_properties['samples_per_file'][0]
297 297 self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15?
298 298
299 299 this_metadata_file = self.digitalReadObj.get_digital_metadata(
300 300 channelNameList[channelList[0]])
301 301 metadata_bounds = this_metadata_file.get_bounds()
302 302 self.fixed_metadata_dict = this_metadata_file.read(
303 303 metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER
304 304
305 305 try:
306 306 self.__processingHeader = self.fixed_metadata_dict['processingHeader']
307 307 self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader']
308 308 self.__systemHeader = self.fixed_metadata_dict['systemHeader']
309 309 self.dtype = pickle.loads(self.fixed_metadata_dict['dtype'])
310 310 except:
311 311 pass
312 312
313 313 self.__frequency = None
314 314
315 315 self.__frequency = self.fixed_metadata_dict.get('frequency', 1)
316 316
317 317 self.__timezone = self.fixed_metadata_dict.get('timezone', 18000)
318 318
319 319 try:
320 320 nSamples = self.fixed_metadata_dict['nSamples']
321 321 except:
322 322 nSamples = None
323 323
324 324 self.__firstHeigth = 0
325 325
326 326 try:
327 327 codeType = self.__radarControllerHeader['codeType']
328 328 except:
329 329 codeType = 0
330 330
331 331 try:
332 332 if codeType:
333 333 nCode = self.__radarControllerHeader['nCode']
334 334 nBaud = self.__radarControllerHeader['nBaud']
335 335 code = self.__radarControllerHeader['code']
336 336 except:
337 337 pass
338 338
339 339 if not ippKm:
340 340 try:
341 341 # seconds to km
342 342 ippKm = self.__radarControllerHeader['ipp']
343 343 except:
344 344 ippKm = None
345 345 ####################################################
346 346 self.__ippKm = ippKm
347 347 startUTCSecond = None
348 348 endUTCSecond = None
349 349
350 350 if startDate:
351 351 startDatetime = datetime.datetime.combine(startDate, startTime)
352 352 startUTCSecond = (
353 353 startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone
354 354
355 355 if endDate:
356 356 endDatetime = datetime.datetime.combine(endDate, endTime)
357 357 endUTCSecond = (endDatetime - datetime.datetime(1970,
358 358 1, 1)).total_seconds() + self.__timezone
359 359
360 360
361 361 print(startUTCSecond,endUTCSecond)
362 362 start_index, end_index = self.digitalReadObj.get_bounds(
363 363 channelNameList[channelList[0]])
364 364
365 365 ##print("*****",start_index,end_index)
366 366 if not startUTCSecond:
367 367 startUTCSecond = start_index / self.__sample_rate
368 368
369 369 if start_index > startUTCSecond * self.__sample_rate:
370 370 startUTCSecond = start_index / self.__sample_rate
371 371
372 372 if not endUTCSecond:
373 373 endUTCSecond = end_index / self.__sample_rate
374 374
375 375 if end_index < endUTCSecond * self.__sample_rate:
376 376 endUTCSecond = end_index / self.__sample_rate
377 377 if not nSamples:
378 378 if not ippKm:
379 379 raise ValueError("[Reading] nSamples or ippKm should be defined")
380 380 nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate))
381 381
382 382 channelBoundList = []
383 383 channelNameListFiltered = []
384 384
385 385 for thisIndexChannel in channelList:
386 386 thisChannelName = channelNameList[thisIndexChannel]
387 387 start_index, end_index = self.digitalReadObj.get_bounds(
388 388 thisChannelName)
389 389 channelBoundList.append((start_index, end_index))
390 390 channelNameListFiltered.append(thisChannelName)
391 391
392 392 self.profileIndex = 0
393 393 self.i = 0
394 394 self.__delay = delay
395 395
396 396 self.__codeType = codeType
397 397 self.__nCode = nCode
398 398 self.__nBaud = nBaud
399 399 self.__code = code
400 400
401 401 self.__datapath = path
402 402 self.__online = online
403 403 self.__channelList = channelList
404 404 self.__channelNameList = channelNameListFiltered
405 405 self.__channelBoundList = channelBoundList
406 406 self.__nSamples = nSamples
407 407 if self.getByBlock:
408 408 nSamples = nSamples*nProfileBlocks
409 print('nProfileBlocks',nProfileBlocks)
410 print('nSamples',nSamples)
411 print("self.__nSample",self.__nSamples)
412 409
413 410
414 411 self.__samples_to_read = int(nSamples) # FIJO: AHORA 40
415 412 self.__nChannels = len(self.__channelList)
416 413 #print("------------------------------------------")
417 414 #print("self.__samples_to_read",self.__samples_to_read)
418 415 #print("self.__nSamples",self.__nSamples)
419 416 # son iguales y el buffer_index da 0
420 417 self.__startUTCSecond = startUTCSecond
421 418 self.__endUTCSecond = endUTCSecond
422 419
423 420 self.__timeInterval = 1.0 * self.__samples_to_read / \
424 421 self.__sample_rate # Time interval
425 422
426 423 if online:
427 424 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
428 425 startUTCSecond = numpy.floor(endUTCSecond)
429 426
430 427 # por que en el otro metodo lo primero q se hace es sumar samplestoread
431 428 self.__thisUnixSample = int(startUTCSecond * self.__sample_rate) - self.__samples_to_read
432 429
433 430 #self.__data_buffer = numpy.zeros(
434 431 # (self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex)
435 432 self.__data_buffer = numpy.zeros((int(len(channelList)), self.__samples_to_read), dtype=numpy.complex)
436 433
437 434
438 435 self.__setFileHeader()
439 436 self.isConfig = True
440 437
441 438 print("[Reading] Digital RF Data was found from %s to %s " % (
442 439 datetime.datetime.utcfromtimestamp(
443 440 self.__startUTCSecond - self.__timezone),
444 441 datetime.datetime.utcfromtimestamp(
445 442 self.__endUTCSecond - self.__timezone)
446 443 ))
447 444
448 445 print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
449 446 datetime.datetime.utcfromtimestamp(
450 447 endUTCSecond - self.__timezone)
451 448 ))
452 449 self.oldAverage = None
453 450 self.count = 0
454 451 self.executionTime = 0
455 452
456 453 def __reload(self):
457 454 # print
458 455 # print "%s not in range [%s, %s]" %(
459 456 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
460 457 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
461 458 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
462 459 # )
463 460 print("[Reading] reloading metadata ...")
464 461
465 462 try:
466 463 self.digitalReadObj.reload(complete_update=True)
467 464 except:
468 465 self.digitalReadObj = digital_rf.DigitalRFReader(self.path)
469 466
470 467 start_index, end_index = self.digitalReadObj.get_bounds(
471 468 self.__channelNameList[self.__channelList[0]])
472 469
473 470 if start_index > self.__startUTCSecond * self.__sample_rate:
474 471 self.__startUTCSecond = 1.0 * start_index / self.__sample_rate
475 472
476 473 if end_index > self.__endUTCSecond * self.__sample_rate:
477 474 self.__endUTCSecond = 1.0 * end_index / self.__sample_rate
478 475 print()
479 476 print("[Reading] New timerange found [%s, %s] " % (
480 477 datetime.datetime.utcfromtimestamp(
481 478 self.__startUTCSecond - self.__timezone),
482 479 datetime.datetime.utcfromtimestamp(
483 480 self.__endUTCSecond - self.__timezone)
484 481 ))
485 482
486 483 return True
487 484
488 485 return False
489 486
490 487 def timeit(self, toExecute):
491 488 t0 = time.time()
492 489 toExecute()
493 490 self.executionTime = time.time() - t0
494 491 if self.oldAverage is None:
495 492 self.oldAverage = self.executionTime
496 493 self.oldAverage = (self.executionTime + self.count *
497 494 self.oldAverage) / (self.count + 1.0)
498 495 self.count = self.count + 1.0
499 496 return
500 497
501 498 def __readNextBlock(self, seconds=30, volt_scale=1):
502 499 '''
503 500 '''
504 501
505 502 # Set the next data
506 503 self.__flagDiscontinuousBlock = False
507 504 self.__thisUnixSample += self.__samples_to_read
508 505
509 506 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
510 507 print ("[Reading] There are no more data into selected time-range")
511 508 if self.__online:
512 509 sleep(3)
513 510 self.__reload()
514 511 else:
515 512 return False
516 513
517 514 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
518 515 return False
519 516 self.__thisUnixSample -= self.__samples_to_read
520 517
521 518 indexChannel = 0
522 519
523 520 dataOk = False
524 521
525 522 for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS?
526 523 for indexSubchannel in range(self.__num_subchannels):
527 524 try:
528 525 t0 = time()
529 526 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
530 527 self.__samples_to_read,
531 528 thisChannelName, sub_channel=indexSubchannel)
532 529 self.executionTime = time() - t0
533 530 if self.oldAverage is None:
534 531 self.oldAverage = self.executionTime
535 532 self.oldAverage = (
536 533 self.executionTime + self.count * self.oldAverage) / (self.count + 1.0)
537 534 self.count = self.count + 1.0
538 535
539 536 except IOError as e:
540 537 # read next profile
541 538 self.__flagDiscontinuousBlock = True
542 539 print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
543 540 break
544 541
545 542 if result.shape[0] != self.__samples_to_read:
546 543 self.__flagDiscontinuousBlock = True
547 544 print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
548 545 result.shape[0],
549 546 self.__samples_to_read))
550 547 break
551 548
552 549 self.__data_buffer[indexChannel, :] = result * volt_scale
553 550 indexChannel+=1
554 551
555 552 dataOk = True
556 553
557 554 self.__utctime = self.__thisUnixSample / self.__sample_rate
558 555
559 556 if not dataOk:
560 557 return False
561 558
562 559 print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
563 560 self.__samples_to_read,
564 561 self.__timeInterval))
565 562
566 563 self.__bufferIndex = 0
567 564
568 565 return True
569 566
570 567 def __isBufferEmpty(self):
568
571 569 return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40
572 570
573 571 def getData(self, seconds=30, nTries=5):
574 572 '''
575 573 This method gets the data from files and put the data into the dataOut object
576 574
577 575 In addition, increase el the buffer counter in one.
578 576
579 577 Return:
580 578 data : retorna un perfil de voltages (alturas * canales) copiados desde el
581 579 buffer. Si no hay mas archivos a leer retorna None.
582 580
583 581 Affected:
584 582 self.dataOut
585 583 self.profileIndex
586 584 self.flagDiscontinuousBlock
587 585 self.flagIsNewBlock
588 586 '''
589 587 #print("getdata")
590 588 err_counter = 0
591 589 self.dataOut.flagNoData = True
592 590
593 591
594 592 if self.__isBufferEmpty():
595 593 #print("hi")
596 594 self.__flagDiscontinuousBlock = False
597 595
598 596 while True:
599 #print ("q ha pasado")
600 597 if self.__readNextBlock():
601 598 break
602 599 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
603 600 raise schainpy.admin.SchainError('Error')
604 601 return
605 602
606 603 if self.__flagDiscontinuousBlock:
607 604 raise schainpy.admin.SchainError('discontinuous block found')
608 605 return
609 606
610 607 if not self.__online:
611 608 raise schainpy.admin.SchainError('Online?')
612 609 return
613 610
614 611 err_counter += 1
615 612 if err_counter > nTries:
616 613 raise schainpy.admin.SchainError('Max retrys reach')
617 614 return
618 615
619 616 print('[Reading] waiting %d seconds to read a new block' % seconds)
620 617 sleep(seconds)
621 618
622 619
623 620 if not self.getByBlock:
624 621
625 622 #print("self.__bufferIndex",self.__bufferIndex)# este valor siempre es cero aparentemente
626 self.dataOut.data = self.__data_buffer[:, self.__bufferIndex:self.__bufferIndex + self.__nSamples]
627 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
628 self.dataOut.flagNoData = False
623 self.dataOut.data = self.__data_buffer[:, self.__bufferIndex:self.__bufferIndex + self.__nSamples]
624 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
625 self.dataOut.flagNoData = False
629 626 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
630 self.dataOut.profileIndex = self.profileIndex
627 self.dataOut.profileIndex = self.profileIndex
631 628
632 629 self.__bufferIndex += self.__nSamples
633 630 self.profileIndex += 1
634 631
635 632 if self.profileIndex == self.dataOut.nProfiles:
636 633 self.profileIndex = 0
637 634 else:
638 635 # ojo debo anadir el readNextBLock y el __isBufferEmpty(
639 636 self.dataOut.flagNoData = False
640 print('Lectura por bloques')
641 print("self.__nSamples",self.__nSamples)
642 print("self.__bufferIndex",self.__bufferIndex)
643 637 buffer = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex + self.__samples_to_read]
644 print('shape',buffer.shape)
645 buffer = buffer.reshape((self.__nChannels,self.nProfileBlocks,int(self.__samples_to_read/self.nProfileBlocks)))
646 print('shape',buffer.shape)
638 buffer = buffer.reshape((self.__nChannels, self.nProfileBlocks, int(self.__samples_to_read/self.nProfileBlocks)))
639 self.dataOut.data = buffer
640 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
641 self.profileIndex += self.__samples_to_read
642 self.__bufferIndex += self.__samples_to_read
643 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
647 644 return True
648 645
649 646
650 647 def printInfo(self):
651 648 '''
652 649 '''
653 650 if self.__printInfo == False:
654 651 return
655 652
656 653 # self.systemHeaderObj.printInfo()
657 654 # self.radarControllerHeaderObj.printInfo()
658 655
659 656 self.__printInfo = False
660 657
661 658 def printNumberOfBlock(self):
662 659 '''
663 660 '''
664 661 return
665 662 # print self.profileIndex
666 663
667 664 def run(self, **kwargs):
668 665 '''
669 666 This method will be called many times so here you should put all your code
670 667 '''
671 668
672 669 if not self.isConfig:
673 670 self.setup(**kwargs)
674 #self.i = self.i+1
671
675 672 self.getData(seconds=self.__delay)
676 673
677 674 return
678 675
679 676 @MPDecorator
680 677 class DigitalRFWriter(Operation):
681 678 '''
682 679 classdocs
683 680 '''
684 681
685 682 def __init__(self, **kwargs):
686 683 '''
687 684 Constructor
688 685 '''
689 686 Operation.__init__(self, **kwargs)
690 687 self.metadata_dict = {}
691 688 self.dataOut = None
692 689 self.dtype = None
693 690 self.oldAverage = 0
694 691
695 692 def setHeader(self):
696 693
697 694 self.metadata_dict['frequency'] = self.dataOut.frequency
698 695 self.metadata_dict['timezone'] = self.dataOut.timeZone
699 696 self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype)
700 697 self.metadata_dict['nProfiles'] = self.dataOut.nProfiles
701 698 self.metadata_dict['heightList'] = self.dataOut.heightList
702 699 self.metadata_dict['channelList'] = self.dataOut.channelList
703 700 self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData
704 701 self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData
705 702 self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT
706 703 self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime
707 704 self.metadata_dict['nCohInt'] = self.dataOut.nCohInt
708 705 self.metadata_dict['type'] = self.dataOut.type
709 706 self.metadata_dict['flagDataAsBlock']= getattr(
710 707 self.dataOut, 'flagDataAsBlock', None) # chequear
711 708
712 709 def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'):
713 710 '''
714 711 In this method we should set all initial parameters.
715 712 Input:
716 713 dataOut: Input data will also be outputa data
717 714 '''
718 715 self.setHeader()
719 716 self.__ippSeconds = dataOut.ippSeconds
720 717 self.__deltaH = dataOut.getDeltaH()
721 718 self.__sample_rate = 1e6 * 0.15 / self.__deltaH
722 719 self.__dtype = dataOut.dtype
723 720 if len(dataOut.dtype) == 2:
724 721 self.__dtype = dataOut.dtype[0]
725 722 self.__nSamples = dataOut.systemHeaderObj.nSamples
726 723 self.__nProfiles = dataOut.nProfiles
727 724
728 725 if self.dataOut.type != 'Voltage':
729 726 raise 'Digital RF cannot be used with this data type'
730 727 self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len(
731 728 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
732 729 else:
733 730 self.arr_data = numpy.ones((self.__nSamples, len(
734 731 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
735 732
736 733 file_cadence_millisecs = 1000
737 734
738 735 sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator()
739 736 sample_rate_numerator = int(sample_rate_fraction.numerator)
740 737 sample_rate_denominator = int(sample_rate_fraction.denominator)
741 738 start_global_index = dataOut.utctime * self.__sample_rate
742 739
743 740 uuid = 'prueba'
744 741 compression_level = 0
745 742 checksum = False
746 743 is_complex = True
747 744 num_subchannels = len(dataOut.channelList)
748 745 is_continuous = True
749 746 marching_periods = False
750 747
751 748 self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence,
752 749 fileCadence, start_global_index,
753 750 sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum,
754 751 is_complex, num_subchannels, is_continuous, marching_periods)
755 752 metadata_dir = os.path.join(path, 'metadata')
756 753 os.system('mkdir %s' % (metadata_dir))
757 754 self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000
758 755 sample_rate_numerator, sample_rate_denominator,
759 756 metadataFile)
760 757 self.isConfig = True
761 758 self.currentSample = 0
762 759 self.oldAverage = 0
763 760 self.count = 0
764 761 return
765 762
766 763 def writeMetadata(self):
767 764 start_idx = self.__sample_rate * self.dataOut.utctime
768 765
769 766 self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict(
770 767 )
771 768 self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict(
772 769 )
773 770 self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict(
774 771 )
775 772 self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict)
776 773 return
777 774
778 775 def timeit(self, toExecute):
779 776 t0 = time()
780 777 toExecute()
781 778 self.executionTime = time() - t0
782 779 if self.oldAverage is None:
783 780 self.oldAverage = self.executionTime
784 781 self.oldAverage = (self.executionTime + self.count *
785 782 self.oldAverage) / (self.count + 1.0)
786 783 self.count = self.count + 1.0
787 784 return
788 785
789 786 def writeData(self):
790 787 if self.dataOut.type != 'Voltage':
791 788 raise 'Digital RF cannot be used with this data type'
792 789 for channel in self.dataOut.channelList:
793 790 for i in range(self.dataOut.nFFTPoints):
794 791 self.arr_data[1][channel * self.dataOut.nFFTPoints +
795 792 i]['r'] = self.dataOut.data[channel][i].real
796 793 self.arr_data[1][channel * self.dataOut.nFFTPoints +
797 794 i]['i'] = self.dataOut.data[channel][i].imag
798 795 else:
799 796 for i in range(self.dataOut.systemHeaderObj.nSamples):
800 797 for channel in self.dataOut.channelList:
801 798 self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real
802 799 self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag
803 800
804 801 def f(): return self.digitalWriteObj.rf_write(self.arr_data)
805 802 self.timeit(f)
806 803
807 804 return
808 805
809 806 def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs):
810 807 '''
811 808 This method will be called many times so here you should put all your code
812 809 Inputs:
813 810 dataOut: object with the data
814 811 '''
815 812 # print dataOut.__dict__
816 813 self.dataOut = dataOut
817 814 if not self.isConfig:
818 815 self.setup(dataOut, path, frequency, fileCadence,
819 816 dirCadence, metadataCadence, **kwargs)
820 817 self.writeMetadata()
821 818
822 819 self.writeData()
823 820
824 821 ## self.currentSample += 1
825 822 # if self.dataOut.flagDataAsBlock or self.currentSample == 1:
826 823 # self.writeMetadata()
827 824 ## if self.currentSample == self.__nProfiles: self.currentSample = 0
828 825
829 826 return dataOut# en la version 2.7 no aparece este return
830 827
831 828 def close(self):
832 829 print('[Writing] - Closing files ')
833 830 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
834 831 try:
835 832 self.digitalWriteObj.close()
836 833 except:
837 834 pass
General Comments 0
You need to be logged in to leave comments. Login now