diff --git a/.gitignore b/.gitignore index c31424d..d63b165 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,9 @@ ENV/ # eclipse .project .pydevproject + +# vscode + +.vscode + +schainpy/scripts/ \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9b1c9b9 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "python.linting.pylintEnabled": true, + "git.ignoreLimitWarning": true +} \ No newline at end of file diff --git a/schain.xml b/schain.xml index b67ac71..a18a91f 100644 --- a/schain.xml +++ b/schain.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/schainpy/admin.py b/schainpy/admin.py index c22cf9e..aaa93d3 100644 --- a/schainpy/admin.py +++ b/schainpy/admin.py @@ -15,7 +15,7 @@ from email.mime.text import MIMEText from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart -class SchainConfigure(): +class SchainConfigure(): __DEFAULT_ADMINISTRATOR_EMAIL = "" __DEFAULT_EMAIL_SERVER = "jro-zimbra.igp.gob.pe" diff --git a/schainpy/controller.py b/schainpy/controller.py index 6d6e1a8..82a8787 100644 --- a/schainpy/controller.py +++ b/schainpy/controller.py @@ -217,7 +217,6 @@ class ParameterConf(): self.id = str(new_id) def setup(self, id, name, value, format='str'): - self.id = str(id) self.name = name if format == 'obj': @@ -757,26 +756,26 @@ class ReadUnitConf(ProcUnitConf): return self.ELEMENTNAME - def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs): + def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="", + endTime="", parentId=None, queue=None, server=None, **kwargs): #Compatible with old signal chain version if datatype==None and name==None: raise ValueError, "datatype or name should be defined" - + if name==None: if 'Reader' in datatype: name = datatype else: name = '%sReader' %(datatype) - if datatype==None: datatype = name.replace('Reader','') self.id = id self.name = name self.datatype = datatype - - self.path = os.path.abspath(path) + if path != '': + self.path = os.path.abspath(path) self.startDate = startDate self.endDate = endDate self.startTime = startTime @@ -785,6 +784,7 @@ class ReadUnitConf(ProcUnitConf): self.inputId = '0' self.parentId = parentId self.queue = queue + self.server = server self.addRunOperation(**kwargs) def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs): @@ -826,16 +826,19 @@ class ReadUnitConf(ProcUnitConf): opObj = self.addOperation(name = 'run', optype = 'self') - opObj.addParameter(name='datatype' , value=self.datatype, format='str') - opObj.addParameter(name='path' , value=self.path, format='str') - opObj.addParameter(name='startDate' , value=self.startDate, format='date') - opObj.addParameter(name='endDate' , value=self.endDate, format='date') - opObj.addParameter(name='startTime' , value=self.startTime, format='time') - opObj.addParameter(name='endTime' , value=self.endTime, format='time') - opObj.addParameter(name='queue' , value=self.queue, format='obj') - - for key, value in kwargs.items(): - opObj.addParameter(name=key, value=value, format=type(value).__name__) + if self.server is None: + opObj.addParameter(name='datatype' , value=self.datatype, format='str') + opObj.addParameter(name='path' , value=self.path, format='str') + opObj.addParameter(name='startDate' , value=self.startDate, format='date') + opObj.addParameter(name='endDate' , value=self.endDate, format='date') + opObj.addParameter(name='startTime' , value=self.startTime, format='time') + opObj.addParameter(name='endTime' , value=self.endTime, format='time') + opObj.addParameter(name='queue' , value=self.queue, format='obj') + for key, value in kwargs.items(): + opObj.addParameter(name=key, value=value, format=type(value).__name__) + else: + opObj.addParameter(name='server' , value=self.server, format='str') + return opObj diff --git a/schainpy/model/data/jroheaderIO.py b/schainpy/model/data/jroheaderIO.py index 38f2d06..a869f36 100644 --- a/schainpy/model/data/jroheaderIO.py +++ b/schainpy/model/data/jroheaderIO.py @@ -115,7 +115,6 @@ class BasicHeader(Header): dstFlag = None errorCount = None datatime = None - __LOCALTIME = None def __init__(self, useLocalTime=True): @@ -133,14 +132,17 @@ class BasicHeader(Header): def read(self, fp): + self.length = 0 try: - header = numpy.fromfile(fp, BASIC_STRUCTURE,1) - + if hasattr(fp, 'read'): + header = numpy.fromfile(fp, BASIC_STRUCTURE,1) + else: + header = numpy.fromstring(fp, BASIC_STRUCTURE,1) except Exception, e: print "BasicHeader: " print e return 0 - + self.size = int(header['nSize'][0]) self.version = int(header['nVersion'][0]) self.dataBlock = int(header['nDataBlockId'][0]) @@ -152,7 +154,8 @@ class BasicHeader(Header): if self.size < 24: return 0 - + + self.length = header.nbytes return 1 def write(self, fp): @@ -197,13 +200,20 @@ class SystemHeader(Header): self.pciDioBusWidth = pciDioBusWith def read(self, fp): - - startFp = fp.tell() - + self.length = 0 + try: + startFp = fp.tell() + except Exception, e: + startFp = None + pass + try: - header = numpy.fromfile(fp,SYSTEM_STRUCTURE,1) + if hasattr(fp, 'read'): + header = numpy.fromfile(fp, SYSTEM_STRUCTURE,1) + else: + header = numpy.fromstring(fp, SYSTEM_STRUCTURE,1) except Exception, e: - print "System Header: " + e + print "System Header: " + str(e) return 0 self.size = header['nSize'][0] @@ -213,16 +223,19 @@ class SystemHeader(Header): self.adcResolution = header['nADCResolution'][0] self.pciDioBusWidth = header['nPCDIOBusWidth'][0] - endFp = self.size + startFp - if fp.tell() > endFp: - sys.stderr.write("Warning %s: Size value read from System Header is lower than it has to be\n" %fp.name) - return 0 + if startFp is not None: + endFp = self.size + startFp - if fp.tell() < endFp: - sys.stderr.write("Warning %s: Size value read from System Header size is greater than it has to be\n" %fp.name) - return 0 + if fp.tell() > endFp: + sys.stderr.write("Warning %s: Size value read from System Header is lower than it has to be\n" %fp.name) + return 0 + + if fp.tell() < endFp: + sys.stderr.write("Warning %s: Size value read from System Header size is greater than it has to be\n" %fp.name) + return 0 + self.length = header.nbytes return 1 def write(self, fp): @@ -299,13 +312,21 @@ class RadarControllerHeader(Header): self.fClock = 0.15/(deltaHeight*1e-6) #0.15Km / (height * 1u) def read(self, fp): - - - startFp = fp.tell() + self.length = 0 + try: + startFp = fp.tell() + except Exception, e: + startFp = None + pass + try: - header = numpy.fromfile(fp,RADAR_STRUCTURE,1) + if hasattr(fp, 'read'): + header = numpy.fromfile(fp, RADAR_STRUCTURE,1) + else: + header = numpy.fromstring(fp, RADAR_STRUCTURE,1) + self.length += header.nbytes except Exception, e: - print "RadarControllerHeader: " + e + print "RadarControllerHeader: " + str(e) return 0 size = int(header['nSize'][0]) @@ -326,23 +347,64 @@ class RadarControllerHeader(Header): self.rangeTxA = header['sRangeTxA'][0] self.rangeTxB = header['sRangeTxB'][0] - samplingWindow = numpy.fromfile(fp,SAMPLING_STRUCTURE,self.nWindows) - + try: + if hasattr(fp, 'read'): + samplingWindow = numpy.fromfile(fp, SAMPLING_STRUCTURE, self.nWindows) + else: + samplingWindow = numpy.fromstring(fp[self.length:], SAMPLING_STRUCTURE, self.nWindows) + self.length += samplingWindow.nbytes + except Exception, e: + print "RadarControllerHeader: " + str(e) + return 0 self.nHeights = int(numpy.sum(samplingWindow['nsa'])) self.firstHeight = samplingWindow['h0'] self.deltaHeight = samplingWindow['dh'] self.samplesWin = samplingWindow['nsa'] + + + + try: + if hasattr(fp, 'read'): + self.Taus = numpy.fromfile(fp, ' endFp: - sys.stderr.write("Warning %s: Size value read from Radar Controller header is lower than it has to be\n" %fp.name) -# return 0 + if startFp is not None: + endFp = size + startFp - if fp.tell() < endFp: - sys.stderr.write("Warning %s: Size value read from Radar Controller header is greater than it has to be\n" %fp.name) + if fp.tell() != endFp: + # fp.seek(endFp) + print "%s: Radar Controller Header size is not consistent: from data [%d] != from header field [%d]" %(fp.name, fp.tell()-startFp, size) + # return 0 + + if fp.tell() > endFp: + sys.stderr.write("Warning %s: Size value read from Radar Controller header is lower than it has to be\n" %fp.name) + # return 0 + + if fp.tell() < endFp: + sys.stderr.write("Warning %s: Size value read from Radar Controller header is greater than it has to be\n" %fp.name) + return 1 def write(self, fp): @@ -508,15 +571,23 @@ class ProcessingHeader(Header): self.flag_cspc = False self.flag_decode = False self.flag_deflip = False - + self.length = 0 def read(self, fp): - - startFp = fp.tell() + self.length = 0 + try: + startFp = fp.tell() + except Exception, e: + startFp = None + pass try: - header = numpy.fromfile(fp,PROCESSING_STRUCTURE,1) + if hasattr(fp, 'read'): + header = numpy.fromfile(fp, PROCESSING_STRUCTURE, 1) + else: + header = numpy.fromstring(fp, PROCESSING_STRUCTURE, 1) + self.length += header.nbytes except Exception, e: - print "ProcessingHeader: " + e + print "ProcessingHeader: " + str(e) return 0 size = int(header['nSize'][0]) @@ -530,14 +601,31 @@ class ProcessingHeader(Header): self.nIncohInt = int(header['nIncoherentIntegrations'][0]) self.totalSpectra = int(header['nTotalSpectra'][0]) - samplingWindow = numpy.fromfile(fp,SAMPLING_STRUCTURE,self.nWindows) + try: + if hasattr(fp, 'read'): + samplingWindow = numpy.fromfile(fp, SAMPLING_STRUCTURE, self.nWindows) + else: + samplingWindow = numpy.fromstring(fp[self.length:], SAMPLING_STRUCTURE, self.nWindows) + self.length += samplingWindow.nbytes + except Exception, e: + print "ProcessingHeader: " + str(e) + return 0 self.nHeights = int(numpy.sum(samplingWindow['nsa'])) self.firstHeight = float(samplingWindow['h0'][0]) self.deltaHeight = float(samplingWindow['dh'][0]) self.samplesWin = samplingWindow['nsa'][0] - self.spectraComb = numpy.fromfile(fp,'u1',2*self.totalSpectra) + + try: + if hasattr(fp, 'read'): + self.spectraComb = numpy.fromfile(fp, 'u1', 2*self.totalSpectra) + else: + self.spectraComb = numpy.fromstring(fp[self.length:], 'u1', 2*self.totalSpectra) + self.length += self.spectraComb.nbytes + except Exception, e: + print "ProcessingHeader: " + str(e) + return 0 if ((self.processFlags & PROCFLAG.DEFINE_PROCESS_CODE) == PROCFLAG.DEFINE_PROCESS_CODE): self.nCode = int(numpy.fromfile(fp,' 0: self.flag_cspc = True - endFp = size + startFp - if fp.tell() > endFp: - sys.stderr.write("Warning: Processing header size is lower than it has to be") - return 0 - - if fp.tell() < endFp: - sys.stderr.write("Warning: Processing header size is greater than it is considered") + + if startFp is not None: + endFp = size + startFp + if fp.tell() > endFp: + sys.stderr.write("Warning: Processing header size is lower than it has to be") + return 0 + + if fp.tell() < endFp: + sys.stderr.write("Warning: Processing header size is greater than it is considered") return 1 diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py index 30a6f83..1ebc6b2 100644 --- a/schainpy/model/io/jroIO_base.py +++ b/schainpy/model/io/jroIO_base.py @@ -11,8 +11,8 @@ import numpy import fnmatch import inspect import time, datetime -#import h5py import traceback +import zmq try: from gevent import sleep @@ -994,12 +994,13 @@ class JRODataReader(JRODataIO): self.__isFirstTimeOnline = 0 def __setNewBlock(self): - + #if self.server is None: if self.fp == None: return 0 # if self.online: # self.__jumpToLastBlock() + print 'xxxx' if self.flagIsNewFile: self.lastUTTime = self.basicHeaderObj.utc @@ -1011,19 +1012,22 @@ class JRODataReader(JRODataIO): return 0 else: return 1 - + print 'xxxx' + #if self.server is None: currentSize = self.fileSize - self.fp.tell() neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize - if (currentSize >= neededSize): self.basicHeaderObj.read(self.fp) self.lastUTTime = self.basicHeaderObj.utc return 1 - + # else: + # self.basicHeaderObj.read(self.zHeader) + # self.lastUTTime = self.basicHeaderObj.utc + # return 1 if self.__waitNewBlock(): self.lastUTTime = self.basicHeaderObj.utc return 1 - + #if self.server is None: if not(self.setNextFile()): return 0 @@ -1041,9 +1045,11 @@ class JRODataReader(JRODataIO): #Skip block out of startTime and endTime while True: + print 'cxxxx' if not(self.__setNewBlock()): + print 'returning' return 0 - + print 'dxxx' if not(self.readBlock()): return 0 @@ -1274,99 +1280,111 @@ class JRODataReader(JRODataIO): skip=None, cursor=None, warnings=True, - verbose=True): - - if path == None: - raise ValueError, "[Reading] The path is not valid" - - if ext == None: - ext = self.ext - - if online: - print "[Reading] Searching files in online mode..." - - for nTries in range( self.nTries ): - fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(path=path, expLabel=expLabel, ext=ext, walk=walk, set=set) - - if fullpath: - break - - print '[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries+1) - sleep( self.delay ) - - if not(fullpath): - print "[Reading] There 'isn't any valid file in %s" % path - return - - self.year = year - self.doy = doy - self.set = set - 1 - self.path = path - self.foldercounter = foldercounter - last_set = None + verbose=True, + server=None): + if server is not None: + if 'tcp://' in server: + address = server + else: + address = 'ipc:///tmp/%s' % server + self.server = address + self.context = zmq.Context() + self.receiver = self.context.socket(zmq.PULL) + self.receiver.connect(self.server) + time.sleep(0.5) + print '[Starting] ReceiverData from {}'.format(self.server) + else: + self.server = None + if path == None: + raise ValueError, "[Reading] The path is not valid" + + if ext == None: + ext = self.ext + + if online: + print "[Reading] Searching files in online mode..." + + for nTries in range( self.nTries ): + fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(path=path, expLabel=expLabel, ext=ext, walk=walk, set=set) + + if fullpath: + break + + print '[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries+1) + sleep( self.delay ) + + if not(fullpath): + print "[Reading] There 'isn't any valid file in %s" % path + return + + self.year = year + self.doy = doy + self.set = set - 1 + self.path = path + self.foldercounter = foldercounter + last_set = None + else: + print "[Reading] Searching files in offline mode ..." + pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate, + startTime=startTime, endTime=endTime, + set=set, expLabel=expLabel, ext=ext, + walk=walk, cursor=cursor, + skip=skip, queue=queue) - else: - print "[Reading] Searching files in offline mode ..." - pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate, - startTime=startTime, endTime=endTime, - set=set, expLabel=expLabel, ext=ext, - walk=walk, cursor=cursor, - skip=skip, queue=queue) + if not(pathList): + # print "[Reading] No *%s files in %s (%s - %s)"%(ext, path, + # datetime.datetime.combine(startDate,startTime).ctime(), + # datetime.datetime.combine(endDate,endTime).ctime()) - if not(pathList): -# print "[Reading] No *%s files in %s (%s - %s)"%(ext, path, -# datetime.datetime.combine(startDate,startTime).ctime(), -# datetime.datetime.combine(endDate,endTime).ctime()) + # sys.exit(-1) -# sys.exit(-1) + self.fileIndex = -1 + self.pathList = [] + self.filenameList = [] + return self.fileIndex = -1 - self.pathList = [] - self.filenameList = [] - return - - self.fileIndex = -1 - self.pathList = pathList - self.filenameList = filenameList - file_name = os.path.basename(filenameList[-1]) - basename, ext = os.path.splitext(file_name) - last_set = int(basename[-3:]) - - self.online = online - self.realtime = realtime - self.delay = delay - ext = ext.lower() - self.ext = ext - self.getByBlock = getblock - self.nTxs = nTxs - self.startTime = startTime - self.endTime = endTime - - #Added----------------- - self.selBlocksize = blocksize - self.selBlocktime = blocktime - - # Verbose----------- - self.verbose = verbose - self.warnings = warnings + self.pathList = pathList + self.filenameList = filenameList + file_name = os.path.basename(filenameList[-1]) + basename, ext = os.path.splitext(file_name) + last_set = int(basename[-3:]) + + self.online = online + self.realtime = realtime + self.delay = delay + ext = ext.lower() + self.ext = ext + self.getByBlock = getblock + self.nTxs = nTxs + self.startTime = startTime + self.endTime = endTime + + #Added----------------- + self.selBlocksize = blocksize + self.selBlocktime = blocktime + + # Verbose----------- + self.verbose = verbose + self.warnings = warnings - if not(self.setNextFile()): - if (startDate!=None) and (endDate!=None): - print "[Reading] No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()) - elif startDate != None: - print "[Reading] No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime()) - else: - print "[Reading] No files" + if not(self.setNextFile()): + if (startDate!=None) and (endDate!=None): + print "[Reading] No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()) + elif startDate != None: + print "[Reading] No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime()) + else: + print "[Reading] No files" - self.fileIndex = -1 - self.pathList = [] - self.filenameList = [] - return + self.fileIndex = -1 + self.pathList = [] + self.filenameList = [] + return -# self.getBasicHeader() + # self.getBasicHeader() - if last_set != None: - self.dataOut.last_block = last_set * self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock + if last_set != None: + self.dataOut.last_block = last_set * self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock return def getBasicHeader(self): @@ -1457,6 +1475,7 @@ class JRODataReader(JRODataIO): skip=None, cursor=None, warnings=True, + server=None, verbose=True, **kwargs): if not(self.isConfig): @@ -1481,10 +1500,13 @@ class JRODataReader(JRODataIO): skip=skip, cursor=cursor, warnings=warnings, + server=server, verbose=verbose) self.isConfig = True - - self.getData() + if server is None: + self.getData() + else: + self.getFromServer() class JRODataWriter(JRODataIO): diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py index 1ad1910..9d1dec2 100644 --- a/schainpy/model/io/jroIO_voltage.py +++ b/schainpy/model/io/jroIO_voltage.py @@ -10,6 +10,9 @@ from jroIO_base import LOCALTIME, JRODataReader, JRODataWriter from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader from schainpy.model.data.jrodata import Voltage +import zmq +import tempfile +from StringIO import StringIO # from _sha import blocksize class VoltageReader(JRODataReader, ProcessingUnit): @@ -188,6 +191,7 @@ class VoltageReader(JRODataReader, ProcessingUnit): pts2read = self.processingHeaderObj.profilesPerBlock * self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels self.blocksize = pts2read + def readBlock(self): """ @@ -212,11 +216,23 @@ class VoltageReader(JRODataReader, ProcessingUnit): Exceptions: Si un bloque leido no es un bloque valido """ + + print 'READ BLOCK' + # if self.server is not None: + # self.zBlock = self.receiver.recv() + # self.zHeader = self.zBlock[:24] + # self.zDataBlock = self.zBlock[24:] + # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real',' \ No newline at end of file + diff --git a/schainpy/trash b/schainpy/trash new file mode 100644 index 0000000..384299d --- /dev/null +++ b/schainpy/trash @@ -0,0 +1 @@ +You should install "digital_rf_hdf5" module if you want to read USRP data