##// END OF EJS Templates
Bug fixed in jroIO_usrp.py: Code should be a numpy.array
Miguel Valdez -
r686:70c29a3edf06
parent child
Show More
@@ -1,591 +1,591
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9 9
10 10 try:
11 11 from gevent import sleep
12 12 except:
13 13 from time import sleep
14 14
15 15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 16 from schainpy.model.data.jrodata import Voltage
17 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
18 18
19 19 try:
20 20 import digital_rf_hdf5
21 21 except:
22 22 print 'You should install "digital_rf_hdf5" module if you want to read USRP data'
23 23
24 24 class USRPReader(ProcessingUnit):
25 25 '''
26 26 classdocs
27 27 '''
28 28
29 29 def __init__(self):
30 30 '''
31 31 Constructor
32 32 '''
33 33
34 34 ProcessingUnit.__init__(self)
35 35
36 36 self.dataOut = Voltage()
37 37 self.__printInfo = True
38 38 self.__flagDiscontinuousBlock = False
39 39 self.__bufferIndex = 9999999
40 40
41 41 self.__ippKm = None
42 42 self.__codeType = 0
43 43 self.__nCode = None
44 44 self.__nBaud = None
45 45 self.__code = None
46 46
47 47 def __getCurrentSecond(self):
48 48
49 49 return self.__thisUnixSample/self.__sample_rate
50 50
51 51 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
52 52
53 53 def __setFileHeader(self):
54 54 '''
55 55 In this method will be initialized every parameter of dataOut object (header, no data)
56 56 '''
57 57 nProfiles = self.__sample_rate #Number of profiles by second
58 58
59 59 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
60 60 txA=0,
61 61 txB=0,
62 62 nWindows=1,
63 63 nHeights=self.__nSamples,
64 64 firstHeight=self.__firstHeigth,
65 65 deltaHeight=self.__deltaHeigth,
66 66 codeType=self.__codeType,
67 67 nCode=self.__nCode, nBaud=self.__nBaud,
68 68 code = self.__code)
69 69
70 70 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
71 71 nProfiles=nProfiles,
72 72 nChannels=len(self.__channelList),
73 73 adcResolution=14)
74 74
75 75 self.dataOut.type = "Voltage"
76 76
77 77 self.dataOut.data = None
78 78
79 79 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
80 80
81 81 # self.dataOut.nChannels = 0
82 82
83 83 # self.dataOut.nHeights = 0
84 84
85 85 self.dataOut.nProfiles = nProfiles
86 86
87 87 self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
88 88
89 89 self.dataOut.channelList = self.__channelList
90 90
91 91 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
92 92
93 93 # self.dataOut.channelIndexList = None
94 94
95 95 self.dataOut.flagNoData = True
96 96
97 97 #Set to TRUE if the data is discontinuous
98 98 self.dataOut.flagDiscontinuousBlock = False
99 99
100 100 self.dataOut.utctime = None
101 101
102 102 self.dataOut.timeZone = self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
103 103
104 104 self.dataOut.dstFlag = 0
105 105
106 106 self.dataOut.errorCount = 0
107 107
108 108 self.dataOut.nCohInt = 1
109 109
110 110 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
111 111
112 112 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
113 113
114 114 self.dataOut.flagShiftFFT = False
115 115
116 116 self.dataOut.ippSeconds = 1.0*self.__nSamples/self.__sample_rate
117 117
118 118 #Time interval between profiles
119 119 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
120 120
121 121 self.dataOut.frequency = self.__frequency
122 122
123 123 self.dataOut.realtime = self.__online
124 124
125 125 def findDatafiles(self, path, startDate=None, endDate=None):
126 126
127 127 if not os.path.isdir(path):
128 128 return []
129 129
130 130 try:
131 131 digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
132 132 except:
133 133 digitalReadObj = digital_rf_hdf5.read_hdf5(path)
134 134
135 135 channelNameList = digitalReadObj.get_channels()
136 136
137 137 if not channelNameList:
138 138 return []
139 139
140 140 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
141 141
142 142 sample_rate = metadata_dict['sample_rate'][0]
143 143
144 144 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
145 145
146 146 try:
147 147 timezone = this_metadata_file['timezone'].value
148 148 except:
149 149 timezone = 0
150 150
151 151 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(channelNameList[0])/sample_rate - timezone
152 152
153 153 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
154 154 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
155 155
156 156 if not startDate:
157 157 startDate = startDatetime.date()
158 158
159 159 if not endDate:
160 160 endDate = endDatatime.date()
161 161
162 162 dateList = []
163 163
164 164 thisDatetime = startDatetime
165 165
166 166 while(thisDatetime<=endDatatime):
167 167
168 168 thisDate = thisDatetime.date()
169 169
170 170 if thisDate < startDate:
171 171 continue
172 172
173 173 if thisDate > endDate:
174 174 break
175 175
176 176 dateList.append(thisDate)
177 177 thisDatetime += datetime.timedelta(1)
178 178
179 179 return dateList
180 180
181 181 def setup(self, path = None,
182 182 startDate = None,
183 183 endDate = None,
184 184 startTime = datetime.time(0,0,0),
185 185 endTime = datetime.time(23,59,59),
186 186 channelList = None,
187 187 nSamples = None,
188 188 ippKm = 60,
189 189 online = False,
190 190 delay = 60,
191 191 buffer_size = None,
192 192 nbuffer = 1024,
193 193 **kwargs):
194 194 '''
195 195 In this method we should set all initial parameters.
196 196
197 197 Inputs:
198 198 path
199 199 startDate
200 200 endDate
201 201 startTime
202 202 endTime
203 203 set
204 204 expLabel
205 205 ext
206 206 online
207 207 delay
208 208 '''
209 209
210 210 if not buffer_size:
211 211 buffer_size = nbuffer
212 212
213 213 if not os.path.isdir(path):
214 214 raise ValueError, "[Reading] Directory %s does not exist" %path
215 215
216 216 try:
217 217 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
218 218 except:
219 219 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path)
220 220
221 221 channelNameList = self.digitalReadObj.get_channels()
222 222
223 223 if not channelNameList:
224 224 raise ValueError, "[Reading] Directory %s does not have any files" %path
225 225
226 226 if not channelList:
227 227 channelList = range(len(channelNameList))
228 228
229 229 ########## Reading metadata ######################
230 230
231 231 metadata_dict = self.digitalReadObj.get_rf_file_metadata(channelNameList[channelList[0]])
232 232
233 233 self.__sample_rate = metadata_dict['sample_rate'][0]
234 234 self.__samples_per_file = metadata_dict['samples_per_file'][0]
235 235 self.__deltaHeigth = 1e6*0.15/self.__sample_rate
236 236
237 237 this_metadata_file = self.digitalReadObj.get_metadata(channelNameList[channelList[0]])
238 238
239 239 self.__frequency = this_metadata_file['center_frequencies'].value
240 240 try:
241 241 self.__timezone = this_metadata_file['timezone'].value
242 242 except:
243 243 self.__timezone = 0
244 244
245 245 self.__firstHeigth = 0
246 246
247 247 try:
248 248 codeType = this_metadata_file['codeType'].value
249 249 except:
250 250 codeType = 0
251 251
252 252 nCode = 1
253 253 nBaud = 1
254 code = numpy.zeros((nCode, nBaud))
254 code = numpy.ones((nCode, nBaud), dtype=numpy.int)
255 255
256 256 if codeType:
257 257 nCode = this_metadata_file['nCode'].value
258 258 nBaud = this_metadata_file['nBaud'].value
259 259 code = this_metadata_file['code'].value
260 260
261 261 if not ippKm:
262 262 try:
263 263 #seconds to km
264 264 ippKm = 1e6*0.15*this_metadata_file['ipp'].value
265 265 except:
266 266 ippKm = None
267 267
268 268 ####################################################
269 269 startUTCSecond = None
270 270 endUTCSecond = None
271 271
272 272 if startDate:
273 273 startDatetime = datetime.datetime.combine(startDate, startTime)
274 274 startUTCSecond = (startDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
275 275
276 276 if endDate:
277 277 endDatetime = datetime.datetime.combine(endDate, endTime)
278 278 endUTCSecond = (endDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
279 279
280 280 start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]])
281 281
282 282 if not startUTCSecond:
283 283 startUTCSecond = start_index/self.__sample_rate
284 284
285 285 if start_index > startUTCSecond*self.__sample_rate:
286 286 startUTCSecond = start_index/self.__sample_rate
287 287
288 288 if not endUTCSecond:
289 289 endUTCSecond = end_index/self.__sample_rate
290 290
291 291 if end_index < endUTCSecond*self.__sample_rate:
292 292 endUTCSecond = end_index/self.__sample_rate
293 293
294 294 if not nSamples:
295 295 if not ippKm:
296 296 raise ValueError, "[Reading] nSamples or ippKm should be defined"
297 297
298 298 nSamples = ippKm / (1e6*0.15/self.__sample_rate)
299 299
300 300 channelBoundList = []
301 301 channelNameListFiltered = []
302 302
303 303 for thisIndexChannel in channelList:
304 304 thisChannelName = channelNameList[thisIndexChannel]
305 305 start_index, end_index = self.digitalReadObj.get_bounds(thisChannelName)
306 306 channelBoundList.append((start_index, end_index))
307 307 channelNameListFiltered.append(thisChannelName)
308 308
309 309 self.profileIndex = 0
310 310
311 311 self.__delay = delay
312 312 self.__ippKm = ippKm
313 313 self.__codeType = codeType
314 314 self.__nCode = nCode
315 315 self.__nBaud = nBaud
316 316 self.__code = code
317 317
318 318 self.__datapath = path
319 319 self.__online = online
320 320 self.__channelList = channelList
321 321 self.__channelNameList = channelNameListFiltered
322 322 self.__channelBoundList = channelBoundList
323 323 self.__nSamples = nSamples
324 324 self.__samples_to_read = buffer_size*nSamples
325 325 self.__nChannels = len(self.__channelList)
326 326
327 327 self.__startUTCSecond = startUTCSecond
328 328 self.__endUTCSecond = endUTCSecond
329 329
330 330 self.__timeInterval = 1.0 * self.__samples_to_read/self.__sample_rate #Time interval
331 331
332 332 if online:
333 333 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
334 334 startUTCSecond = numpy.floor(endUTCSecond)
335 335
336 336 self.__thisUnixSample = int(startUTCSecond*self.__sample_rate) - self.__samples_to_read
337 337
338 338 self.__data_buffer = numpy.zeros((self.__nChannels, self.__samples_to_read), dtype = numpy.complex)
339 339
340 340 self.__setFileHeader()
341 341 self.isConfig = True
342 342
343 343 print "[Reading] USRP Data was found from %s to %s " %(
344 344 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
345 345 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
346 346 )
347 347
348 348 print "[Reading] Starting process from %s to %s" %(datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
349 349 datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone)
350 350 )
351 351
352 352 def __reload(self):
353 353
354 354 if not self.__online:
355 355 return
356 356
357 357 # print
358 358 # print "%s not in range [%s, %s]" %(
359 359 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
360 360 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
361 361 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
362 362 # )
363 363 print "[Reading] reloading metadata ..."
364 364
365 365 try:
366 366 self.digitalReadObj.reload(complete_update=True)
367 367 except:
368 368 self.digitalReadObj.reload()
369 369
370 370 start_index, end_index = self.digitalReadObj.get_bounds(self.__channelNameList[self.__channelList[0]])
371 371
372 372 if start_index > self.__startUTCSecond*self.__sample_rate:
373 373 self.__startUTCSecond = 1.0*start_index/self.__sample_rate
374 374
375 375 if end_index > self.__endUTCSecond*self.__sample_rate:
376 376 self.__endUTCSecond = 1.0*end_index/self.__sample_rate
377 377 print
378 378 print "[Reading] New timerange found [%s, %s] " %(
379 379 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
380 380 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
381 381 )
382 382
383 383 return True
384 384
385 385 return False
386 386
387 387 def __readNextBlock(self, seconds=30, volt_scale = 218776):
388 388 '''
389 389 '''
390 390
391 391 #Set the next data
392 392 self.__flagDiscontinuousBlock = False
393 393 self.__thisUnixSample += self.__samples_to_read
394 394
395 395 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
396 396 print "[Reading] There are no more data into selected timerange"
397 397
398 398 self.__reload()
399 399
400 400 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
401 401 self.__thisUnixSample -= self.__samples_to_read
402 402 return False
403 403
404 404 indexChannel = 0
405 405
406 406 dataOk = False
407 407
408 408 for thisChannelName in self.__channelNameList:
409 409
410 410 try:
411 411 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
412 412 self.__samples_to_read,
413 413 thisChannelName)
414 414
415 415 except IOError, e:
416 416 #read next profile
417 417 self.__flagDiscontinuousBlock = True
418 418 print "[Reading] %s" %datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e
419 419 break
420 420
421 421 if result.shape[0] != self.__samples_to_read:
422 422 self.__flagDiscontinuousBlock = True
423 423 print "[Reading] %s: Too few samples were found, just %d/%d samples" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
424 424 result.shape[0],
425 425 self.__samples_to_read)
426 426 break
427 427
428 428 self.__data_buffer[indexChannel,:] = result*volt_scale
429 429
430 430 indexChannel += 1
431 431
432 432 dataOk = True
433 433
434 434 self.__utctime = self.__thisUnixSample/self.__sample_rate
435 435
436 436 if not dataOk:
437 437 return False
438 438
439 439 print "[Reading] %s: %d samples <> %f sec" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
440 440 self.__samples_to_read,
441 441 self.__timeInterval)
442 442
443 443 self.__bufferIndex = 0
444 444
445 445 return True
446 446
447 447 def __isBufferEmpty(self):
448 448
449 449 if self.__bufferIndex <= self.__samples_to_read - self.__nSamples:
450 450 return False
451 451
452 452 return True
453 453
454 454 def getData(self, seconds=30, nTries=5):
455 455
456 456 '''
457 457 This method gets the data from files and put the data into the dataOut object
458 458
459 459 In addition, increase el the buffer counter in one.
460 460
461 461 Return:
462 462 data : retorna un perfil de voltages (alturas * canales) copiados desde el
463 463 buffer. Si no hay mas archivos a leer retorna None.
464 464
465 465 Affected:
466 466 self.dataOut
467 467 self.profileIndex
468 468 self.flagDiscontinuousBlock
469 469 self.flagIsNewBlock
470 470 '''
471 471
472 472 err_counter = 0
473 473 self.dataOut.flagNoData = True
474 474
475 475 if self.__isBufferEmpty():
476 476
477 477 self.__flagDiscontinuousBlock = False
478 478
479 479 while True:
480 480 if self.__readNextBlock():
481 481 break
482 482
483 483 if self.__thisUnixSample > self.__endUTCSecond*self.__sample_rate:
484 484 return False
485 485
486 486 if self.__flagDiscontinuousBlock:
487 487 print '[Reading] discontinuous block found ... continue with the next block'
488 488 continue
489 489
490 490 if not self.__online:
491 491 return False
492 492
493 493 err_counter += 1
494 494 if err_counter > nTries:
495 495 return False
496 496
497 497 print '[Reading] waiting %d seconds to read a new block' %seconds
498 498 sleep(seconds)
499 499
500 500 self.dataOut.data = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex+self.__nSamples]
501 501 self.dataOut.utctime = (self.__thisUnixSample + self.__bufferIndex)/self.__sample_rate
502 502 self.dataOut.flagNoData = False
503 503 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
504 504
505 505 self.__bufferIndex += self.__nSamples
506 506 self.profileIndex += 1
507 507
508 508 return True
509 509
510 510 def printInfo(self):
511 511 '''
512 512 '''
513 513 if self.__printInfo == False:
514 514 return
515 515
516 516 # self.systemHeaderObj.printInfo()
517 517 # self.radarControllerHeaderObj.printInfo()
518 518
519 519 self.__printInfo = False
520 520
521 521 def printNumberOfBlock(self):
522 522 '''
523 523 '''
524 524
525 525 print self.profileIndex
526 526
527 527 def run(self, **kwargs):
528 528 '''
529 529 This method will be called many times so here you should put all your code
530 530 '''
531 531
532 532 if not self.isConfig:
533 533 self.setup(**kwargs)
534 534
535 535 self.getData(seconds=self.__delay)
536 536
537 537 return
538 538
539 539 class USRPWriter(Operation):
540 540 '''
541 541 classdocs
542 542 '''
543 543
544 544 def __init__(self):
545 545 '''
546 546 Constructor
547 547 '''
548 548 self.dataOut = None
549 549
550 550 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
551 551 '''
552 552 In this method we should set all initial parameters.
553 553
554 554 Input:
555 555 dataIn : Input data will also be outputa data
556 556
557 557 '''
558 558 self.dataOut = dataIn
559 559
560 560
561 561
562 562
563 563
564 564 self.isConfig = True
565 565
566 566 return
567 567
568 568 def run(self, dataIn, **kwargs):
569 569 '''
570 570 This method will be called many times so here you should put all your code
571 571
572 572 Inputs:
573 573
574 574 dataIn : object with the data
575 575
576 576 '''
577 577
578 578 if not self.isConfig:
579 579 self.setup(dataIn, **kwargs)
580 580
581 581
582 582 if __name__ == '__main__':
583 583
584 584 readObj = USRPReader()
585 585
586 586 while True:
587 587 readObj.run(path='/Volumes/DATA/haystack/passive_radar/')
588 588 # readObj.printInfo()
589 589 readObj.printNumberOfBlock()
590 590
591 591 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now