##// END OF EJS Templates
Fix digitalRF
jespinoza -
r1725:73fbb5dc9ea6
parent child
Show More
@@ -1,863 +1,863
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 # SUBCHANNELS EN VEZ DE CHANNELS
7 7 # BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO
8 8 # ACTUALIZACION DE VERSION
9 9 # HEADERS
10 10 # MODULO DE ESCRITURA
11 11 # METADATA
12 12
13 13 import os
14 14 import time
15 15 import datetime
16 16 import numpy
17 17 import timeit
18 18 from fractions import Fraction
19 19 from time import time
20 20 from time import sleep
21 21
22 22 import schainpy.admin
23 23 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
24 24 from schainpy.model.data.jrodata import Voltage
25 25 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
26 26
27 27 import pickle
28 28 try:
29 29 os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
30 30 import digital_rf
31 31 except:
32 32 pass
33 33
34 34
35 35 class DigitalRFReader(ProcessingUnit):
36 36 '''
37 37 classdocs
38 38 '''
39 39
40 40 def __init__(self):
41 41 '''
42 42 Constructor
43 43 '''
44 44
45 45 ProcessingUnit.__init__(self)
46 46
47 47 self.dataOut = Voltage()
48 48 self.__printInfo = True
49 49 self.__flagDiscontinuousBlock = False
50 50 self.__bufferIndex = 9999999
51 51 self.__codeType = 0
52 52 self.__ippKm = None
53 53 self.__nCode = None
54 54 self.__nBaud = None
55 55 self.__code = None
56 56 self.dtype = None
57 57 self.oldAverage = None
58 58 self.path = None
59 59
60 60 def close(self):
61 61 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
62 62 return
63 63
64 64 def __getCurrentSecond(self):
65 65
66 66 return self.__thisUnixSample / self.__sample_rate
67 67
68 68 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
69 69
70 70 def __setFileHeader(self):
71 71 '''
72 72 In this method will be initialized every parameter of dataOut object (header, no data)
73 73 '''
74 74 ippSeconds = 1.0 * self.__nSamples / self.__sample_rate
75 75 if not self.getByBlock:
76 76 nProfiles = 1.0 / ippSeconds # Number of profiles in one second
77 77 else:
78 78 nProfiles = self.nProfileBlocks # Number of profiles in one block
79 79
80 80 try:
81 81 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
82 82 self.__radarControllerHeader)
83 83 except:
84 84 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
85 85 txA=0,
86 86 txB=0,
87 87 nWindows=1,
88 88 nHeights=self.__nSamples,
89 89 firstHeight=self.__firstHeigth,
90 90 deltaHeight=self.__deltaHeigth,
91 91 codeType=self.__codeType,
92 92 nCode=self.__nCode, nBaud=self.__nBaud,
93 93 code=self.__code)
94 94
95 95 try:
96 96 self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader)
97 97 except:
98 98 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
99 99 nProfiles=nProfiles,
100 100 nChannels=len(
101 101 self.__channelList),
102 102 adcResolution=14)
103 103 self.dataOut.type = "Voltage"
104 104
105 105 self.dataOut.data = None
106 106
107 107 self.dataOut.dtype = self.dtype
108 108
109 109 # self.dataOut.nChannels = 0
110 110
111 111 # self.dataOut.nHeights = 0
112 112
113 113 self.dataOut.nProfiles = int(nProfiles)
114 114
115 115 self.dataOut.heightList = self.__firstHeigth + \
116 numpy.arange(self.__nSamples, dtype=numpy.float) * \
116 numpy.arange(self.__nSamples, dtype=numpy.float_) * \
117 117 self.__deltaHeigth
118 118
119 119 #self.dataOut.channelList = list(range(self.__num_subchannels))
120 120 self.dataOut.channelList = list(range(len(self.__channelList)))
121 121 if not self.getByBlock:
122 122
123 123 self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights
124 124 else:
125 125 self.dataOut.blocksize = self.dataOut.nChannels * self.dataOut.nHeights*self.nProfileBlocks
126 126
127 127 # self.dataOut.channelIndexList = None
128 128
129 129 self.dataOut.flagNoData = True
130 130 if not self.getByBlock:
131 131 self.dataOut.flagDataAsBlock = False
132 132 else:
133 133 self.dataOut.flagDataAsBlock = True
134 134 # Set to TRUE if the data is discontinuous
135 135 self.dataOut.flagDiscontinuousBlock = False
136 136
137 137 self.dataOut.utctime = None
138 138
139 139 # timezone like jroheader, difference in minutes between UTC and localtime
140 140 self.dataOut.timeZone = self.__timezone / 60
141 141
142 142 self.dataOut.dstFlag = 0
143 143
144 144 self.dataOut.errorCount = 0
145 145
146 146 try:
147 147 self.dataOut.nCohInt = self.fixed_metadata_dict.get(
148 148 'nCohInt', self.nCohInt)
149 149
150 150 # asumo que la data esta decodificada
151 151 self.dataOut.flagDecodeData = self.fixed_metadata_dict.get(
152 152 'flagDecodeData', self.flagDecodeData)
153 153
154 154 # asumo que la data esta sin flip
155 155 self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData']
156 156
157 157 self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT']
158 158
159 159 self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime']
160 160 except:
161 161 pass
162 162
163 163 self.dataOut.ippSeconds = ippSeconds
164 164
165 165 # Time interval between profiles
166 166 # self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
167 167
168 168 self.dataOut.frequency = self.__frequency
169 169
170 170 self.dataOut.realtime = self.__online
171 171
172 172 def findDatafiles(self, path, startDate=None, endDate=None):
173 173
174 174 if not os.path.isdir(path):
175 175 return []
176 176
177 177 try:
178 178 digitalReadObj = digital_rf.DigitalRFReader(
179 179 path, load_all_metadata=True)
180 180 except:
181 181 digitalReadObj = digital_rf.DigitalRFReader(path)
182 182
183 183 channelNameList = digitalReadObj.get_channels()
184 184
185 185 if not channelNameList:
186 186 return []
187 187
188 188 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
189 189
190 190 sample_rate = metadata_dict['sample_rate'][0]
191 191
192 192 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
193 193
194 194 try:
195 195 timezone = this_metadata_file['timezone'].value
196 196 except:
197 197 timezone = 0
198 198
199 199 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(
200 200 channelNameList[0]) / sample_rate - timezone
201 201
202 202 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
203 203 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
204 204
205 205 if not startDate:
206 206 startDate = startDatetime.date()
207 207
208 208 if not endDate:
209 209 endDate = endDatatime.date()
210 210
211 211 dateList = []
212 212
213 213 thisDatetime = startDatetime
214 214
215 215 while(thisDatetime <= endDatatime):
216 216
217 217 thisDate = thisDatetime.date()
218 218
219 219 if thisDate < startDate:
220 220 continue
221 221
222 222 if thisDate > endDate:
223 223 break
224 224
225 225 dateList.append(thisDate)
226 226 thisDatetime += datetime.timedelta(1)
227 227
228 228 return dateList
229 229
230 230 def setup(self, path=None,
231 231 startDate=None,
232 232 endDate=None,
233 233 startTime=datetime.time(0, 0, 0),
234 234 endTime=datetime.time(23, 59, 59),
235 235 channelList=None,
236 236 nSamples=None,
237 237 online=False,
238 238 delay=60,
239 239 buffer_size=1024,
240 240 ippKm=None,
241 241 nCohInt=1,
242 242 nCode=1,
243 243 nBaud=1,
244 244 flagDecodeData=False,
245 code=numpy.ones((1, 1), dtype=numpy.int),
245 code=numpy.ones((1, 1), dtype=int),
246 246 getByBlock=0,
247 247 nProfileBlocks=1,
248 248 **kwargs):
249 249 '''
250 250 In this method we should set all initial parameters.
251 251
252 252 Inputs:
253 253 path
254 254 startDate
255 255 endDate
256 256 startTime
257 257 endTime
258 258 set
259 259 expLabel
260 260 ext
261 261 online
262 262 delay
263 263 '''
264 264 self.path = path
265 265 self.nCohInt = nCohInt
266 266 self.flagDecodeData = flagDecodeData
267 267 self.i = 0
268 268
269 269 self.getByBlock = getByBlock
270 270 self.nProfileBlocks = nProfileBlocks
271 271 if online:
272 272 print('Waiting for RF data..')
273 273 sleep(40)
274 274
275 275 if not os.path.isdir(path):
276 276 raise ValueError("[Reading] Directory %s does not exist" % path)
277 277
278 278 #print("path",path)
279 279 try:
280 280 self.digitalReadObj = digital_rf.DigitalRFReader(
281 281 path, load_all_metadata=True)
282 282 except:
283 283 self.digitalReadObj = digital_rf.DigitalRFReader(path)
284 284
285 285 channelNameList = self.digitalReadObj.get_channels()
286 286
287 287 if not channelNameList:
288 288 raise ValueError("[Reading] Directory %s does not have any files" % path)
289 289
290 290 if not channelList:
291 291 channelList = list(range(len(channelNameList)))
292 292
293 293 ########## Reading metadata ######################
294 294
295 295 top_properties = self.digitalReadObj.get_properties(
296 296 channelNameList[channelList[0]])
297 297
298 298 self.__num_subchannels = top_properties['num_subchannels']
299 299 self.__sample_rate = 1.0 * \
300 300 top_properties['sample_rate_numerator'] / \
301 301 top_properties['sample_rate_denominator']
302 302 # self.__samples_per_file = top_properties['samples_per_file'][0]
303 303 self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15?
304 304
305 305 this_metadata_file = self.digitalReadObj.get_digital_metadata(
306 306 channelNameList[channelList[0]])
307 307 metadata_bounds = this_metadata_file.get_bounds()
308 308 self.fixed_metadata_dict = this_metadata_file.read(
309 309 metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER
310 310
311 311 try:
312 312 self.__processingHeader = self.fixed_metadata_dict['processingHeader']
313 313 self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader']
314 314 self.__systemHeader = self.fixed_metadata_dict['systemHeader']
315 315 self.dtype = pickle.loads(self.fixed_metadata_dict['dtype'])
316 316 except:
317 317 pass
318 318
319 319 self.__frequency = None
320 320
321 321 self.__frequency = self.fixed_metadata_dict.get('frequency', 1)
322 322
323 323 self.__timezone = self.fixed_metadata_dict.get('timezone', 18000)
324 324
325 325 try:
326 326 nSamples = self.fixed_metadata_dict['nSamples']
327 327 except:
328 328 nSamples = None
329 329
330 330 self.__firstHeigth = 0
331 331
332 332 try:
333 333 codeType = self.__radarControllerHeader['codeType']
334 334 except:
335 335 codeType = 0
336 336
337 337 try:
338 338 if codeType:
339 339 nCode = self.__radarControllerHeader['nCode']
340 340 nBaud = self.__radarControllerHeader['nBaud']
341 341 code = self.__radarControllerHeader['code']
342 342 except:
343 343 pass
344 344
345 345 if not ippKm:
346 346 try:
347 347 # seconds to km
348 348 ippKm = self.__radarControllerHeader['ipp']
349 349 except:
350 350 ippKm = None
351 351 ####################################################
352 352 self.__ippKm = ippKm
353 353 startUTCSecond = None
354 354 endUTCSecond = None
355 355
356 356 if startDate:
357 357 startDatetime = datetime.datetime.combine(startDate, startTime)
358 358 startUTCSecond = (
359 359 startDatetime - datetime.datetime(1970, 1, 1)).total_seconds()# + self.__timezone
360 360
361 361 if endDate:
362 362 endDatetime = datetime.datetime.combine(endDate, endTime)
363 363 endUTCSecond = (endDatetime - datetime.datetime(1970,
364 364 1, 1)).total_seconds()# + self.__timezone
365 365 start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]])
366 366 if start_index==None or end_index==None:
367 367 print("Check error No data, start_index: ",start_index,",end_index: ",end_index)
368 368 #return 0
369 369 if not startUTCSecond:
370 370 startUTCSecond = start_index / self.__sample_rate
371 371 if start_index > startUTCSecond * self.__sample_rate:
372 372 startUTCSecond = start_index / self.__sample_rate
373 373
374 374 if not endUTCSecond:
375 375 endUTCSecond = end_index / self.__sample_rate
376 376
377 377 if end_index < endUTCSecond * self.__sample_rate:
378 378 endUTCSecond = end_index / self.__sample_rate #Check UTC and LT time
379 379
380 380 if not nSamples:
381 381 if not ippKm:
382 382 raise ValueError("[Reading] nSamples or ippKm should be defined")
383 383 nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate))
384 384
385 385 channelBoundList = []
386 386 channelNameListFiltered = []
387 387
388 388 for thisIndexChannel in channelList:
389 389 thisChannelName = channelNameList[thisIndexChannel]
390 390 start_index, end_index = self.digitalReadObj.get_bounds(
391 391 thisChannelName)
392 392 channelBoundList.append((start_index, end_index))
393 393 channelNameListFiltered.append(thisChannelName)
394 394
395 395 self.profileIndex = 0
396 396 self.i = 0
397 397 self.__delay = delay
398 398
399 399 self.__codeType = codeType
400 400 self.__nCode = nCode
401 401 self.__nBaud = nBaud
402 402 self.__code = code
403 403
404 404 self.__datapath = path
405 405 self.__online = online
406 406 self.__channelList = channelList
407 407 self.__channelNameList = channelNameListFiltered
408 408 self.__channelBoundList = channelBoundList
409 409 self.__nSamples = nSamples
410 410 if self.getByBlock:
411 411 nSamples = nSamples*nProfileBlocks
412 412
413 413
414 414 self.__samples_to_read = int(nSamples) # FIJO: AHORA 40
415 415 self.__nChannels = len(self.__channelList)
416 416 #print("------------------------------------------")
417 417 #print("self.__samples_to_read",self.__samples_to_read)
418 418 #print("self.__nSamples",self.__nSamples)
419 419 # son iguales y el buffer_index da 0
420 420 self.__startUTCSecond = startUTCSecond
421 421 self.__endUTCSecond = endUTCSecond
422 422
423 423 self.__timeInterval = 1.0 * self.__samples_to_read / \
424 424 self.__sample_rate # Time interval
425 425
426 426 if online:
427 427 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
428 428 startUTCSecond = numpy.floor(endUTCSecond)
429 429
430 430 # por que en el otro metodo lo primero q se hace es sumar samplestoread
431 431 self.__thisUnixSample = int(startUTCSecond * self.__sample_rate) - self.__samples_to_read
432 432
433 433 #self.__data_buffer = numpy.zeros(
434 434 # (self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex)
435 435 print("samplestoread",self.__samples_to_read)
436 self.__data_buffer = numpy.zeros((int(len(channelList)), self.__samples_to_read), dtype=numpy.complex)
436 self.__data_buffer = numpy.zeros((int(len(channelList)), self.__samples_to_read), dtype=numpy.complex_)
437 437
438 438
439 439 self.__setFileHeader()
440 440 self.isConfig = True
441 441
442 442 print("[Reading] Digital RF Data was found from %s to %s " % (
443 443 datetime.datetime.utcfromtimestamp(
444 444 self.__startUTCSecond - self.__timezone),
445 445 datetime.datetime.utcfromtimestamp(
446 446 self.__endUTCSecond - self.__timezone)
447 447 ))
448 448
449 449 print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
450 450 datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone)))
451 451 self.oldAverage = None
452 452 self.count = 0
453 453 self.executionTime = 0
454 454
455 455 def __reload(self):
456 456 # print
457 457 # print "%s not in range [%s, %s]" %(
458 458 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
459 459 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
460 460 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
461 461 # )
462 462 print("[Reading] reloading metadata ...")
463 463
464 464 try:
465 465 self.digitalReadObj.reload(complete_update=True)
466 466 except:
467 467 self.digitalReadObj = digital_rf.DigitalRFReader(self.path)
468 468
469 469 start_index, end_index = self.digitalReadObj.get_bounds(
470 470 self.__channelNameList[self.__channelList[0]])
471 471
472 472 if start_index > self.__startUTCSecond * self.__sample_rate:
473 473 self.__startUTCSecond = 1.0 * start_index / self.__sample_rate
474 474
475 475 if end_index > self.__endUTCSecond * self.__sample_rate:
476 476 self.__endUTCSecond = 1.0 * end_index / self.__sample_rate
477 477 print()
478 478 print("[Reading] New timerange found [%s, %s] " % (
479 479 datetime.datetime.utcfromtimestamp(
480 480 self.__startUTCSecond - self.__timezone),
481 481 datetime.datetime.utcfromtimestamp(
482 482 self.__endUTCSecond - self.__timezone)
483 483 ))
484 484
485 485 return True
486 486
487 487 return False
488 488
489 489 def timeit(self, toExecute):
490 490 t0 = time.time()
491 491 toExecute()
492 492 self.executionTime = time.time() - t0
493 493 if self.oldAverage is None:
494 494 self.oldAverage = self.executionTime
495 495 self.oldAverage = (self.executionTime + self.count *
496 496 self.oldAverage) / (self.count + 1.0)
497 497 self.count = self.count + 1.0
498 498 return
499 499
500 500 def __readNextBlock(self, seconds=30, volt_scale=1/20000.0):
501 501 '''
502 502 NOTA: APLICACION RADAR METEOROLOGICO
503 503 VALORES OBTENIDOS CON LA USRP, volt_scale = 1,conexion directa al Ch Rx.
504 504
505 505 MAXIMO
506 506 9886 -> 0.980 Voltiospp
507 507 4939 -> 0.480 Voltiospp
508 508 14825 -> 1.440 Voltiospp
509 509 18129 -> 1.940 Voltiospp
510 510 Para llevar al valor correspondiente de Voltaje, debemos dividir por 20000
511 511 y obtenemos la Amplitud correspondiente de entrada IQ.
512 512 volt_scale = (1/20000.0)
513 513 '''
514 514 # Set the next data
515 515 self.__flagDiscontinuousBlock = False
516 516 self.__thisUnixSample += self.__samples_to_read
517 517
518 518 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
519 519 print ("[Reading] There are no more data into selected time-range")
520 520 if self.__online:
521 521 sleep(3)
522 522 self.__reload()
523 523 else:
524 524 return False
525 525
526 526 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
527 527 return False
528 528 self.__thisUnixSample -= self.__samples_to_read
529 529
530 530 indexChannel = 0
531 531
532 532 dataOk = False
533 533
534 534 for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS?
535 535 for indexSubchannel in range(self.__num_subchannels):
536 536 try:
537 537 t0 = time()
538 538 #print("thisUNixSample",self.__thisUnixSample)
539 539 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
540 540 self.__samples_to_read,
541 541 thisChannelName, sub_channel=indexSubchannel)
542 542 #print("result--------------",result)
543 543 self.executionTime = time() - t0
544 544 if self.oldAverage is None:
545 545 self.oldAverage = self.executionTime
546 546 self.oldAverage = (
547 547 self.executionTime + self.count * self.oldAverage) / (self.count + 1.0)
548 548 self.count = self.count + 1.0
549 549
550 550 except IOError as e:
551 551 # read next profile
552 552 self.__flagDiscontinuousBlock = True
553 553 print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
554 554 bot = 0
555 555 while(self.__flagDiscontinuousBlock):
556 556 bot +=1
557 self.__thisUnixSample += self.__sample_rate
557 self.__thisUnixSample += self.__samples_to_read
558 558 try:
559 559 result = result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,self.__samples_to_read,thisChannelName, sub_channel=indexSubchannel)
560 560 self.__flagDiscontinuousBlock=False
561 561 print("Searching.. NΒ°: ",bot,"Success",self.__thisUnixSample)
562 562 except:
563 563 print("Searching...NΒ°: ",bot,"Fail", self.__thisUnixSample)
564 564 if self.__flagDiscontinuousBlock==True:
565 565 break
566 566 else:
567 567 print("New data index found...",self.__thisUnixSample)
568 568 #break
569 569
570 570 if result.shape[0] != self.__samples_to_read:
571 571 self.__flagDiscontinuousBlock = True
572 572 print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
573 573 result.shape[0],
574 574 self.__samples_to_read))
575 575 break
576 576
577 577 self.__data_buffer[indexChannel, :] = result * volt_scale
578 578 indexChannel+=1
579 579
580 580 dataOk = True
581 581
582 582 self.__utctime = self.__thisUnixSample / self.__sample_rate
583 583
584 584 if not dataOk:
585 585 return False
586 586
587 587 print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
588 588 self.__samples_to_read,
589 589 self.__timeInterval))
590 590
591 591 self.__bufferIndex = 0
592 592
593 593 return True
594 594
595 595 def __isBufferEmpty(self):
596 596
597 597 return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40
598 598
599 599 def getData(self, seconds=30, nTries=5):
600 600 '''
601 601 This method gets the data from files and put the data into the dataOut object
602 602
603 603 In addition, increase el the buffer counter in one.
604 604
605 605 Return:
606 606 data : retorna un perfil de voltages (alturas * canales) copiados desde el
607 607 buffer. Si no hay mas archivos a leer retorna None.
608 608
609 609 Affected:
610 610 self.dataOut
611 611 self.profileIndex
612 612 self.flagDiscontinuousBlock
613 613 self.flagIsNewBlock
614 614 '''
615 615 #print("getdata")
616 616 err_counter = 0
617 617 self.dataOut.flagNoData = True
618 618
619 619
620 620 if self.__isBufferEmpty():
621 621 #print("hi")
622 622 self.__flagDiscontinuousBlock = False
623 623
624 624 while True:
625 625 if self.__readNextBlock():
626 626 break
627 627 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
628 628 raise schainpy.admin.SchainError('Error')
629 629 return
630 630
631 631 if self.__flagDiscontinuousBlock:
632 632 raise schainpy.admin.SchainError('discontinuous block found')
633 633 return
634 634
635 635 if not self.__online:
636 636 raise schainpy.admin.SchainError('Online?')
637 637 return
638 638
639 639 err_counter += 1
640 640 if err_counter > nTries:
641 641 raise schainpy.admin.SchainError('Max retrys reach')
642 642 return
643 643
644 644 print('[Reading] waiting %d seconds to read a new block' % seconds)
645 645 sleep(seconds)
646 646
647 647
648 648 if not self.getByBlock:
649 649
650 650 #print("self.__bufferIndex",self.__bufferIndex)# este valor siempre es cero aparentemente
651 651 self.dataOut.data = self.__data_buffer[:, self.__bufferIndex:self.__bufferIndex + self.__nSamples]
652 652 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
653 653 self.dataOut.flagNoData = False
654 654 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
655 655 self.dataOut.profileIndex = self.profileIndex
656 656
657 657 self.__bufferIndex += self.__nSamples
658 658 self.profileIndex += 1
659 659
660 660 if self.profileIndex == self.dataOut.nProfiles:
661 661 self.profileIndex = 0
662 662 else:
663 663 # ojo debo anadir el readNextBLock y el __isBufferEmpty(
664 664 self.dataOut.flagNoData = False
665 665 buffer = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex + self.__samples_to_read]
666 666 buffer = buffer.reshape((self.__nChannels, self.nProfileBlocks, int(self.__samples_to_read/self.nProfileBlocks)))
667 667 self.dataOut.nProfileBlocks = self.nProfileBlocks
668 668 self.dataOut.data = buffer
669 669 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
670 670 self.profileIndex += self.__samples_to_read
671 671 self.__bufferIndex += self.__samples_to_read
672 672 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
673 673 return True
674 674
675 675
676 676 def printInfo(self):
677 677 '''
678 678 '''
679 679 if self.__printInfo == False:
680 680 return
681 681
682 682 # self.systemHeaderObj.printInfo()
683 683 # self.radarControllerHeaderObj.printInfo()
684 684
685 685 self.__printInfo = False
686 686
687 687 def printNumberOfBlock(self):
688 688 '''
689 689 '''
690 690 return
691 691 # print self.profileIndex
692 692
693 693 def run(self, **kwargs):
694 694 '''
695 695 This method will be called many times so here you should put all your code
696 696 '''
697 697
698 698 if not self.isConfig:
699 699 self.setup(**kwargs)
700 700
701 701 self.getData(seconds=self.__delay)
702 702
703 703 return
704 704
705 705 @MPDecorator
706 706 class DigitalRFWriter(Operation):
707 707 '''
708 708 classdocs
709 709 '''
710 710
711 711 def __init__(self, **kwargs):
712 712 '''
713 713 Constructor
714 714 '''
715 715 Operation.__init__(self, **kwargs)
716 716 self.metadata_dict = {}
717 717 self.dataOut = None
718 718 self.dtype = None
719 719 self.oldAverage = 0
720 720
721 721 def setHeader(self):
722 722
723 723 self.metadata_dict['frequency'] = self.dataOut.frequency
724 724 self.metadata_dict['timezone'] = self.dataOut.timeZone
725 725 self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype)
726 726 self.metadata_dict['nProfiles'] = self.dataOut.nProfiles
727 727 self.metadata_dict['heightList'] = self.dataOut.heightList
728 728 self.metadata_dict['channelList'] = self.dataOut.channelList
729 729 self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData
730 730 self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData
731 731 self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT
732 732 self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime
733 733 self.metadata_dict['nCohInt'] = self.dataOut.nCohInt
734 734 self.metadata_dict['type'] = self.dataOut.type
735 735 self.metadata_dict['flagDataAsBlock']= getattr(
736 736 self.dataOut, 'flagDataAsBlock', None) # chequear
737 737
738 738 def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'):
739 739 '''
740 740 In this method we should set all initial parameters.
741 741 Input:
742 742 dataOut: Input data will also be outputa data
743 743 '''
744 744 self.setHeader()
745 745 self.__ippSeconds = dataOut.ippSeconds
746 746 self.__deltaH = dataOut.getDeltaH()
747 747 self.__sample_rate = 1e6 * 0.15 / self.__deltaH
748 748 self.__dtype = dataOut.dtype
749 749 if len(dataOut.dtype) == 2:
750 750 self.__dtype = dataOut.dtype[0]
751 751 self.__nSamples = dataOut.systemHeaderObj.nSamples
752 752 self.__nProfiles = dataOut.nProfiles
753 753
754 754 if self.dataOut.type != 'Voltage':
755 755 raise 'Digital RF cannot be used with this data type'
756 756 self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len(
757 757 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
758 758 else:
759 759 self.arr_data = numpy.ones((self.__nSamples, len(
760 760 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
761 761
762 762 file_cadence_millisecs = 1000
763 763
764 764 sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator()
765 765 sample_rate_numerator = int(sample_rate_fraction.numerator)
766 766 sample_rate_denominator = int(sample_rate_fraction.denominator)
767 767 start_global_index = dataOut.utctime * self.__sample_rate
768 768
769 769 uuid = 'prueba'
770 770 compression_level = 0
771 771 checksum = False
772 772 is_complex = True
773 773 num_subchannels = len(dataOut.channelList)
774 774 is_continuous = True
775 775 marching_periods = False
776 776
777 777 self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence,
778 778 fileCadence, start_global_index,
779 779 sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum,
780 780 is_complex, num_subchannels, is_continuous, marching_periods)
781 781 metadata_dir = os.path.join(path, 'metadata')
782 782 os.system('mkdir %s' % (metadata_dir))
783 783 self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000
784 784 sample_rate_numerator, sample_rate_denominator,
785 785 metadataFile)
786 786 self.isConfig = True
787 787 self.currentSample = 0
788 788 self.oldAverage = 0
789 789 self.count = 0
790 790 return
791 791
792 792 def writeMetadata(self):
793 793 start_idx = self.__sample_rate * self.dataOut.utctime
794 794
795 795 self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict(
796 796 )
797 797 self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict(
798 798 )
799 799 self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict(
800 800 )
801 801 self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict)
802 802 return
803 803
804 804 def timeit(self, toExecute):
805 805 t0 = time()
806 806 toExecute()
807 807 self.executionTime = time() - t0
808 808 if self.oldAverage is None:
809 809 self.oldAverage = self.executionTime
810 810 self.oldAverage = (self.executionTime + self.count *
811 811 self.oldAverage) / (self.count + 1.0)
812 812 self.count = self.count + 1.0
813 813 return
814 814
815 815 def writeData(self):
816 816 if self.dataOut.type != 'Voltage':
817 817 raise 'Digital RF cannot be used with this data type'
818 818 for channel in self.dataOut.channelList:
819 819 for i in range(self.dataOut.nFFTPoints):
820 820 self.arr_data[1][channel * self.dataOut.nFFTPoints +
821 821 i]['r'] = self.dataOut.data[channel][i].real
822 822 self.arr_data[1][channel * self.dataOut.nFFTPoints +
823 823 i]['i'] = self.dataOut.data[channel][i].imag
824 824 else:
825 825 for i in range(self.dataOut.systemHeaderObj.nSamples):
826 826 for channel in self.dataOut.channelList:
827 827 self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real
828 828 self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag
829 829
830 830 def f(): return self.digitalWriteObj.rf_write(self.arr_data)
831 831 self.timeit(f)
832 832
833 833 return
834 834
835 835 def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs):
836 836 '''
837 837 This method will be called many times so here you should put all your code
838 838 Inputs:
839 839 dataOut: object with the data
840 840 '''
841 841 # print dataOut.__dict__
842 842 self.dataOut = dataOut
843 843 if not self.isConfig:
844 844 self.setup(dataOut, path, frequency, fileCadence,
845 845 dirCadence, metadataCadence, **kwargs)
846 846 self.writeMetadata()
847 847
848 848 self.writeData()
849 849
850 850 ## self.currentSample += 1
851 851 # if self.dataOut.flagDataAsBlock or self.currentSample == 1:
852 852 # self.writeMetadata()
853 853 ## if self.currentSample == self.__nProfiles: self.currentSample = 0
854 854
855 855 return dataOut# en la version 2.7 no aparece este return
856 856
857 857 def close(self):
858 858 print('[Writing] - Closing files ')
859 859 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
860 860 try:
861 861 self.digitalWriteObj.close()
862 862 except:
863 863 pass
General Comments 0
You need to be logged in to leave comments. Login now