##// END OF EJS Templates
Fix support for ZMQ in VoltageReader
Juan C. Espinoza -
r1564:d1f6bedb3696
parent child
Show More
@@ -286,7 +286,7 class ReadUnitConf(ProcUnitConf):
286 286 self.parameters = {}
287 287
288 288 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
289 startTime='', endTime='', server=None, **kwargs):
289 startTime='', endTime='', server=None, topic='', **kwargs):
290 290
291 291 if datatype == None and name == None:
292 292 raise ValueError('datatype or name should be defined')
@@ -314,6 +314,8 class ReadUnitConf(ProcUnitConf):
314 314 self.addParameter(name='endDate', value=endDate)
315 315 self.addParameter(name='startTime', value=startTime)
316 316 self.addParameter(name='endTime', value=endTime)
317 self.addParameter(name='server', value=server)
318 self.addParameter(name='topic', value=topic)
317 319
318 320 for key, value in kwargs.items():
319 321 self.addParameter(name=key, value=value)
@@ -554,7 +554,7 class Plot(Operation):
554 554
555 555 self.sender_time = last_time
556 556
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax']
557 attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax', 'zlimits']
558 558 for attr in attrs:
559 559 value = getattr(self, attr)
560 560 if value:
@@ -781,6 +781,7 class JRODataReader(Reader):
781 781 firstHeaderSize = 0
782 782 basicHeaderSize = 24
783 783 __isFirstTimeOnline = 1
784 topic = ''
784 785 filefmt = "*%Y%j***"
785 786 folderfmt = "*%Y%j"
786 787 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk']
@@ -1152,13 +1153,14 class JRODataReader(Reader):
1152 1153
1153 1154 if self.server is not None:
1154 1155 if 'tcp://' in self.server:
1155 address = server
1156 address = self.server
1156 1157 else:
1157 1158 address = 'ipc:///tmp/%s' % self.server
1158 1159 self.server = address
1159 1160 self.context = zmq.Context()
1160 self.receiver = self.context.socket(zmq.PULL)
1161 self.receiver = self.context.socket(zmq.SUB)
1161 1162 self.receiver.connect(self.server)
1163 self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic)))
1162 1164 time.sleep(0.5)
1163 1165 print('[Starting] ReceiverData from {}'.format(self.server))
1164 1166 else:
@@ -1287,7 +1289,11 class JRODataReader(Reader):
1287 1289 if self.server is None:
1288 1290 self.getData()
1289 1291 else:
1292 try:
1290 1293 self.getFromServer()
1294 except Exception as e:
1295 log.warning('Invalid block...')
1296 self.dataOut.flagNoData = True
1291 1297
1292 1298
1293 1299 class JRODataWriter(Reader):
@@ -309,7 +309,11 class VoltageReader(JRODataReader, ProcessingUnit):
309 309 self.readFirstHeaderFromServer()
310 310
311 311 timestamp = self.basicHeaderObj.get_datatime()
312 print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
312 print('[Receiving] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
313 if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile:
314 self.nTotalBlocks = 0
315 self.nReadBlocks = 0
316 print('Receiving the next stream...')
313 317 current_pointer_location = self.blockPointer
314 318 junk = numpy.fromstring(
315 319 block[self.blockPointer:], self.dtype, self.blocksize)
@@ -623,9 +627,12 class VoltageWriter(JRODataWriter, Operation):
623 627 if self.profileIndex == 0:
624 628 self.setBasicHeader()
625 629
630 if not self.dataOut.flagDataAsBlock:
626 631 self.datablock[:, self.profileIndex, :] = self.dataOut.data
627
628 632 self.profileIndex += 1
633 else:
634 self.datablock[:, :, :] = self.dataOut.data
635 self.profileIndex = self.processingHeaderObj.profilesPerBlock
629 636
630 637 if self.hasAllDataInBuffer():
631 638 # if self.flagIsNewFile:
General Comments 0
You need to be logged in to leave comments. Login now