diff --git a/.gitignore b/.gitignore
index c31424d..d63b165 100644
--- a/.gitignore
+++ b/.gitignore
@@ -100,3 +100,9 @@ ENV/
# eclipse
.project
.pydevproject
+
+# vscode
+
+.vscode
+
+schainpy/scripts/
\ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..9b1c9b9
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,4 @@
+{
+ "python.linting.pylintEnabled": true,
+ "git.ignoreLimitWarning": true
+}
\ No newline at end of file
diff --git a/schain.xml b/schain.xml
index b67ac71..a18a91f 100644
--- a/schain.xml
+++ b/schain.xml
@@ -1 +1 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/schainpy/admin.py b/schainpy/admin.py
index c22cf9e..aaa93d3 100644
--- a/schainpy/admin.py
+++ b/schainpy/admin.py
@@ -15,7 +15,7 @@ from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
-class SchainConfigure():
+class SchainConfigure():
__DEFAULT_ADMINISTRATOR_EMAIL = ""
__DEFAULT_EMAIL_SERVER = "jro-zimbra.igp.gob.pe"
diff --git a/schainpy/controller.py b/schainpy/controller.py
index 6d6e1a8..82a8787 100644
--- a/schainpy/controller.py
+++ b/schainpy/controller.py
@@ -217,7 +217,6 @@ class ParameterConf():
self.id = str(new_id)
def setup(self, id, name, value, format='str'):
-
self.id = str(id)
self.name = name
if format == 'obj':
@@ -757,26 +756,26 @@ class ReadUnitConf(ProcUnitConf):
return self.ELEMENTNAME
- def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
+ def setup(self, id, name, datatype, path='', startDate="", endDate="", startTime="",
+ endTime="", parentId=None, queue=None, server=None, **kwargs):
#Compatible with old signal chain version
if datatype==None and name==None:
raise ValueError, "datatype or name should be defined"
-
+
if name==None:
if 'Reader' in datatype:
name = datatype
else:
name = '%sReader' %(datatype)
-
if datatype==None:
datatype = name.replace('Reader','')
self.id = id
self.name = name
self.datatype = datatype
-
- self.path = os.path.abspath(path)
+ if path != '':
+ self.path = os.path.abspath(path)
self.startDate = startDate
self.endDate = endDate
self.startTime = startTime
@@ -785,6 +784,7 @@ class ReadUnitConf(ProcUnitConf):
self.inputId = '0'
self.parentId = parentId
self.queue = queue
+ self.server = server
self.addRunOperation(**kwargs)
def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
@@ -826,16 +826,19 @@ class ReadUnitConf(ProcUnitConf):
opObj = self.addOperation(name = 'run', optype = 'self')
- opObj.addParameter(name='datatype' , value=self.datatype, format='str')
- opObj.addParameter(name='path' , value=self.path, format='str')
- opObj.addParameter(name='startDate' , value=self.startDate, format='date')
- opObj.addParameter(name='endDate' , value=self.endDate, format='date')
- opObj.addParameter(name='startTime' , value=self.startTime, format='time')
- opObj.addParameter(name='endTime' , value=self.endTime, format='time')
- opObj.addParameter(name='queue' , value=self.queue, format='obj')
-
- for key, value in kwargs.items():
- opObj.addParameter(name=key, value=value, format=type(value).__name__)
+ if self.server is None:
+ opObj.addParameter(name='datatype' , value=self.datatype, format='str')
+ opObj.addParameter(name='path' , value=self.path, format='str')
+ opObj.addParameter(name='startDate' , value=self.startDate, format='date')
+ opObj.addParameter(name='endDate' , value=self.endDate, format='date')
+ opObj.addParameter(name='startTime' , value=self.startTime, format='time')
+ opObj.addParameter(name='endTime' , value=self.endTime, format='time')
+ opObj.addParameter(name='queue' , value=self.queue, format='obj')
+ for key, value in kwargs.items():
+ opObj.addParameter(name=key, value=value, format=type(value).__name__)
+ else:
+ opObj.addParameter(name='server' , value=self.server, format='str')
+
return opObj
diff --git a/schainpy/model/data/jroheaderIO.py b/schainpy/model/data/jroheaderIO.py
index 38f2d06..a869f36 100644
--- a/schainpy/model/data/jroheaderIO.py
+++ b/schainpy/model/data/jroheaderIO.py
@@ -115,7 +115,6 @@ class BasicHeader(Header):
dstFlag = None
errorCount = None
datatime = None
-
__LOCALTIME = None
def __init__(self, useLocalTime=True):
@@ -133,14 +132,17 @@ class BasicHeader(Header):
def read(self, fp):
+ self.length = 0
try:
- header = numpy.fromfile(fp, BASIC_STRUCTURE,1)
-
+ if hasattr(fp, 'read'):
+ header = numpy.fromfile(fp, BASIC_STRUCTURE,1)
+ else:
+ header = numpy.fromstring(fp, BASIC_STRUCTURE,1)
except Exception, e:
print "BasicHeader: "
print e
return 0
-
+
self.size = int(header['nSize'][0])
self.version = int(header['nVersion'][0])
self.dataBlock = int(header['nDataBlockId'][0])
@@ -152,7 +154,8 @@ class BasicHeader(Header):
if self.size < 24:
return 0
-
+
+ self.length = header.nbytes
return 1
def write(self, fp):
@@ -197,13 +200,20 @@ class SystemHeader(Header):
self.pciDioBusWidth = pciDioBusWith
def read(self, fp):
-
- startFp = fp.tell()
-
+ self.length = 0
+ try:
+ startFp = fp.tell()
+ except Exception, e:
+ startFp = None
+ pass
+
try:
- header = numpy.fromfile(fp,SYSTEM_STRUCTURE,1)
+ if hasattr(fp, 'read'):
+ header = numpy.fromfile(fp, SYSTEM_STRUCTURE,1)
+ else:
+ header = numpy.fromstring(fp, SYSTEM_STRUCTURE,1)
except Exception, e:
- print "System Header: " + e
+ print "System Header: " + str(e)
return 0
self.size = header['nSize'][0]
@@ -213,16 +223,19 @@ class SystemHeader(Header):
self.adcResolution = header['nADCResolution'][0]
self.pciDioBusWidth = header['nPCDIOBusWidth'][0]
- endFp = self.size + startFp
- if fp.tell() > endFp:
- sys.stderr.write("Warning %s: Size value read from System Header is lower than it has to be\n" %fp.name)
- return 0
+ if startFp is not None:
+ endFp = self.size + startFp
- if fp.tell() < endFp:
- sys.stderr.write("Warning %s: Size value read from System Header size is greater than it has to be\n" %fp.name)
- return 0
+ if fp.tell() > endFp:
+ sys.stderr.write("Warning %s: Size value read from System Header is lower than it has to be\n" %fp.name)
+ return 0
+
+ if fp.tell() < endFp:
+ sys.stderr.write("Warning %s: Size value read from System Header size is greater than it has to be\n" %fp.name)
+ return 0
+ self.length = header.nbytes
return 1
def write(self, fp):
@@ -299,13 +312,21 @@ class RadarControllerHeader(Header):
self.fClock = 0.15/(deltaHeight*1e-6) #0.15Km / (height * 1u)
def read(self, fp):
-
-
- startFp = fp.tell()
+ self.length = 0
+ try:
+ startFp = fp.tell()
+ except Exception, e:
+ startFp = None
+ pass
+
try:
- header = numpy.fromfile(fp,RADAR_STRUCTURE,1)
+ if hasattr(fp, 'read'):
+ header = numpy.fromfile(fp, RADAR_STRUCTURE,1)
+ else:
+ header = numpy.fromstring(fp, RADAR_STRUCTURE,1)
+ self.length += header.nbytes
except Exception, e:
- print "RadarControllerHeader: " + e
+ print "RadarControllerHeader: " + str(e)
return 0
size = int(header['nSize'][0])
@@ -326,23 +347,64 @@ class RadarControllerHeader(Header):
self.rangeTxA = header['sRangeTxA'][0]
self.rangeTxB = header['sRangeTxB'][0]
- samplingWindow = numpy.fromfile(fp,SAMPLING_STRUCTURE,self.nWindows)
-
+ try:
+ if hasattr(fp, 'read'):
+ samplingWindow = numpy.fromfile(fp, SAMPLING_STRUCTURE, self.nWindows)
+ else:
+ samplingWindow = numpy.fromstring(fp[self.length:], SAMPLING_STRUCTURE, self.nWindows)
+ self.length += samplingWindow.nbytes
+ except Exception, e:
+ print "RadarControllerHeader: " + str(e)
+ return 0
self.nHeights = int(numpy.sum(samplingWindow['nsa']))
self.firstHeight = samplingWindow['h0']
self.deltaHeight = samplingWindow['dh']
self.samplesWin = samplingWindow['nsa']
+
+
+
+ try:
+ if hasattr(fp, 'read'):
+ self.Taus = numpy.fromfile(fp, ' endFp:
- sys.stderr.write("Warning %s: Size value read from Radar Controller header is lower than it has to be\n" %fp.name)
-# return 0
+ if startFp is not None:
+ endFp = size + startFp
- if fp.tell() < endFp:
- sys.stderr.write("Warning %s: Size value read from Radar Controller header is greater than it has to be\n" %fp.name)
+ if fp.tell() != endFp:
+ # fp.seek(endFp)
+ print "%s: Radar Controller Header size is not consistent: from data [%d] != from header field [%d]" %(fp.name, fp.tell()-startFp, size)
+ # return 0
+
+ if fp.tell() > endFp:
+ sys.stderr.write("Warning %s: Size value read from Radar Controller header is lower than it has to be\n" %fp.name)
+ # return 0
+
+ if fp.tell() < endFp:
+ sys.stderr.write("Warning %s: Size value read from Radar Controller header is greater than it has to be\n" %fp.name)
+
return 1
def write(self, fp):
@@ -508,15 +571,23 @@ class ProcessingHeader(Header):
self.flag_cspc = False
self.flag_decode = False
self.flag_deflip = False
-
+ self.length = 0
def read(self, fp):
-
- startFp = fp.tell()
+ self.length = 0
+ try:
+ startFp = fp.tell()
+ except Exception, e:
+ startFp = None
+ pass
try:
- header = numpy.fromfile(fp,PROCESSING_STRUCTURE,1)
+ if hasattr(fp, 'read'):
+ header = numpy.fromfile(fp, PROCESSING_STRUCTURE, 1)
+ else:
+ header = numpy.fromstring(fp, PROCESSING_STRUCTURE, 1)
+ self.length += header.nbytes
except Exception, e:
- print "ProcessingHeader: " + e
+ print "ProcessingHeader: " + str(e)
return 0
size = int(header['nSize'][0])
@@ -530,14 +601,31 @@ class ProcessingHeader(Header):
self.nIncohInt = int(header['nIncoherentIntegrations'][0])
self.totalSpectra = int(header['nTotalSpectra'][0])
- samplingWindow = numpy.fromfile(fp,SAMPLING_STRUCTURE,self.nWindows)
+ try:
+ if hasattr(fp, 'read'):
+ samplingWindow = numpy.fromfile(fp, SAMPLING_STRUCTURE, self.nWindows)
+ else:
+ samplingWindow = numpy.fromstring(fp[self.length:], SAMPLING_STRUCTURE, self.nWindows)
+ self.length += samplingWindow.nbytes
+ except Exception, e:
+ print "ProcessingHeader: " + str(e)
+ return 0
self.nHeights = int(numpy.sum(samplingWindow['nsa']))
self.firstHeight = float(samplingWindow['h0'][0])
self.deltaHeight = float(samplingWindow['dh'][0])
self.samplesWin = samplingWindow['nsa'][0]
- self.spectraComb = numpy.fromfile(fp,'u1',2*self.totalSpectra)
+
+ try:
+ if hasattr(fp, 'read'):
+ self.spectraComb = numpy.fromfile(fp, 'u1', 2*self.totalSpectra)
+ else:
+ self.spectraComb = numpy.fromstring(fp[self.length:], 'u1', 2*self.totalSpectra)
+ self.length += self.spectraComb.nbytes
+ except Exception, e:
+ print "ProcessingHeader: " + str(e)
+ return 0
if ((self.processFlags & PROCFLAG.DEFINE_PROCESS_CODE) == PROCFLAG.DEFINE_PROCESS_CODE):
self.nCode = int(numpy.fromfile(fp,' 0:
self.flag_cspc = True
- endFp = size + startFp
- if fp.tell() > endFp:
- sys.stderr.write("Warning: Processing header size is lower than it has to be")
- return 0
-
- if fp.tell() < endFp:
- sys.stderr.write("Warning: Processing header size is greater than it is considered")
+
+ if startFp is not None:
+ endFp = size + startFp
+ if fp.tell() > endFp:
+ sys.stderr.write("Warning: Processing header size is lower than it has to be")
+ return 0
+
+ if fp.tell() < endFp:
+ sys.stderr.write("Warning: Processing header size is greater than it is considered")
return 1
diff --git a/schainpy/model/io/jroIO_base.py b/schainpy/model/io/jroIO_base.py
index 30a6f83..1ebc6b2 100644
--- a/schainpy/model/io/jroIO_base.py
+++ b/schainpy/model/io/jroIO_base.py
@@ -11,8 +11,8 @@ import numpy
import fnmatch
import inspect
import time, datetime
-#import h5py
import traceback
+import zmq
try:
from gevent import sleep
@@ -994,12 +994,13 @@ class JRODataReader(JRODataIO):
self.__isFirstTimeOnline = 0
def __setNewBlock(self):
-
+ #if self.server is None:
if self.fp == None:
return 0
# if self.online:
# self.__jumpToLastBlock()
+ print 'xxxx'
if self.flagIsNewFile:
self.lastUTTime = self.basicHeaderObj.utc
@@ -1011,19 +1012,22 @@ class JRODataReader(JRODataIO):
return 0
else:
return 1
-
+ print 'xxxx'
+ #if self.server is None:
currentSize = self.fileSize - self.fp.tell()
neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
-
if (currentSize >= neededSize):
self.basicHeaderObj.read(self.fp)
self.lastUTTime = self.basicHeaderObj.utc
return 1
-
+ # else:
+ # self.basicHeaderObj.read(self.zHeader)
+ # self.lastUTTime = self.basicHeaderObj.utc
+ # return 1
if self.__waitNewBlock():
self.lastUTTime = self.basicHeaderObj.utc
return 1
-
+ #if self.server is None:
if not(self.setNextFile()):
return 0
@@ -1041,9 +1045,11 @@ class JRODataReader(JRODataIO):
#Skip block out of startTime and endTime
while True:
+ print 'cxxxx'
if not(self.__setNewBlock()):
+ print 'returning'
return 0
-
+ print 'dxxx'
if not(self.readBlock()):
return 0
@@ -1274,99 +1280,111 @@ class JRODataReader(JRODataIO):
skip=None,
cursor=None,
warnings=True,
- verbose=True):
-
- if path == None:
- raise ValueError, "[Reading] The path is not valid"
-
- if ext == None:
- ext = self.ext
-
- if online:
- print "[Reading] Searching files in online mode..."
-
- for nTries in range( self.nTries ):
- fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
-
- if fullpath:
- break
-
- print '[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries+1)
- sleep( self.delay )
-
- if not(fullpath):
- print "[Reading] There 'isn't any valid file in %s" % path
- return
-
- self.year = year
- self.doy = doy
- self.set = set - 1
- self.path = path
- self.foldercounter = foldercounter
- last_set = None
+ verbose=True,
+ server=None):
+ if server is not None:
+ if 'tcp://' in server:
+ address = server
+ else:
+ address = 'ipc:///tmp/%s' % server
+ self.server = address
+ self.context = zmq.Context()
+ self.receiver = self.context.socket(zmq.PULL)
+ self.receiver.connect(self.server)
+ time.sleep(0.5)
+ print '[Starting] ReceiverData from {}'.format(self.server)
+ else:
+ self.server = None
+ if path == None:
+ raise ValueError, "[Reading] The path is not valid"
+
+ if ext == None:
+ ext = self.ext
+
+ if online:
+ print "[Reading] Searching files in online mode..."
+
+ for nTries in range( self.nTries ):
+ fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
+
+ if fullpath:
+ break
+
+ print '[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries+1)
+ sleep( self.delay )
+
+ if not(fullpath):
+ print "[Reading] There 'isn't any valid file in %s" % path
+ return
+
+ self.year = year
+ self.doy = doy
+ self.set = set - 1
+ self.path = path
+ self.foldercounter = foldercounter
+ last_set = None
+ else:
+ print "[Reading] Searching files in offline mode ..."
+ pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate,
+ startTime=startTime, endTime=endTime,
+ set=set, expLabel=expLabel, ext=ext,
+ walk=walk, cursor=cursor,
+ skip=skip, queue=queue)
- else:
- print "[Reading] Searching files in offline mode ..."
- pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate,
- startTime=startTime, endTime=endTime,
- set=set, expLabel=expLabel, ext=ext,
- walk=walk, cursor=cursor,
- skip=skip, queue=queue)
+ if not(pathList):
+ # print "[Reading] No *%s files in %s (%s - %s)"%(ext, path,
+ # datetime.datetime.combine(startDate,startTime).ctime(),
+ # datetime.datetime.combine(endDate,endTime).ctime())
- if not(pathList):
-# print "[Reading] No *%s files in %s (%s - %s)"%(ext, path,
-# datetime.datetime.combine(startDate,startTime).ctime(),
-# datetime.datetime.combine(endDate,endTime).ctime())
+ # sys.exit(-1)
-# sys.exit(-1)
+ self.fileIndex = -1
+ self.pathList = []
+ self.filenameList = []
+ return
self.fileIndex = -1
- self.pathList = []
- self.filenameList = []
- return
-
- self.fileIndex = -1
- self.pathList = pathList
- self.filenameList = filenameList
- file_name = os.path.basename(filenameList[-1])
- basename, ext = os.path.splitext(file_name)
- last_set = int(basename[-3:])
-
- self.online = online
- self.realtime = realtime
- self.delay = delay
- ext = ext.lower()
- self.ext = ext
- self.getByBlock = getblock
- self.nTxs = nTxs
- self.startTime = startTime
- self.endTime = endTime
-
- #Added-----------------
- self.selBlocksize = blocksize
- self.selBlocktime = blocktime
-
- # Verbose-----------
- self.verbose = verbose
- self.warnings = warnings
+ self.pathList = pathList
+ self.filenameList = filenameList
+ file_name = os.path.basename(filenameList[-1])
+ basename, ext = os.path.splitext(file_name)
+ last_set = int(basename[-3:])
+
+ self.online = online
+ self.realtime = realtime
+ self.delay = delay
+ ext = ext.lower()
+ self.ext = ext
+ self.getByBlock = getblock
+ self.nTxs = nTxs
+ self.startTime = startTime
+ self.endTime = endTime
+
+ #Added-----------------
+ self.selBlocksize = blocksize
+ self.selBlocktime = blocktime
+
+ # Verbose-----------
+ self.verbose = verbose
+ self.warnings = warnings
- if not(self.setNextFile()):
- if (startDate!=None) and (endDate!=None):
- print "[Reading] No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime())
- elif startDate != None:
- print "[Reading] No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime())
- else:
- print "[Reading] No files"
+ if not(self.setNextFile()):
+ if (startDate!=None) and (endDate!=None):
+ print "[Reading] No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime())
+ elif startDate != None:
+ print "[Reading] No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime())
+ else:
+ print "[Reading] No files"
- self.fileIndex = -1
- self.pathList = []
- self.filenameList = []
- return
+ self.fileIndex = -1
+ self.pathList = []
+ self.filenameList = []
+ return
-# self.getBasicHeader()
+ # self.getBasicHeader()
- if last_set != None:
- self.dataOut.last_block = last_set * self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
+ if last_set != None:
+ self.dataOut.last_block = last_set * self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
return
def getBasicHeader(self):
@@ -1457,6 +1475,7 @@ class JRODataReader(JRODataIO):
skip=None,
cursor=None,
warnings=True,
+ server=None,
verbose=True, **kwargs):
if not(self.isConfig):
@@ -1481,10 +1500,13 @@ class JRODataReader(JRODataIO):
skip=skip,
cursor=cursor,
warnings=warnings,
+ server=server,
verbose=verbose)
self.isConfig = True
-
- self.getData()
+ if server is None:
+ self.getData()
+ else:
+ self.getFromServer()
class JRODataWriter(JRODataIO):
diff --git a/schainpy/model/io/jroIO_voltage.py b/schainpy/model/io/jroIO_voltage.py
index 1ad1910..9d1dec2 100644
--- a/schainpy/model/io/jroIO_voltage.py
+++ b/schainpy/model/io/jroIO_voltage.py
@@ -10,6 +10,9 @@ from jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
from schainpy.model.data.jrodata import Voltage
+import zmq
+import tempfile
+from StringIO import StringIO
# from _sha import blocksize
class VoltageReader(JRODataReader, ProcessingUnit):
@@ -188,6 +191,7 @@ class VoltageReader(JRODataReader, ProcessingUnit):
pts2read = self.processingHeaderObj.profilesPerBlock * self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels
self.blocksize = pts2read
+
def readBlock(self):
"""
@@ -212,11 +216,23 @@ class VoltageReader(JRODataReader, ProcessingUnit):
Exceptions:
Si un bloque leido no es un bloque valido
"""
+
+ print 'READ BLOCK'
+ # if self.server is not None:
+ # self.zBlock = self.receiver.recv()
+ # self.zHeader = self.zBlock[:24]
+ # self.zDataBlock = self.zBlock[24:]
+ # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real','
\ No newline at end of file
+
diff --git a/schainpy/trash b/schainpy/trash
new file mode 100644
index 0000000..384299d
--- /dev/null
+++ b/schainpy/trash
@@ -0,0 +1 @@
+You should install "digital_rf_hdf5" module if you want to read USRP data