@@ -286,7 +286,7 class ReadUnitConf(ProcUnitConf): | |||
|
286 | 286 | self.parameters = {} |
|
287 | 287 | |
|
288 | 288 | def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', |
|
289 | startTime='', endTime='', server=None, **kwargs): | |
|
289 | startTime='', endTime='', server=None, topic='', **kwargs): | |
|
290 | 290 | |
|
291 | 291 | if datatype == None and name == None: |
|
292 | 292 | raise ValueError('datatype or name should be defined') |
@@ -314,6 +314,8 class ReadUnitConf(ProcUnitConf): | |||
|
314 | 314 | self.addParameter(name='endDate', value=endDate) |
|
315 | 315 | self.addParameter(name='startTime', value=startTime) |
|
316 | 316 | self.addParameter(name='endTime', value=endTime) |
|
317 | self.addParameter(name='server', value=server) | |
|
318 | self.addParameter(name='topic', value=topic) | |
|
317 | 319 | |
|
318 | 320 | for key, value in kwargs.items(): |
|
319 | 321 | self.addParameter(name=key, value=value) |
@@ -554,7 +554,7 class Plot(Operation): | |||
|
554 | 554 | |
|
555 | 555 | self.sender_time = last_time |
|
556 | 556 | |
|
557 | attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax'] | |
|
557 | attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax', 'zlimits'] | |
|
558 | 558 | for attr in attrs: |
|
559 | 559 | value = getattr(self, attr) |
|
560 | 560 | if value: |
@@ -781,6 +781,7 class JRODataReader(Reader): | |||
|
781 | 781 | firstHeaderSize = 0 |
|
782 | 782 | basicHeaderSize = 24 |
|
783 | 783 | __isFirstTimeOnline = 1 |
|
784 | topic = '' | |
|
784 | 785 | filefmt = "*%Y%j***" |
|
785 | 786 | folderfmt = "*%Y%j" |
|
786 | 787 | __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk'] |
@@ -1152,13 +1153,14 class JRODataReader(Reader): | |||
|
1152 | 1153 | |
|
1153 | 1154 | if self.server is not None: |
|
1154 | 1155 | if 'tcp://' in self.server: |
|
1155 | address = server | |
|
1156 | address = self.server | |
|
1156 | 1157 | else: |
|
1157 | 1158 | address = 'ipc:///tmp/%s' % self.server |
|
1158 | 1159 | self.server = address |
|
1159 | 1160 | self.context = zmq.Context() |
|
1160 |
self.receiver = self.context.socket(zmq. |
|
|
1161 | self.receiver = self.context.socket(zmq.SUB) | |
|
1161 | 1162 | self.receiver.connect(self.server) |
|
1163 | self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic))) | |
|
1162 | 1164 | time.sleep(0.5) |
|
1163 | 1165 | print('[Starting] ReceiverData from {}'.format(self.server)) |
|
1164 | 1166 | else: |
@@ -1287,7 +1289,11 class JRODataReader(Reader): | |||
|
1287 | 1289 | if self.server is None: |
|
1288 | 1290 | self.getData() |
|
1289 | 1291 | else: |
|
1290 | self.getFromServer() | |
|
1292 | try: | |
|
1293 | self.getFromServer() | |
|
1294 | except Exception as e: | |
|
1295 | log.warning('Invalid block...') | |
|
1296 | self.dataOut.flagNoData = True | |
|
1291 | 1297 | |
|
1292 | 1298 | |
|
1293 | 1299 | class JRODataWriter(Reader): |
@@ -309,7 +309,11 class VoltageReader(JRODataReader, ProcessingUnit): | |||
|
309 | 309 | self.readFirstHeaderFromServer() |
|
310 | 310 | |
|
311 | 311 | timestamp = self.basicHeaderObj.get_datatime() |
|
312 |
print('[Re |
|
|
312 | print('[Receiving] - Block {} - {}'.format(self.nTotalBlocks, timestamp)) | |
|
313 | if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile: | |
|
314 | self.nTotalBlocks = 0 | |
|
315 | self.nReadBlocks = 0 | |
|
316 | print('Receiving the next stream...') | |
|
313 | 317 | current_pointer_location = self.blockPointer |
|
314 | 318 | junk = numpy.fromstring( |
|
315 | 319 | block[self.blockPointer:], self.dtype, self.blocksize) |
@@ -623,10 +627,13 class VoltageWriter(JRODataWriter, Operation): | |||
|
623 | 627 | if self.profileIndex == 0: |
|
624 | 628 | self.setBasicHeader() |
|
625 | 629 | |
|
626 | self.datablock[:, self.profileIndex, :] = self.dataOut.data | |
|
627 | ||
|
628 | self.profileIndex += 1 | |
|
629 | ||
|
630 | if not self.dataOut.flagDataAsBlock: | |
|
631 | self.datablock[:, self.profileIndex, :] = self.dataOut.data | |
|
632 | self.profileIndex += 1 | |
|
633 | else: | |
|
634 | self.datablock[:, :, :] = self.dataOut.data | |
|
635 | self.profileIndex = self.processingHeaderObj.profilesPerBlock | |
|
636 | ||
|
630 | 637 | if self.hasAllDataInBuffer(): |
|
631 | 638 | # if self.flagIsNewFile: |
|
632 | 639 | self.writeNextBlock() |
General Comments 0
You need to be logged in to leave comments.
Login now