##// END OF EJS Templates
jroIO_usrp.py: nbuffer argument was eliminated
Miguel Valdez -
r727:79a1d6033326
parent child
Show More
@@ -1,591 +1,587
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9 9
10 10 try:
11 11 from gevent import sleep
12 12 except:
13 13 from time import sleep
14 14
15 15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 16 from schainpy.model.data.jrodata import Voltage
17 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
18 18
19 19 try:
20 20 import digital_rf_hdf5
21 21 except:
22 22 print 'You should install "digital_rf_hdf5" module if you want to read USRP data'
23 23
24 24 class USRPReader(ProcessingUnit):
25 25 '''
26 26 classdocs
27 27 '''
28 28
29 29 def __init__(self):
30 30 '''
31 31 Constructor
32 32 '''
33 33
34 34 ProcessingUnit.__init__(self)
35 35
36 36 self.dataOut = Voltage()
37 37 self.__printInfo = True
38 38 self.__flagDiscontinuousBlock = False
39 39 self.__bufferIndex = 9999999
40 40
41 41 self.__ippKm = None
42 42 self.__codeType = 0
43 43 self.__nCode = None
44 44 self.__nBaud = None
45 45 self.__code = None
46 46
47 47 def __getCurrentSecond(self):
48 48
49 49 return self.__thisUnixSample/self.__sample_rate
50 50
51 51 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
52 52
53 53 def __setFileHeader(self):
54 54 '''
55 55 In this method will be initialized every parameter of dataOut object (header, no data)
56 56 '''
57 57 nProfiles = self.__sample_rate #Number of profiles by second
58 58
59 59 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
60 60 txA=0,
61 61 txB=0,
62 62 nWindows=1,
63 63 nHeights=self.__nSamples,
64 64 firstHeight=self.__firstHeigth,
65 65 deltaHeight=self.__deltaHeigth,
66 66 codeType=self.__codeType,
67 67 nCode=self.__nCode, nBaud=self.__nBaud,
68 68 code = self.__code)
69 69
70 70 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
71 71 nProfiles=nProfiles,
72 72 nChannels=len(self.__channelList),
73 73 adcResolution=14)
74 74
75 75 self.dataOut.type = "Voltage"
76 76
77 77 self.dataOut.data = None
78 78
79 79 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
80 80
81 81 # self.dataOut.nChannels = 0
82 82
83 83 # self.dataOut.nHeights = 0
84 84
85 85 self.dataOut.nProfiles = nProfiles
86 86
87 87 self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
88 88
89 89 self.dataOut.channelList = self.__channelList
90 90
91 91 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
92 92
93 93 # self.dataOut.channelIndexList = None
94 94
95 95 self.dataOut.flagNoData = True
96 96
97 97 #Set to TRUE if the data is discontinuous
98 98 self.dataOut.flagDiscontinuousBlock = False
99 99
100 100 self.dataOut.utctime = None
101 101
102 102 self.dataOut.timeZone = self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
103 103
104 104 self.dataOut.dstFlag = 0
105 105
106 106 self.dataOut.errorCount = 0
107 107
108 108 self.dataOut.nCohInt = 1
109 109
110 110 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
111 111
112 112 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
113 113
114 114 self.dataOut.flagShiftFFT = False
115 115
116 116 self.dataOut.ippSeconds = 1.0*self.__nSamples/self.__sample_rate
117 117
118 118 #Time interval between profiles
119 119 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
120 120
121 121 self.dataOut.frequency = self.__frequency
122 122
123 123 self.dataOut.realtime = self.__online
124 124
125 125 def findDatafiles(self, path, startDate=None, endDate=None):
126 126
127 127 if not os.path.isdir(path):
128 128 return []
129 129
130 130 try:
131 131 digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
132 132 except:
133 133 digitalReadObj = digital_rf_hdf5.read_hdf5(path)
134 134
135 135 channelNameList = digitalReadObj.get_channels()
136 136
137 137 if not channelNameList:
138 138 return []
139 139
140 140 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
141 141
142 142 sample_rate = metadata_dict['sample_rate'][0]
143 143
144 144 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
145 145
146 146 try:
147 147 timezone = this_metadata_file['timezone'].value
148 148 except:
149 149 timezone = 0
150 150
151 151 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(channelNameList[0])/sample_rate - timezone
152 152
153 153 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
154 154 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
155 155
156 156 if not startDate:
157 157 startDate = startDatetime.date()
158 158
159 159 if not endDate:
160 160 endDate = endDatatime.date()
161 161
162 162 dateList = []
163 163
164 164 thisDatetime = startDatetime
165 165
166 166 while(thisDatetime<=endDatatime):
167 167
168 168 thisDate = thisDatetime.date()
169 169
170 170 if thisDate < startDate:
171 171 continue
172 172
173 173 if thisDate > endDate:
174 174 break
175 175
176 176 dateList.append(thisDate)
177 177 thisDatetime += datetime.timedelta(1)
178 178
179 179 return dateList
180 180
181 181 def setup(self, path = None,
182 182 startDate = None,
183 183 endDate = None,
184 184 startTime = datetime.time(0,0,0),
185 185 endTime = datetime.time(23,59,59),
186 186 channelList = None,
187 187 nSamples = None,
188 188 ippKm = 60,
189 189 online = False,
190 190 delay = 60,
191 buffer_size = None,
192 nbuffer = 1024,
191 buffer_size = 1024,
193 192 **kwargs):
194 193 '''
195 194 In this method we should set all initial parameters.
196 195
197 196 Inputs:
198 197 path
199 198 startDate
200 199 endDate
201 200 startTime
202 201 endTime
203 202 set
204 203 expLabel
205 204 ext
206 205 online
207 206 delay
208 207 '''
209 208
210 if not buffer_size:
211 buffer_size = nbuffer
212
213 209 if not os.path.isdir(path):
214 210 raise ValueError, "[Reading] Directory %s does not exist" %path
215 211
216 212 try:
217 213 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
218 214 except:
219 215 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path)
220 216
221 217 channelNameList = self.digitalReadObj.get_channels()
222 218
223 219 if not channelNameList:
224 220 raise ValueError, "[Reading] Directory %s does not have any files" %path
225 221
226 222 if not channelList:
227 223 channelList = range(len(channelNameList))
228 224
229 225 ########## Reading metadata ######################
230 226
231 227 metadata_dict = self.digitalReadObj.get_rf_file_metadata(channelNameList[channelList[0]])
232 228
233 229 self.__sample_rate = metadata_dict['sample_rate'][0]
234 230 self.__samples_per_file = metadata_dict['samples_per_file'][0]
235 231 self.__deltaHeigth = 1e6*0.15/self.__sample_rate
236 232
237 233 this_metadata_file = self.digitalReadObj.get_metadata(channelNameList[channelList[0]])
238 234
239 235 self.__frequency = this_metadata_file['center_frequencies'].value
240 236 try:
241 237 self.__timezone = this_metadata_file['timezone'].value
242 238 except:
243 239 self.__timezone = 0
244 240
245 241 self.__firstHeigth = 0
246 242
247 243 try:
248 244 codeType = this_metadata_file['codeType'].value
249 245 except:
250 246 codeType = 0
251 247
252 248 nCode = 1
253 249 nBaud = 1
254 250 code = numpy.ones((nCode, nBaud), dtype=numpy.int)
255 251
256 252 if codeType:
257 253 nCode = this_metadata_file['nCode'].value
258 254 nBaud = this_metadata_file['nBaud'].value
259 255 code = this_metadata_file['code'].value
260 256
261 257 if not ippKm:
262 258 try:
263 259 #seconds to km
264 260 ippKm = 1e6*0.15*this_metadata_file['ipp'].value
265 261 except:
266 262 ippKm = None
267 263
268 264 ####################################################
269 265 startUTCSecond = None
270 266 endUTCSecond = None
271 267
272 268 if startDate:
273 269 startDatetime = datetime.datetime.combine(startDate, startTime)
274 270 startUTCSecond = (startDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
275 271
276 272 if endDate:
277 273 endDatetime = datetime.datetime.combine(endDate, endTime)
278 274 endUTCSecond = (endDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
279 275
280 276 start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]])
281 277
282 278 if not startUTCSecond:
283 279 startUTCSecond = start_index/self.__sample_rate
284 280
285 281 if start_index > startUTCSecond*self.__sample_rate:
286 282 startUTCSecond = start_index/self.__sample_rate
287 283
288 284 if not endUTCSecond:
289 285 endUTCSecond = end_index/self.__sample_rate
290 286
291 287 if end_index < endUTCSecond*self.__sample_rate:
292 288 endUTCSecond = end_index/self.__sample_rate
293 289
294 290 if not nSamples:
295 291 if not ippKm:
296 292 raise ValueError, "[Reading] nSamples or ippKm should be defined"
297 293
298 294 nSamples = ippKm / (1e6*0.15/self.__sample_rate)
299 295
300 296 channelBoundList = []
301 297 channelNameListFiltered = []
302 298
303 299 for thisIndexChannel in channelList:
304 300 thisChannelName = channelNameList[thisIndexChannel]
305 301 start_index, end_index = self.digitalReadObj.get_bounds(thisChannelName)
306 302 channelBoundList.append((start_index, end_index))
307 303 channelNameListFiltered.append(thisChannelName)
308 304
309 305 self.profileIndex = 0
310 306
311 307 self.__delay = delay
312 308 self.__ippKm = ippKm
313 309 self.__codeType = codeType
314 310 self.__nCode = nCode
315 311 self.__nBaud = nBaud
316 312 self.__code = code
317 313
318 314 self.__datapath = path
319 315 self.__online = online
320 316 self.__channelList = channelList
321 317 self.__channelNameList = channelNameListFiltered
322 318 self.__channelBoundList = channelBoundList
323 319 self.__nSamples = nSamples
324 320 self.__samples_to_read = buffer_size*nSamples
325 321 self.__nChannels = len(self.__channelList)
326 322
327 323 self.__startUTCSecond = startUTCSecond
328 324 self.__endUTCSecond = endUTCSecond
329 325
330 326 self.__timeInterval = 1.0 * self.__samples_to_read/self.__sample_rate #Time interval
331 327
332 328 if online:
333 329 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
334 330 startUTCSecond = numpy.floor(endUTCSecond)
335 331
336 332 self.__thisUnixSample = int(startUTCSecond*self.__sample_rate) - self.__samples_to_read
337 333
338 334 self.__data_buffer = numpy.zeros((self.__nChannels, self.__samples_to_read), dtype = numpy.complex)
339 335
340 336 self.__setFileHeader()
341 337 self.isConfig = True
342 338
343 339 print "[Reading] USRP Data was found from %s to %s " %(
344 340 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
345 341 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
346 342 )
347 343
348 344 print "[Reading] Starting process from %s to %s" %(datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
349 345 datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone)
350 346 )
351 347
352 348 def __reload(self):
353 349
354 350 if not self.__online:
355 351 return
356 352
357 353 # print
358 354 # print "%s not in range [%s, %s]" %(
359 355 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
360 356 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
361 357 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
362 358 # )
363 359 print "[Reading] reloading metadata ..."
364 360
365 361 try:
366 362 self.digitalReadObj.reload(complete_update=True)
367 363 except:
368 364 self.digitalReadObj.reload()
369 365
370 366 start_index, end_index = self.digitalReadObj.get_bounds(self.__channelNameList[self.__channelList[0]])
371 367
372 368 if start_index > self.__startUTCSecond*self.__sample_rate:
373 369 self.__startUTCSecond = 1.0*start_index/self.__sample_rate
374 370
375 371 if end_index > self.__endUTCSecond*self.__sample_rate:
376 372 self.__endUTCSecond = 1.0*end_index/self.__sample_rate
377 373 print
378 374 print "[Reading] New timerange found [%s, %s] " %(
379 375 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
380 376 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
381 377 )
382 378
383 379 return True
384 380
385 381 return False
386 382
387 383 def __readNextBlock(self, seconds=30, volt_scale = 218776):
388 384 '''
389 385 '''
390 386
391 387 #Set the next data
392 388 self.__flagDiscontinuousBlock = False
393 389 self.__thisUnixSample += self.__samples_to_read
394 390
395 391 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
396 392 print "[Reading] There are no more data into selected timerange"
397 393
398 394 self.__reload()
399 395
400 396 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
401 397 self.__thisUnixSample -= self.__samples_to_read
402 398 return False
403 399
404 400 indexChannel = 0
405 401
406 402 dataOk = False
407 403
408 404 for thisChannelName in self.__channelNameList:
409 405
410 406 try:
411 407 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
412 408 self.__samples_to_read,
413 409 thisChannelName)
414 410
415 411 except IOError, e:
416 412 #read next profile
417 413 self.__flagDiscontinuousBlock = True
418 414 print "[Reading] %s" %datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e
419 415 break
420 416
421 417 if result.shape[0] != self.__samples_to_read:
422 418 self.__flagDiscontinuousBlock = True
423 419 print "[Reading] %s: Too few samples were found, just %d/%d samples" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
424 420 result.shape[0],
425 421 self.__samples_to_read)
426 422 break
427 423
428 424 self.__data_buffer[indexChannel,:] = result*volt_scale
429 425
430 426 indexChannel += 1
431 427
432 428 dataOk = True
433 429
434 430 self.__utctime = self.__thisUnixSample/self.__sample_rate
435 431
436 432 if not dataOk:
437 433 return False
438 434
439 435 print "[Reading] %s: %d samples <> %f sec" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
440 436 self.__samples_to_read,
441 437 self.__timeInterval)
442 438
443 439 self.__bufferIndex = 0
444 440
445 441 return True
446 442
447 443 def __isBufferEmpty(self):
448 444
449 445 if self.__bufferIndex <= self.__samples_to_read - self.__nSamples:
450 446 return False
451 447
452 448 return True
453 449
454 450 def getData(self, seconds=30, nTries=5):
455 451
456 452 '''
457 453 This method gets the data from files and put the data into the dataOut object
458 454
459 455 In addition, increase el the buffer counter in one.
460 456
461 457 Return:
462 458 data : retorna un perfil de voltages (alturas * canales) copiados desde el
463 459 buffer. Si no hay mas archivos a leer retorna None.
464 460
465 461 Affected:
466 462 self.dataOut
467 463 self.profileIndex
468 464 self.flagDiscontinuousBlock
469 465 self.flagIsNewBlock
470 466 '''
471 467
472 468 err_counter = 0
473 469 self.dataOut.flagNoData = True
474 470
475 471 if self.__isBufferEmpty():
476 472
477 473 self.__flagDiscontinuousBlock = False
478 474
479 475 while True:
480 476 if self.__readNextBlock():
481 477 break
482 478
483 479 if self.__thisUnixSample > self.__endUTCSecond*self.__sample_rate:
484 480 return False
485 481
486 482 if self.__flagDiscontinuousBlock:
487 483 print '[Reading] discontinuous block found ... continue with the next block'
488 484 continue
489 485
490 486 if not self.__online:
491 487 return False
492 488
493 489 err_counter += 1
494 490 if err_counter > nTries:
495 491 return False
496 492
497 493 print '[Reading] waiting %d seconds to read a new block' %seconds
498 494 sleep(seconds)
499 495
500 496 self.dataOut.data = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex+self.__nSamples]
501 497 self.dataOut.utctime = (self.__thisUnixSample + self.__bufferIndex)/self.__sample_rate
502 498 self.dataOut.flagNoData = False
503 499 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
504 500
505 501 self.__bufferIndex += self.__nSamples
506 502 self.profileIndex += 1
507 503
508 504 return True
509 505
510 506 def printInfo(self):
511 507 '''
512 508 '''
513 509 if self.__printInfo == False:
514 510 return
515 511
516 512 # self.systemHeaderObj.printInfo()
517 513 # self.radarControllerHeaderObj.printInfo()
518 514
519 515 self.__printInfo = False
520 516
521 517 def printNumberOfBlock(self):
522 518 '''
523 519 '''
524 520
525 521 print self.profileIndex
526 522
527 523 def run(self, **kwargs):
528 524 '''
529 525 This method will be called many times so here you should put all your code
530 526 '''
531 527
532 528 if not self.isConfig:
533 529 self.setup(**kwargs)
534 530
535 531 self.getData(seconds=self.__delay)
536 532
537 533 return
538 534
539 535 class USRPWriter(Operation):
540 536 '''
541 537 classdocs
542 538 '''
543 539
544 540 def __init__(self):
545 541 '''
546 542 Constructor
547 543 '''
548 544 self.dataOut = None
549 545
550 546 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
551 547 '''
552 548 In this method we should set all initial parameters.
553 549
554 550 Input:
555 551 dataIn : Input data will also be outputa data
556 552
557 553 '''
558 554 self.dataOut = dataIn
559 555
560 556
561 557
562 558
563 559
564 560 self.isConfig = True
565 561
566 562 return
567 563
568 564 def run(self, dataIn, **kwargs):
569 565 '''
570 566 This method will be called many times so here you should put all your code
571 567
572 568 Inputs:
573 569
574 570 dataIn : object with the data
575 571
576 572 '''
577 573
578 574 if not self.isConfig:
579 575 self.setup(dataIn, **kwargs)
580 576
581 577
582 578 if __name__ == '__main__':
583 579
584 580 readObj = USRPReader()
585 581
586 582 while True:
587 583 readObj.run(path='/Volumes/DATA/haystack/passive_radar/')
588 584 # readObj.printInfo()
589 585 readObj.printNumberOfBlock()
590 586
591 587 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now