diff --git a/schainpy/controller.py b/schainpy/controller.py index 4b80dda..61809b2 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -286,7 +286,7 @@ class ReadUnitConf(ProcUnitConf): self.parameters = {} def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='', - startTime='', endTime='', server=None, **kwargs): + startTime='', endTime='', server=None, topic='', **kwargs): if datatype == None and name == None: raise ValueError('datatype or name should be defined') @@ -314,6 +314,8 @@ class ReadUnitConf(ProcUnitConf): self.addParameter(name='endDate', value=endDate) self.addParameter(name='startTime', value=startTime) self.addParameter(name='endTime', value=endTime) + self.addParameter(name='server', value=server) + self.addParameter(name='topic', value=topic) for key, value in kwargs.items(): self.addParameter(name=key, value=value) diff --git a/schainpy/model/graphics/jroplot_base.py b/schainpy/model/graphics/jroplot_base.py index 106fb1a..6d5db22 100644 --- a/schainpy/model/graphics/jroplot_base.py +++ b/schainpy/model/graphics/jroplot_base.py @@ -554,7 +554,7 @@ class Plot(Operation): self.sender_time = last_time - attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax'] + attrs = ['titles', 'zmin', 'zmax', 'tag', 'ymin', 'ymax', 'zlimits'] for attr in attrs: value = getattr(self, attr) if value: diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 087187c..2f12ceb 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -781,6 +781,7 @@ class JRODataReader(Reader): firstHeaderSize = 0 basicHeaderSize = 24 __isFirstTimeOnline = 1 + topic = '' filefmt = "*%Y%j***" folderfmt = "*%Y%j" __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'online', 'delay', 'walk'] @@ -1152,13 +1153,14 @@ class JRODataReader(Reader): if self.server is not None: if 'tcp://' in self.server: - address = server + address = self.server else: address = 'ipc:///tmp/%s' % self.server self.server = address self.context = zmq.Context() - self.receiver = self.context.socket(zmq.PULL) + self.receiver = self.context.socket(zmq.SUB) self.receiver.connect(self.server) + self.receiver.setsockopt(zmq.SUBSCRIBE, str.encode(str(self.topic))) time.sleep(0.5) print('[Starting] ReceiverData from {}'.format(self.server)) else: @@ -1287,7 +1289,11 @@ class JRODataReader(Reader): if self.server is None: self.getData() else: - self.getFromServer() + try: + self.getFromServer() + except Exception as e: + log.warning('Invalid block...') + self.dataOut.flagNoData = True class JRODataWriter(Reader): diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index 8f7c11d..ea77b0b 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -309,7 +309,11 @@ class VoltageReader(JRODataReader, ProcessingUnit): self.readFirstHeaderFromServer() timestamp = self.basicHeaderObj.get_datatime() - print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp)) + print('[Receiving] - Block {} - {}'.format(self.nTotalBlocks, timestamp)) + if self.nTotalBlocks == self.processingHeaderObj.dataBlocksPerFile: + self.nTotalBlocks = 0 + self.nReadBlocks = 0 + print('Receiving the next stream...') current_pointer_location = self.blockPointer junk = numpy.fromstring( block[self.blockPointer:], self.dtype, self.blocksize) @@ -623,10 +627,13 @@ class VoltageWriter(JRODataWriter, Operation): if self.profileIndex == 0: self.setBasicHeader() - self.datablock[:, self.profileIndex, :] = self.dataOut.data - - self.profileIndex += 1 - + if not self.dataOut.flagDataAsBlock: + self.datablock[:, self.profileIndex, :] = self.dataOut.data + self.profileIndex += 1 + else: + self.datablock[:, :, :] = self.dataOut.data + self.profileIndex = self.processingHeaderObj.profilesPerBlock + if self.hasAllDataInBuffer(): # if self.flagIsNewFile: self.writeNextBlock()