##// END OF EJS Templates
Fix support for ZMQ in VoltageReader
Juan C. Espinoza -
r1564:d1f6bedb3696
parent child
Show More
@@ -286,7 +286,7 class ReadUnitConf(ProcUnitConf):
286 self.parameters = {}
286 self.parameters = {}
287
287
288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
289 startTime='', endTime='', server=None, **kwargs):
289 startTime='', endTime='', server=None, topic='', **kwargs):
290
290
291 if datatype == None and name == None:
291 if datatype == None and name == None:
292 raise ValueError('datatype or name should be defined')
292 raise ValueError('datatype or name should be defined')
@@ -314,6 +314,8 class ReadUnitConf(ProcUnitConf):
314 self.addParameter(name='endDate', value=endDate)
314 self.addParameter(name='endDate', value=endDate)
315 self.addParameter(name='startTime', value=startTime)
315 self.addParameter(name='startTime', value=startTime)
316 self.addParameter(name='endTime', value=endTime)
316 self.addParameter(name='endTime', value=endTime)
317 self.addParameter(name='server', value=server)
318 self.addParameter(name='topic', value=topic)
317
319
318 for key, value in kwargs.items():
320 for key, value in kwargs.items():
319 self.addParameter(name=key, value=value)
321 self.addParameter(name=key, value=value)
@@ -554,7 +554,7 class Plot(Operation):
554
554
555 self.sender_time = last_time
555 self.sender_time = last_time
556
556
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax']
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax', 'zlimits']
558 for attr in attrs:
558 for attr in attrs:
559 value = getattr(self, attr)
559 value = getattr(self, attr)
560 if value:
560 if value:
@@ -781,6 +781,7 class JRODataReader(Reader):
781 firstHeaderSize = 0
781 firstHeaderSize = 0
782 basicHeaderSize = 24
782 basicHeaderSize = 24
783 __isFirstTimeOnline = 1
783 __isFirstTimeOnline = 1
784 topic = ''
784 filefmt = "*%Y%j***"
785 filefmt = "*%Y%j***"
785 folderfmt = "*%Y%j"
786 folderfmt = "*%Y%j"
786 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
787 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
@@ -1152,13 +1153,14 class JRODataReader(Reader):
1152
1153
1153 if self.server is not None:
1154 if self.server is not None:
1154 if 'tcp://' in self.server:
1155 if 'tcp://' in self.server:
1155 address = server
1156 address = self.server
1156 else:
1157 else:
1157 address = 'ipc:///tmp/%s' % self.server
1158 address = 'ipc:///tmp/%s' % self.server
1158 self.server = address
1159 self.server = address
1159 self.context = zmq.Context()
1160 self.context = zmq.Context()
1160 self.receiver = self.context.socket(zmq.PULL)
1161 self.receiver = self.context.socket(zmq.SUB)
1161 self.receiver.connect(self.server)
1162 self.receiver.connect(self.server)
1163 self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic)))
1162 time.sleep(0.5)
1164 time.sleep(0.5)
1163 print('[Starting] ReceiverData from {}'.format(self.server))
1165 print('[Starting] ReceiverData from {}'.format(self.server))
1164 else:
1166 else:
@@ -1287,7 +1289,11 class JRODataReader(Reader):
1287 if self.server is None:
1289 if self.server is None:
1288 self.getData()
1290 self.getData()
1289 else:
1291 else:
1290 self.getFromServer()
1292 try:
1293 self.getFromServer()
1294 except Exception as e:
1295 log.warning('Invalid block...')
1296 self.dataOut.flagNoData = True
1291
1297
1292
1298
1293 class JRODataWriter(Reader):
1299 class JRODataWriter(Reader):
@@ -309,7 +309,11 class VoltageReader(JRODataReader, ProcessingUnit):
309 self.readFirstHeaderFromServer()
309 self.readFirstHeaderFromServer()
310
310
311 timestamp = self.basicHeaderObj.get_datatime()
311 timestamp = self.basicHeaderObj.get_datatime()
312 print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
312 print('[Receiving] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
313 if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile:
314 self.nTotalBlocks = 0
315 self.nReadBlocks = 0
316 print('Receiving the next stream...')
313 current_pointer_location = self.blockPointer
317 current_pointer_location = self.blockPointer
314 junk = numpy.fromstring(
318 junk = numpy.fromstring(
315 block[self.blockPointer:], self.dtype, self.blocksize)
319 block[self.blockPointer:], self.dtype, self.blocksize)
@@ -623,10 +627,13 class VoltageWriter(JRODataWriter, Operation):
623 if self.profileIndex == 0:
627 if self.profileIndex == 0:
624 self.setBasicHeader()
628 self.setBasicHeader()
625
629
626 self.datablock[:, self.profileIndex, :] = self.dataOut.data
630 if not self.dataOut.flagDataAsBlock:
627
631 self.datablock[:, self.profileIndex, :] = self.dataOut.data
628 self.profileIndex += 1
632 self.profileIndex += 1
629
633 else:
634 self.datablock[:, :, :] = self.dataOut.data
635 self.profileIndex = self.processingHeaderObj.profilesPerBlock
636
630 if self.hasAllDataInBuffer():
637 if self.hasAllDataInBuffer():
631 # if self.flagIsNewFile:
638 # if self.flagIsNewFile:
632 self.writeNextBlock()
639 self.writeNextBlock()
General Comments 0
You need to be logged in to leave comments. Login now