##// END OF EJS Templates
simulador y pruebas de librerias
avaldez -
r1290:a2308e7c2e9e
parent child
Show More
@@ -0,0 +1,485
1 import numpy,math,random,time
2 import zmq
3 import tempfile
4 from io import StringIO
5 ########## 1 Heredamos JRODatareader
6 from schainpy.model.io.jroIO_base import *
7 ########## 2 Heredamos las propiedades de ProcessingUnit
8 from schainpy.model.proc.jroproc_base import ProcessingUnit,Operation,MPDecorator
9 ########## 3 Importaremos las clases BascicHeader, SystemHeader, RadarControlHeader, ProcessingHeader
10 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader,SystemHeader,RadarControllerHeader, ProcessingHeader
11 ########## 4 Importaremos el objeto Voltge
12 from schainpy.model.data.jrodata import Voltage
13
14 @MPDecorator
15 class SimulatorReader(JRODataReader, ProcessingUnit):
16 incIntFactor = 1
17 nFFTPoints = 0
18 FixPP_IncInt = 1
19 FixRCP_IPP = 1000
20 FixPP_CohInt = 1
21 Tau_0 = 250
22 AcqH0_0 = 70
23 H0 = AcqH0_0
24 AcqDH_0 = 1.25
25 DH0 = AcqDH_0
26 Bauds = 32
27 BaudWidth = None
28 FixRCP_TXA = 40
29 FixRCP_TXB = 70
30 fAngle = 2.0*math.pi*(1/16)
31 DC_level = 500
32 stdev = 8
33 Num_Codes = 2
34 #code0 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,1,1,1,0,1,1,0,1,0,0,0,1,1,1,0,1])
35 #code1 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,0,0,0,1,0,0,1,0,1,1,1,0,0,0,1,0])
36 #Dyn_snCode = numpy.array([Num_Codes,Bauds])
37 Dyn_snCode = None
38 Samples = 200
39 channels = 5
40 pulses = None
41 Reference = None
42 pulse_size = None
43 prof_gen = None
44 Fdoppler = 100
45 Hdoppler = 36
46 def __init__(self):
47 """
48 Inicializador de la clases SimulatorReader para
49 generar datos de voltage simulados.
50 Input:
51 dataOut: Objeto de la clase Voltage.
52 Este Objeto sera utilizado apra almacenar
53 un perfil de datos cada vez qe se haga psiversho
54 un requerimiento (getData)
55 """
56 ProcessingUnit.__init__(self)
57 print(" [ START ] init - Metodo Simulator Reader")
58
59 self.isConfig = False
60 self.basicHeaderObj = BasicHeader(LOCALTIME)
61 self.systemHeaderObj = SystemHeader()
62 self.radarControllerHeaderObj = RadarControllerHeader()
63 self.processingHeaderObj = ProcessingHeader()
64 self.profileIndex = 2**32-1
65 self.dataOut = Voltage()
66 #code0 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,1,1,1,0,1,1,0,1,0,0,0,1,1,1,0,1])
67 code0 = numpy.array([1,1,1,-1,1,1,-1,1,1,1,1,-1,-1,-1,1,-1,1,1,1,-1,1,1,-1,1,-1,-1,-1,1,1,1,-1,1])
68 #code1 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,0,0,0,1,0,0,1,0,1,1,1,0,0,0,1,0])
69 code1 = numpy.array([1,1,1,-1,1,1,-1,1,1,1,1,-1,-1,-1,1,-1,-1,-1,-1,1,-1,-1,1,-1,1,1,1,-1,-1,-1,1,-1])
70 #self.Dyn_snCode = numpy.array([code0,code1])
71 self.Dyn_snCode = None
72 print(" [ END ] init - Metodo simulator Reader" )
73
74
75 def __hasNotDataInBuffer(self):
76
77 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock* self.nTxs:
78 if self.nReadBlocks>0:
79 tmp = self.dataOut.utctime
80 tmp_utc = int(self.dataOut.utctime)
81 tmp_milisecond = int((tmp-tmp_utc)*1000)
82 self.basicHeaderObj.utc = tmp_utc
83 self.basicHeaderObj.miliSecond= tmp_milisecond
84 return 1
85 return 0
86
87
88 def setNextFile(self):
89 """Set the next file to be readed open it and parse de file header"""
90
91 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
92 print('------------------- [Opening file] ------------------------------')
93 self.nReadBlocks = 0
94
95 def __setNewBlock(self):
96
97 self.setNextFile()
98 if self.flagIsNewFile:
99 return 1
100
101 def readNextBlock(self):
102 while True:
103 self.__setNewBlock()
104 if not(self.readBlock()):
105 return 0
106 self.getBasicHeader()
107 break
108 if self.verbose:
109 print("[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
110 self.processingHeaderObj.dataBlocksPerFile,
111 self.dataOut.datatime.ctime()) )
112 return 1
113
114 def getFirstHeader(self):
115 self.getBasicHeader()
116 self.dataOut.processingHeaderObj = self.processingHeaderObj.copy()
117 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
118 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
119 #ADD NEW
120 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock
121 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights) * self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
122 self.dataOut.channelList = list(range(self.systemHeaderObj.nChannels))
123 self.dataOut.nCohInt = self.processingHeaderObj.nCohInt
124 # asumo q la data no esta decodificada
125 self.dataOut.flagDecodeData = self.processingHeaderObj.flag_decode
126 # asumo q la data no esta sin flip
127 self.dataOut.flagDeflipData = self.processingHeaderObj.flag_deflip
128 self.dataOut.flagShiftFFT = self.processingHeaderObj.shif_fft
129
130 def getBasicHeader(self):
131
132 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
133 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
134
135 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
136
137 self.dataOut.timeZone = self.basicHeaderObj.timeZone
138
139 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
140
141 self.dataOut.errorCount = self.basicHeaderObj.errorCount
142
143 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
144
145 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
146
147 def reshapeData(self):
148 if self.nTxs==1:
149 return
150
151 def readBlock(self):
152
153 self.jro_GenerateBlockOfData(Samples= self.samples,DC_level=self.DC_level,
154 stdev=self.stdev,Reference= self.Reference,
155 pulses = self.pulses,Num_Codes=self.Num_Codes,
156 pulse_size=self.pulse_size,prof_gen=self.profiles,
157 H0=self.H0,DH0=self.DH0)
158
159 self.profileIndex = 0
160 self.flagIsNewFile = 0
161 self.flagIsNewBlock = 1
162 self.nTotalBlocks += 1
163 self.nReadBlocks += 1
164
165 return 1
166
167
168 def getData(self): ### metodo propio de VoltageReader
169
170 if self.flagNoMoreFiles:
171 self.dataOut.flagNodata = True
172 self.flagDiscontinuousBlock = 0
173 self.flagIsNewBlock = 0
174 if self.__hasNotDataInBuffer(): # aqui es verdad
175 if not(self.readNextBlock()): # return 1 y por eso el if not salta a getBasic Header
176 return 0
177 self.getFirstHeader() # atributo
178 self.reshapeData() # nTxx1 =1 return , n
179
180 if not self.getByBlock:
181 self.dataOut.flagDataAsBlock = False
182 self.dataOut.data = self.datablock[:, self.profileIndex, :]
183 self.dataOut.profileIndex = self.profileIndex
184 self.profileIndex += 1
185 else:
186 pass
187 self.dataOut.flagNoData = False
188 self.getBasicHeader()
189 self.dataOut.realtime = self.online
190 return self.dataOut.data
191
192 def set_kwargs(self, **kwargs):
193 for key, value in kwargs.items():
194 setattr(self, key, value)
195
196 def set_RCH(self, expType=2, nTx=1,ipp=None, txA=0, txB=0,
197 nWindows=None, nHeights=None, firstHeight=None, deltaHeight=None,
198 numTaus=0, line6Function=0, line5Function=0, fClock=None,
199 prePulseBefore=0, prePulseAfter=0,
200 codeType=0, nCode=0, nBaud=0, code=None,
201 flip1=0, flip2=0):
202
203 self.radarControllerHeaderObj.expType = expType
204 self.radarControllerHeaderObj.nTx = nTx
205 self.radarControllerHeaderObj.ipp = float(ipp)
206 self.radarControllerHeaderObj.txA = float(txA)
207 self.radarControllerHeaderObj.txB = float(txB)
208 self.radarControllerHeaderObj.rangeIPP = ipp
209 self.radarControllerHeaderObj.rangeTxA = txA
210 self.radarControllerHeaderObj.rangeTxB = txB
211
212 self.radarControllerHeaderObj.nHeights = int(nHeights)
213 self.radarControllerHeaderObj.firstHeight = numpy.array([firstHeight])
214 self.radarControllerHeaderObj.deltaHeight = numpy.array([deltaHeight])
215 self.radarControllerHeaderObj.samplesWin = numpy.array([nHeights])
216
217
218 self.radarControllerHeaderObj.nWindows = nWindows
219 self.radarControllerHeaderObj.numTaus = numTaus
220 self.radarControllerHeaderObj.codeType = codeType
221 self.radarControllerHeaderObj.line6Function = line6Function
222 self.radarControllerHeaderObj.line5Function = line5Function
223 self.radarControllerHeaderObj.fclock = fClock
224 self.radarControllerHeaderObj.prePulseBefore= prePulseBefore
225 self.radarControllerHeaderObj.prePulseAfter = prePulseAfter
226
227 self.radarControllerHeaderObj.nCode = nCode
228 self.radarControllerHeaderObj.nBaud = nBaud
229 self.radarControllerHeaderObj.code = code
230 self.radarControllerHeaderObj.flip1 = flip1
231 self.radarControllerHeaderObj.flip2 = flip2
232
233 self.radarControllerHeaderObj.code_size = int(numpy.ceil(nBaud / 32.)) * nCode * 4
234
235 if fClock is None and deltaHeight is not None:
236 self.fClock = 0.15 / (deltaHeight * 1e-6)
237
238 def set_PH(self, dtype=0, blockSize=0, profilesPerBlock=0,
239 dataBlocksPerFile=0, nWindows=0, processFlags=0, nCohInt=0,
240 nIncohInt=0, totalSpectra=0, nHeights=0, firstHeight=0,
241 deltaHeight=0, samplesWin=0, spectraComb=0, nCode=0,
242 code=0, nBaud=None, shif_fft=False, flag_dc=False,
243 flag_cspc=False, flag_decode=False, flag_deflip=False):
244
245 self.processingHeaderObj.profilesPerBlock = profilesPerBlock
246 self.processingHeaderObj.dataBlocksPerFile = dataBlocksPerFile
247 self.processingHeaderObj.nWindows = nWindows
248 self.processingHeaderObj.nCohInt = nCohInt
249 self.processingHeaderObj.nIncohInt = nIncohInt
250 self.processingHeaderObj.totalSpectra = totalSpectra
251 self.processingHeaderObj.nHeights = int(nHeights)
252 self.processingHeaderObj.firstHeight = firstHeight
253 self.processingHeaderObj.deltaHeight = deltaHeight
254 self.processingHeaderObj.samplesWin = nHeights
255
256 def set_BH(self, utc = 0, miliSecond = 0, timeZone = 0):
257 self.basicHeaderObj.utc = utc
258 self.basicHeaderObj.miliSecond = miliSecond
259 self.basicHeaderObj.timeZone = timeZone
260
261 def set_SH(self, nSamples=0, nProfiles=0, nChannels=0, adcResolution=14, pciDioBusWidth=0):
262 self.systemHeaderObj.nSamples = nSamples
263 self.systemHeaderObj.nProfiles = nProfiles
264 self.systemHeaderObj.nChannels = nChannels
265 self.systemHeaderObj.adcResolution = adcResolution
266 self.systemHeaderObj.pciDioBusWidth = pciDioBusWidth
267
268 def setup(self,incIntFactor= 1, nFFTPoints = 0, FixPP_IncInt=1,FixRCP_IPP=1000,
269 FixPP_CohInt= 1,Tau_0= 250,AcqH0_0 = 70 ,AcqDH_0=1.25, Bauds= 32,
270 FixRCP_TXA = 40, FixRCP_TXB = 50, fAngle = 2.0*math.pi*(1/16),DC_level= 500,
271 stdev= 8,Num_Codes = 1 , Dyn_snCode = None, samples=200,channels=1,Fdoppler=20,Hdoppler=36,
272 **kwargs):
273
274 self.set_kwargs(**kwargs)
275 self.nReadBlocks = 0
276 tmp = time.time()
277 tmp_utc = int(tmp)
278 tmp_milisecond = int((tmp-tmp_utc)*1000)
279 print(" SETUP -basicHeaderObj.utc",datetime.datetime.utcfromtimestamp(tmp))
280 if Dyn_snCode is None:
281 Num_Codes=1
282 Bauds =1
283
284
285
286 self.set_BH(utc= tmp_utc,miliSecond= tmp_milisecond,timeZone=300 )
287
288 self.set_RCH( expType=0, nTx=150,ipp=FixRCP_IPP, txA=FixRCP_TXA, txB= FixRCP_TXB,
289 nWindows=1 , nHeights=samples, firstHeight=AcqH0_0, deltaHeight=AcqDH_0,
290 numTaus=1, line6Function=0, line5Function=0, fClock=None,
291 prePulseBefore=0, prePulseAfter=0,
292 codeType=14, nCode=Num_Codes, nBaud=32, code=Dyn_snCode,
293 flip1=0, flip2=0)
294
295 self.set_PH(dtype=0, blockSize=0, profilesPerBlock=300,
296 dataBlocksPerFile=120, nWindows=1, processFlags=0, nCohInt=1,
297 nIncohInt=1, totalSpectra=0, nHeights=samples, firstHeight=AcqH0_0,
298 deltaHeight=AcqDH_0, samplesWin=samples, spectraComb=0, nCode=0,
299 code=0, nBaud=None, shif_fft=False, flag_dc=False,
300 flag_cspc=False, flag_decode=False, flag_deflip=False)
301
302 self.set_SH(nSamples=samples, nProfiles=300, nChannels=channels)
303
304 self.incIntFactor = incIntFactor
305 self.nFFTPoints = nFFTPoints
306 self.FixPP_IncInt = FixPP_IncInt
307 self.FixRCP_IPP = FixRCP_IPP
308 self.FixPP_CohInt = FixPP_CohInt
309 self.Tau_0 = Tau_0
310 self.AcqH0_0 = AcqH0_0
311 self.H0 = AcqH0_0
312 self.AcqDH_0 = AcqDH_0
313 self.DH0 = AcqDH_0
314 self.Bauds = Bauds
315 self.FixRCP_TXA = FixRCP_TXA
316 self.FixRCP_TXB = FixRCP_TXB
317 self.fAngle = fAngle
318 self.DC_level = DC_level
319 self.stdev = stdev
320 self.Num_Codes = Num_Codes
321 self.Dyn_snCode = Dyn_snCode
322 self.samples = samples
323 self.channels = channels
324 self.profiles = None
325 self.m_nReference = None
326 self.Baudwidth = None
327 self.Fdoppler = Fdoppler
328 self.Hdoppler = Hdoppler
329
330 print("IPP ", self.FixRCP_IPP)
331 print("Tau_0 ",self.Tau_0)
332 print("AcqH0_0",self.AcqH0_0)
333 print("samples,window ",self.samples)
334 print("AcqDH_0",AcqDH_0)
335 print("FixRCP_TXA",self.FixRCP_TXA)
336 print("FixRCP_TXB",self.FixRCP_TXB)
337 print("Dyn_snCode",Dyn_snCode)
338 print("Fdoppler", Fdoppler)
339 print("Hdoppler",Hdoppler)
340
341 self.init_acquisition()
342 self.pulses,self.pulse_size=self.init_pulse(Num_Codes=self.Num_Codes,Bauds=self.Bauds,BaudWidth=self.BaudWidth,Dyn_snCode=Dyn_snCode)
343 print(" [ END ] - SETUP metodo")
344 return
345
346 def run(self,**kwargs): # metodo propio
347 if not(self.isConfig):
348 self.setup(**kwargs)
349 self.isConfig = True
350 self.getData()
351
352 ##################################################################
353 ###### Aqui ingresamos las clases y metodos propios del simulador
354 ##################################################################
355
356 #############################################
357 ############## INIT_ACQUISITION##############
358 #############################################
359 def init_acquisition(self):
360
361 if self.nFFTPoints != 0:
362 self.incIntFactor = m_nProfilesperBlock/self.nFFTPoints
363 if (self.FixPP_IncInt > self.incIntFactor):
364 self.incIntFactor = self.FixPP_IncInt/ self.incIntFactor
365 elif(self.FixPP_IncInt< self.incIntFactor):
366 print("False alert...")
367
368 ProfilesperBlock = self.processingHeaderObj.profilesPerBlock
369
370 self.timeperblock =int(((self.FixRCP_IPP
371 *ProfilesperBlock
372 *self.FixPP_CohInt
373 *self.incIntFactor)
374 /150.0)
375 *0.9
376 +0.5)
377 # para cada canal
378 self.profiles = ProfilesperBlock*self.FixPP_CohInt
379 self.profiles = ProfilesperBlock
380 self.Reference = int((self.Tau_0-self.AcqH0_0)/(self.AcqDH_0)+0.5)
381 self.BaudWidth = int((self.FixRCP_TXA/self.AcqDH_0)/self.Bauds + 0.5 )
382
383 if (self.BaudWidth==0):
384 self.BaudWidth=1
385 #################################################################
386 ####################### init_pulse ##############################
387 ################################################################
388
389 def init_pulse(self,Num_Codes=Num_Codes,Bauds=Bauds,BaudWidth=BaudWidth,Dyn_snCode=Dyn_snCode):
390
391 Num_Codes = Num_Codes
392 Bauds = Bauds
393 BaudWidth = BaudWidth
394 Dyn_snCode = Dyn_snCode
395
396 if Dyn_snCode:
397 print("EXISTE")
398 else:
399 print("No existe")
400
401 if Dyn_snCode: # if Bauds:
402 pulses = list(range(0,Num_Codes))
403 num_codes = Num_Codes
404 for i in range(num_codes):
405 pulse_size = Bauds*BaudWidth
406 pulses[i] = numpy.zeros(pulse_size)
407 for j in range(Bauds):
408 for k in range(BaudWidth):
409 pulses[i][j*BaudWidth+k] = int(Dyn_snCode[i][j]*600)
410 else:
411 print("sin code")
412 pulses = list(range(1))
413 pulse_size = int(self.FixRCP_TXB/0.15+0.5)
414 pulses[0] = numpy.ones(pulse_size)
415 pulses = 600*pulses[0]
416
417 return pulses,pulse_size
418
419 #################################################################
420 ##################### Generate block data
421 ################################################################
422
423 def jro_GenerateBlockOfData(self,Samples=Samples,DC_level= DC_level,stdev=stdev,
424 Reference= Reference,pulses= pulses,
425 Num_Codes= Num_Codes,pulse_size=pulse_size,
426 prof_gen= prof_gen,H0 = H0,DH0=DH0,Fdoppler= Fdoppler,Hdoppler=Hdoppler):
427 Samples = Samples
428 DC_level = DC_level
429 stdev = stdev
430 m_nR = Reference
431 pulses = pulses
432 num_codes = Num_Codes
433 ps = pulse_size
434 prof_gen = prof_gen
435 channels = self.channels
436 H0 = H0
437 DH0 = DH0
438 ippSec = self.radarControllerHeaderObj.ippSeconds
439 Fdoppler = self.Fdoppler
440 Hdoppler = self.Hdoppler
441
442 self.datablock = numpy.zeros([channels,prof_gen,Samples],dtype= numpy.complex64)
443 for i in range(channels):
444 for k in range(prof_gen):
445 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·NOISEΒ·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·
446 Noise_r = numpy.random.normal(DC_level,stdev,Samples)
447 Noise_i = numpy.random.normal(DC_level,stdev,Samples)
448 Noise = numpy.zeros(Samples,dtype=complex)
449 Noise.real = Noise_r
450 Noise.imag = Noise_i
451 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·PULSOSΒ·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·
452 Pulso = numpy.zeros(pulse_size,dtype=complex)
453 Pulso.real = pulses[k%num_codes]
454 Pulso.imag = pulses[k%num_codes]
455 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β· PULSES+NOISEΒ·Β·Β·Β·Β·Β·Β·Β·Β·Β·
456 InBuffer = numpy.zeros(Samples,dtype=complex)
457 InBuffer[m_nR:m_nR+ps] = Pulso
458 InBuffer = Noise+ InBuffer
459 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β· ANGLE Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·
460
461
462
463
464 InBuffer.imag[m_nR:m_nR+ps] = InBuffer.imag[m_nR:m_nR+ps]*(math.sin( self.fAngle)*5)
465 InBuffer=InBuffer
466 self.datablock[i][k]= InBuffer
467 #plot_cts(InBuffer,H0=H0,DH0=DH0
468
469
470 #wave_fft(x=InBuffer,plot_show=True)
471 #time.sleep(1)
472 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·DOPPLER SIGNAL...............................................
473 time_vec = numpy.linspace(0,(prof_gen-1)*ippSec,int(prof_gen))+self.nReadBlocks*ippSec*prof_gen
474 fd = Fdoppler #+(600.0/120)*self.nReadBlocks
475 d_signal = 650*numpy.array(numpy.exp(1.0j*2.0*math.pi*fd*time_vec),dtype=numpy.complex64)
476 #Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β· DATABLOCK + DOPPLERΒ·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·Β·...........................
477 HD=int(Hdoppler/self.AcqDH_0)
478 self.datablock[0,:,HD]=self.datablock[0,:,HD]+ d_signal # RESULT
479 '''
480 a= numpy.zeros(10)
481 for i in range(10):
482 a[i]=i+self.nReadBlocks+20
483 for i in a:
484 self.datablock[0,:,int(i)]=self.datablock[0,:,int(i)]+ d_signal # RESULT
485 '''
@@ -0,0 +1,336
1 import numpy,math
2 import zmq
3 import tempfile
4 from io import StringIO
5 ########## 1 Heredamos JRODatareader
6 from schainpy.model.io.jroIO_base import *
7 ########## 2 Heredamos las propiedades de ProcessingUnit
8 from schainpy.model.proc.jroproc_base import ProcessingUnit,Operation,MPDecorator
9 ########## 3 Importaremos las clases BascicHeader, SystemHeader, RadarControlHeader, ProcessingHeader
10 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader,SystemHeader,RadarControllerHeader, ProcessingHeader
11 ########## 4 Importaremos el objeto Voltge
12 from schainpy.model.data.jrodata import Voltage
13
14 @MPDecorator
15 class SimulatorReader(JRODataReader, ProcessingUnit):
16 incIntFactor = 1
17 nFFTPoints = 0
18 FixPP_IncInt = 1
19 FixRCP_IPP = 1000
20 FixPP_CohInt = 1
21 Tau_0 = 250
22 AcqH0_0 = 70
23 H0 = AcqH0_0
24 AcqDH_0 = 1.25
25 DH0 = AcqDH_0
26 Bauds = 32
27 FixRCP_TXA = 40
28
29 fAngle = 2.0*math.pi*(1/16)
30 DC_level = 500
31 stdev = 8
32 Num_codes = 2
33 code0 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,1,1,1,0,1,1,0,1,0,0,0,1,1,1,0,1])
34 code1 = numpy.array([1,1,1,0,1,1,0,1,1,1,1,0,0,0,1,0,0,0,0,1,0,0,1,0,1,1,1,0,0,0,1,0])
35 Dyn_sncode = numpy.array([Num_codes,Bauds])
36 samples = 200
37 channels = 1
38
39
40 def __init__(self):
41 """
42 Inicializador de la clases SimulatorReader para
43 generar datos de voltage simulados.
44 Input:
45 dataOut: Objeto de la clase Voltage.
46 Este Objeto sera utilizado apra almacenar
47 un perfil de datos cada vez qe se haga psiversho
48 un requerimiento (getData)
49 """
50 ProcessingUnit.__init__(self)
51 print(" [ START ] init - Metodo Simulator Reader")
52 self.isConfig = False
53 self.basicHeaderObj = BasicHeader(LOCALTIME)
54 self.systemHeaderObj = SystemHeader()
55 self.radarControlHeaderObj = RadarControllerHeader()
56 self.processingHeaderObj = ProcessingHeader()
57 self.profileIndex = 2**32-1
58 self.dataOut = Voltage()
59 #self.server = "simulate"
60 print(" [ END ] init - Metodo simulator Reader" )
61
62
63 def __hasNotDataInBuffer(self):
64 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock* self.nTxs:
65 return 1
66 return 0
67
68 def __setNewBlock(self):
69 if self.flagIsNewFile:
70 #self.lastUTTime = self.basicHeaderObj.utc
71 return 1
72
73 def readNextBlock(self):
74 while True:
75 self.__setNewBlock()
76 print (" [ START ] readNexBlock")
77 if not(self.readBlock()):
78 return 0
79 self.getBasicHeader()
80 break
81 if self.verbose:
82 print("[Reading] Block No. %d/%d -> %s" %(self.nReadBlock,
83 self.processingHeaderObj.dataBlocksPerfile,
84 self.dataOut.datatime.ctime()) )
85 return 1
86
87 def getFirstHeader(self):
88 self.getBasicHeader()
89 self.dataOut.processingHeaderObj= self.processingHeaderObj.copy()
90
91 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
92
93 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
94
95 def getBasicHeader(self):
96 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
97 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
98
99 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
100
101 self.dataOut.timeZone = self.basicHeaderObj.timeZone
102
103 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
104
105 self.dataOut.errorCount = self.basicHeaderObj.errorCount
106
107 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
108
109 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
110
111 def reshapeData(self):
112 if self.nTxs==1:
113 return
114
115 def readBlock(self):
116 self.init_acquisition()
117 pulses, num_codes, pulse_size = self.init_pulse()
118 self.jro_GenerateBlockOfData()
119 self.profileIndex = 0
120
121 self.flagIsNewFile = 0
122 self.flagIsNewBlock = 1
123
124 self.nTotalBlocks += 1
125 self.nReadBlocks += 1
126
127 return 1
128
129
130 def getData(self): ### metodo propio de VoltageReader
131
132 if self.flagNoMoreFiles:
133 self.dataOut.flagNodata= True
134 self.flagDiscontinuousBlock=0
135 self.flagIsNewBlock = 0
136 if self.__hasNotDataInBuffer(): # aqui es verdad
137 if not(self.readNextBlock()): # return 1 y por eso el if not salta a getBasic Header
138 return 0
139 self.getFirstHeader() # atributo
140 self.reshapeData() # nTxx1 =1 return , n
141
142 if not self.getByBlock:
143 self.dataOut.flagDataAsBlock = False
144 self.dataOut.data = self.datablock[:, self.profileIndex, :]
145 self.dataOut.profileIndex = self.profileIndex
146
147 self.profileIndex += 1
148 else:
149 pass
150 self.getBasicHeader()
151 self.dataOut.realtime = self.searchFilesOnline
152
153 return self.dataOut.data
154
155
156 def set_kwargs(self, **kwargs):
157
158 for key, value in kwargs.items():
159 print(" set_kwargs ",key,value)
160 setattr(self, key, value)
161
162
163 def setup(self,incIntFactor= 1, nFFTPoints = 0, FixPP_IncInt=1,FixRCP_IPP=100,
164 FixPP_CohInt= 1,Tau_0= 250,AcqH0_0 = 70 ,AcqDH_0=1.25, Bauds= 32,
165 FixRCP_TXA = 40, fAngle = 2.0*math.pi*(1/16),DC_level= 500, stdev= 8,
166 Num_codes = None , Dyn_snCode = None, samples=200,channels=1,
167 **kwargs):
168 print(" [ START ] - SETUP metodo")
169 self.set_kwargs(**kwargs)
170 self.processingHeaderObj.profilesPerBlock = 100
171 self.incIntFactor = incIntFactor
172 self.nFFTPoints = nFFTPoints
173 self.FixPP_IncInt = FixPP_IncInt
174 self.FixRCP_IPP = FixRCP_IPP
175 self.FixPP_CohInt = FixPP_CohInt
176 self.Tau_0 = Tau_0
177 self.AcqH0_0 = AcqH0_0
178 self.H0 = AcqH0_0
179 self.AcqDH_0 = AcqDH_0
180 self.DH0 = AcqDH_0
181 self.Bauds = Bauds
182 self.FixRCP_TXA = FixRCP_TXA
183
184 self.fAngle = fAngle
185 self.DC_level = DC_level
186 self.stdev = stdev
187 self.Num_codes = Num_codes
188 #self.code0 = code0
189 #self.code1 = code1
190 self.Dyn_snCode = Dyn_snCode
191 self.samples = samples
192 self.channels = channels
193
194 print(" [ END ] - SETUP metodo")
195 return
196
197 def run(self,**kwargs): # metodo propio
198
199 print(" [ START ] Metodo RUN: ", self.server)
200 if not(self.isConfig):
201 self.setup(**kwargs)
202 self.isConfig = True
203 import time
204 time.sleep(3)
205 #if self.server is None:
206 self.getData()
207 #else:
208 # self.getFromServer()
209 ##################################################################
210 ###### Aqui ingresamos las clases y metodos propios del simulador
211 ##################################################################
212
213 #############################################
214 ############## INIT_ACQUISITION##############
215 #############################################
216 def init_acquisition(self):
217
218 if self.nFFTPoints != 0:
219 self.incIntfactor = m_nProfilesperBlock/self.nFFTPoints
220 if (self.FixPP_IncInt > self.incIntfactor):
221 self.incIntfactor = self.FixPP_IncInt/ self.incIntfactor
222 elif(self.FixPP_IncInt< self.incIntfactor):
223 print("False alert...")
224
225 ProfilesperBLock = self.processingHeaderObj.profilesPerBlock
226
227 self.timeperblock =int(((self.FixRCP_m_fIPP
228 *m_nProfilesperBlock
229 *self.FixPP_CohInt
230 *self.incIntfactor)
231 /150.0)
232 *0.9
233 +0.5)
234 # para cada canal
235 prof_gen = m_nProfilesperBlock*FixPP_m_n_CoherentIntegrations
236 prof_gen = m_nProfilesperBlock
237
238
239 m_nReference = int((Dyn_sfTau_0-Dyn_sfAcqH0_0)/(Dyn_sfAcqDH_0)+0.5)
240 print(m_nReference)
241 BaudWidth = int((FixRCP_m_fTXA/Dyn_sfAcqDH_0)/m_nBauds + 0.5 )
242 print(BaudWidth)
243 if (BaudWidth==0):
244 BaudWidth=1
245
246
247 #################################################################
248 ##################### init_pulse
249 ################################################################
250
251 def init_pulse(m_nNum_Codes,m_nBauds,BaudWidth,Dyn_snCode):
252 fAngle = 2.0*math.pi*(1/16)
253 DC_level = 500
254 stdev = 8
255 m_nNum_Codes= m_nNum_Codes
256 m_nBauds = m_nBauds
257 BaudWidth = BaudWidth
258 Dyn_snCode = Dyn_snCode
259
260 if m_nBauds:
261 pulses = list(range(0,m_nNum_Codes))
262 num_codes = m_nNum_Codes
263 for i in range(num_codes):
264 pulse_size = m_nBauds*BaudWidth
265 pulses[i] = numpy.zeros(pulse_size)
266 for j in range(m_nBauds):
267 for k in range(BaudWidth):
268 pulses[i][j*BaudWidth+k] = int(Dyn_snCode[i][j]*600)
269 else:
270 pulses = list(range(1))
271 pulse_size = int(FixRCP_m_fTXB/0.15+0.5)
272 pulses[0] = numpy.ones(pulse_size)
273 pulses = 600*pulses[0]
274 return pulses,num_codes,pulse_size
275
276 #################################################################
277 ##################### Generate block data
278 ################################################################
279 # m_nChannels
280 # prof_gen
281 # fAngle = 2.0*math.pi*(1/16)
282 # DC_level = 500
283 # stdev
284 # num_codes
285 #fAngle = 2.0*math.pi*(1/16)
286 #num_codes = 8
287
288
289 def jro_GenerateBlockOfData(m_nSamples,DC_level,stdev,m_nReference,pulses,num_codes,pulse_size,prof_gen,H0,DH0):
290 m_nSamples = m_nSamples
291 DC_level = DC_level
292 stdev = stdev
293 m_nR = m_nReference
294 pulses = pulses
295 num_codes = num_codes
296 ps = pulse_size
297 prof_gen = prof_gen
298 H0 = H0
299 DH0 = DH0
300 fAngle = 2.0*math.pi*(1/16)
301
302 # NOISE
303 Seed_r=random.seed(2)
304 Noise_r = numpy.random.normal(DC_level,stdev,m_nSamples)
305 Seed_i=random.seed(3)
306 Noise_i = numpy.random.normal(DC_level,stdev,m_nSamples)
307 Noise = numpy.zeros(m_nSamples,dtype=complex)
308 Noise.real = Noise_r
309 Noise.imag = Noise_i
310 Pulso = numpy.zeros(pulse_size,dtype=complex)
311
312 #DOPPLER
313 x = m_nSamples
314 time_space = (DH0*numpy.linspace(0, x-1,num=x) +H0)
315 time_vec = time_space*(1.0e-3/150.0)
316 fd = 10
317 d_signal = numpy.array(numpy.exp(1.0j*2.0*math.pi*fd*time_vec),dtype=numpy.complex64)
318
319
320
321 for i in range(m_nChannels):
322 for k in range(prof_gen):
323 Pulso.real = pulses[k%num_codes]
324 Pulso.imag = pulses[k%num_codes]
325 InBuffer = numpy.zeros(m_nSamples,dtype=complex)
326 InBuffer[m_nR:m_nR+ps] = Pulso
327 InBuffer = Noise+ InBuffer
328 InBuffer.real[m_nR:m_nR+ps] = InBuffer.real[m_nR:m_nR+ps]*(math.cos( fAngle)*5)
329 InBuffer.imag[m_nR:m_nR+ps] = InBuffer.imag[m_nR:m_nR+ps]*(math.sin( fAngle)*5)
330 InBuffer=InBuffer
331 #print(InBuffer[:10])
332 #print(InBuffer.shape)
333 plot_cts(InBuffer,H0=H0,DH0=DH0)
334 #wave_fft(x=InBuffer,plot_show=True)
335 #time.sleep(1)
336
@@ -1,1290 +1,1300
1 1 '''
2 2 Updated on January , 2018, for multiprocessing purposes
3 3 Author: Sergio Cortez
4 4 Created on September , 2012
5 5 '''
6 6 from platform import python_version
7 7 import sys
8 8 import ast
9 9 import datetime
10 10 import traceback
11 11 import math
12 12 import time
13 13 import zmq
14 14 from multiprocessing import Process, Queue, Event, Value, cpu_count
15 15 from threading import Thread
16 16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 17 from xml.dom import minidom
18 18
19 19
20 20 from schainpy.admin import Alarm, SchainWarning
21 21 from schainpy.model import *
22 22 from schainpy.utils import log
23 23
24 24
25 25 DTYPES = {
26 26 'Voltage': '.r',
27 27 'Spectra': '.pdata'
28 28 }
29 29
30 30
31 31 def MPProject(project, n=cpu_count()):
32 32 '''
33 33 Project wrapper to run schain in n processes
34 34 '''
35 35
36 36 rconf = project.getReadUnitObj()
37 37 op = rconf.getOperationObj('run')
38 38 dt1 = op.getParameterValue('startDate')
39 39 dt2 = op.getParameterValue('endDate')
40 40 tm1 = op.getParameterValue('startTime')
41 41 tm2 = op.getParameterValue('endTime')
42 42 days = (dt2 - dt1).days
43 43
44 44 for day in range(days + 1):
45 45 skip = 0
46 46 cursor = 0
47 47 processes = []
48 48 dt = dt1 + datetime.timedelta(day)
49 49 dt_str = dt.strftime('%Y/%m/%d')
50 50 reader = JRODataReader()
51 51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 52 startDate=dt,
53 53 endDate=dt,
54 54 startTime=tm1,
55 55 endTime=tm2,
56 56 ext=DTYPES[rconf.datatype])
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 skip = int(math.ceil(nFiles / n))
60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
64 64 p = project.clone()
65 65 p.start()
66 66 processes.append(p)
67 67 cursor += 1
68 68
69 69 def beforeExit(exctype, value, trace):
70 70 for process in processes:
71 71 process.terminate()
72 72 process.join()
73 73 print(traceback.print_tb(trace))
74 74
75 75 sys.excepthook = beforeExit
76 76
77 77 for process in processes:
78 78 process.join()
79 79 process.terminate()
80 80
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84
84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
92 92
93 93 class ParameterConf():
94 94
95 95 id = None
96 96 name = None
97 97 value = None
98 98 format = None
99 99
100 100 __formated_value = None
101 101
102 102 ELEMENTNAME = 'Parameter'
103 103
104 104 def __init__(self):
105 105
106 106 self.format = 'str'
107 107
108 108 def getElementName(self):
109 109
110 110 return self.ELEMENTNAME
111 111
112 112 def getValue(self):
113 113
114 114 value = self.value
115 115 format = self.format
116 116
117 117 if self.__formated_value != None:
118 118
119 119 return self.__formated_value
120 120
121 121 if format == 'obj':
122 122 return value
123 123
124 124 if format == 'str':
125 125 self.__formated_value = str(value)
126 126 return self.__formated_value
127 127
128 128 if value == '':
129 129 raise ValueError('%s: This parameter value is empty' % self.name)
130 130
131 131 if format == 'list':
132 132 strList = [s.strip() for s in value.split(',')]
133 133 self.__formated_value = strList
134 134
135 135 return self.__formated_value
136 136
137 137 if format == 'intlist':
138 138 '''
139 139 Example:
140 140 value = (0,1,2)
141 141 '''
142 142
143 143 new_value = ast.literal_eval(value)
144 144
145 145 if type(new_value) not in (tuple, list):
146 146 new_value = [int(new_value)]
147 147
148 148 self.__formated_value = new_value
149 149
150 150 return self.__formated_value
151 151
152 152 if format == 'floatlist':
153 153 '''
154 154 Example:
155 155 value = (0.5, 1.4, 2.7)
156 156 '''
157 157
158 158 new_value = ast.literal_eval(value)
159 159
160 160 if type(new_value) not in (tuple, list):
161 161 new_value = [float(new_value)]
162 162
163 163 self.__formated_value = new_value
164 164
165 165 return self.__formated_value
166 166
167 167 if format == 'date':
168 168 strList = value.split('/')
169 169 intList = [int(x) for x in strList]
170 170 date = datetime.date(intList[0], intList[1], intList[2])
171 171
172 172 self.__formated_value = date
173 173
174 174 return self.__formated_value
175 175
176 176 if format == 'time':
177 177 strList = value.split(':')
178 178 intList = [int(x) for x in strList]
179 179 time = datetime.time(intList[0], intList[1], intList[2])
180 180
181 181 self.__formated_value = time
182 182
183 183 return self.__formated_value
184 184
185 185 if format == 'pairslist':
186 186 '''
187 187 Example:
188 188 value = (0,1),(1,2)
189 189 '''
190 190
191 191 new_value = ast.literal_eval(value)
192 192
193 193 if type(new_value) not in (tuple, list):
194 194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195 195
196 196 if type(new_value[0]) not in (tuple, list):
197 197 if len(new_value) != 2:
198 198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 199 new_value = [new_value]
200 200
201 201 for thisPair in new_value:
202 202 if len(thisPair) != 2:
203 203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204 204
205 205 self.__formated_value = new_value
206 206
207 207 return self.__formated_value
208 208
209 209 if format == 'multilist':
210 210 '''
211 211 Example:
212 212 value = (0,1,2),(3,4,5)
213 213 '''
214 214 multiList = ast.literal_eval(value)
215 215
216 216 if type(multiList[0]) == int:
217 217 multiList = ast.literal_eval('(' + value + ')')
218 218
219 219 self.__formated_value = multiList
220 220
221 221 return self.__formated_value
222 222
223 223 if format == 'bool':
224 224 value = int(value)
225 225
226 226 if format == 'int':
227 227 value = float(value)
228 228
229 229 format_func = eval(format)
230 230
231 231 self.__formated_value = format_func(value)
232 232
233 233 return self.__formated_value
234 234
235 235 def updateId(self, new_id):
236 236
237 237 self.id = str(new_id)
238 238
239 239 def setup(self, id, name, value, format='str'):
240 240 self.id = str(id)
241 241 self.name = name
242 242 if format == 'obj':
243 243 self.value = value
244 244 else:
245 245 self.value = str(value)
246 246 self.format = str.lower(format)
247 247
248 248 self.getValue()
249 249
250 250 return 1
251 251
252 252 def update(self, name, value, format='str'):
253 253
254 254 self.name = name
255 255 self.value = str(value)
256 256 self.format = format
257 257
258 258 def makeXml(self, opElement):
259 259 if self.name not in ('queue',):
260 260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 261 parmElement.set('id', str(self.id))
262 262 parmElement.set('name', self.name)
263 263 parmElement.set('value', self.value)
264 264 parmElement.set('format', self.format)
265
265
266 266 def readXml(self, parmElement):
267 267
268 268 self.id = parmElement.get('id')
269 269 self.name = parmElement.get('name')
270 270 self.value = parmElement.get('value')
271 271 self.format = str.lower(parmElement.get('format'))
272 272
273 273 # Compatible with old signal chain version
274 274 if self.format == 'int' and self.name == 'idfigure':
275 275 self.name = 'id'
276 276
277 277 def printattr(self):
278 278
279 279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280 280
281 281 class OperationConf():
282 282
283 283 ELEMENTNAME = 'Operation'
284 284
285 285 def __init__(self):
286 286
287 287 self.id = '0'
288 288 self.name = None
289 289 self.priority = None
290 290 self.topic = None
291 291
292 292 def __getNewId(self):
293 293
294 294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295 295
296 296 def getId(self):
297 297 return self.id
298 298
299 299 def updateId(self, new_id):
300 300
301 301 self.id = str(new_id)
302 302
303 303 n = 1
304 304 for parmObj in self.parmConfObjList:
305 305
306 306 idParm = str(int(new_id) * 10 + n)
307 307 parmObj.updateId(idParm)
308 308
309 309 n += 1
310 310
311 311 def getElementName(self):
312 312
313 313 return self.ELEMENTNAME
314 314
315 315 def getParameterObjList(self):
316 316
317 317 return self.parmConfObjList
318 318
319 319 def getParameterObj(self, parameterName):
320 320
321 321 for parmConfObj in self.parmConfObjList:
322 322
323 323 if parmConfObj.name != parameterName:
324 324 continue
325 325
326 326 return parmConfObj
327 327
328 328 return None
329 329
330 330 def getParameterObjfromValue(self, parameterValue):
331 331
332 332 for parmConfObj in self.parmConfObjList:
333 333
334 334 if parmConfObj.getValue() != parameterValue:
335 335 continue
336 336
337 337 return parmConfObj.getValue()
338 338
339 339 return None
340 340
341 341 def getParameterValue(self, parameterName):
342 342
343 343 parameterObj = self.getParameterObj(parameterName)
344 344
345 345 # if not parameterObj:
346 346 # return None
347 347
348 348 value = parameterObj.getValue()
349 349
350 350 return value
351 351
352 352 def getKwargs(self):
353 353
354 354 kwargs = {}
355 355
356 356 for parmConfObj in self.parmConfObjList:
357 357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 358 continue
359 359
360 360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361 361
362 362 return kwargs
363 363
364 364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365 365
366 366 self.id = str(id)
367 367 self.project_id = project_id
368 368 self.name = name
369 369 self.type = type
370 370 self.priority = priority
371 371 self.err_queue = err_queue
372 372 self.lock = lock
373 373 self.parmConfObjList = []
374 374
375 375 def removeParameters(self):
376 376
377 377 for obj in self.parmConfObjList:
378 378 del obj
379 379
380 380 self.parmConfObjList = []
381 381
382 382 def addParameter(self, name, value, format='str'):
383 383
384 384 if value is None:
385 385 return None
386 386 id = self.__getNewId()
387 387
388 388 parmConfObj = ParameterConf()
389 389 if not parmConfObj.setup(id, name, value, format):
390 390 return None
391 391
392 392 self.parmConfObjList.append(parmConfObj)
393 393
394 394 return parmConfObj
395 395
396 396 def changeParameter(self, name, value, format='str'):
397 397
398 398 parmConfObj = self.getParameterObj(name)
399 399 parmConfObj.update(name, value, format)
400 400
401 401 return parmConfObj
402 402
403 403 def makeXml(self, procUnitElement):
404 404
405 405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
406 406 opElement.set('id', str(self.id))
407 407 opElement.set('name', self.name)
408 408 opElement.set('type', self.type)
409 409 opElement.set('priority', str(self.priority))
410 410
411 411 for parmConfObj in self.parmConfObjList:
412 412 parmConfObj.makeXml(opElement)
413 413
414 414 def readXml(self, opElement, project_id):
415 415
416 416 self.id = opElement.get('id')
417 417 self.name = opElement.get('name')
418 418 self.type = opElement.get('type')
419 419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
420 self.project_id = str(project_id)
421 421
422 422 # Compatible with old signal chain version
423 423 # Use of 'run' method instead 'init'
424 424 if self.type == 'self' and self.name == 'init':
425 425 self.name = 'run'
426 426
427 427 self.parmConfObjList = []
428 428
429 429 parmElementList = opElement.iter(ParameterConf().getElementName())
430 430
431 431 for parmElement in parmElementList:
432 432 parmConfObj = ParameterConf()
433 433 parmConfObj.readXml(parmElement)
434 434
435 435 # Compatible with old signal chain version
436 436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
437 437 if self.type != 'self' and self.name == 'Plot':
438 438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
439 439 self.name = parmConfObj.value
440 440 continue
441 441
442 442 self.parmConfObjList.append(parmConfObj)
443 443
444 444 def printattr(self):
445 445
446 446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
447 447 self.id,
448 448 self.name,
449 449 self.type,
450 450 self.priority,
451 451 self.project_id))
452 452
453 453 for parmConfObj in self.parmConfObjList:
454 454 parmConfObj.printattr()
455 455
456 456 def createObject(self):
457 457
458 458 className = eval(self.name)
459 459
460 460 if self.type == 'other':
461 461 opObj = className()
462 462 elif self.type == 'external':
463 463 kwargs = self.getKwargs()
464 464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
465 465 opObj.start()
466 466 self.opObj = opObj
467 467
468 468 return opObj
469 469
470 470 class ProcUnitConf():
471 471
472 472 ELEMENTNAME = 'ProcUnit'
473 473
474 474 def __init__(self):
475 475
476 476 self.id = None
477 477 self.datatype = None
478 478 self.name = None
479 self.inputId = None
479 self.inputId = None
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
483 483
484 484 def __getPriority(self):
485 485
486 486 return len(self.opConfObjList) + 1
487 487
488 488 def __getNewId(self):
489 489
490 490 return int(self.id) * 10 + len(self.opConfObjList) + 1
491 491
492 492 def getElementName(self):
493 493
494 494 return self.ELEMENTNAME
495 495
496 496 def getId(self):
497 497
498 498 return self.id
499 499
500 def updateId(self, new_id):
500 def updateId(self, new_id):
501 501 '''
502 502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504 504
505 505 # If this proc unit has not inputs
506 506 #if self.inputId == '0':
507 507 #new_inputId = 0
508 508
509 509 n = 1
510 510 for opConfObj in self.opConfObjList:
511 511
512 512 idOp = str(int(new_id) * 10 + n)
513 513 opConfObj.updateId(idOp)
514 514
515 515 n += 1
516 516
517 517 self.parentId = str(parentId)
518 518 self.id = str(new_id)
519 519 #self.inputId = str(new_inputId)
520 520 '''
521 521 n = 1
522 522
523 523 def getInputId(self):
524 524
525 525 return self.inputId
526 526
527 527 def getOperationObjList(self):
528 528
529 529 return self.opConfObjList
530 530
531 531 def getOperationObj(self, name=None):
532 532
533 533 for opConfObj in self.opConfObjList:
534 534
535 535 if opConfObj.name != name:
536 536 continue
537 537
538 538 return opConfObj
539 539
540 540 return None
541 541
542 542 def getOpObjfromParamValue(self, value=None):
543 543
544 544 for opConfObj in self.opConfObjList:
545 545 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 546 continue
547 547 return opConfObj
548 548 return None
549 549
550 550 def getProcUnitObj(self):
551 551
552 552 return self.procUnitObj
553 553
554 554 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
555 555 '''
556 556 id sera el topico a publicar
557 557 inputId sera el topico a subscribirse
558 558 '''
559
559
560 560 # Compatible with old signal chain version
561 561 if datatype == None and name == None:
562 562 raise ValueError('datatype or name should be defined')
563 563
564 564 #Definir una condicion para inputId cuando sea 0
565 565
566 566 if name == None:
567 567 if 'Proc' in datatype:
568 568 name = datatype
569 569 else:
570 570 name = '%sProc' % (datatype)
571 571
572 572 if datatype == None:
573 573 datatype = name.replace('Proc', '')
574 574
575 575 self.id = str(id)
576 576 self.project_id = project_id
577 577 self.name = name
578 578 self.datatype = datatype
579 579 self.inputId = inputId
580 580 self.err_queue = err_queue
581 581 self.lock = lock
582 582 self.opConfObjList = []
583 583
584 self.addOperation(name='run', optype='self')
584 self.addOperation(name='run', optype='self')
585 585
586 586 def removeOperations(self):
587 587
588 588 for obj in self.opConfObjList:
589 589 del obj
590 590
591 591 self.opConfObjList = []
592 592 self.addOperation(name='run')
593 593
594 594 def addParameter(self, **kwargs):
595 595 '''
596 596 Add parameters to 'run' operation
597 597 '''
598 598 opObj = self.opConfObjList[0]
599 599
600 600 opObj.addParameter(**kwargs)
601 601
602 602 return opObj
603 603
604 604 def addOperation(self, name, optype='self'):
605 605 '''
606 606 Actualizacion - > proceso comunicacion
607 607 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 608 definir el tipoc de socket o comunicacion ipc++
609 609
610 610 '''
611 611
612 612 id = self.__getNewId()
613 613 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
614 614 opConfObj = OperationConf()
615 615 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.lock)
616 616 self.opConfObjList.append(opConfObj)
617 617
618 618 return opConfObj
619 619
620 620 def makeXml(self, projectElement):
621 621
622 622 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
623 623 procUnitElement.set('id', str(self.id))
624 624 procUnitElement.set('name', self.name)
625 625 procUnitElement.set('datatype', self.datatype)
626 626 procUnitElement.set('inputId', str(self.inputId))
627 627
628 628 for opConfObj in self.opConfObjList:
629 629 opConfObj.makeXml(procUnitElement)
630 630
631 631 def readXml(self, upElement, project_id):
632 632
633 633 self.id = upElement.get('id')
634 634 self.name = upElement.get('name')
635 635 self.datatype = upElement.get('datatype')
636 636 self.inputId = upElement.get('inputId')
637 637 self.project_id = str(project_id)
638 638
639 639 if self.ELEMENTNAME == 'ReadUnit':
640 640 self.datatype = self.datatype.replace('Reader', '')
641 641
642 642 if self.ELEMENTNAME == 'ProcUnit':
643 643 self.datatype = self.datatype.replace('Proc', '')
644 644
645 645 if self.inputId == 'None':
646 646 self.inputId = '0'
647 647
648 648 self.opConfObjList = []
649 649
650 650 opElementList = upElement.iter(OperationConf().getElementName())
651 651
652 652 for opElement in opElementList:
653 653 opConfObj = OperationConf()
654 654 opConfObj.readXml(opElement, project_id)
655 655 self.opConfObjList.append(opConfObj)
656 656
657 657 def printattr(self):
658 658
659 659 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 660 self.id,
661 661 self.name,
662 662 self.datatype,
663 663 self.inputId,
664 664 self.project_id))
665 665
666 666 for opConfObj in self.opConfObjList:
667 667 opConfObj.printattr()
668 668
669 669 def getKwargs(self):
670 670
671 671 opObj = self.opConfObjList[0]
672 672 kwargs = opObj.getKwargs()
673 673
674 674 return kwargs
675 675
676 676 def createObjects(self):
677 677 '''
678 678 Instancia de unidades de procesamiento.
679 679 '''
680
680 #print(" [ CREATE OBJ ] :",self.name)
681 681 className = eval(self.name)
682
682 683 kwargs = self.getKwargs()
684 #print (" [ kwargs ] : ", kwargs)
683 685 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
684 686 log.success('creating process...', self.name)
685 687
686 688 for opConfObj in self.opConfObjList:
687
689
688 690 if opConfObj.type == 'self' and opConfObj.name == 'run':
689 691 continue
690 692 elif opConfObj.type == 'self':
691 693 opObj = getattr(procUnitObj, opConfObj.name)
692 694 else:
693 695 opObj = opConfObj.createObject()
694
696
695 697 log.success('adding operation: {}, type:{}'.format(
696 698 opConfObj.name,
697 699 opConfObj.type), self.name)
698
700
699 701 procUnitObj.addOperation(opConfObj, opObj)
700
702
701 703 procUnitObj.start()
702 704 self.procUnitObj = procUnitObj
703
705
704 706 def close(self):
705 707
706 708 for opConfObj in self.opConfObjList:
707 709 if opConfObj.type == 'self':
708 710 continue
709 711
710 712 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 713 opObj.close()
712 714
713 715 self.procUnitObj.close()
714 716
715 717 return
716 718
717 719
718 720 class ReadUnitConf(ProcUnitConf):
719 721
720 722 ELEMENTNAME = 'ReadUnit'
721 723
722 724 def __init__(self):
723 725
724 726 self.id = None
725 727 self.datatype = None
726 728 self.name = None
727 729 self.inputId = None
728 730 self.opConfObjList = []
729 731 self.lock = Event()
730 732 self.lock.set()
731 733 self.lock.n = Value('d', 0)
732 734
733 735 def getElementName(self):
734 736
735 return self.ELEMENTNAME
736
737 return self.ELEMENTNAME
738
737 739 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 740 startTime='', endTime='', server=None, **kwargs):
739 741
740 742
741 743 '''
742 744 *****el id del proceso sera el Topico
743 745
744 746 Adicion de {topic}, si no esta presente -> error
745 747 kwargs deben ser trasmitidos en la instanciacion
746 748
747 749 '''
748
750
749 751 # Compatible with old signal chain version
752 #print (" [INSIDE] : setup ReadUnit", kwargs)
750 753 if datatype == None and name == None:
751 754 raise ValueError('datatype or name should be defined')
752 755 if name == None:
753 756 if 'Reader' in datatype:
754 757 name = datatype
755 758 datatype = name.replace('Reader','')
756 759 else:
757 760 name = '{}Reader'.format(datatype)
758 761 if datatype == None:
759 762 if 'Reader' in name:
760 763 datatype = name.replace('Reader','')
761 764 else:
762 765 datatype = name
763 766 name = '{}Reader'.format(name)
764 767
765 768 self.id = id
766 769 self.project_id = project_id
767 770 self.name = name
768 771 self.datatype = datatype
769 772 if path != '':
770 773 self.path = os.path.abspath(path)
771 774 self.startDate = startDate
772 775 self.endDate = endDate
773 776 self.startTime = startTime
774 777 self.endTime = endTime
775 778 self.server = server
776 self.err_queue = err_queue
779 self.err_queue = err_queue
777 780 self.addRunOperation(**kwargs)
778 781
779 782 def update(self, **kwargs):
780 783
781 784 if 'datatype' in kwargs:
782 785 datatype = kwargs.pop('datatype')
783 786 if 'Reader' in datatype:
784 787 self.name = datatype
785 788 else:
786 789 self.name = '%sReader' % (datatype)
787 790 self.datatype = self.name.replace('Reader', '')
788 791
789 792 attrs = ('path', 'startDate', 'endDate',
790 793 'startTime', 'endTime')
791 794
792 795 for attr in attrs:
793 796 if attr in kwargs:
794 797 setattr(self, attr, kwargs.pop(attr))
795 798
796 799 self.updateRunOperation(**kwargs)
797 800
798 801 def removeOperations(self):
799 802
800 803 for obj in self.opConfObjList:
801 804 del obj
802 805
803 806 self.opConfObjList = []
804 807
805 808 def addRunOperation(self, **kwargs):
806 809
807 opObj = self.addOperation(name='run', optype='self')
810 opObj = self.addOperation(name='run', optype='self')
808 811
809 812 if self.server is None:
810 813 opObj.addParameter(
811 814 name='datatype', value=self.datatype, format='str')
812 815 opObj.addParameter(name='path', value=self.path, format='str')
813 816 opObj.addParameter(
814 817 name='startDate', value=self.startDate, format='date')
815 818 opObj.addParameter(
816 819 name='endDate', value=self.endDate, format='date')
817 820 opObj.addParameter(
818 821 name='startTime', value=self.startTime, format='time')
819 822 opObj.addParameter(
820 823 name='endTime', value=self.endTime, format='time')
821 824
822 825 for key, value in list(kwargs.items()):
823 826 opObj.addParameter(name=key, value=value,
824 827 format=type(value).__name__)
828 elif self.server== "simulate":
829 #print(" [ INSIDE ] : AROperation simulate -True simulate")
830 opObj.addParameter(
831 name='datatype', value=self.datatype, format='str')
832 for key, value in list(kwargs.items()):
833 opObj.addParameter(name=key, value=value,
834 format=type(value).__name__)
825 835 else:
826 836 opObj.addParameter(name='server', value=self.server, format='str')
827 837
828 838 return opObj
829 839
830 840 def updateRunOperation(self, **kwargs):
831 841
832 842 opObj = self.getOperationObj(name='run')
833 843 opObj.removeParameters()
834 844
835 845 opObj.addParameter(name='datatype', value=self.datatype, format='str')
836 846 opObj.addParameter(name='path', value=self.path, format='str')
837 847 opObj.addParameter(
838 848 name='startDate', value=self.startDate, format='date')
839 849 opObj.addParameter(name='endDate', value=self.endDate, format='date')
840 850 opObj.addParameter(
841 851 name='startTime', value=self.startTime, format='time')
842 852 opObj.addParameter(name='endTime', value=self.endTime, format='time')
843 853
844 854 for key, value in list(kwargs.items()):
845 855 opObj.addParameter(name=key, value=value,
846 856 format=type(value).__name__)
847 857
848 858 return opObj
849 859
850 860 def readXml(self, upElement, project_id):
851 861
852 862 self.id = upElement.get('id')
853 863 self.name = upElement.get('name')
854 864 self.datatype = upElement.get('datatype')
855 865 self.project_id = str(project_id) #yong
856 866
857 867 if self.ELEMENTNAME == 'ReadUnit':
858 868 self.datatype = self.datatype.replace('Reader', '')
859 869
860 870 self.opConfObjList = []
861 871
862 872 opElementList = upElement.iter(OperationConf().getElementName())
863 873
864 874 for opElement in opElementList:
865 875 opConfObj = OperationConf()
866 876 opConfObj.readXml(opElement, project_id)
867 877 self.opConfObjList.append(opConfObj)
868 878
869 879 if opConfObj.name == 'run':
870 880 self.path = opConfObj.getParameterValue('path')
871 881 self.startDate = opConfObj.getParameterValue('startDate')
872 882 self.endDate = opConfObj.getParameterValue('endDate')
873 883 self.startTime = opConfObj.getParameterValue('startTime')
874 884 self.endTime = opConfObj.getParameterValue('endTime')
875 885
876 886
877 887 class Project(Process):
878 888
879 889 ELEMENTNAME = 'Project'
880 890
881 891 def __init__(self):
882 892
883 893 Process.__init__(self)
884 894 self.id = None
885 895 self.filename = None
886 896 self.description = None
887 897 self.email = None
888 898 self.alarm = None
889 899 self.procUnitConfObjDict = {}
890 900 self.err_queue = Queue()
891 901
892 902 def __getNewId(self):
893 903
894 904 idList = list(self.procUnitConfObjDict.keys())
895 905 id = int(self.id) * 10
896 906
897 907 while True:
898 908 id += 1
899 909
900 910 if str(id) in idList:
901 911 continue
902 912
903 913 break
904 914
905 915 return str(id)
906 916
907 917 def getElementName(self):
908 918
909 919 return self.ELEMENTNAME
910 920
911 921 def getId(self):
912 922
913 923 return self.id
914 924
915 925 def updateId(self, new_id):
916 926
917 927 self.id = str(new_id)
918 928
919 929 keyList = list(self.procUnitConfObjDict.keys())
920 930 keyList.sort()
921 931
922 932 n = 1
923 933 newProcUnitConfObjDict = {}
924 934
925 935 for procKey in keyList:
926 936
927 937 procUnitConfObj = self.procUnitConfObjDict[procKey]
928 938 idProcUnit = str(int(self.id) * 10 + n)
929 939 procUnitConfObj.updateId(idProcUnit)
930 940 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
931 941 n += 1
932 942
933 943 self.procUnitConfObjDict = newProcUnitConfObjDict
934 944
935 945 def setup(self, id=1, name='', description='', email=None, alarm=[]):
936 946
937 947 print(' ')
938 948 print('*' * 60)
939 949 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
940 950 print('*' * 60)
941 951 print("* Python " + python_version() + " *")
942 952 print('*' * 19)
943 953 print(' ')
944 954 self.id = str(id)
945 self.description = description
955 self.description = description
946 956 self.email = email
947 957 self.alarm = alarm
948 958 if name:
949 959 self.name = '{} ({})'.format(Process.__name__, name)
950 960
951 961 def update(self, **kwargs):
952 962
953 963 for key, value in list(kwargs.items()):
954 964 setattr(self, key, value)
955 965
956 966 def clone(self):
957 967
958 968 p = Project()
959 969 p.procUnitConfObjDict = self.procUnitConfObjDict
960 970 return p
961 971
962 972 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
963 973
964 974 '''
965 975 Actualizacion:
966 976 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
967 977
968 978 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
969 979
970 980 '''
971 981
972 982 if id is None:
973 983 idReadUnit = self.__getNewId()
974 984 else:
975 985 idReadUnit = str(id)
976 986
977 987 readUnitConfObj = ReadUnitConf()
978 988 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 989 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
980
990
981 991 return readUnitConfObj
982 992
983 993 def addProcUnit(self, inputId='0', datatype=None, name=None):
984 994
985 995 '''
986 996 Actualizacion:
987 997 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
988 998 Deberia reemplazar a "inputId"
989 999
990 1000 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
991 1001 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
992 1002
993 1003 '''
994 1004
995 1005 idProcUnit = self.__getNewId()
996 1006 procUnitConfObj = ProcUnitConf()
997 input_proc = self.procUnitConfObjDict[inputId]
1007 input_proc = self.procUnitConfObjDict[inputId]
998 1008 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 1009 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1010
1001 1011 return procUnitConfObj
1002 1012
1003 1013 def removeProcUnit(self, id):
1004 1014
1005 1015 if id in list(self.procUnitConfObjDict.keys()):
1006 1016 self.procUnitConfObjDict.pop(id)
1007 1017
1008 1018 def getReadUnitId(self):
1009 1019
1010 1020 readUnitConfObj = self.getReadUnitObj()
1011 1021
1012 1022 return readUnitConfObj.id
1013 1023
1014 1024 def getReadUnitObj(self):
1015 1025
1016 1026 for obj in list(self.procUnitConfObjDict.values()):
1017 1027 if obj.getElementName() == 'ReadUnit':
1018 1028 return obj
1019 1029
1020 1030 return None
1021 1031
1022 1032 def getProcUnitObj(self, id=None, name=None):
1023 1033
1024 1034 if id != None:
1025 1035 return self.procUnitConfObjDict[id]
1026 1036
1027 1037 if name != None:
1028 1038 return self.getProcUnitObjByName(name)
1029 1039
1030 1040 return None
1031 1041
1032 1042 def getProcUnitObjByName(self, name):
1033 1043
1034 1044 for obj in list(self.procUnitConfObjDict.values()):
1035 1045 if obj.name == name:
1036 1046 return obj
1037 1047
1038 1048 return None
1039 1049
1040 1050 def procUnitItems(self):
1041 1051
1042 1052 return list(self.procUnitConfObjDict.items())
1043 1053
1044 1054 def makeXml(self):
1045 1055
1046 1056 projectElement = Element('Project')
1047 1057 projectElement.set('id', str(self.id))
1048 1058 projectElement.set('name', self.name)
1049 1059 projectElement.set('description', self.description)
1050 1060
1051 1061 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1052 1062 procUnitConfObj.makeXml(projectElement)
1053 1063
1054 1064 self.projectElement = projectElement
1055 1065
1056 1066 def writeXml(self, filename=None):
1057 1067
1058 1068 if filename == None:
1059 1069 if self.filename:
1060 1070 filename = self.filename
1061 1071 else:
1062 1072 filename = 'schain.xml'
1063 1073
1064 1074 if not filename:
1065 1075 print('filename has not been defined. Use setFilename(filename) for do it.')
1066 1076 return 0
1067 1077
1068 1078 abs_file = os.path.abspath(filename)
1069 1079
1070 1080 if not os.access(os.path.dirname(abs_file), os.W_OK):
1071 1081 print('No write permission on %s' % os.path.dirname(abs_file))
1072 1082 return 0
1073 1083
1074 1084 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1075 1085 print('File %s already exists and it could not be overwriten' % abs_file)
1076 1086 return 0
1077 1087
1078 1088 self.makeXml()
1079 1089
1080 1090 ElementTree(self.projectElement).write(abs_file, method='xml')
1081 1091
1082 1092 self.filename = abs_file
1083 1093
1084 1094 return 1
1085 1095
1086 1096 def readXml(self, filename=None):
1087 1097
1088 1098 if not filename:
1089 1099 print('filename is not defined')
1090 1100 return 0
1091 1101
1092 1102 abs_file = os.path.abspath(filename)
1093 1103
1094 1104 if not os.path.isfile(abs_file):
1095 1105 print('%s file does not exist' % abs_file)
1096 1106 return 0
1097 1107
1098 1108 self.projectElement = None
1099 1109 self.procUnitConfObjDict = {}
1100 1110
1101 1111 try:
1102 1112 self.projectElement = ElementTree().parse(abs_file)
1103 1113 except:
1104 1114 print('Error reading %s, verify file format' % filename)
1105 1115 return 0
1106 1116
1107 1117 self.project = self.projectElement.tag
1108 1118
1109 1119 self.id = self.projectElement.get('id')
1110 1120 self.name = self.projectElement.get('name')
1111 1121 self.description = self.projectElement.get('description')
1112 1122
1113 1123 readUnitElementList = self.projectElement.iter(
1114 1124 ReadUnitConf().getElementName())
1115 1125
1116 1126 for readUnitElement in readUnitElementList:
1117 1127 readUnitConfObj = ReadUnitConf()
1118 1128 readUnitConfObj.readXml(readUnitElement, self.id)
1119 1129 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1120 1130
1121 1131 procUnitElementList = self.projectElement.iter(
1122 1132 ProcUnitConf().getElementName())
1123 1133
1124 1134 for procUnitElement in procUnitElementList:
1125 1135 procUnitConfObj = ProcUnitConf()
1126 1136 procUnitConfObj.readXml(procUnitElement, self.id)
1127 1137 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1128 1138
1129 1139 self.filename = abs_file
1130 1140
1131 1141 return 1
1132 1142
1133 1143 def __str__(self):
1134 1144
1135 1145 print('Project: name = %s, description = %s, id = %s' % (
1136 1146 self.name,
1137 1147 self.description,
1138 1148 self.id))
1139 1149
1140 1150 for procUnitConfObj in self.procUnitConfObjDict.values():
1141 1151 print(procUnitConfObj)
1142 1152
1143 1153 def createObjects(self):
1144 1154
1145 1155
1146 1156 keys = list(self.procUnitConfObjDict.keys())
1147 1157 keys.sort()
1148 1158 for key in keys:
1149 1159 self.procUnitConfObjDict[key].createObjects()
1150 1160
1151 1161 def monitor(self):
1152 1162
1153 1163 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1154 1164 t.start()
1155
1165
1156 1166 def __monitor(self, queue, ctx):
1157 1167
1158 1168 import socket
1159
1169
1160 1170 procs = 0
1161 1171 err_msg = ''
1162
1172
1163 1173 while True:
1164 1174 msg = queue.get()
1165 1175 if '#_start_#' in msg:
1166 1176 procs += 1
1167 1177 elif '#_end_#' in msg:
1168 1178 procs -=1
1169 1179 else:
1170 1180 err_msg = msg
1171
1172 if procs == 0 or 'Traceback' in err_msg:
1181
1182 if procs == 0 or 'Traceback' in err_msg:
1173 1183 break
1174 1184 time.sleep(0.1)
1175
1185
1176 1186 if '|' in err_msg:
1177 1187 name, err = err_msg.split('|')
1178 1188 if 'SchainWarning' in err:
1179 1189 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1180 1190 elif 'SchainError' in err:
1181 1191 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1182 1192 else:
1183 1193 log.error(err, name)
1184 else:
1194 else:
1185 1195 name, err = self.name, err_msg
1186
1196
1187 1197 time.sleep(2)
1188 1198
1189 1199 for conf in self.procUnitConfObjDict.values():
1190 1200 for confop in conf.opConfObjList:
1191 1201 if confop.type == 'external':
1192 1202 confop.opObj.terminate()
1193 1203 conf.procUnitObj.terminate()
1194
1204
1195 1205 ctx.term()
1196 1206
1197 1207 message = ''.join(err)
1198 1208
1199 1209 if err_msg:
1200 1210 subject = 'SChain v%s: Error running %s\n' % (
1201 1211 schainpy.__version__, self.name)
1202 1212
1203 1213 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1204 1214 socket.gethostname())
1205 1215 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1206 1216 subtitle += 'Configuration file: %s\n' % self.filename
1207 1217 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1208 1218
1209 1219 readUnitConfObj = self.getReadUnitObj()
1210 1220 if readUnitConfObj:
1211 1221 subtitle += '\nInput parameters:\n'
1212 1222 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1213 1223 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1214 1224 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1215 1225 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1216 1226 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1217 1227 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218 1228
1219 1229 a = Alarm(
1220 modes=self.alarm,
1230 modes=self.alarm,
1221 1231 email=self.email,
1222 1232 message=message,
1223 1233 subject=subject,
1224 1234 subtitle=subtitle,
1225 1235 filename=self.filename
1226 1236 )
1227 1237
1228 1238 a.start()
1229 1239
1230 1240 def isPaused(self):
1231 1241 return 0
1232 1242
1233 1243 def isStopped(self):
1234 1244 return 0
1235 1245
1236 1246 def runController(self):
1237 1247 '''
1238 1248 returns 0 when this process has been stopped, 1 otherwise
1239 1249 '''
1240 1250
1241 1251 if self.isPaused():
1242 1252 print('Process suspended')
1243 1253
1244 1254 while True:
1245 1255 time.sleep(0.1)
1246 1256
1247 1257 if not self.isPaused():
1248 1258 break
1249 1259
1250 1260 if self.isStopped():
1251 1261 break
1252 1262
1253 1263 print('Process reinitialized')
1254 1264
1255 1265 if self.isStopped():
1256 1266 print('Process stopped')
1257 1267 return 0
1258 1268
1259 1269 return 1
1260 1270
1261 1271 def setFilename(self, filename):
1262 1272
1263 1273 self.filename = filename
1264 1274
1265 1275 def setProxy(self):
1266 1276
1267 1277 if not os.path.exists('/tmp/schain'):
1268 1278 os.mkdir('/tmp/schain')
1269
1279
1270 1280 self.ctx = zmq.Context()
1271 1281 xpub = self.ctx.socket(zmq.XPUB)
1272 1282 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1273 1283 xsub = self.ctx.socket(zmq.XSUB)
1274 1284 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1275 1285 self.monitor()
1276 1286 try:
1277 1287 zmq.proxy(xpub, xsub)
1278 1288 except zmq.ContextTerminated:
1279 1289 xpub.close()
1280 1290 xsub.close()
1281 1291
1282 1292 def run(self):
1283 1293
1284 1294 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1285 self.start_time = time.time()
1286 self.createObjects()
1287 self.setProxy()
1295 self.start_time = time.time()
1296 self.createObjects()
1297 self.setProxy()
1288 1298 log.success('{} Done (Time: {}s)'.format(
1289 1299 self.name,
1290 1300 time.time()-self.start_time), '')
@@ -1,294 +1,295
1 1 '''
2 2 Created on Jul 9, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9 9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator #YONG
10 10 from schainpy.utils import log
11 11 from .figure import Figure
12 12
13 13
14 14 @MPDecorator
15 15 class Scope_(Figure):
16 16
17 17 isConfig = None
18 18
19 19 def __init__(self):#, **kwargs): #YONG
20 20 Figure.__init__(self)#, **kwargs)
21 21 self.isConfig = False
22 22 self.WIDTH = 300
23 23 self.HEIGHT = 200
24 24 self.counter_imagwr = 0
25 25
26 26 def getSubplots(self):
27 27
28 28 nrow = self.nplots
29 29 ncol = 3
30 30 return nrow, ncol
31 31
32 32 def setup(self, id, nplots, wintitle, show):
33 33
34 34 self.nplots = nplots
35 35
36 36 self.createFigure(id=id,
37 37 wintitle=wintitle,
38 38 show=show)
39 39
40 40 nrow,ncol = self.getSubplots()
41 41 colspan = 3
42 42 rowspan = 1
43 43
44 44 for i in range(nplots):
45 45 self.addAxes(nrow, ncol, i, 0, colspan, rowspan)
46 46
47 47 def plot_iq(self, x, y, id, channelIndexList, thisDatetime, wintitle, show, xmin, xmax, ymin, ymax):
48 48 yreal = y[channelIndexList,:].real
49 49 yimag = y[channelIndexList,:].imag
50 50
51 51 title = wintitle + " Scope: %s" %(thisDatetime.strftime("%d-%b-%Y %H:%M:%S"))
52 52 xlabel = "Range (Km)"
53 53 ylabel = "Intensity - IQ"
54 54
55 55 if not self.isConfig:
56 56 nplots = len(channelIndexList)
57 57
58 58 self.setup(id=id,
59 59 nplots=nplots,
60 60 wintitle='',
61 61 show=show)
62 62
63 63 if xmin == None: xmin = numpy.nanmin(x)
64 64 if xmax == None: xmax = numpy.nanmax(x)
65 65 if ymin == None: ymin = min(numpy.nanmin(yreal),numpy.nanmin(yimag))
66 66 if ymax == None: ymax = max(numpy.nanmax(yreal),numpy.nanmax(yimag))
67 67
68 68 self.isConfig = True
69 69
70 70 self.setWinTitle(title)
71 71
72 72 for i in range(len(self.axesList)):
73 73 title = "Channel %d" %(i)
74 74 axes = self.axesList[i]
75 75
76 76 axes.pline(x, yreal[i,:],
77 77 xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
78 78 xlabel=xlabel, ylabel=ylabel, title=title)
79 79
80 80 axes.addpline(x, yimag[i,:], idline=1, color="red", linestyle="solid", lw=2)
81 81
82 82 def plot_power(self, x, y, id, channelIndexList, thisDatetime, wintitle, show, xmin, xmax, ymin, ymax):
83 83 y = y[channelIndexList,:] * numpy.conjugate(y[channelIndexList,:])
84 84 yreal = y.real
85 85
86 86 title = wintitle + " Scope: %s" %(thisDatetime.strftime("%d-%b-%Y %H:%M:%S"))
87 87 xlabel = "Range (Km)"
88 88 ylabel = "Intensity"
89 89
90 90 if not self.isConfig:
91 91 nplots = len(channelIndexList)
92 92
93 93 self.setup(id=id,
94 94 nplots=nplots,
95 95 wintitle='',
96 96 show=show)
97 97
98 98 if xmin == None: xmin = numpy.nanmin(x)
99 99 if xmax == None: xmax = numpy.nanmax(x)
100 100 if ymin == None: ymin = numpy.nanmin(yreal)
101 101 if ymax == None: ymax = numpy.nanmax(yreal)
102 102
103 103 self.isConfig = True
104 104
105 105 self.setWinTitle(title)
106 106
107 107 for i in range(len(self.axesList)):
108 108 title = "Channel %d" %(i)
109 109 axes = self.axesList[i]
110 110 ychannel = yreal[i,:]
111 111 axes.pline(x, ychannel,
112 112 xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
113 113 xlabel=xlabel, ylabel=ylabel, title=title)
114 114
115 115 def plot_weatherpower(self, x, y, id, channelIndexList, thisDatetime, wintitle, show, xmin, xmax, ymin, ymax):
116 116 y = y[channelIndexList,:]
117 117 yreal = y
118 118
119 119 title = wintitle + " Scope: %s" %(thisDatetime.strftime("%d-%b-%Y %H:%M:%S"))
120 120 xlabel = "Range (Km)"
121 121 ylabel = "Intensity"
122 122
123 123 if not self.isConfig:
124 124 nplots = len(channelIndexList)
125 125
126 126 self.setup(id=id,
127 127 nplots=nplots,
128 128 wintitle='',
129 129 show=show)
130 130
131 131 if xmin == None: xmin = numpy.nanmin(x)
132 132 if xmax == None: xmax = numpy.nanmax(x)
133 133 if ymin == None: ymin = numpy.nanmin(yreal)
134 134 if ymax == None: ymax = numpy.nanmax(yreal)
135 135
136 136 self.isConfig = True
137 137
138 138 self.setWinTitle(title)
139 139
140 140 for i in range(len(self.axesList)):
141 141 title = "Channel %d" %(i)
142 142 axes = self.axesList[i]
143 143 ychannel = yreal[i,:]
144 144 axes.pline(x, ychannel,
145 145 xmin=xmin, xmax=xmax, ymin=ymin, ymax=ymax,
146 146 xlabel=xlabel, ylabel=ylabel, title=title)
147 147
148 148
149 149
150 150 def run(self, dataOut, id, wintitle="", channelList=None,
151 151 xmin=None, xmax=None, ymin=None, ymax=None, save=False,
152 152 figpath='./', figfile=None, show=True, wr_period=1,
153 153 ftp=False, server=None, folder=None, username=None, password=None, type='power', **kwargs):
154 154
155 155 """
156 156
157 157 Input:
158 158 dataOut :
159 159 id :
160 160 wintitle :
161 161 channelList :
162 162 xmin : None,
163 163 xmax : None,
164 164 ymin : None,
165 165 ymax : None,
166 166 """
167 167 if dataOut.flagNoData:
168 168 return dataOut
169 169
170 170 if channelList == None:
171 171 channelIndexList = dataOut.channelIndexList
172 172 else:
173 173 channelIndexList = []
174 174 for channel in channelList:
175 175 if channel not in dataOut.channelList:
176 176 raise ValueError("Channel %d is not in dataOut.channelList")
177 177 channelIndexList.append(dataOut.channelList.index(channel))
178 178
179 179 thisDatetime = datetime.datetime.utcfromtimestamp(dataOut.getTimeRange()[0])
180 ### print("***************** PLOTEO **************************")
181 ### print(dataOut.nProfiles)
182 ### print(dataOut.heightList.shape)
180 #print("***************** PLOTEO **************************")
181 #print(dataOut.nProfiles)
182 #print(dataOut.heightList.shape)
183 #print(dataOut.data.shape)
183 184 if dataOut.flagDataAsBlock:
184 185
185 186 for i in range(dataOut.nProfiles):
186 187
187 188 wintitle1 = wintitle + " [Profile = %d] " %i
188 189
189 190 if type == "power":
190 191 self.plot_power(dataOut.heightList,
191 192 dataOut.data[:,i,:],
192 193 id,
193 194 channelIndexList,
194 195 thisDatetime,
195 196 wintitle1,
196 197 show,
197 198 xmin,
198 199 xmax,
199 200 ymin,
200 201 ymax)
201 202
202 203 if type == "weatherpower":
203 204 self.plot_weatherpower(dataOut.heightList,
204 205 dataOut.data[:,i,:],
205 206 id,
206 207 channelIndexList,
207 208 thisDatetime,
208 209 wintitle1,
209 210 show,
210 211 xmin,
211 212 xmax,
212 213 ymin,
213 214 ymax)
214 215
215 216 if type == "weathervelocity":
216 217 self.plot_weatherpower(dataOut.heightList,
217 218 dataOut.data_velocity[:,i,:],
218 219 id,
219 220 channelIndexList,
220 221 thisDatetime,
221 222 wintitle1,
222 223 show,
223 224 xmin,
224 225 xmax,
225 226 ymin,
226 227 ymax)
227 228
228 229 if type == "iq":
229 230 self.plot_iq(dataOut.heightList,
230 231 dataOut.data[:,i,:],
231 232 id,
232 233 channelIndexList,
233 234 thisDatetime,
234 235 wintitle1,
235 236 show,
236 237 xmin,
237 238 xmax,
238 239 ymin,
239 240 ymax)
240 241
241 242 self.draw()
242 243
243 244 str_datetime = thisDatetime.strftime("%Y%m%d_%H%M%S")
244 245 figfile = self.getFilename(name = str_datetime) + "_" + str(i)
245 246
246 247 self.save(figpath=figpath,
247 248 figfile=figfile,
248 249 save=save,
249 250 ftp=ftp,
250 251 wr_period=wr_period,
251 252 thisDatetime=thisDatetime)
252 253
253 254 else:
254 255 wintitle += " [Profile = %d] " %dataOut.profileIndex
255 256
256 257 if type == "power":
257 258 self.plot_power(dataOut.heightList,
258 259 dataOut.data,
259 260 id,
260 261 channelIndexList,
261 262 thisDatetime,
262 263 wintitle,
263 264 show,
264 265 xmin,
265 266 xmax,
266 267 ymin,
267 268 ymax)
268 269
269 270 if type == "iq":
270 271 self.plot_iq(dataOut.heightList,
271 272 dataOut.data,
272 273 id,
273 274 channelIndexList,
274 275 thisDatetime,
275 276 wintitle,
276 277 show,
277 278 xmin,
278 279 xmax,
279 280 ymin,
280 281 ymax)
281 282
282 283 self.draw()
283 284
284 285 str_datetime = thisDatetime.strftime("%Y%m%d_%H%M%S") + "_" + str(dataOut.profileIndex)
285 286 figfile = self.getFilename(name = str_datetime)
286 287
287 288 self.save(figpath=figpath,
288 289 figfile=figfile,
289 290 save=save,
290 291 ftp=ftp,
291 292 wr_period=wr_period,
292 293 thisDatetime=thisDatetime)
293 294
294 295 return dataOut
@@ -1,23 +1,25
1 1 '''
2 2
3 3 $Author: murco $
4 4 $Id: JRODataIO.py 169 2012-11-19 21:57:03Z murco $
5 5 '''
6 6
7 7 from .jroIO_voltage import *
8 8 from .jroIO_spectra import *
9 9 from .jroIO_heispectra import *
10 10 from .jroIO_usrp import *
11 11 from .jroIO_digitalRF import *
12 12 from .jroIO_kamisr import *
13 13 from .jroIO_param import *
14 14 from .jroIO_hf import *
15 15
16 16 from .jroIO_madrigal import *
17 17
18 18 from .bltrIO_param import *
19 19 from .bltrIO_spectra import *
20 20 from .jroIO_mira35c import *
21 21 from .julIO_param import *
22 22
23 from .pxIO_param import * No newline at end of file
23 from .pxIO_param import *
24
25 from .jroIO_simulator import *
@@ -1,1575 +1,1587
1 1 """
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 """
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import zmq
16 16
17 17 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
18 18 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
19 19 from schainpy.utils import log
20 20 import schainpy.admin
21 21
22 22 LOCALTIME = True
23 23 DT_DIRECTIVES = {
24 24 '%Y': 4,
25 25 '%y': 2,
26 26 '%m': 2,
27 27 '%d': 2,
28 28 '%j': 3,
29 29 '%H': 2,
30 30 '%M': 2,
31 31 '%S': 2,
32 32 '%f': 6
33 33 }
34 34
35 35
36 36 def isNumber(cad):
37 37 """
38 38 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
39 39
40 40 Excepciones:
41 41 Si un determinado string no puede ser convertido a numero
42 42 Input:
43 43 str, string al cual se le analiza para determinar si convertible a un numero o no
44 44
45 45 Return:
46 46 True : si el string es uno numerico
47 47 False : no es un string numerico
48 48 """
49 49 try:
50 50 float(cad)
51 51 return True
52 52 except:
53 53 return False
54 54
55 55
56 56 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
57 57 """
58 58 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
59 59
60 60 Inputs:
61 61 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
62 62
63 63 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
64 64 segundos contados desde 01/01/1970.
65 65 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
66 66 segundos contados desde 01/01/1970.
67 67
68 68 Return:
69 69 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
70 70 fecha especificado, de lo contrario retorna False.
71 71
72 72 Excepciones:
73 73 Si el archivo no existe o no puede ser abierto
74 74 Si la cabecera no puede ser leida.
75 75
76 76 """
77 77 basicHeaderObj = BasicHeader(LOCALTIME)
78 78
79 79 try:
80 80 fp = open(filename, 'rb')
81 81 except IOError:
82 82 print("The file %s can't be opened" % (filename))
83 83 return 0
84 84
85 85 sts = basicHeaderObj.read(fp)
86 86 fp.close()
87 87
88 88 if not(sts):
89 89 print("Skipping the file %s because it has not a valid header" % (filename))
90 90 return 0
91 91
92 92 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
93 93 return 0
94 94
95 95 return 1
96 96
97 97
98 98 def isTimeInRange(thisTime, startTime, endTime):
99 99 if endTime >= startTime:
100 100 if (thisTime < startTime) or (thisTime > endTime):
101 101 return 0
102 102 return 1
103 103 else:
104 104 if (thisTime < startTime) and (thisTime > endTime):
105 105 return 0
106 106 return 1
107 107
108 108
109 109 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
110 110 """
111 111 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
112 112
113 113 Inputs:
114 114 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
115 115
116 116 startDate : fecha inicial del rango seleccionado en formato datetime.date
117 117
118 118 endDate : fecha final del rango seleccionado en formato datetime.date
119 119
120 120 startTime : tiempo inicial del rango seleccionado en formato datetime.time
121 121
122 122 endTime : tiempo final del rango seleccionado en formato datetime.time
123 123
124 124 Return:
125 125 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
126 126 fecha especificado, de lo contrario retorna False.
127 127
128 128 Excepciones:
129 129 Si el archivo no existe o no puede ser abierto
130 130 Si la cabecera no puede ser leida.
131 131
132 132 """
133 133
134 134 try:
135 135 fp = open(filename, 'rb')
136 136 except IOError:
137 137 print("The file %s can't be opened" % (filename))
138 138 return None
139 139
140 140 firstBasicHeaderObj = BasicHeader(LOCALTIME)
141 141 systemHeaderObj = SystemHeader()
142 142 radarControllerHeaderObj = RadarControllerHeader()
143 143 processingHeaderObj = ProcessingHeader()
144 144
145 145 lastBasicHeaderObj = BasicHeader(LOCALTIME)
146 146
147 147 sts = firstBasicHeaderObj.read(fp)
148 148
149 149 if not(sts):
150 150 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
151 151 return None
152 152
153 153 if not systemHeaderObj.read(fp):
154 154 return None
155 155
156 156 if not radarControllerHeaderObj.read(fp):
157 157 return None
158 158
159 159 if not processingHeaderObj.read(fp):
160 160 return None
161 161
162 162 filesize = os.path.getsize(filename)
163 163
164 164 offset = processingHeaderObj.blockSize + 24 # header size
165 165
166 166 if filesize <= offset:
167 167 print("[Reading] %s: This file has not enough data" % filename)
168 168 return None
169 169
170 170 fp.seek(-offset, 2)
171 171
172 172 sts = lastBasicHeaderObj.read(fp)
173 173
174 174 fp.close()
175 175
176 176 thisDatetime = lastBasicHeaderObj.datatime
177 177 thisTime_last_block = thisDatetime.time()
178 178
179 179 thisDatetime = firstBasicHeaderObj.datatime
180 180 thisDate = thisDatetime.date()
181 181 thisTime_first_block = thisDatetime.time()
182 182
183 183 # General case
184 184 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
185 185 #-----------o----------------------------o-----------
186 186 # startTime endTime
187 187
188 188 if endTime >= startTime:
189 189 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
190 190 return None
191 191
192 192 return thisDatetime
193 193
194 194 # If endTime < startTime then endTime belongs to the next day
195 195
196 196 #<<<<<<<<<<<o o>>>>>>>>>>>
197 197 #-----------o----------------------------o-----------
198 198 # endTime startTime
199 199
200 200 if (thisDate == startDate) and (thisTime_last_block < startTime):
201 201 return None
202 202
203 203 if (thisDate == endDate) and (thisTime_first_block > endTime):
204 204 return None
205 205
206 206 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
207 207 return None
208 208
209 209 return thisDatetime
210 210
211 211
212 212 def isFolderInDateRange(folder, startDate=None, endDate=None):
213 213 """
214 214 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
215 215
216 216 Inputs:
217 217 folder : nombre completo del directorio.
218 218 Su formato deberia ser "/path_root/?YYYYDDD"
219 219
220 220 siendo:
221 221 YYYY : Anio (ejemplo 2015)
222 222 DDD : Dia del anio (ejemplo 305)
223 223
224 224 startDate : fecha inicial del rango seleccionado en formato datetime.date
225 225
226 226 endDate : fecha final del rango seleccionado en formato datetime.date
227 227
228 228 Return:
229 229 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
230 230 fecha especificado, de lo contrario retorna False.
231 231 Excepciones:
232 232 Si el directorio no tiene el formato adecuado
233 233 """
234 234
235 235 basename = os.path.basename(folder)
236 236
237 237 if not isRadarFolder(basename):
238 238 print("The folder %s has not the rigth format" % folder)
239 239 return 0
240 240
241 241 if startDate and endDate:
242 242 thisDate = getDateFromRadarFolder(basename)
243 243
244 244 if thisDate < startDate:
245 245 return 0
246 246
247 247 if thisDate > endDate:
248 248 return 0
249 249
250 250 return 1
251 251
252 252
253 253 def isFileInDateRange(filename, startDate=None, endDate=None):
254 254 """
255 255 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
256 256
257 257 Inputs:
258 258 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
259 259
260 260 Su formato deberia ser "?YYYYDDDsss"
261 261
262 262 siendo:
263 263 YYYY : Anio (ejemplo 2015)
264 264 DDD : Dia del anio (ejemplo 305)
265 265 sss : set
266 266
267 267 startDate : fecha inicial del rango seleccionado en formato datetime.date
268 268
269 269 endDate : fecha final del rango seleccionado en formato datetime.date
270 270
271 271 Return:
272 272 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
273 273 fecha especificado, de lo contrario retorna False.
274 274 Excepciones:
275 275 Si el archivo no tiene el formato adecuado
276 276 """
277 277
278 278 basename = os.path.basename(filename)
279 279
280 280 if not isRadarFile(basename):
281 281 print("The filename %s has not the rigth format" % filename)
282 282 return 0
283 283
284 284 if startDate and endDate:
285 285 thisDate = getDateFromRadarFile(basename)
286 286
287 287 if thisDate < startDate:
288 288 return 0
289 289
290 290 if thisDate > endDate:
291 291 return 0
292 292
293 293 return 1
294 294
295 295
296 296 def getFileFromSet(path, ext, set):
297 297 validFilelist = []
298 298 fileList = os.listdir(path)
299 299
300 300 # 0 1234 567 89A BCDE
301 301 # H YYYY DDD SSS .ext
302 302
303 303 for thisFile in fileList:
304 304 try:
305 305 year = int(thisFile[1:5])
306 306 doy = int(thisFile[5:8])
307 307 except:
308 308 continue
309 309
310 310 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
311 311 continue
312 312
313 313 validFilelist.append(thisFile)
314 314
315 315 myfile = fnmatch.filter(
316 316 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
317 317
318 318 if len(myfile) != 0:
319 319 return myfile[0]
320 320 else:
321 321 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
322 322 print('the filename %s does not exist' % filename)
323 323 print('...going to the last file: ')
324 324
325 325 if validFilelist:
326 326 validFilelist = sorted(validFilelist, key=str.lower)
327 327 return validFilelist[-1]
328 328
329 329 return None
330 330
331 331
332 332 def getlastFileFromPath(path, ext):
333 333 """
334 334 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
335 335 al final de la depuracion devuelve el ultimo file de la lista que quedo.
336 336
337 337 Input:
338 338 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
339 339 ext : extension de los files contenidos en una carpeta
340 340
341 341 Return:
342 342 El ultimo file de una determinada carpeta, no se considera el path.
343 343 """
344 344 validFilelist = []
345 345 fileList = os.listdir(path)
346 346
347 347 # 0 1234 567 89A BCDE
348 348 # H YYYY DDD SSS .ext
349 349
350 350 for thisFile in fileList:
351 351
352 352 year = thisFile[1:5]
353 353 if not isNumber(year):
354 354 continue
355 355
356 356 doy = thisFile[5:8]
357 357 if not isNumber(doy):
358 358 continue
359 359
360 360 year = int(year)
361 361 doy = int(doy)
362 362
363 363 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
364 364 continue
365 365
366 366 validFilelist.append(thisFile)
367 367
368 368 if validFilelist:
369 369 validFilelist = sorted(validFilelist, key=str.lower)
370 370 return validFilelist[-1]
371 371
372 372 return None
373 373
374 374
375 375 def isRadarFolder(folder):
376 376 try:
377 377 year = int(folder[1:5])
378 378 doy = int(folder[5:8])
379 379 except:
380 380 return 0
381 381
382 382 return 1
383 383
384 384
385 385 def isRadarFile(file):
386 try:
386 try:
387 387 year = int(file[1:5])
388 388 doy = int(file[5:8])
389 389 set = int(file[8:11])
390 390 except:
391 391 return 0
392 392
393 393 return 1
394 394
395 395
396 396 def getDateFromRadarFile(file):
397 try:
397 try:
398 398 year = int(file[1:5])
399 399 doy = int(file[5:8])
400 set = int(file[8:11])
400 set = int(file[8:11])
401 401 except:
402 402 return None
403 403
404 404 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
405 405 return thisDate
406 406
407 407
408 408 def getDateFromRadarFolder(folder):
409 409 try:
410 410 year = int(folder[1:5])
411 411 doy = int(folder[5:8])
412 412 except:
413 413 return None
414 414
415 415 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
416 416 return thisDate
417 417
418 418 def parse_format(s, fmt):
419
419
420 420 for i in range(fmt.count('%')):
421 421 x = fmt.index('%')
422 422 d = DT_DIRECTIVES[fmt[x:x+2]]
423 423 fmt = fmt.replace(fmt[x:x+2], s[x:x+d])
424 424 return fmt
425 425
426 426 class Reader(object):
427 427
428 428 c = 3E8
429 429 isConfig = False
430 430 dtype = None
431 431 pathList = []
432 432 filenameList = []
433 433 datetimeList = []
434 434 filename = None
435 435 ext = None
436 436 flagIsNewFile = 1
437 437 flagDiscontinuousBlock = 0
438 438 flagIsNewBlock = 0
439 439 flagNoMoreFiles = 0
440 440 fp = None
441 441 firstHeaderSize = 0
442 442 basicHeaderSize = 24
443 443 versionFile = 1103
444 444 fileSize = None
445 445 fileSizeByHeader = None
446 446 fileIndex = -1
447 447 profileIndex = None
448 448 blockIndex = 0
449 449 nTotalBlocks = 0
450 450 maxTimeStep = 30
451 451 lastUTTime = None
452 452 datablock = None
453 453 dataOut = None
454 454 getByBlock = False
455 455 path = None
456 456 startDate = None
457 457 endDate = None
458 458 startTime = datetime.time(0, 0, 0)
459 459 endTime = datetime.time(23, 59, 59)
460 460 set = None
461 461 expLabel = ""
462 462 online = False
463 463 delay = 60
464 464 nTries = 3 # quantity tries
465 465 nFiles = 3 # number of files for searching
466 466 walk = True
467 467 getblock = False
468 468 nTxs = 1
469 469 realtime = False
470 470 blocksize = 0
471 471 blocktime = None
472 472 warnings = True
473 473 verbose = True
474 474 server = None
475 475 format = None
476 476 oneDDict = None
477 477 twoDDict = None
478 478 independentParam = None
479 479 filefmt = None
480 480 folderfmt = None
481 481 open_file = open
482 482 open_mode = 'rb'
483 483
484 484 def run(self):
485 485
486 raise NotImplementedError
486 raise NotImplementedError
487 487
488 488 def getAllowedArgs(self):
489 489 if hasattr(self, '__attrs__'):
490 490 return self.__attrs__
491 491 else:
492 492 return inspect.getargspec(self.run).args
493 493
494 494 def set_kwargs(self, **kwargs):
495 495
496 496 for key, value in kwargs.items():
497 497 setattr(self, key, value)
498
498
499 499 def find_folders(self, path, startDate, endDate, folderfmt, last=False):
500 500
501 folders = [x for f in path.split(',')
501 folders = [x for f in path.split(',')
502 502 for x in os.listdir(f) if os.path.isdir(os.path.join(f, x))]
503 503 folders.sort()
504 504
505 505 if last:
506 506 folders = [folders[-1]]
507 507
508 for folder in folders:
509 try:
510 dt = datetime.datetime.strptime(parse_format(folder, folderfmt), folderfmt).date()
508 for folder in folders:
509 try:
510 dt = datetime.datetime.strptime(parse_format(folder, folderfmt), folderfmt).date()
511 511 if dt >= startDate and dt <= endDate:
512 512 yield os.path.join(path, folder)
513 513 else:
514 514 log.log('Skiping folder {}'.format(folder), self.name)
515 515 except Exception as e:
516 516 log.log('Skiping folder {}'.format(folder), self.name)
517 517 continue
518 518 return
519
520 def find_files(self, folders, ext, filefmt, startDate=None, endDate=None,
519
520 def find_files(self, folders, ext, filefmt, startDate=None, endDate=None,
521 521 expLabel='', last=False):
522
523 for path in folders:
522
523 for path in folders:
524 524 files = glob.glob1(path, '*{}'.format(ext))
525 525 files.sort()
526 526 if last:
527 if files:
527 if files:
528 528 fo = files[-1]
529 try:
530 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
531 yield os.path.join(path, expLabel, fo)
532 except Exception as e:
529 try:
530 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
531 yield os.path.join(path, expLabel, fo)
532 except Exception as e:
533 533 pass
534 534 return
535 535 else:
536 536 return
537 537
538 538 for fo in files:
539 try:
540 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
539 try:
540 dt = datetime.datetime.strptime(parse_format(fo, filefmt), filefmt).date()
541 541 if dt >= startDate and dt <= endDate:
542 542 yield os.path.join(path, expLabel, fo)
543 543 else:
544 544 log.log('Skiping file {}'.format(fo), self.name)
545 545 except Exception as e:
546 546 log.log('Skiping file {}'.format(fo), self.name)
547 continue
547 continue
548 548
549 549 def searchFilesOffLine(self, path, startDate, endDate,
550 expLabel, ext, walk,
550 expLabel, ext, walk,
551 551 filefmt, folderfmt):
552 552 """Search files in offline mode for the given arguments
553 553
554 554 Return:
555 555 Generator of files
556 556 """
557 557
558 558 if walk:
559 559 folders = self.find_folders(
560 560 path, startDate, endDate, folderfmt)
561 561 else:
562 562 folders = path.split(',')
563
563
564 564 return self.find_files(
565 folders, ext, filefmt, startDate, endDate, expLabel)
565 folders, ext, filefmt, startDate, endDate, expLabel)
566 566
567 567 def searchFilesOnLine(self, path, startDate, endDate,
568 expLabel, ext, walk,
568 expLabel, ext, walk,
569 569 filefmt, folderfmt):
570 570 """Search for the last file of the last folder
571 571
572 572 Arguments:
573 573 path : carpeta donde estan contenidos los files que contiene data
574 574 expLabel : Nombre del subexperimento (subfolder)
575 575 ext : extension de los files
576 576 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
577 577
578 578 Return:
579 579 generator with the full path of last filename
580 580 """
581
581
582 582 if walk:
583 583 folders = self.find_folders(
584 584 path, startDate, endDate, folderfmt, last=True)
585 585 else:
586 586 folders = path.split(',')
587
587
588 588 return self.find_files(
589 589 folders, ext, filefmt, startDate, endDate, expLabel, last=True)
590 590
591 591 def setNextFile(self):
592 592 """Set the next file to be readed open it and parse de file header"""
593 593
594 594 while True:
595 595 if self.fp != None:
596 self.fp.close()
596 self.fp.close()
597 597
598 598 if self.online:
599 599 newFile = self.setNextFileOnline()
600 600 else:
601 601 newFile = self.setNextFileOffline()
602
602
603 603 if not(newFile):
604 604 if self.online:
605 605 raise schainpy.admin.SchainError('Time to wait for new files reach')
606 606 else:
607 607 if self.fileIndex == -1:
608 608 raise schainpy.admin.SchainWarning('No files found in the given path')
609 609 else:
610 610 raise schainpy.admin.SchainWarning('No more files to read')
611
611
612 612 if self.verifyFile(self.filename):
613 613 break
614
614
615 615 log.log('Opening file: %s' % self.filename, self.name)
616 616
617 617 self.readFirstHeader()
618 618 self.nReadBlocks = 0
619 619
620 620 def setNextFileOnline(self):
621 621 """Check for the next file to be readed in online mode.
622 622
623 623 Set:
624 624 self.filename
625 625 self.fp
626 626 self.filesize
627
627
628 628 Return:
629 629 boolean
630 630
631 631 """
632 632 nextFile = True
633 633 nextDay = False
634 634
635 for nFiles in range(self.nFiles+1):
635 for nFiles in range(self.nFiles+1):
636 636 for nTries in range(self.nTries):
637 637 fullfilename, filename = self.checkForRealPath(nextFile, nextDay)
638 638 if fullfilename is not None:
639 639 break
640 640 log.warning(
641 641 "Waiting %0.2f sec for the next file: \"%s\" , try %02d ..." % (self.delay, filename, nTries + 1),
642 642 self.name)
643 643 time.sleep(self.delay)
644 644 nextFile = False
645 continue
646
645 continue
646
647 647 if fullfilename is not None:
648 648 break
649
649
650 650 self.nTries = 1
651 nextFile = True
651 nextFile = True
652 652
653 653 if nFiles == (self.nFiles - 1):
654 654 log.log('Trying with next day...', self.name)
655 655 nextDay = True
656 self.nTries = 3
656 self.nTries = 3
657 657
658 658 if fullfilename:
659 659 self.fileSize = os.path.getsize(fullfilename)
660 660 self.filename = fullfilename
661 661 self.flagIsNewFile = 1
662 662 if self.fp != None:
663 663 self.fp.close()
664 664 self.fp = self.open_file(fullfilename, self.open_mode)
665 665 self.flagNoMoreFiles = 0
666 666 self.fileIndex += 1
667 667 return 1
668 else:
668 else:
669 669 return 0
670
670
671 671 def setNextFileOffline(self):
672 672 """Open the next file to be readed in offline mode"""
673
673
674 674 try:
675 675 filename = next(self.filenameList)
676 676 self.fileIndex +=1
677 677 except StopIteration:
678 678 self.flagNoMoreFiles = 1
679 return 0
679 return 0
680 680
681 681 self.filename = filename
682 682 self.fileSize = os.path.getsize(filename)
683 683 self.fp = self.open_file(filename, self.open_mode)
684 684 self.flagIsNewFile = 1
685 685
686 686 return 1
687
687
688 688 @staticmethod
689 689 def isDateTimeInRange(dt, startDate, endDate, startTime, endTime):
690 690 """Check if the given datetime is in range"""
691
691
692 692 if startDate <= dt.date() <= endDate:
693 693 if startTime <= dt.time() <= endTime:
694 694 return True
695 695 return False
696
696
697 697 def verifyFile(self, filename):
698 698 """Check for a valid file
699
699
700 700 Arguments:
701 701 filename -- full path filename
702
702
703 703 Return:
704 704 boolean
705 705 """
706 706
707 707 return True
708 708
709 709 def checkForRealPath(self, nextFile, nextDay):
710 710 """Check if the next file to be readed exists"""
711 711
712 712 raise NotImplementedError
713
713
714 714 def readFirstHeader(self):
715 715 """Parse the file header"""
716 716
717 717 pass
718 718
719 719 class JRODataReader(Reader):
720 720
721 721 utc = 0
722 722 nReadBlocks = 0
723 723 foldercounter = 0
724 724 firstHeaderSize = 0
725 725 basicHeaderSize = 24
726 726 __isFirstTimeOnline = 1
727 727 __printInfo = True
728 728 filefmt = "*%Y%j***"
729 729 folderfmt = "*%Y%j"
730 730
731 731 def getDtypeWidth(self):
732 732
733 733 dtype_index = get_dtype_index(self.dtype)
734 734 dtype_width = get_dtype_width(dtype_index)
735 735
736 736 return dtype_width
737 737
738 738 def checkForRealPath(self, nextFile, nextDay):
739 739 """Check if the next file to be readed exists.
740 740
741 741 Example :
742 742 nombre correcto del file es .../.../D2009307/P2009307367.ext
743 743
744 744 Entonces la funcion prueba con las siguientes combinaciones
745 745 .../.../y2009307367.ext
746 746 .../.../Y2009307367.ext
747 747 .../.../x2009307/y2009307367.ext
748 748 .../.../x2009307/Y2009307367.ext
749 749 .../.../X2009307/y2009307367.ext
750 750 .../.../X2009307/Y2009307367.ext
751 751 siendo para este caso, la ultima combinacion de letras, identica al file buscado
752 752
753 753 Return:
754 754 str -- fullpath of the file
755 755 """
756
757
756
757
758
758 759 if nextFile:
759 760 self.set += 1
760 761 if nextDay:
761 762 self.set = 0
762 763 self.doy += 1
763 764 foldercounter = 0
764 765 prefixDirList = [None, 'd', 'D']
765 766 if self.ext.lower() == ".r": # voltage
766 767 prefixFileList = ['d', 'D']
767 768 elif self.ext.lower() == ".pdata": # spectra
768 769 prefixFileList = ['p', 'P']
769
770
770 771 # barrido por las combinaciones posibles
771 772 for prefixDir in prefixDirList:
772 773 thispath = self.path
773 774 if prefixDir != None:
774 775 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
775 776 if foldercounter == 0:
776 777 thispath = os.path.join(self.path, "%s%04d%03d" %
777 778 (prefixDir, self.year, self.doy))
778 779 else:
779 780 thispath = os.path.join(self.path, "%s%04d%03d_%02d" % (
780 781 prefixDir, self.year, self.doy, foldercounter))
781 782 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
782 783 # formo el nombre del file xYYYYDDDSSS.ext
783 784 filename = "%s%04d%03d%03d%s" % (prefixFile, self.year, self.doy, self.set, self.ext)
784 785 fullfilename = os.path.join(
785 786 thispath, filename)
786 787
787 788 if os.path.exists(fullfilename):
788 789 return fullfilename, filename
789
790 return None, filename
791
790
791 return None, filename
792
792 793 def __waitNewBlock(self):
793 794 """
794 795 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
795 796
796 797 Si el modo de lectura es OffLine siempre retorn 0
797 798 """
798 799 if not self.online:
799 800 return 0
800 801
801 802 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
802 803 return 0
803 804
804 805 currentPointer = self.fp.tell()
805 806
806 807 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
807 808
808 809 for nTries in range(self.nTries):
809 810
810 811 self.fp.close()
811 812 self.fp = open(self.filename, 'rb')
812 813 self.fp.seek(currentPointer)
813 814
814 815 self.fileSize = os.path.getsize(self.filename)
815 816 currentSize = self.fileSize - currentPointer
816 817
817 818 if (currentSize >= neededSize):
818 819 self.basicHeaderObj.read(self.fp)
819 820 return 1
820 821
821 822 if self.fileSize == self.fileSizeByHeader:
822 823 # self.flagEoF = True
823 824 return 0
824 825
825 826 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
826 827 time.sleep(self.delay)
827 828
828 829 return 0
829 830
830 831 def waitDataBlock(self, pointer_location, blocksize=None):
831 832
832 833 currentPointer = pointer_location
833 834 if blocksize is None:
834 835 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
835 836 else:
836 837 neededSize = blocksize
837 838
838 839 for nTries in range(self.nTries):
839 840 self.fp.close()
840 841 self.fp = open(self.filename, 'rb')
841 842 self.fp.seek(currentPointer)
842 843
843 844 self.fileSize = os.path.getsize(self.filename)
844 845 currentSize = self.fileSize - currentPointer
845 846
846 847 if (currentSize >= neededSize):
847 848 return 1
848 849
849 850 log.warning(
850 851 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
851 852 self.name
852 853 )
853 854 time.sleep(self.delay)
854 855
855 856 return 0
856 857
857 858 def __setNewBlock(self):
858 859
859 860 if self.fp == None:
860 return 0
861
862 if self.flagIsNewFile:
861 return 0
862
863 #print("DIME COMO ARRANCA",self.flagIsNewFile)
864 #print("DIME COMO VA",self.nReadBlocks)
865 if self.flagIsNewFile:
863 866 self.lastUTTime = self.basicHeaderObj.utc
864 867 return 1
865 868
866 869 if self.realtime:
867 870 self.flagDiscontinuousBlock = 1
868 871 if not(self.setNextFile()):
869 872 return 0
870 873 else:
871 874 return 1
872 875
873 876 currentSize = self.fileSize - self.fp.tell()
874 877 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
875
878
876 879 if (currentSize >= neededSize):
877 880 self.basicHeaderObj.read(self.fp)
878 881 self.lastUTTime = self.basicHeaderObj.utc
879 882 return 1
880
883
881 884 if self.__waitNewBlock():
882 885 self.lastUTTime = self.basicHeaderObj.utc
883 886 return 1
884 887
885 888 if not(self.setNextFile()):
886 889 return 0
887 890
888 891 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
889 892 self.lastUTTime = self.basicHeaderObj.utc
890 893
891 894 self.flagDiscontinuousBlock = 0
892 895
893 896 if deltaTime > self.maxTimeStep:
894 897 self.flagDiscontinuousBlock = 1
895 898
896 899 return 1
897 900
898 901 def readNextBlock(self):
902 #print("nReadBlocks",self.nReadBlocks)
899 903
900 904 while True:
901 905 self.__setNewBlock()
902 906
903 907 if not(self.readBlock()):
904 908 return 0
905 909
906 910 self.getBasicHeader()
907 911
908 912 if not self.isDateTimeInRange(self.dataOut.datatime, self.startDate, self.endDate, self.startTime, self.endTime):
909 913 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
910 914 self.processingHeaderObj.dataBlocksPerFile,
911 915 self.dataOut.datatime.ctime()))
912 916 continue
913 917
914 918 break
915 919
916 920 if self.verbose:
917 921 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
918 922 self.processingHeaderObj.dataBlocksPerFile,
919 923 self.dataOut.datatime.ctime()))
924
920 925 return 1
921 926
922 927 def readFirstHeader(self):
923 928
924 929 self.basicHeaderObj.read(self.fp)
925 930 self.systemHeaderObj.read(self.fp)
926 931 self.radarControllerHeaderObj.read(self.fp)
927 932 self.processingHeaderObj.read(self.fp)
928 933 self.firstHeaderSize = self.basicHeaderObj.size
929 934
930 935 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
931 936 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
932 937 if datatype == 0:
933 938 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
934 939 elif datatype == 1:
935 940 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
936 941 elif datatype == 2:
937 942 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
938 943 elif datatype == 3:
939 944 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
940 945 elif datatype == 4:
941 946 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
942 947 elif datatype == 5:
943 948 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
944 949 else:
945 950 raise ValueError('Data type was not defined')
946 951
947 952 self.dtype = datatype_str
948 953 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
949 954 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
950 955 self.firstHeaderSize + self.basicHeaderSize * \
951 956 (self.processingHeaderObj.dataBlocksPerFile - 1)
952 957 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
953 958 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
954 959 self.getBlockDimension()
955 960
956 961 def verifyFile(self, filename, msgFlag=True):
957 962
958 963 msg = None
959 964
960 965 try:
961 966 fp = open(filename, 'rb')
962 967 except IOError:
963 968
964 969 if msgFlag:
965 970 print("[Reading] File %s can't be opened" % (filename))
966 971
967 972 return False
968
973
969 974 if self.waitDataBlock(0):
970 975 basicHeaderObj = BasicHeader(LOCALTIME)
971 976 systemHeaderObj = SystemHeader()
972 977 radarControllerHeaderObj = RadarControllerHeader()
973 978 processingHeaderObj = ProcessingHeader()
974 979
975 980 if not(basicHeaderObj.read(fp)):
976 981 fp.close()
977 982 return False
978 983
979 984 if not(systemHeaderObj.read(fp)):
980 985 fp.close()
981 986 return False
982 987
983 988 if not(radarControllerHeaderObj.read(fp)):
984 989 fp.close()
985 990 return False
986 991
987 992 if not(processingHeaderObj.read(fp)):
988 993 fp.close()
989 return False
994 return False
990 995
991 996 if not self.online:
992 dt1 = basicHeaderObj.datatime
997 dt1 = basicHeaderObj.datatime
993 998 fp.seek(self.fileSize-processingHeaderObj.blockSize-24)
994 999 if not(basicHeaderObj.read(fp)):
995 1000 fp.close()
996 1001 return False
997 1002 dt2 = basicHeaderObj.datatime
998 1003 if not self.isDateTimeInRange(dt1, self.startDate, self.endDate, self.startTime, self.endTime) and not \
999 1004 self.isDateTimeInRange(dt2, self.startDate, self.endDate, self.startTime, self.endTime):
1000 return False
1005 return False
1001 1006
1002 1007 fp.close()
1003
1008
1004 1009 return True
1005 1010
1006 1011 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1007 1012
1008 1013 path_empty = True
1009 1014
1010 1015 dateList = []
1011 1016 pathList = []
1012 1017
1013 1018 multi_path = path.split(',')
1014 1019
1015 1020 if not walk:
1016 1021
1017 1022 for single_path in multi_path:
1018 1023
1019 1024 if not os.path.isdir(single_path):
1020 1025 continue
1021 1026
1022 1027 fileList = glob.glob1(single_path, "*" + ext)
1023 1028
1024 1029 if not fileList:
1025 1030 continue
1026 1031
1027 1032 path_empty = False
1028 1033
1029 1034 fileList.sort()
1030 1035
1031 1036 for thisFile in fileList:
1032 1037
1033 1038 if not os.path.isfile(os.path.join(single_path, thisFile)):
1034 1039 continue
1035 1040
1036 1041 if not isRadarFile(thisFile):
1037 1042 continue
1038 1043
1039 1044 if not isFileInDateRange(thisFile, startDate, endDate):
1040 1045 continue
1041 1046
1042 1047 thisDate = getDateFromRadarFile(thisFile)
1043 1048
1044 1049 if thisDate in dateList or single_path in pathList:
1045 1050 continue
1046 1051
1047 1052 dateList.append(thisDate)
1048 1053 pathList.append(single_path)
1049 1054
1050 1055 else:
1051 1056 for single_path in multi_path:
1052 1057
1053 1058 if not os.path.isdir(single_path):
1054 1059 continue
1055 1060
1056 1061 dirList = []
1057 1062
1058 1063 for thisPath in os.listdir(single_path):
1059 1064
1060 1065 if not os.path.isdir(os.path.join(single_path, thisPath)):
1061 1066 continue
1062 1067
1063 1068 if not isRadarFolder(thisPath):
1064 1069 continue
1065 1070
1066 1071 if not isFolderInDateRange(thisPath, startDate, endDate):
1067 1072 continue
1068 1073
1069 1074 dirList.append(thisPath)
1070 1075
1071 1076 if not dirList:
1072 1077 continue
1073 1078
1074 1079 dirList.sort()
1075 1080
1076 1081 for thisDir in dirList:
1077 1082
1078 1083 datapath = os.path.join(single_path, thisDir, expLabel)
1079 1084 fileList = glob.glob1(datapath, "*" + ext)
1080 1085
1081 1086 if not fileList:
1082 1087 continue
1083 1088
1084 1089 path_empty = False
1085 1090
1086 1091 thisDate = getDateFromRadarFolder(thisDir)
1087 1092
1088 1093 pathList.append(datapath)
1089 1094 dateList.append(thisDate)
1090 1095
1091 1096 dateList.sort()
1092 1097
1093 1098 if walk:
1094 1099 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1095 1100 else:
1096 1101 pattern_path = multi_path[0]
1097 1102
1098 1103 if path_empty:
1099 1104 raise schainpy.admin.SchainError("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1100 1105 else:
1101 1106 if not dateList:
1102 1107 raise schainpy.admin.SchainError("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1103 1108
1104 1109 if include_path:
1105 1110 return dateList, pathList
1106 1111
1107 1112 return dateList
1108 1113
1109 1114 def setup(self, **kwargs):
1110
1115
1111 1116 self.set_kwargs(**kwargs)
1112 1117 if not self.ext.startswith('.'):
1113 1118 self.ext = '.{}'.format(self.ext)
1114
1119
1115 1120 if self.server is not None:
1116 1121 if 'tcp://' in self.server:
1117 1122 address = server
1118 1123 else:
1119 1124 address = 'ipc:///tmp/%s' % self.server
1120 1125 self.server = address
1121 1126 self.context = zmq.Context()
1122 1127 self.receiver = self.context.socket(zmq.PULL)
1123 1128 self.receiver.connect(self.server)
1124 1129 time.sleep(0.5)
1125 1130 print('[Starting] ReceiverData from {}'.format(self.server))
1126 1131 else:
1127 1132 self.server = None
1128 1133 if self.path == None:
1129 1134 raise ValueError("[Reading] The path is not valid")
1130 1135
1131 1136 if self.online:
1132 1137 log.log("[Reading] Searching files in online mode...", self.name)
1133 1138
1134 1139 for nTries in range(self.nTries):
1135 1140 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1136 self.endDate, self.expLabel, self.ext, self.walk,
1141 self.endDate, self.expLabel, self.ext, self.walk,
1137 1142 self.filefmt, self.folderfmt)
1138 1143
1139 1144 try:
1140 1145 fullpath = next(fullpath)
1141 1146 except:
1142 1147 fullpath = None
1143
1148
1144 1149 if fullpath:
1145 1150 break
1146 1151
1147 1152 log.warning(
1148 1153 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1149 self.delay, self.path, nTries + 1),
1154 self.delay, self.path, nTries + 1),
1150 1155 self.name)
1151 1156 time.sleep(self.delay)
1152 1157
1153 1158 if not(fullpath):
1154 1159 raise schainpy.admin.SchainError(
1155 'There isn\'t any valid file in {}'.format(self.path))
1160 'There isn\'t any valid file in {}'.format(self.path))
1156 1161
1157 1162 pathname, filename = os.path.split(fullpath)
1158 1163 self.year = int(filename[1:5])
1159 1164 self.doy = int(filename[5:8])
1160 self.set = int(filename[8:11]) - 1
1165 self.set = int(filename[8:11]) - 1
1161 1166 else:
1162 1167 log.log("Searching files in {}".format(self.path), self.name)
1163 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1168 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1164 1169 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1165
1170
1166 1171 self.setNextFile()
1167 1172
1168 1173 return
1169 1174
1170 1175 def getBasicHeader(self):
1171
1176 '''
1177 print("1",self.radarControllerHeaderObj.ippSeconds)
1178 print("2",self.profileIndex)
1179 print("3",self.basicHeaderObj.miliSecond)
1180 print("4",self.basicHeaderObj.utc)
1181 print("5",self.nTxs)
1182 '''
1172 1183 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1173 1184 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1185 #print(self.profileIndex,self.dataOut.utctime)
1174 1186
1175 1187 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1176 1188
1177 1189 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1178 1190
1179 1191 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1180 1192
1181 1193 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1182 1194
1183 1195 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1184 1196
1185 1197 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1186 1198
1187 1199 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1188 1200
1189 1201 def getFirstHeader(self):
1190 1202
1191 1203 raise NotImplementedError
1192 1204
1193 1205 def getData(self):
1194 1206
1195 1207 raise NotImplementedError
1196 1208
1197 1209 def hasNotDataInBuffer(self):
1198 1210
1199 1211 raise NotImplementedError
1200 1212
1201 1213 def readBlock(self):
1202 1214
1203 1215 raise NotImplementedError
1204 1216
1205 1217 def isEndProcess(self):
1206 1218
1207 1219 return self.flagNoMoreFiles
1208 1220
1209 1221 def printReadBlocks(self):
1210 1222
1211 1223 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1212 1224
1213 1225 def printTotalBlocks(self):
1214 1226
1215 1227 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1216 1228
1217 1229 def printNumberOfBlock(self):
1218 1230 'SPAM!'
1219 1231
1220 1232 # if self.flagIsNewBlock:
1221 1233 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1222 1234 # self.processingHeaderObj.dataBlocksPerFile,
1223 1235 # self.dataOut.datatime.ctime())
1224 1236
1225 1237 def printInfo(self):
1226 1238
1227 1239 if self.__printInfo == False:
1228 1240 return
1229 1241
1230 1242 self.basicHeaderObj.printInfo()
1231 1243 self.systemHeaderObj.printInfo()
1232 1244 self.radarControllerHeaderObj.printInfo()
1233 1245 self.processingHeaderObj.printInfo()
1234 1246
1235 1247 self.__printInfo = False
1236 1248
1237 1249 def run(self, **kwargs):
1238 1250 """
1239 1251
1240 1252 Arguments:
1241 path :
1242 startDate :
1253 path :
1254 startDate :
1243 1255 endDate :
1244 1256 startTime :
1245 1257 endTime :
1246 1258 set :
1247 1259 expLabel :
1248 1260 ext :
1249 1261 online :
1250 1262 delay :
1251 1263 walk :
1252 1264 getblock :
1253 1265 nTxs :
1254 1266 realtime :
1255 1267 blocksize :
1256 1268 blocktime :
1257 1269 skip :
1258 1270 cursor :
1259 1271 warnings :
1260 1272 server :
1261 1273 verbose :
1262 1274 format :
1263 1275 oneDDict :
1264 1276 twoDDict :
1265 1277 independentParam :
1266 1278 """
1267 1279
1268 1280 if not(self.isConfig):
1269 1281 self.setup(**kwargs)
1270 1282 self.isConfig = True
1271 1283 if self.server is None:
1272 1284 self.getData()
1273 1285 else:
1274 1286 self.getFromServer()
1275 1287
1276 1288
1277 1289 class JRODataWriter(Reader):
1278 1290
1279 1291 """
1280 1292 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1281 1293 de los datos siempre se realiza por bloques.
1282 1294 """
1283 1295
1284 1296 setFile = None
1285 1297 profilesPerBlock = None
1286 1298 blocksPerFile = None
1287 1299 nWriteBlocks = 0
1288 1300 fileDate = None
1289 1301
1290 1302 def __init__(self, dataOut=None):
1291 1303 raise NotImplementedError
1292 1304
1293 1305 def hasAllDataInBuffer(self):
1294 1306 raise NotImplementedError
1295 1307
1296 1308 def setBlockDimension(self):
1297 1309 raise NotImplementedError
1298 1310
1299 1311 def writeBlock(self):
1300 1312 raise NotImplementedError
1301 1313
1302 1314 def putData(self):
1303 1315 raise NotImplementedError
1304 1316
1305 1317 def getDtypeWidth(self):
1306 1318
1307 1319 dtype_index = get_dtype_index(self.dtype)
1308 1320 dtype_width = get_dtype_width(dtype_index)
1309 1321
1310 1322 return dtype_width
1311
1323
1312 1324 def getProcessFlags(self):
1313 1325
1314 1326 processFlags = 0
1315 1327
1316 1328 dtype_index = get_dtype_index(self.dtype)
1317 1329 procflag_dtype = get_procflag_dtype(dtype_index)
1318 1330
1319 1331 processFlags += procflag_dtype
1320 1332
1321 1333 if self.dataOut.flagDecodeData:
1322 1334 processFlags += PROCFLAG.DECODE_DATA
1323 1335
1324 1336 if self.dataOut.flagDeflipData:
1325 1337 processFlags += PROCFLAG.DEFLIP_DATA
1326 1338
1327 1339 if self.dataOut.code is not None:
1328 1340 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1329 1341
1330 1342 if self.dataOut.nCohInt > 1:
1331 1343 processFlags += PROCFLAG.COHERENT_INTEGRATION
1332 1344
1333 1345 if self.dataOut.type == "Spectra":
1334 1346 if self.dataOut.nIncohInt > 1:
1335 1347 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1336 1348
1337 1349 if self.dataOut.data_dc is not None:
1338 1350 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1339 1351
1340 1352 if self.dataOut.flagShiftFFT:
1341 1353 processFlags += PROCFLAG.SHIFT_FFT_DATA
1342 1354
1343 1355 return processFlags
1344 1356
1345 1357 def setBasicHeader(self):
1346 1358
1347 1359 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1348 1360 self.basicHeaderObj.version = self.versionFile
1349 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1361 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1350 1362 utc = numpy.floor(self.dataOut.utctime)
1351 milisecond = (self.dataOut.utctime - utc) * 1000.0
1363 milisecond = (self.dataOut.utctime - utc) * 1000.0
1352 1364 self.basicHeaderObj.utc = utc
1353 1365 self.basicHeaderObj.miliSecond = milisecond
1354 1366 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1355 1367 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1356 1368 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1357 1369
1358 1370 def setFirstHeader(self):
1359 1371 """
1360 1372 Obtiene una copia del First Header
1361 1373
1362 1374 Affected:
1363 1375
1364 1376 self.basicHeaderObj
1365 1377 self.systemHeaderObj
1366 1378 self.radarControllerHeaderObj
1367 1379 self.processingHeaderObj self.
1368 1380
1369 1381 Return:
1370 1382 None
1371 1383 """
1372 1384
1373 1385 raise NotImplementedError
1374 1386
1375 1387 def __writeFirstHeader(self):
1376 1388 """
1377 1389 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1378 1390
1379 1391 Affected:
1380 1392 __dataType
1381 1393
1382 1394 Return:
1383 1395 None
1384 1396 """
1385 1397
1386 1398 # CALCULAR PARAMETROS
1387 1399
1388 1400 sizeLongHeader = self.systemHeaderObj.size + \
1389 1401 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1390 1402 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1391 1403
1392 1404 self.basicHeaderObj.write(self.fp)
1393 1405 self.systemHeaderObj.write(self.fp)
1394 1406 self.radarControllerHeaderObj.write(self.fp)
1395 1407 self.processingHeaderObj.write(self.fp)
1396 1408
1397 1409 def __setNewBlock(self):
1398 1410 """
1399 1411 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1400 1412
1401 1413 Return:
1402 1414 0 : si no pudo escribir nada
1403 1415 1 : Si escribio el Basic el First Header
1404 1416 """
1405 1417 if self.fp == None:
1406 1418 self.setNextFile()
1407 1419
1408 1420 if self.flagIsNewFile:
1409 1421 return 1
1410 1422
1411 1423 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1412 1424 self.basicHeaderObj.write(self.fp)
1413 1425 return 1
1414 1426
1415 1427 if not(self.setNextFile()):
1416 1428 return 0
1417 1429
1418 1430 return 1
1419 1431
1420 1432 def writeNextBlock(self):
1421 1433 """
1422 1434 Selecciona el bloque siguiente de datos y los escribe en un file
1423 1435
1424 1436 Return:
1425 1437 0 : Si no hizo pudo escribir el bloque de datos
1426 1438 1 : Si no pudo escribir el bloque de datos
1427 1439 """
1428 1440 if not(self.__setNewBlock()):
1429 1441 return 0
1430 1442
1431 1443 self.writeBlock()
1432 1444
1433 1445 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1434 1446 self.processingHeaderObj.dataBlocksPerFile))
1435 1447
1436 1448 return 1
1437 1449
1438 def setNextFile(self):
1450 def setNewxtFile(self):
1439 1451 """Determina el siguiente file que sera escrito
1440 1452
1441 1453 Affected:
1442 1454 self.filename
1443 1455 self.subfolder
1444 1456 self.fp
1445 1457 self.setFile
1446 1458 self.flagIsNewFile
1447 1459
1448 1460 Return:
1449 1461 0 : Si el archivo no puede ser escrito
1450 1462 1 : Si el archivo esta listo para ser escrito
1451 1463 """
1452 1464 ext = self.ext
1453 1465 path = self.path
1454 1466
1455 1467 if self.fp != None:
1456 1468 self.fp.close()
1457 1469
1458 1470 if not os.path.exists(path):
1459 1471 os.mkdir(path)
1460 1472
1461 1473 timeTuple = time.localtime(self.dataOut.utctime)
1462 1474 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1463 1475
1464 1476 fullpath = os.path.join(path, subfolder)
1465 1477 setFile = self.setFile
1466 1478
1467 1479 if not(os.path.exists(fullpath)):
1468 1480 os.mkdir(fullpath)
1469 1481 setFile = -1 # inicializo mi contador de seteo
1470 1482 else:
1471 1483 filesList = os.listdir(fullpath)
1472 1484 if len(filesList) > 0:
1473 1485 filesList = sorted(filesList, key=str.lower)
1474 1486 filen = filesList[-1]
1475 1487 # el filename debera tener el siguiente formato
1476 1488 # 0 1234 567 89A BCDE (hex)
1477 1489 # x YYYY DDD SSS .ext
1478 1490 if isNumber(filen[8:11]):
1479 1491 # inicializo mi contador de seteo al seteo del ultimo file
1480 1492 setFile = int(filen[8:11])
1481 1493 else:
1482 1494 setFile = -1
1483 1495 else:
1484 1496 setFile = -1 # inicializo mi contador de seteo
1485 1497
1486 1498 setFile += 1
1487 1499
1488 1500 # If this is a new day it resets some values
1489 1501 if self.dataOut.datatime.date() > self.fileDate:
1490 1502 setFile = 0
1491 1503 self.nTotalBlocks = 0
1492
1504
1493 1505 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1494 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1506 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1495 1507
1496 1508 filename = os.path.join(path, subfolder, filen)
1497 1509
1498 1510 fp = open(filename, 'wb')
1499 1511
1500 1512 self.blockIndex = 0
1501 1513 self.filename = filename
1502 1514 self.subfolder = subfolder
1503 1515 self.fp = fp
1504 1516 self.setFile = setFile
1505 1517 self.flagIsNewFile = 1
1506 1518 self.fileDate = self.dataOut.datatime.date()
1507 1519 self.setFirstHeader()
1508 1520
1509 1521 print('[Writing] Opening file: %s' % self.filename)
1510 1522
1511 1523 self.__writeFirstHeader()
1512 1524
1513 1525 return 1
1514 1526
1515 1527 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1516 1528 """
1517 1529 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1518 1530
1519 1531 Inputs:
1520 1532 path : directory where data will be saved
1521 1533 profilesPerBlock : number of profiles per block
1522 1534 set : initial file set
1523 1535 datatype : An integer number that defines data type:
1524 1536 0 : int8 (1 byte)
1525 1537 1 : int16 (2 bytes)
1526 1538 2 : int32 (4 bytes)
1527 1539 3 : int64 (8 bytes)
1528 1540 4 : float32 (4 bytes)
1529 1541 5 : double64 (8 bytes)
1530 1542
1531 1543 Return:
1532 1544 0 : Si no realizo un buen seteo
1533 1545 1 : Si realizo un buen seteo
1534 1546 """
1535 1547
1536 1548 if ext == None:
1537 1549 ext = self.ext
1538 1550
1539 1551 self.ext = ext.lower()
1540 1552
1541 1553 self.path = path
1542
1554
1543 1555 if set is None:
1544 1556 self.setFile = -1
1545 1557 else:
1546 self.setFile = set - 1
1558 self.setFile = set - 1
1547 1559
1548 1560 self.blocksPerFile = blocksPerFile
1549 1561 self.profilesPerBlock = profilesPerBlock
1550 1562 self.dataOut = dataOut
1551 1563 self.fileDate = self.dataOut.datatime.date()
1552 1564 self.dtype = self.dataOut.dtype
1553 1565
1554 1566 if datatype is not None:
1555 1567 self.dtype = get_numpy_dtype(datatype)
1556 1568
1557 1569 if not(self.setNextFile()):
1558 1570 print("[Writing] There isn't a next file")
1559 1571 return 0
1560 1572
1561 1573 self.setBlockDimension()
1562 1574
1563 1575 return 1
1564 1576
1565 1577 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1566 1578
1567 1579 if not(self.isConfig):
1568 1580
1569 1581 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1570 1582 set=set, ext=ext, datatype=datatype, **kwargs)
1571 1583 self.isConfig = True
1572 1584
1573 1585 self.dataOut = dataOut
1574 1586 self.putData()
1575 1587 return self.dataOut
@@ -1,680 +1,680
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 import numpy
8 8
9 9 from .jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
10 10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 11 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
12 12 from schainpy.model.data.jrodata import Voltage
13 13 import zmq
14 14 import tempfile
15 15 from io import StringIO
16 16 # from _sha import blocksize
17 17
18 18 @MPDecorator
19 19 class VoltageReader(JRODataReader, ProcessingUnit):
20 20 """
21 21 Esta clase permite leer datos de voltage desde archivos en formato rawdata (.r). La lectura
22 22 de los datos siempre se realiza por bloques. Los datos leidos (array de 3 dimensiones:
23 23 perfiles*alturas*canales) son almacenados en la variable "buffer".
24 24
25 25 perfiles * alturas * canales
26 26
27 27 Esta clase contiene instancias (objetos) de las clases BasicHeader, SystemHeader,
28 28 RadarControllerHeader y Voltage. Los tres primeros se usan para almacenar informacion de la
29 29 cabecera de datos (metadata), y el cuarto (Voltage) para obtener y almacenar un perfil de
30 30 datos desde el "buffer" cada vez que se ejecute el metodo "getData".
31 31
32 32 Example:
33 33
34 34 dpath = "/home/myuser/data"
35 35
36 36 startTime = datetime.datetime(2010,1,20,0,0,0,0,0,0)
37 37
38 38 endTime = datetime.datetime(2010,1,21,23,59,59,0,0,0)
39 39
40 40 readerObj = VoltageReader()
41 41
42 42 readerObj.setup(dpath, startTime, endTime)
43 43
44 44 while(True):
45 45
46 46 #to get one profile
47 47 profile = readerObj.getData()
48 48
49 49 #print the profile
50 50 print profile
51 51
52 52 #If you want to see all datablock
53 53 print readerObj.datablock
54 54
55 55 if readerObj.flagNoMoreFiles:
56 56 break
57 57
58 58 """
59 59
60 60 def __init__(self):
61 61 """
62 62 Inicializador de la clase VoltageReader para la lectura de datos de voltage.
63 63
64 64 Input:
65 65 dataOut : Objeto de la clase Voltage. Este objeto sera utilizado para
66 66 almacenar un perfil de datos cada vez que se haga un requerimiento
67 67 (getData). El perfil sera obtenido a partir del buffer de datos,
68 si el buffer esta vacio se hara un nuevo proceso de lectura de un
68 si el buffer esta vacio se hara un nuevo proceso de lectura de unX
69 69 bloque de datos.
70 70 Si este parametro no es pasado se creara uno internamente.
71 71
72 72 Variables afectadas:
73 73 self.dataOut
74 74
75 75 Return:
76 76 None
77 77 """
78 78
79 79 ProcessingUnit.__init__(self)
80 80
81 81 self.ext = ".r"
82 82 self.optchar = "D"
83 83 self.basicHeaderObj = BasicHeader(LOCALTIME)
84 84 self.systemHeaderObj = SystemHeader()
85 85 self.radarControllerHeaderObj = RadarControllerHeader()
86 86 self.processingHeaderObj = ProcessingHeader()
87 87 self.lastUTTime = 0
88 88 self.profileIndex = 2**32 - 1
89 89 self.dataOut = Voltage()
90 90 self.selBlocksize = None
91 91 self.selBlocktime = None
92 92
93 93 def createObjByDefault(self):
94 94
95 95 dataObj = Voltage()
96 96
97 97 return dataObj
98 98
99 99 def __hasNotDataInBuffer(self):
100 100
101 101 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock * self.nTxs:
102 102 return 1
103 103
104 104 return 0
105 105
106 106 def getBlockDimension(self):
107 107 """
108 108 Obtiene la cantidad de puntos a leer por cada bloque de datos
109 109
110 110 Affected:
111 111 self.blocksize
112 112
113 113 Return:
114 114 None
115 115 """
116 116 pts2read = self.processingHeaderObj.profilesPerBlock * \
117 117 self.processingHeaderObj.nHeights * self.systemHeaderObj.nChannels
118 118 self.blocksize = pts2read
119 119
120 120 def readBlock(self):
121 121 """
122 122 readBlock lee el bloque de datos desde la posicion actual del puntero del archivo
123 123 (self.fp) y actualiza todos los parametros relacionados al bloque de datos
124 124 (metadata + data). La data leida es almacenada en el buffer y el contador del buffer
125 125 es seteado a 0
126 126
127 127 Inputs:
128 128 None
129 129
130 130 Return:
131 131 None
132 132
133 133 Affected:
134 134 self.profileIndex
135 135 self.datablock
136 136 self.flagIsNewFile
137 137 self.flagIsNewBlock
138 138 self.nTotalBlocks
139 139
140 140 Exceptions:
141 141 Si un bloque leido no es un bloque valido
142 142 """
143 143
144 144 # if self.server is not None:
145 145 # self.zBlock = self.receiver.recv()
146 146 # self.zHeader = self.zBlock[:24]
147 147 # self.zDataBlock = self.zBlock[24:]
148 148 # junk = numpy.fromstring(self.zDataBlock, numpy.dtype([('real','<i4'),('imag','<i4')]))
149 149 # self.processingHeaderObj.profilesPerBlock = 240
150 150 # self.processingHeaderObj.nHeights = 248
151 151 # self.systemHeaderObj.nChannels
152 152 # else:
153 153 current_pointer_location = self.fp.tell()
154 154 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
155 155
156 156 try:
157 157 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
158 158 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
159 159 except:
160 160 # print "The read block (%3d) has not enough data" %self.nReadBlocks
161 161
162 162 if self.waitDataBlock(pointer_location=current_pointer_location):
163 163 junk = numpy.fromfile(self.fp, self.dtype, self.blocksize)
164 164 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
165 165 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
166 166 # return 0
167 167
168 168 # Dimensions : nChannels, nProfiles, nSamples
169 169
170 170 junk = numpy.transpose(junk, (2, 0, 1))
171 171 self.datablock = junk['real'] + junk['imag'] * 1j
172 172
173 173 self.profileIndex = 0
174 174
175 175 self.flagIsNewFile = 0
176 176 self.flagIsNewBlock = 1
177 177
178 178 self.nTotalBlocks += 1
179 179 self.nReadBlocks += 1
180 180
181 181 return 1
182 182
183 183 def getFirstHeader(self):
184 184
185 185 self.getBasicHeader()
186 186
187 187 self.dataOut.processingHeaderObj = self.processingHeaderObj.copy()
188 188
189 189 self.dataOut.systemHeaderObj = self.systemHeaderObj.copy()
190 190
191 191 self.dataOut.radarControllerHeaderObj = self.radarControllerHeaderObj.copy()
192 192
193 193 if self.nTxs > 1:
194 194 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
195 195 # Time interval and code are propierties of dataOut. Its value depends of radarControllerHeaderObj.
196 196
197 197 # self.dataOut.timeInterval = self.radarControllerHeaderObj.ippSeconds * self.processingHeaderObj.nCohInt
198 198 #
199 199 # if self.radarControllerHeaderObj.code is not None:
200 200 #
201 201 # self.dataOut.nCode = self.radarControllerHeaderObj.nCode
202 202 #
203 203 # self.dataOut.nBaud = self.radarControllerHeaderObj.nBaud
204 204 #
205 205 # self.dataOut.code = self.radarControllerHeaderObj.code
206 206
207 207 self.dataOut.dtype = self.dtype
208 208
209 209 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock
210 210
211 211 self.dataOut.heightList = numpy.arange(
212 212 self.processingHeaderObj.nHeights) * self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
213 213
214 214 self.dataOut.channelList = list(range(self.systemHeaderObj.nChannels))
215 215
216 216 self.dataOut.nCohInt = self.processingHeaderObj.nCohInt
217 217
218 218 # asumo q la data no esta decodificada
219 219 self.dataOut.flagDecodeData = self.processingHeaderObj.flag_decode
220 220
221 221 # asumo q la data no esta sin flip
222 222 self.dataOut.flagDeflipData = self.processingHeaderObj.flag_deflip
223 223
224 224 self.dataOut.flagShiftFFT = self.processingHeaderObj.shif_fft
225 225
226 226 def reshapeData(self):
227 227
228 228 if self.nTxs < 0:
229 229 return
230 230
231 231 if self.nTxs == 1:
232 232 return
233 233
234 234 if self.nTxs < 1 and self.processingHeaderObj.profilesPerBlock % (1. / self.nTxs) != 0:
235 235 raise ValueError("1./nTxs (=%f), should be a multiple of nProfiles (=%d)" % (
236 236 1. / self.nTxs, self.processingHeaderObj.profilesPerBlock))
237 237
238 238 if self.nTxs > 1 and self.processingHeaderObj.nHeights % self.nTxs != 0:
239 239 raise ValueError("nTxs (=%d), should be a multiple of nHeights (=%d)" % (
240 240 self.nTxs, self.processingHeaderObj.nHeights))
241 241
242 242 self.datablock = self.datablock.reshape(
243 243 (self.systemHeaderObj.nChannels, self.processingHeaderObj.profilesPerBlock * self.nTxs, int(self.processingHeaderObj.nHeights / self.nTxs)))
244 244
245 245 self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock * self.nTxs
246 246 self.dataOut.heightList = numpy.arange(self.processingHeaderObj.nHeights / self.nTxs) * \
247 247 self.processingHeaderObj.deltaHeight + self.processingHeaderObj.firstHeight
248 248 self.dataOut.radarControllerHeaderObj.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
249 249
250 250 return
251 251
252 252 def readFirstHeaderFromServer(self):
253 253
254 254 self.getFirstHeader()
255 255
256 256 self.firstHeaderSize = self.basicHeaderObj.size
257 257
258 258 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
259 259 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
260 260 if datatype == 0:
261 261 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
262 262 elif datatype == 1:
263 263 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
264 264 elif datatype == 2:
265 265 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
266 266 elif datatype == 3:
267 267 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
268 268 elif datatype == 4:
269 269 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
270 270 elif datatype == 5:
271 271 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
272 272 else:
273 273 raise ValueError('Data type was not defined')
274 274
275 275 self.dtype = datatype_str
276 276 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
277 277 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
278 278 self.firstHeaderSize + self.basicHeaderSize * \
279 279 (self.processingHeaderObj.dataBlocksPerFile - 1)
280 280 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
281 281 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
282 282 self.getBlockDimension()
283 283
284 284 def getFromServer(self):
285 285 self.flagDiscontinuousBlock = 0
286 286 self.profileIndex = 0
287 287 self.flagIsNewBlock = 1
288 self.dataOut.flagNoData = False
288 self.dataOut.flagNoata = False
289 289 self.nTotalBlocks += 1
290 290 self.nReadBlocks += 1
291 291 self.blockPointer = 0
292 292
293 293 block = self.receiver.recv()
294 294
295 295 self.basicHeaderObj.read(block[self.blockPointer:])
296 296 self.blockPointer += self.basicHeaderObj.length
297 297 self.systemHeaderObj.read(block[self.blockPointer:])
298 298 self.blockPointer += self.systemHeaderObj.length
299 299 self.radarControllerHeaderObj.read(block[self.blockPointer:])
300 300 self.blockPointer += self.radarControllerHeaderObj.length
301 301 self.processingHeaderObj.read(block[self.blockPointer:])
302 302 self.blockPointer += self.processingHeaderObj.length
303 303 self.readFirstHeaderFromServer()
304 304
305 305 timestamp = self.basicHeaderObj.get_datatime()
306 306 print('[Reading] - Block {} - {}'.format(self.nTotalBlocks, timestamp))
307 307 current_pointer_location = self.blockPointer
308 308 junk = numpy.fromstring(
309 309 block[self.blockPointer:], self.dtype, self.blocksize)
310 310
311 311 try:
312 312 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
313 313 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
314 314 except:
315 315 # print "The read block (%3d) has not enough data" %self.nReadBlocks
316 316 if self.waitDataBlock(pointer_location=current_pointer_location):
317 317 junk = numpy.fromstring(
318 318 block[self.blockPointer:], self.dtype, self.blocksize)
319 319 junk = junk.reshape((self.processingHeaderObj.profilesPerBlock,
320 320 self.processingHeaderObj.nHeights, self.systemHeaderObj.nChannels))
321 321 # return 0
322 322
323 323 # Dimensions : nChannels, nProfiles, nSamples
324 324
325 325 junk = numpy.transpose(junk, (2, 0, 1))
326 326 self.datablock = junk['real'] + junk['imag'] * 1j
327 327 self.profileIndex = 0
328 328 if self.selBlocksize == None:
329 329 self.selBlocksize = self.dataOut.nProfiles
330 330 if self.selBlocktime != None:
331 331 if self.dataOut.nCohInt is not None:
332 332 nCohInt = self.dataOut.nCohInt
333 333 else:
334 334 nCohInt = 1
335 335 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
336 336 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
337 337 self.dataOut.data = self.datablock[:,
338 338 self.profileIndex:self.profileIndex + self.selBlocksize, :]
339 339 datasize = self.dataOut.data.shape[1]
340 340 if datasize < self.selBlocksize:
341 341 buffer = numpy.zeros(
342 342 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
343 343 buffer[:, :datasize, :] = self.dataOut.data
344 344 self.dataOut.data = buffer
345 345 self.profileIndex = blockIndex
346 346
347 347 self.dataOut.flagDataAsBlock = True
348 348 self.flagIsNewBlock = 1
349 349 self.dataOut.realtime = self.online
350 350
351 351 return self.dataOut.data
352 352
353 353 def getData(self):
354 354 """
355 355 getData obtiene una unidad de datos del buffer de lectura, un perfil, y la copia al objeto self.dataOut
356 356 del tipo "Voltage" con todos los parametros asociados a este (metadata). cuando no hay datos
357 357 en el buffer de lectura es necesario hacer una nueva lectura de los bloques de datos usando
358 358 "readNextBlock"
359 359
360 360 Ademas incrementa el contador del buffer "self.profileIndex" en 1.
361 361
362 362 Return:
363 363
364 364 Si el flag self.getByBlock ha sido seteado el bloque completo es copiado a self.dataOut y el self.profileIndex
365 365 es igual al total de perfiles leidos desde el archivo.
366 366
367 367 Si self.getByBlock == False:
368 368
369 369 self.dataOut.data = buffer[:, thisProfile, :]
370 370
371 371 shape = [nChannels, nHeis]
372 372
373 373 Si self.getByBlock == True:
374 374
375 375 self.dataOut.data = buffer[:, :, :]
376 376
377 377 shape = [nChannels, nProfiles, nHeis]
378 378
379 379 Variables afectadas:
380 380 self.dataOut
381 381 self.profileIndex
382 382
383 383 Affected:
384 384 self.dataOut
385 385 self.profileIndex
386 386 self.flagDiscontinuousBlock
387 387 self.flagIsNewBlock
388 388 """
389 389 if self.flagNoMoreFiles:
390 390 self.dataOut.flagNoData = True
391 391 return 0
392 392 self.flagDiscontinuousBlock = 0
393 393 self.flagIsNewBlock = 0
394 394 if self.__hasNotDataInBuffer():
395 395 if not(self.readNextBlock()):
396 396 return 0
397 397
398 398 self.getFirstHeader()
399 399
400 400 self.reshapeData()
401 401 if self.datablock is None:
402 402 self.dataOut.flagNoData = True
403 403 return 0
404 404
405 405 if not self.getByBlock:
406 406
407 407 """
408 408 Return profile by profile
409 409
410 410 If nTxs > 1 then one profile is divided by nTxs and number of total
411 411 blocks is increased by nTxs (nProfiles *= nTxs)
412 412 """
413 413 self.dataOut.flagDataAsBlock = False
414 414 self.dataOut.data = self.datablock[:, self.profileIndex, :]
415 415 self.dataOut.profileIndex = self.profileIndex
416 416
417 417 self.profileIndex += 1
418 418
419 419 else:
420 420 """
421 421 Return a block
422 422 """
423 423 if self.selBlocksize == None:
424 424 self.selBlocksize = self.dataOut.nProfiles
425 425 if self.selBlocktime != None:
426 426 if self.dataOut.nCohInt is not None:
427 427 nCohInt = self.dataOut.nCohInt
428 428 else:
429 429 nCohInt = 1
430 430 self.selBlocksize = int(self.dataOut.nProfiles * round(self.selBlocktime / (
431 431 nCohInt * self.dataOut.ippSeconds * self.dataOut.nProfiles)))
432 432
433 433 self.dataOut.data = self.datablock[:,
434 434 self.profileIndex:self.profileIndex + self.selBlocksize, :]
435 435 self.profileIndex += self.selBlocksize
436 436 datasize = self.dataOut.data.shape[1]
437 437
438 438 if datasize < self.selBlocksize:
439 439 buffer = numpy.zeros(
440 440 (self.dataOut.data.shape[0], self.selBlocksize, self.dataOut.data.shape[2]), dtype='complex')
441 441 buffer[:, :datasize, :] = self.dataOut.data
442 442
443 443 while datasize < self.selBlocksize: # Not enough profiles to fill the block
444 444 if not(self.readNextBlock()):
445 445 return 0
446 446 self.getFirstHeader()
447 447 self.reshapeData()
448 448 if self.datablock is None:
449 449 self.dataOut.flagNoData = True
450 450 return 0
451 451 # stack data
452 452 blockIndex = self.selBlocksize - datasize
453 453 datablock1 = self.datablock[:, :blockIndex, :]
454 454
455 455 buffer[:, datasize:datasize +
456 456 datablock1.shape[1], :] = datablock1
457 457 datasize += datablock1.shape[1]
458 458
459 459 self.dataOut.data = buffer
460 460 self.profileIndex = blockIndex
461 461
462 462 self.dataOut.flagDataAsBlock = True
463 463 self.dataOut.nProfiles = self.dataOut.data.shape[1]
464 464
465 465 self.dataOut.flagNoData = False
466 466
467 467 self.getBasicHeader()
468 468
469 469 self.dataOut.realtime = self.online
470 470
471 471 return self.dataOut.data
472 472
473 473
474 474 @MPDecorator
475 475 class VoltageWriter(JRODataWriter, Operation):
476 476 """
477 477 Esta clase permite escribir datos de voltajes a archivos procesados (.r). La escritura
478 478 de los datos siempre se realiza por bloques.
479 479 """
480 480
481 481 ext = ".r"
482 482
483 483 optchar = "D"
484 484
485 485 shapeBuffer = None
486 486
487 487 def __init__(self):#, **kwargs):
488 488 """
489 489 Inicializador de la clase VoltageWriter para la escritura de datos de espectros.
490 490
491 491 Affected:
492 492 self.dataOut
493 493
494 494 Return: None
495 495 """
496 496 Operation.__init__(self)#, **kwargs)
497 497
498 498 self.nTotalBlocks = 0
499 499
500 500 self.profileIndex = 0
501 501
502 502 self.isConfig = False
503 503
504 504 self.fp = None
505 505
506 506 self.flagIsNewFile = 1
507 507
508 508 self.blockIndex = 0
509 509
510 510 self.flagIsNewBlock = 0
511 511
512 512 self.setFile = None
513 513
514 514 self.dtype = None
515 515
516 516 self.path = None
517 517
518 518 self.filename = None
519 519
520 520 self.basicHeaderObj = BasicHeader(LOCALTIME)
521 521
522 522 self.systemHeaderObj = SystemHeader()
523 523
524 524 self.radarControllerHeaderObj = RadarControllerHeader()
525 525
526 526 self.processingHeaderObj = ProcessingHeader()
527 527
528 528 def hasAllDataInBuffer(self):
529 529 if self.profileIndex >= self.processingHeaderObj.profilesPerBlock:
530 530 return 1
531 531 return 0
532 532
533 533 def setBlockDimension(self):
534 534 """
535 535 Obtiene las formas dimensionales del los subbloques de datos que componen un bloque
536 536
537 537 Affected:
538 538 self.shape_spc_Buffer
539 539 self.shape_cspc_Buffer
540 540 self.shape_dc_Buffer
541 541
542 542 Return: None
543 543 """
544 544 self.shapeBuffer = (self.processingHeaderObj.profilesPerBlock,
545 545 self.processingHeaderObj.nHeights,
546 546 self.systemHeaderObj.nChannels)
547 547
548 548 self.datablock = numpy.zeros((self.systemHeaderObj.nChannels,
549 549 self.processingHeaderObj.profilesPerBlock,
550 550 self.processingHeaderObj.nHeights),
551 551 dtype=numpy.dtype('complex64'))
552 552
553 553 def writeBlock(self):
554 554 """
555 555 Escribe el buffer en el file designado
556 556
557 557 Affected:
558 558 self.profileIndex
559 559 self.flagIsNewFile
560 560 self.flagIsNewBlock
561 561 self.nTotalBlocks
562 562 self.blockIndex
563 563
564 564 Return: None
565 565 """
566 566 data = numpy.zeros(self.shapeBuffer, self.dtype)
567 567
568 568 junk = numpy.transpose(self.datablock, (1, 2, 0))
569 569
570 570 data['real'] = junk.real
571 571 data['imag'] = junk.imag
572 572
573 573 data = data.reshape((-1))
574 574
575 575 data.tofile(self.fp)
576 576
577 577 self.datablock.fill(0)
578 578
579 579 self.profileIndex = 0
580 580 self.flagIsNewFile = 0
581 581 self.flagIsNewBlock = 1
582 582
583 583 self.blockIndex += 1
584 584 self.nTotalBlocks += 1
585 585
586 586 # print "[Writing] Block = %04d" %self.blockIndex
587 587
588 588 def putData(self):
589 589 """
590 590 Setea un bloque de datos y luego los escribe en un file
591 591
592 592 Affected:
593 593 self.flagIsNewBlock
594 594 self.profileIndex
595 595
596 596 Return:
597 597 0 : Si no hay data o no hay mas files que puedan escribirse
598 598 1 : Si se escribio la data de un bloque en un file
599 599 """
600 600 if self.dataOut.flagNoData:
601 601 return 0
602 602
603 603 self.flagIsNewBlock = 0
604 604
605 605 if self.dataOut.flagDiscontinuousBlock:
606 606 self.datablock.fill(0)
607 607 self.profileIndex = 0
608 608 self.setNextFile()
609 609
610 610 if self.profileIndex == 0:
611 611 self.setBasicHeader()
612 612
613 613 self.datablock[:, self.profileIndex, :] = self.dataOut.data
614 614
615 615 self.profileIndex += 1
616 616
617 617 if self.hasAllDataInBuffer():
618 618 # if self.flagIsNewFile:
619 619 self.writeNextBlock()
620 620 # self.setFirstHeader()
621 621
622 622 return 1
623 623
624 624 def __getBlockSize(self):
625 625 '''
626 626 Este metodos determina el cantidad de bytes para un bloque de datos de tipo Voltage
627 627 '''
628 628
629 629 dtype_width = self.getDtypeWidth()
630 630
631 631 blocksize = int(self.dataOut.nHeights * self.dataOut.nChannels *
632 632 self.profilesPerBlock * dtype_width * 2)
633 633
634 634 return blocksize
635 635
636 636 def setFirstHeader(self):
637 637 """
638 638 Obtiene una copia del First Header
639 639
640 640 Affected:
641 641 self.systemHeaderObj
642 642 self.radarControllerHeaderObj
643 643 self.dtype
644 644
645 645 Return:
646 646 None
647 647 """
648 648
649 649 self.systemHeaderObj = self.dataOut.systemHeaderObj.copy()
650 650 self.systemHeaderObj.nChannels = self.dataOut.nChannels
651 651 self.radarControllerHeaderObj = self.dataOut.radarControllerHeaderObj.copy()
652 652
653 653 self.processingHeaderObj.dtype = 0 # Voltage
654 654 self.processingHeaderObj.blockSize = self.__getBlockSize()
655 655 self.processingHeaderObj.profilesPerBlock = self.profilesPerBlock
656 656 self.processingHeaderObj.dataBlocksPerFile = self.blocksPerFile
657 657 # podria ser 1 o self.dataOut.processingHeaderObj.nWindows
658 658 self.processingHeaderObj.nWindows = 1
659 659 self.processingHeaderObj.nCohInt = self.dataOut.nCohInt
660 660 # Cuando la data de origen es de tipo Voltage
661 661 self.processingHeaderObj.nIncohInt = 1
662 662 # Cuando la data de origen es de tipo Voltage
663 663 self.processingHeaderObj.totalSpectra = 0
664 664
665 665 if self.dataOut.code is not None:
666 666 self.processingHeaderObj.code = self.dataOut.code
667 667 self.processingHeaderObj.nCode = self.dataOut.nCode
668 668 self.processingHeaderObj.nBaud = self.dataOut.nBaud
669 669
670 670 if self.processingHeaderObj.nWindows != 0:
671 671 self.processingHeaderObj.firstHeight = self.dataOut.heightList[0]
672 672 self.processingHeaderObj.deltaHeight = self.dataOut.heightList[1] - \
673 673 self.dataOut.heightList[0]
674 674 self.processingHeaderObj.nHeights = self.dataOut.nHeights
675 675 self.processingHeaderObj.samplesWin = self.dataOut.nHeights
676 676
677 677 self.processingHeaderObj.processFlags = self.getProcessFlags()
678 678
679 679 self.setBasicHeader()
680 No newline at end of file
680
@@ -1,429 +1,428
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import sys
17 17 import inspect
18 18 import zmq
19 19 import time
20 20 import pickle
21 21 import traceback
22 22 try:
23 23 from queue import Queue
24 24 except:
25 25 from Queue import Queue
26 26 from threading import Thread
27 27 from multiprocessing import Process
28 28
29 29 from schainpy.utils import log
30 30
31 31
32 32 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43 43
44 44
45 45 """
46 46 proc_type = 'processing'
47 47 __attrs__ = []
48 48
49 49 def __init__(self):
50 50
51 51 self.dataIn = None
52 52 self.dataOut = None
53 53 self.isConfig = False
54 54 self.operations = []
55 55 self.plots = []
56 56
57 57 def getAllowedArgs(self):
58 58 if hasattr(self, '__attrs__'):
59 59 return self.__attrs__
60 60 else:
61 61 return inspect.getargspec(self.run).args
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
69 69 identificador asociado a este objeto.
70 70
71 71 Input:
72 72
73 73 object : objeto de la clase "Operation"
74 74
75 75 Return:
76 76
77 77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
78 78 """
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82 82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
86 86 def getOperationObj(self, objId):
87 87
88 88 if objId not in list(self.operations.keys()):
89 89 return None
90 90
91 91 return self.operations[objId]
92 92
93 93 def operation(self, **kwargs):
94 94 """
95 95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
96 96 atributos del objeto dataOut
97 97
98 98 Input:
99 99
100 100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
101 101 """
102 102
103 103 raise NotImplementedError
104 104
105 105 def setup(self):
106 106
107 107 raise NotImplementedError
108 108
109 109 def run(self):
110 110
111 111 raise NotImplementedError
112 112
113 113 def close(self):
114 114
115 115 return
116 116
117 117
118 118 class Operation(object):
119 119
120 120 """
121 121 Update - Jan 2018 - MULTIPROCESSING
122 122
123 123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
124 124 The constructor doe snot receive any argument, neither the baseclass.
125 125
126 126
127 127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
128 128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
129 129 acumulacion dentro de esta clase
130 130
131 131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
132 132
133 133 """
134 134 proc_type = 'operation'
135 135 __attrs__ = []
136 136
137 137 def __init__(self):
138 138
139 139 self.id = None
140 140 self.isConfig = False
141 141
142 142 if not hasattr(self, 'name'):
143 143 self.name = self.__class__.__name__
144 144
145 145 def getAllowedArgs(self):
146 146 if hasattr(self, '__attrs__'):
147 147 return self.__attrs__
148 148 else:
149 149 return inspect.getargspec(self.run).args
150 150
151 151 def setup(self):
152 152
153 153 self.isConfig = True
154 154
155 155 raise NotImplementedError
156 156
157 157 def run(self, dataIn, **kwargs):
158 158 """
159 159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
160 160 atributos del objeto dataIn.
161 161
162 162 Input:
163 163
164 164 dataIn : objeto del tipo JROData
165 165
166 166 Return:
167 167
168 168 None
169 169
170 170 Affected:
171 171 __buffer : buffer de recepcion de datos.
172 172
173 173 """
174 174 if not self.isConfig:
175 175 self.setup(**kwargs)
176 176
177 177 raise NotImplementedError
178 178
179 179 def close(self):
180 180
181 181 return
182 182
183 183 class InputQueue(Thread):
184 184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
188 188
189 189 def __init__(self, project_id, inputId, lock=None):
190 190
191 191 Thread.__init__(self)
192 192 self.queue = Queue()
193 193 self.project_id = project_id
194 194 self.inputId = inputId
195 195 self.lock = lock
196 196 self.islocked = False
197 197 self.size = 0
198 198
199 199 def run(self):
200 200
201 201 c = zmq.Context()
202 202 self.receiver = c.socket(zmq.SUB)
203 203 self.receiver.connect(
204 204 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
205 205 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
206 206
207 207 while True:
208 208 obj = self.receiver.recv_multipart()[1]
209 209 self.size += sys.getsizeof(obj)
210 210 self.queue.put(obj)
211 211
212 212 def get(self):
213 213
214 214 if not self.islocked and self.size/1000000 > 512:
215 215 self.lock.n.value += 1
216 216 self.islocked = True
217 217 self.lock.clear()
218 218 elif self.islocked and self.size/1000000 <= 512:
219 219 self.islocked = False
220 220 self.lock.n.value -= 1
221 221 if self.lock.n.value == 0:
222 222 self.lock.set()
223 223
224 224 obj = self.queue.get()
225 225 self.size -= sys.getsizeof(obj)
226 226 return pickle.loads(obj)
227 227
228 228
229 229 def MPDecorator(BaseClass):
230 230 """
231 231 Multiprocessing class decorator
232 232
233 233 This function add multiprocessing features to a BaseClass. Also, it handle
234 234 the communication beetween processes (readers, procUnits and operations).
235 235 """
236 236
237 237 class MPClass(BaseClass, Process):
238 238
239 239 def __init__(self, *args, **kwargs):
240 240 super(MPClass, self).__init__()
241 241 Process.__init__(self)
242 242 self.operationKwargs = {}
243 243 self.args = args
244 244 self.kwargs = kwargs
245 245 self.sender = None
246 246 self.receiver = None
247 247 self.i = 0
248 248 self.t = time.time()
249 249 self.name = BaseClass.__name__
250 250 self.__doc__ = BaseClass.__doc__
251 251
252 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254 254
255 255 self.start_time = time.time()
256 256 self.id = args[0]
257 257 self.inputId = args[1]
258 258 self.project_id = args[2]
259 259 self.err_queue = args[3]
260 260 self.lock = args[4]
261 261 self.typeProc = args[5]
262 262 self.err_queue.put('#_start_#')
263 263 if self.inputId is not None:
264 264 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
265 265
266 266 def subscribe(self):
267 267 '''
268 268 Start the zmq socket receiver and subcribe to input ID.
269 269 '''
270 270
271 271 self.queue.start()
272 272
273 273 def listen(self):
274 274 '''
275 275 This function waits for objects
276 276 '''
277 277
278 278 return self.queue.get()
279 279
280 280 def set_publisher(self):
281 281 '''
282 282 This function create a zmq socket for publishing objects.
283 283 '''
284 284
285 285 time.sleep(0.5)
286 286
287 287 c = zmq.Context()
288 288 self.sender = c.socket(zmq.PUB)
289 289 self.sender.connect(
290 290 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
291 291
292 292 def publish(self, data, id):
293 293 '''
294 294 This function publish an object, to an specific topic.
295 295 It blocks publishing when receiver queue is full to avoid data loss
296 296 '''
297 297
298 298 if self.inputId is None:
299 299 self.lock.wait()
300 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301 301
302 302 def runReader(self):
303 303 '''
304 304 Run fuction for read units
305 305 '''
306 306 while True:
307
308 307 try:
309 308 BaseClass.run(self, **self.kwargs)
310 309 except:
311 310 err = traceback.format_exc()
312 311 if 'No more files' in err:
313 312 log.warning('No more files to read', self.name)
314 313 else:
315 314 self.err_queue.put('{}|{}'.format(self.name, err))
316 315 self.dataOut.error = True
317 316
318 317 for op, optype, opId, kwargs in self.operations:
319 318 if optype == 'self' and not self.dataOut.flagNoData:
320 319 op(**kwargs)
321 320 elif optype == 'other' and not self.dataOut.flagNoData:
322 321 self.dataOut = op.run(self.dataOut, **self.kwargs)
323 322 elif optype == 'external':
324 323 self.publish(self.dataOut, opId)
325 324
326 325 if self.dataOut.flagNoData and not self.dataOut.error:
327 326 continue
328 327
329 328 self.publish(self.dataOut, self.id)
330 329
331 330 if self.dataOut.error:
332 331 break
333 332
334 333 time.sleep(0.5)
335 334
336 335 def runProc(self):
337 336 '''
338 337 Run function for proccessing units
339 338 '''
340 339
341 340 while True:
342 341 self.dataIn = self.listen()
343 342
344 343 if self.dataIn.flagNoData and self.dataIn.error is None:
345 344 continue
346 345 elif not self.dataIn.error:
347 346 try:
348 347 BaseClass.run(self, **self.kwargs)
349 348 except:
350 349 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
351 350 self.dataOut.error = True
352 351 elif self.dataIn.error:
353 352 self.dataOut.error = self.dataIn.error
354 353 self.dataOut.flagNoData = True
355 354
356 355 for op, optype, opId, kwargs in self.operations:
357 356 if optype == 'self' and not self.dataOut.flagNoData:
358 357 op(**kwargs)
359 358 elif optype == 'other' and not self.dataOut.flagNoData:
360 359 self.dataOut = op.run(self.dataOut, **kwargs)
361 360 elif optype == 'external' and not self.dataOut.flagNoData:
362 361 self.publish(self.dataOut, opId)
363 362
364 363 self.publish(self.dataOut, self.id)
365 364 for op, optype, opId, kwargs in self.operations:
366 365 if optype == 'external' and self.dataOut.error:
367 366 self.publish(self.dataOut, opId)
368 367
369 368 if self.dataOut.error:
370 369 break
371 370
372 371 time.sleep(0.5)
373 372
374 373 def runOp(self):
375 374 '''
376 375 Run function for external operations (this operations just receive data
377 376 ex: plots, writers, publishers)
378 377 '''
379 378
380 379 while True:
381 380
382 381 dataOut = self.listen()
383 382
384 383 if not dataOut.error:
385 384 try:
386 385 BaseClass.run(self, dataOut, **self.kwargs)
387 386 except:
388 387 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 388 dataOut.error = True
390 389 else:
391 390 break
392 391
393 392 def run(self):
394 393 if self.typeProc is "ProcUnit":
395 394
396 395 if self.inputId is not None:
397 396 self.subscribe()
398 397
399 398 self.set_publisher()
400 399
401 400 if 'Reader' not in BaseClass.__name__:
402 401 self.runProc()
403 402 else:
404 403 self.runReader()
405 404
406 405 elif self.typeProc is "Operation":
407 406
408 407 self.subscribe()
409 408 self.runOp()
410 409
411 410 else:
412 411 raise ValueError("Unknown type")
413 412
414 413 self.close()
415 414
416 415 def close(self):
417 416
418 417 BaseClass.close(self)
419 418 self.err_queue.put('#_end_#')
420 419
421 420 if self.sender:
422 421 self.sender.close()
423 422
424 423 if self.receiver:
425 424 self.receiver.close()
426 425
427 426 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
428 427
429 428 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now