##// END OF EJS Templates
Bug fixed in zerorpc test module
Miguel Valdez -
r800:0ed26d7e3402
parent child
Show More
@@ -1,587 +1,587
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9 9
10 10 try:
11 11 from gevent import sleep
12 12 except:
13 13 from time import sleep
14 14
15 15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 16 from schainpy.model.data.jrodata import Voltage
17 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
18 18
19 19 try:
20 20 import digital_rf_hdf5
21 21 except:
22 22 print 'You should install "digital_rf_hdf5" module if you want to read USRP data'
23 23
24 24 class USRPReader(ProcessingUnit):
25 25 '''
26 26 classdocs
27 27 '''
28 28
29 29 def __init__(self):
30 30 '''
31 31 Constructor
32 32 '''
33 33
34 34 ProcessingUnit.__init__(self)
35 35
36 36 self.dataOut = Voltage()
37 37 self.__printInfo = True
38 38 self.__flagDiscontinuousBlock = False
39 39 self.__bufferIndex = 9999999
40 40
41 41 self.__ippKm = None
42 42 self.__codeType = 0
43 43 self.__nCode = None
44 44 self.__nBaud = None
45 45 self.__code = None
46 46
47 47 def __getCurrentSecond(self):
48 48
49 49 return self.__thisUnixSample/self.__sample_rate
50 50
51 51 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
52 52
53 53 def __setFileHeader(self):
54 54 '''
55 55 In this method will be initialized every parameter of dataOut object (header, no data)
56 56 '''
57 57 nProfiles = self.__sample_rate #Number of profiles by second
58 58
59 59 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
60 60 txA=0,
61 61 txB=0,
62 62 nWindows=1,
63 63 nHeights=self.__nSamples,
64 64 firstHeight=self.__firstHeigth,
65 65 deltaHeight=self.__deltaHeigth,
66 66 codeType=self.__codeType,
67 67 nCode=self.__nCode, nBaud=self.__nBaud,
68 68 code = self.__code)
69 69
70 70 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
71 71 nProfiles=nProfiles,
72 72 nChannels=len(self.__channelList),
73 73 adcResolution=14)
74 74
75 75 self.dataOut.type = "Voltage"
76 76
77 77 self.dataOut.data = None
78 78
79 79 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
80 80
81 81 # self.dataOut.nChannels = 0
82 82
83 83 # self.dataOut.nHeights = 0
84 84
85 85 self.dataOut.nProfiles = nProfiles
86 86
87 87 self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
88 88
89 89 self.dataOut.channelList = self.__channelList
90 90
91 91 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
92 92
93 93 # self.dataOut.channelIndexList = None
94 94
95 95 self.dataOut.flagNoData = True
96 96
97 97 #Set to TRUE if the data is discontinuous
98 98 self.dataOut.flagDiscontinuousBlock = False
99 99
100 100 self.dataOut.utctime = None
101 101
102 102 self.dataOut.timeZone = self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
103 103
104 104 self.dataOut.dstFlag = 0
105 105
106 106 self.dataOut.errorCount = 0
107 107
108 108 self.dataOut.nCohInt = 1
109 109
110 110 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
111 111
112 112 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
113 113
114 114 self.dataOut.flagShiftFFT = False
115 115
116 116 self.dataOut.ippSeconds = 1.0*self.__nSamples/self.__sample_rate
117 117
118 118 #Time interval between profiles
119 119 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
120 120
121 121 self.dataOut.frequency = self.__frequency
122 122
123 123 self.dataOut.realtime = self.__online
124 124
125 125 def findDatafiles(self, path, startDate=None, endDate=None):
126 126
127 127 if not os.path.isdir(path):
128 128 return []
129 129
130 130 try:
131 131 digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
132 132 except:
133 133 digitalReadObj = digital_rf_hdf5.read_hdf5(path)
134 134
135 135 channelNameList = digitalReadObj.get_channels()
136 136
137 137 if not channelNameList:
138 138 return []
139 139
140 140 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
141 141
142 142 sample_rate = metadata_dict['sample_rate'][0]
143 143
144 144 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
145 145
146 146 try:
147 147 timezone = this_metadata_file['timezone'].value
148 148 except:
149 149 timezone = 0
150 150
151 151 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(channelNameList[0])/sample_rate - timezone
152 152
153 153 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
154 154 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
155 155
156 156 if not startDate:
157 157 startDate = startDatetime.date()
158 158
159 159 if not endDate:
160 160 endDate = endDatatime.date()
161 161
162 162 dateList = []
163 163
164 164 thisDatetime = startDatetime
165 165
166 166 while(thisDatetime<=endDatatime):
167 167
168 168 thisDate = thisDatetime.date()
169 169
170 170 if thisDate < startDate:
171 171 continue
172 172
173 173 if thisDate > endDate:
174 174 break
175 175
176 176 dateList.append(thisDate)
177 177 thisDatetime += datetime.timedelta(1)
178 178
179 179 return dateList
180 180
181 181 def setup(self, path = None,
182 182 startDate = None,
183 183 endDate = None,
184 184 startTime = datetime.time(0,0,0),
185 185 endTime = datetime.time(23,59,59),
186 186 channelList = None,
187 187 nSamples = None,
188 188 ippKm = 60,
189 189 online = False,
190 190 delay = 60,
191 191 buffer_size = 1024,
192 192 **kwargs):
193 193 '''
194 194 In this method we should set all initial parameters.
195 195
196 196 Inputs:
197 197 path
198 198 startDate
199 199 endDate
200 200 startTime
201 201 endTime
202 202 set
203 203 expLabel
204 204 ext
205 205 online
206 206 delay
207 207 '''
208 208
209 209 if not os.path.isdir(path):
210 210 raise ValueError, "[Reading] Directory %s does not exist" %path
211 211
212 212 try:
213 213 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
214 214 except:
215 215 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path)
216 216
217 217 channelNameList = self.digitalReadObj.get_channels()
218 218
219 219 if not channelNameList:
220 220 raise ValueError, "[Reading] Directory %s does not have any files" %path
221 221
222 222 if not channelList:
223 223 channelList = range(len(channelNameList))
224 224
225 225 ########## Reading metadata ######################
226 226
227 227 metadata_dict = self.digitalReadObj.get_rf_file_metadata(channelNameList[channelList[0]])
228 228
229 229 self.__sample_rate = metadata_dict['sample_rate'][0]
230 230 self.__samples_per_file = metadata_dict['samples_per_file'][0]
231 231 self.__deltaHeigth = 1e6*0.15/self.__sample_rate
232 232
233 233 this_metadata_file = self.digitalReadObj.get_metadata(channelNameList[channelList[0]])
234 234
235 235 self.__frequency = this_metadata_file['center_frequencies'].value
236 236 try:
237 237 self.__timezone = this_metadata_file['timezone'].value
238 238 except:
239 239 self.__timezone = 0
240 240
241 241 self.__firstHeigth = 0
242 242
243 243 try:
244 244 codeType = this_metadata_file['codeType'].value
245 245 except:
246 246 codeType = 0
247 247
248 248 nCode = 1
249 249 nBaud = 1
250 250 code = numpy.ones((nCode, nBaud), dtype=numpy.int)
251 251
252 252 if codeType:
253 253 nCode = this_metadata_file['nCode'].value
254 254 nBaud = this_metadata_file['nBaud'].value
255 255 code = this_metadata_file['code'].value
256 256
257 257 if not ippKm:
258 258 try:
259 259 #seconds to km
260 260 ippKm = 1e6*0.15*this_metadata_file['ipp'].value
261 261 except:
262 262 ippKm = None
263 263
264 264 ####################################################
265 265 startUTCSecond = None
266 266 endUTCSecond = None
267 267
268 268 if startDate:
269 269 startDatetime = datetime.datetime.combine(startDate, startTime)
270 270 startUTCSecond = (startDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
271 271
272 272 if endDate:
273 273 endDatetime = datetime.datetime.combine(endDate, endTime)
274 274 endUTCSecond = (endDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
275 275
276 276 start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]])
277 277
278 278 if not startUTCSecond:
279 279 startUTCSecond = start_index/self.__sample_rate
280 280
281 281 if start_index > startUTCSecond*self.__sample_rate:
282 282 startUTCSecond = start_index/self.__sample_rate
283 283
284 284 if not endUTCSecond:
285 285 endUTCSecond = end_index/self.__sample_rate
286 286
287 287 if end_index < endUTCSecond*self.__sample_rate:
288 288 endUTCSecond = end_index/self.__sample_rate
289 289
290 290 if not nSamples:
291 291 if not ippKm:
292 292 raise ValueError, "[Reading] nSamples or ippKm should be defined"
293 293
294 nSamples = ippKm / (1e6*0.15/self.__sample_rate)
294 nSamples = int(ippKm / (1e6*0.15/self.__sample_rate))
295 295
296 296 channelBoundList = []
297 297 channelNameListFiltered = []
298 298
299 299 for thisIndexChannel in channelList:
300 300 thisChannelName = channelNameList[thisIndexChannel]
301 301 start_index, end_index = self.digitalReadObj.get_bounds(thisChannelName)
302 302 channelBoundList.append((start_index, end_index))
303 303 channelNameListFiltered.append(thisChannelName)
304 304
305 305 self.profileIndex = 0
306 306
307 307 self.__delay = delay
308 308 self.__ippKm = ippKm
309 309 self.__codeType = codeType
310 310 self.__nCode = nCode
311 311 self.__nBaud = nBaud
312 312 self.__code = code
313 313
314 314 self.__datapath = path
315 315 self.__online = online
316 316 self.__channelList = channelList
317 317 self.__channelNameList = channelNameListFiltered
318 318 self.__channelBoundList = channelBoundList
319 319 self.__nSamples = nSamples
320 self.__samples_to_read = buffer_size*nSamples
320 self.__samples_to_read = int(buffer_size*nSamples)
321 321 self.__nChannels = len(self.__channelList)
322 322
323 323 self.__startUTCSecond = startUTCSecond
324 324 self.__endUTCSecond = endUTCSecond
325 325
326 326 self.__timeInterval = 1.0 * self.__samples_to_read/self.__sample_rate #Time interval
327 327
328 328 if online:
329 329 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
330 330 startUTCSecond = numpy.floor(endUTCSecond)
331 331
332 332 self.__thisUnixSample = int(startUTCSecond*self.__sample_rate) - self.__samples_to_read
333 333
334 334 self.__data_buffer = numpy.zeros((self.__nChannels, self.__samples_to_read), dtype = numpy.complex)
335 335
336 336 self.__setFileHeader()
337 337 self.isConfig = True
338 338
339 339 print "[Reading] USRP Data was found from %s to %s " %(
340 340 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
341 341 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
342 342 )
343 343
344 344 print "[Reading] Starting process from %s to %s" %(datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
345 345 datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone)
346 346 )
347 347
348 348 def __reload(self):
349 349
350 350 if not self.__online:
351 351 return
352 352
353 353 # print
354 354 # print "%s not in range [%s, %s]" %(
355 355 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
356 356 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
357 357 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
358 358 # )
359 359 print "[Reading] reloading metadata ..."
360 360
361 361 try:
362 362 self.digitalReadObj.reload(complete_update=True)
363 363 except:
364 364 self.digitalReadObj.reload()
365 365
366 366 start_index, end_index = self.digitalReadObj.get_bounds(self.__channelNameList[self.__channelList[0]])
367 367
368 368 if start_index > self.__startUTCSecond*self.__sample_rate:
369 369 self.__startUTCSecond = 1.0*start_index/self.__sample_rate
370 370
371 371 if end_index > self.__endUTCSecond*self.__sample_rate:
372 372 self.__endUTCSecond = 1.0*end_index/self.__sample_rate
373 373 print
374 374 print "[Reading] New timerange found [%s, %s] " %(
375 375 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
376 376 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
377 377 )
378 378
379 379 return True
380 380
381 381 return False
382 382
383 383 def __readNextBlock(self, seconds=30, volt_scale = 218776):
384 384 '''
385 385 '''
386 386
387 387 #Set the next data
388 388 self.__flagDiscontinuousBlock = False
389 389 self.__thisUnixSample += self.__samples_to_read
390 390
391 391 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
392 print "[Reading] There are no more data into selected timerange"
392 print "[Reading] There are no more data into selected time-range"
393 393
394 394 self.__reload()
395 395
396 396 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
397 397 self.__thisUnixSample -= self.__samples_to_read
398 398 return False
399 399
400 400 indexChannel = 0
401 401
402 402 dataOk = False
403 403
404 404 for thisChannelName in self.__channelNameList:
405 405
406 406 try:
407 407 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
408 408 self.__samples_to_read,
409 409 thisChannelName)
410 410
411 411 except IOError, e:
412 412 #read next profile
413 413 self.__flagDiscontinuousBlock = True
414 414 print "[Reading] %s" %datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e
415 415 break
416 416
417 417 if result.shape[0] != self.__samples_to_read:
418 418 self.__flagDiscontinuousBlock = True
419 419 print "[Reading] %s: Too few samples were found, just %d/%d samples" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
420 420 result.shape[0],
421 421 self.__samples_to_read)
422 422 break
423 423
424 424 self.__data_buffer[indexChannel,:] = result*volt_scale
425 425
426 426 indexChannel += 1
427 427
428 428 dataOk = True
429 429
430 430 self.__utctime = self.__thisUnixSample/self.__sample_rate
431 431
432 432 if not dataOk:
433 433 return False
434 434
435 435 print "[Reading] %s: %d samples <> %f sec" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
436 436 self.__samples_to_read,
437 437 self.__timeInterval)
438 438
439 439 self.__bufferIndex = 0
440 440
441 441 return True
442 442
443 443 def __isBufferEmpty(self):
444 444
445 445 if self.__bufferIndex <= self.__samples_to_read - self.__nSamples:
446 446 return False
447 447
448 448 return True
449 449
450 450 def getData(self, seconds=30, nTries=5):
451 451
452 452 '''
453 453 This method gets the data from files and put the data into the dataOut object
454 454
455 455 In addition, increase el the buffer counter in one.
456 456
457 457 Return:
458 458 data : retorna un perfil de voltages (alturas * canales) copiados desde el
459 459 buffer. Si no hay mas archivos a leer retorna None.
460 460
461 461 Affected:
462 462 self.dataOut
463 463 self.profileIndex
464 464 self.flagDiscontinuousBlock
465 465 self.flagIsNewBlock
466 466 '''
467 467
468 468 err_counter = 0
469 469 self.dataOut.flagNoData = True
470 470
471 471 if self.__isBufferEmpty():
472 472
473 473 self.__flagDiscontinuousBlock = False
474 474
475 475 while True:
476 476 if self.__readNextBlock():
477 477 break
478 478
479 479 if self.__thisUnixSample > self.__endUTCSecond*self.__sample_rate:
480 480 return False
481 481
482 482 if self.__flagDiscontinuousBlock:
483 483 print '[Reading] discontinuous block found ... continue with the next block'
484 484 continue
485 485
486 486 if not self.__online:
487 487 return False
488 488
489 489 err_counter += 1
490 490 if err_counter > nTries:
491 491 return False
492 492
493 493 print '[Reading] waiting %d seconds to read a new block' %seconds
494 494 sleep(seconds)
495 495
496 496 self.dataOut.data = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex+self.__nSamples]
497 497 self.dataOut.utctime = (self.__thisUnixSample + self.__bufferIndex)/self.__sample_rate
498 498 self.dataOut.flagNoData = False
499 499 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
500 500
501 501 self.__bufferIndex += self.__nSamples
502 502 self.profileIndex += 1
503 503
504 504 return True
505 505
506 506 def printInfo(self):
507 507 '''
508 508 '''
509 509 if self.__printInfo == False:
510 510 return
511 511
512 512 # self.systemHeaderObj.printInfo()
513 513 # self.radarControllerHeaderObj.printInfo()
514 514
515 515 self.__printInfo = False
516 516
517 517 def printNumberOfBlock(self):
518 518 '''
519 519 '''
520 520
521 521 print self.profileIndex
522 522
523 523 def run(self, **kwargs):
524 524 '''
525 525 This method will be called many times so here you should put all your code
526 526 '''
527 527
528 528 if not self.isConfig:
529 529 self.setup(**kwargs)
530 530
531 531 self.getData(seconds=self.__delay)
532 532
533 533 return
534 534
535 535 class USRPWriter(Operation):
536 536 '''
537 537 classdocs
538 538 '''
539 539
540 540 def __init__(self):
541 541 '''
542 542 Constructor
543 543 '''
544 544 self.dataOut = None
545 545
546 546 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
547 547 '''
548 548 In this method we should set all initial parameters.
549 549
550 550 Input:
551 551 dataIn : Input data will also be outputa data
552 552
553 553 '''
554 554 self.dataOut = dataIn
555 555
556 556
557 557
558 558
559 559
560 560 self.isConfig = True
561 561
562 562 return
563 563
564 564 def run(self, dataIn, **kwargs):
565 565 '''
566 566 This method will be called many times so here you should put all your code
567 567
568 568 Inputs:
569 569
570 570 dataIn : object with the data
571 571
572 572 '''
573 573
574 574 if not self.isConfig:
575 575 self.setup(dataIn, **kwargs)
576 576
577 577
578 578 if __name__ == '__main__':
579 579
580 580 readObj = USRPReader()
581 581
582 582 while True:
583 583 readObj.run(path='/Volumes/DATA/haystack/passive_radar/')
584 584 # readObj.printInfo()
585 585 readObj.printNumberOfBlock()
586 586
587 587 No newline at end of file
@@ -1,135 +1,139
1 1 '''
2 2 Created on Jul 15, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import time
7 7 import threading
8 8 import cPickle
9 9
10 10 # try:
11 11 # from gevent import sleep
12 12 # except:
13 13 from time import sleep
14 14
15 15 SERIALIZER = cPickle
16 16
17 17 # from schainpy.serializer import DynamicSerializer
18 18 from schainpy.model.io.jroIO_usrp import USRPReader
19 19 from schainpy.model.serializer.data import obj2Serial
20 20
21 21 class USRPReaderAPI(USRPReader, threading.Thread):
22 22
23 23 # __isBufferEmpty = True
24 24
25 25 __DATAKEYLIST = ['data','utctime','flagNoData']
26 26
27 27 def __init__(self, serializer='msgpack'):
28 28
29 29 threading.Thread.__init__(self)
30 30 USRPReader.__init__(self)
31 31
32 32 # self.__serializerObj = DynamicSerializer.DynamicSerializer('msgpack')
33 33 self.__mySerial = None
34 34 self.__isBufferEmpty = True
35 35
36 36 self.setSerializer(serializer)
37 37
38 38 def setSerializer(self, serializer):
39 39
40 40 self.__serializer = serializer
41 41
42 42 def getSerializer(self):
43 43
44 44 return self.__serializer
45 45
46 46 def getProfileIndex(self):
47 47
48 48 return self.profileIndex
49 49
50 50 def getSerialMetaData(self):
51 51
52 52 if self.__isBufferEmpty:
53 53 ini = time.time()
54 54
55 55 while True:
56 56
57 57 if not self.__isBufferEmpty:
58 58 break
59 59
60 60 if time.time() - ini > 20:
61 61 break
62 62
63 63 sleep(1e-12)
64 64
65 65
66 66 # if not self.getData():
67 67 # self.__isBufferEmpty = False
68 68 # return None
69 69
70 70 if self.dataOut.flagNoData:
71 71 return None
72 72
73 73 myMetadataSerial = obj2Serial(self.dataOut,
74 74 serializer = self.__serializer)
75 75
76 76 return myMetadataSerial
77 77
78 78 def getSerialData(self):
79 79
80 80 if self.__isBufferEmpty:
81 81 ini = time.time()
82 82
83 83 while True:
84 84
85 85 if not self.__isBufferEmpty:
86 86 break
87 87
88 88 if time.time() - ini > 20:
89 89 break
90 90
91 91 sleep(1e-12)
92 92
93 93
94 94 # if not self.getData():
95 95 # self.__isBufferEmpty = False
96 96 # return None
97 97
98 98 if self.dataOut.flagNoData:
99 99 return None
100 100
101 101 self.__isBufferEmpty = True
102 102
103 103 return self.__mySerial
104 104
105 105 def run(self):
106 106
107 107 '''
108 108 This method will be called once when start() is called
109 109 '''
110 110
111 111 if not self.isConfig:
112 112 raise RuntimeError, 'setup() method has to be called before start()'
113
113
114 print "Running ..."
115
114 116 while True:
115 117
116 118 if not self.__isBufferEmpty:
117 119 sleep(1e-12)
118 120 continue
119 121
120 122 if not self.getData():
121 123 break
122 124
123 125 print ".",
124 126
125 127 self.__mySerial = obj2Serial(self.dataOut,
126 128 keyList = self.__DATAKEYLIST,
127 129 serializer = self.__serializer)
128 130 self.__isBufferEmpty = False
129 131
130 132 # print self.profileIndex
131 133 # print 'wait 1 second'
132 134
133 135 # sleep(0.1)
134
136
137 print "Closing thread"
138
135 139 return No newline at end of file
@@ -1,108 +1,108
1 1 '''
2 2 Created on Jul 17, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 DEFAULT_SERIALIZER = None
8 8
9 9 try:
10 10 import cPickle
11 11 DEFAULT_SERIALIZER = 'cPickle'
12 12 except:
13 13 pass
14 14
15 15 try:
16 16 import msgpack_numpy
17 17 DEFAULT_SERIALIZER = 'msgpack'
18 18 except:
19 19 pass
20 20
21 21 # import jsonpickle
22 22 # import yaml
23 23
24 24 class Serializer(object):
25 25
26 26 def __init__(self):
27 27
28 28 self.serializer = None
29 29
30 30 def dumps(self, obj, **kwargs):
31 31
32 32 return self.serializer.dumps(obj, **kwargs)
33 33
34 34 def loads(self, obj, **kwargs):
35 35 return self.serializer.loads(obj, **kwargs)
36 36
37 37 class cPickleSerializer(Serializer):
38 38
39 39 def __init__(self):
40 40 self.serializer = cPickle
41 41
42 42 def dumps(self, obj, **kwargs):
43 43 return self.serializer.dumps(obj, 2)
44 44
45 45 def loads(self, obj, **kwargs):
46 46 return self.serializer.loads(obj)
47 47
48 48 class msgpackSerializer(Serializer):
49 49
50 50 def __init__(self):
51 51
52 52 self.serializer = msgpack_numpy
53 53
54 54 def dumps(self, obj, **kwargs):
55 55 return self.serializer.packb(obj)
56 56
57 57 def loads(self, obj, **kwargs):
58 58 return self.serializer.unpackb(obj)
59 59
60 60 # class jsonpickleSerializer(Serializer):
61 61 #
62 62 # def __init__(self):
63 63 #
64 64 # self.serializer = jsonpickle
65 65 #
66 66 # def dumps(self, obj, **kwargs):
67 67 # return self.serializer.encode(obj, **kwargs)
68 68 #
69 69 # def loads(self, obj, **kwargs):
70 70 # return self.serializer.decode(obj, **kwargs)
71 71 #
72 72 # class yamlSerializer(Serializer):
73 73 #
74 74 # def __init__(self):
75 75 #
76 76 # self.serializer = yaml
77 77 #
78 78 # def dumps(self, obj, **kwargs):
79 79 # return self.serializer.dump(obj, **kwargs)
80 80 #
81 81 # def loads(self, obj, **kwargs):
82 82 # return self.serializer.load(obj, **kwargs)
83 83
84 84 class DynamicSerializer(Serializer):
85 85
86 86 def __init__(self, module = None):
87 87
88 88 if not DEFAULT_SERIALIZER:
89 89 raise ImportError, "Install a python serializer like cPickle or msgpack"
90 90
91 if not mode:
92 mode == DEFAULT_SERIALIZER
91 if not module:
92 module == DEFAULT_SERIALIZER
93 93
94 if mode == 'cPickle':
94 if module == 'cPickle':
95 95 self.serializer = cPickleSerializer()
96 96 #
97 # if mode == 'jsonpickle':
97 # if module == 'jsonpickle':
98 98 # self.serializer = jsonpickleSerializer()
99 99 #
100 # if mode == 'yaml':
100 # if module == 'yaml':
101 101 # self.serializer = yamlSerializer()
102 102
103 if mode == 'msgpack':
103 if module == 'msgpack':
104 104 self.serializer = msgpackSerializer()
105 105
106 106
107 107 if __name__ == '__main__':
108 108 pass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now