diff --git a/schainpy/controller.py b/schainpy/controller.py index 6d6e1a8..82a8787 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -217,7 +217,6 @@ class ParameterConf(): self.id = str(new_id) def setup(self, id, name, value, format='str'): - self.id = str(id) self.name = name if format == 'obj': @@ -757,26 +756,26 @@ class ReadUnitConf(ProcUnitConf): return self.ELEMENTNAME - def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs): + def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="", + endTime="", parentId=None, queue=None, server=None, **kwargs): #Compatible with old signal chain version if datatype==None and name==None: raise ValueError, "datatype or name should be defined" - + if name==None: if 'Reader' in datatype: name = datatype else: name = '%sReader' %(datatype) - if datatype==None: datatype = name.replace('Reader','') self.id = id self.name = name self.datatype = datatype - - self.path = os.path.abspath(path) + if path != '': + self.path = os.path.abspath(path) self.startDate = startDate self.endDate = endDate self.startTime = startTime @@ -785,6 +784,7 @@ class ReadUnitConf(ProcUnitConf): self.inputId = '0' self.parentId = parentId self.queue = queue + self.server = server self.addRunOperation(**kwargs) def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs): @@ -826,16 +826,19 @@ class ReadUnitConf(ProcUnitConf): opObj = self.addOperation(name = 'run', optype = 'self') - opObj.addParameter(name='datatype' , value=self.datatype, format='str') - opObj.addParameter(name='path' , value=self.path, format='str') - opObj.addParameter(name='startDate' , value=self.startDate, format='date') - opObj.addParameter(name='endDate' , value=self.endDate, format='date') - opObj.addParameter(name='startTime' , value=self.startTime, format='time') - opObj.addParameter(name='endTime' , value=self.endTime, format='time') - opObj.addParameter(name='queue' , value=self.queue, format='obj') - - for key, value in kwargs.items(): - opObj.addParameter(name=key, value=value, format=type(value).__name__) + if self.server is None: + opObj.addParameter(name='datatype' , value=self.datatype, format='str') + opObj.addParameter(name='path' , value=self.path, format='str') + opObj.addParameter(name='startDate' , value=self.startDate, format='date') + opObj.addParameter(name='endDate' , value=self.endDate, format='date') + opObj.addParameter(name='startTime' , value=self.startTime, format='time') + opObj.addParameter(name='endTime' , value=self.endTime, format='time') + opObj.addParameter(name='queue' , value=self.queue, format='obj') + for key, value in kwargs.items(): + opObj.addParameter(name=key, value=value, format=type(value).__name__) + else: + opObj.addParameter(name='server' , value=self.server, format='str') + return opObj diff --git a/schainpy/model/data/jroheaderIO.py b/schainpy/model/data/jroheaderIO.py index c03853d..4e88ab1 100644 --- a/schainpy/model/data/jroheaderIO.py +++ b/schainpy/model/data/jroheaderIO.py @@ -115,7 +115,6 @@ class BasicHeader(Header): dstFlag = None errorCount = None datatime = None - __LOCALTIME = None def __init__(self, useLocalTime=True): @@ -134,8 +133,12 @@ class BasicHeader(Header): def read(self, fp): try: - header = numpy.fromfile(fp, BASIC_STRUCTURE,1) - + if hasattr(fp, 'read'): + print 'fromfile' + header = numpy.fromfile(fp, BASIC_STRUCTURE,1) + else: + print 'fromstring' + header = numpy.fromstring(fp, BASIC_STRUCTURE,1) except Exception, e: print "BasicHeader: " print e diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 77d08a0..47f0b08 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -12,6 +12,7 @@ import fnmatch import inspect import time, datetime import traceback +import zmq try: from gevent import sleep @@ -993,12 +994,13 @@ class JRODataReader(JRODataIO): self.__isFirstTimeOnline = 0 def __setNewBlock(self): - - if self.fp == None: - return 0 + if self.server is None: + if self.fp == None: + return 0 # if self.online: # self.__jumpToLastBlock() + print 'xxxx' if self.flagIsNewFile: self.lastUTTime = self.basicHeaderObj.utc @@ -1010,21 +1012,24 @@ class JRODataReader(JRODataIO): return 0 else: return 1 - - currentSize = self.fileSize - self.fp.tell() - neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize - - if (currentSize >= neededSize): - self.basicHeaderObj.read(self.fp) + print 'xxxx' + if self.server is None: + currentSize = self.fileSize - self.fp.tell() + neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize + if (currentSize >= neededSize): + self.basicHeaderObj.read(self.fp) + self.lastUTTime = self.basicHeaderObj.utc + return 1 + else: + self.basicHeaderObj.read(self.zHeader) self.lastUTTime = self.basicHeaderObj.utc return 1 - if self.__waitNewBlock(): self.lastUTTime = self.basicHeaderObj.utc return 1 - - if not(self.setNextFile()): - return 0 + if self.server is None: + if not(self.setNextFile()): + return 0 deltaTime = self.basicHeaderObj.utc - self.lastUTTime # self.lastUTTime = self.basicHeaderObj.utc @@ -1040,9 +1045,11 @@ class JRODataReader(JRODataIO): #Skip block out of startTime and endTime while True: + print 'cxxxx' if not(self.__setNewBlock()): + print 'returning' return 0 - + print 'dxxx' if not(self.readBlock()): return 0 @@ -1275,20 +1282,19 @@ class JRODataReader(JRODataIO): warnings=True, verbose=True, server=None): - if server is not None: - server = kwargs.get('server', 'zmq.pipe') if 'tcp://' in server: address = server else: address = 'ipc:///tmp/%s' % server - self.address = address + self.server = address self.context = zmq.Context() self.receiver = self.context.socket(zmq.PULL) - self.receiver.bind(self.address) + self.receiver.bind(self.server) time.sleep(0.5) - print '[Starting] ReceiverData from {}'.format(self.address) - else: + print '[Starting] ReceiverData from {}'.format(self.server) + else: + self.server = None if path == None: raise ValueError, "[Reading] The path is not valid" @@ -1494,13 +1500,11 @@ class JRODataReader(JRODataIO): skip=skip, cursor=cursor, warnings=warnings, - server=None, + server=server, verbose=verbose) self.isConfig = True - if self.server is None: - self.getData() - else: - self.getFromZMQ() + print 'hola' + self.getData() class JRODataWriter(JRODataIO): diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index a6fa073..8362cb7 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -189,6 +189,7 @@ class VoltageReader(JRODataReader, ProcessingUnit): pts2read = self.processingHeaderObj.profilesPerBlock * self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels self.blocksize = pts2read + def readBlock(self): """ @@ -213,11 +214,23 @@ class VoltageReader(JRODataReader, ProcessingUnit): Exceptions: Si un bloque leido no es un bloque valido """ - current_pointer_location = self.fp.tell() - junk = numpy.fromfile( self.fp, self.dtype, self.blocksize ) + + print 'READ BLOCK' + if self.server is not None: + self.zBlock = self.receiver.recv() + self.zHeader = self.zBlock[:24] + self.zDataBlock = self.zBlock[24:] + junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real',' + \ No newline at end of file