##// END OF EJS Templates
Update ParamReader to support diferent HDF5 files, fix Read/Write Madrigal files
jespinoza -
r1254:6b25d3b79646
parent child
Show More
@@ -1,674 +1,629
1 '''
1 '''
2 Created on Set 9, 2015
2 Created on Set 9, 2015
3
3
4 @author: roj-idl71 Karim Kuyeng
4 @author: roj-idl71 Karim Kuyeng
5 '''
5 '''
6
6
7 import os
7 import os
8 import sys
8 import sys
9 import glob
9 import glob
10 import fnmatch
10 import fnmatch
11 import datetime
11 import datetime
12 import time
12 import time
13 import re
13 import re
14 import h5py
14 import h5py
15 import numpy
15 import numpy
16
16
17 try:
17 try:
18 from gevent import sleep
18 from gevent import sleep
19 except:
19 except:
20 from time import sleep
20 from time import sleep
21
21
22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
23 from schainpy.model.data.jrodata import Voltage
23 from schainpy.model.data.jrodata import Voltage
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
25 from numpy import imag
25 from numpy import imag
26
26
27 class AMISRReader(ProcessingUnit):
27 class AMISRReader(ProcessingUnit):
28 '''
28 '''
29 classdocs
29 classdocs
30 '''
30 '''
31
31
32 def __init__(self):
32 def __init__(self):
33 '''
33 '''
34 Constructor
34 Constructor
35 '''
35 '''
36
36
37 ProcessingUnit.__init__(self)
37 ProcessingUnit.__init__(self)
38
38
39 self.set = None
39 self.set = None
40 self.subset = None
40 self.subset = None
41 self.extension_file = '.h5'
41 self.extension_file = '.h5'
42 self.dtc_str = 'dtc'
42 self.dtc_str = 'dtc'
43 self.dtc_id = 0
43 self.dtc_id = 0
44 self.status = True
44 self.status = True
45 self.isConfig = False
45 self.isConfig = False
46 self.dirnameList = []
46 self.dirnameList = []
47 self.filenameList = []
47 self.filenameList = []
48 self.fileIndex = None
48 self.fileIndex = None
49 self.flagNoMoreFiles = False
49 self.flagNoMoreFiles = False
50 self.flagIsNewFile = 0
50 self.flagIsNewFile = 0
51 self.filename = ''
51 self.filename = ''
52 self.amisrFilePointer = None
52 self.amisrFilePointer = None
53
53
54
54
55 self.dataset = None
55 self.dataset = None
56
56
57
57
58
58
59
59
60 self.profileIndex = 0
60 self.profileIndex = 0
61
61
62
62
63 self.beamCodeByFrame = None
63 self.beamCodeByFrame = None
64 self.radacTimeByFrame = None
64 self.radacTimeByFrame = None
65
65
66 self.dataset = None
66 self.dataset = None
67
67
68
68
69
69
70
70
71 self.__firstFile = True
71 self.__firstFile = True
72
72
73 self.buffer = None
73 self.buffer = None
74
74
75
75
76 self.timezone = 'ut'
76 self.timezone = 'ut'
77
77
78 self.__waitForNewFile = 20
78 self.__waitForNewFile = 20
79 self.__filename_online = None
79 self.__filename_online = None
80 #Is really necessary create the output object in the initializer
80 #Is really necessary create the output object in the initializer
81 self.dataOut = Voltage()
81 self.dataOut = Voltage()
82
82
83 def setup(self,path=None,
83 def setup(self,path=None,
84 startDate=None,
84 startDate=None,
85 endDate=None,
85 endDate=None,
86 startTime=None,
86 startTime=None,
87 endTime=None,
87 endTime=None,
88 walk=True,
88 walk=True,
89 timezone='ut',
89 timezone='ut',
90 all=0,
90 all=0,
91 code = None,
91 code = None,
92 nCode = 0,
92 nCode = 0,
93 nBaud = 0,
93 nBaud = 0,
94 online=False):
94 online=False):
95
95
96 self.timezone = timezone
96 self.timezone = timezone
97 self.all = all
97 self.all = all
98 self.online = online
98 self.online = online
99
99
100 self.code = code
100 self.code = code
101 self.nCode = int(nCode)
101 self.nCode = int(nCode)
102 self.nBaud = int(nBaud)
102 self.nBaud = int(nBaud)
103
103
104
104
105
105
106 #self.findFiles()
106 #self.findFiles()
107 if not(online):
107 if not(online):
108 #Busqueda de archivos offline
108 #Busqueda de archivos offline
109 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
109 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
110 else:
110 else:
111 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
111 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
112
112
113 if not(self.filenameList):
113 if not(self.filenameList):
114 print("There is no files into the folder: %s"%(path))
114 print("There is no files into the folder: %s"%(path))
115
115
116 sys.exit(-1)
116 sys.exit(-1)
117
117
118 self.fileIndex = -1
118 self.fileIndex = -1
119
119
120 self.readNextFile(online)
120 self.readNextFile(online)
121
121
122 '''
122 '''
123 Add code
123 Add code
124 '''
124 '''
125 self.isConfig = True
125 self.isConfig = True
126
126
127 pass
127 pass
128
128
129
129
130 def readAMISRHeader(self,fp):
130 def readAMISRHeader(self,fp):
131 header = 'Raw11/Data/RadacHeader'
131 header = 'Raw11/Data/RadacHeader'
132 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
132 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
133 self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO
133 self.beamCode = fp.get('Raw11/Data/Beamcodes') # NUMBER OF CHANNELS AND IDENTIFY POSITION TO CREATE A FILE WITH THAT INFO
134 #self.code = fp.get(header+'/Code') # NOT USE FOR THIS
134 #self.code = fp.get(header+'/Code') # NOT USE FOR THIS
135 self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS
135 self.frameCount = fp.get(header+'/FrameCount')# NOT USE FOR THIS
136 self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS
136 self.modeGroup = fp.get(header+'/ModeGroup')# NOT USE FOR THIS
137 self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT
137 self.nsamplesPulse = fp.get(header+'/NSamplesPulse')# TO GET NSA OR USING DATA FOR THAT
138 self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS
138 self.pulseCount = fp.get(header+'/PulseCount')# NOT USE FOR THIS
139 self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile
139 self.radacTime = fp.get(header+'/RadacTime')# 1st TIME ON FILE ANDE CALCULATE THE REST WITH IPP*nindexprofile
140 self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS
140 self.timeCount = fp.get(header+'/TimeCount')# NOT USE FOR THIS
141 self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS
141 self.timeStatus = fp.get(header+'/TimeStatus')# NOT USE FOR THIS
142 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
142 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
143 self.frequency = fp.get('Rx/Frequency')
143 self.frequency = fp.get('Rx/Frequency')
144 txAus = fp.get('Raw11/Data/Pulsewidth')
144 txAus = fp.get('Raw11/Data/Pulsewidth')
145
145
146
146
147 self.nblocks = self.pulseCount.shape[0] #nblocks
147 self.nblocks = self.pulseCount.shape[0] #nblocks
148
148
149 self.nprofiles = self.pulseCount.shape[1] #nprofile
149 self.nprofiles = self.pulseCount.shape[1] #nprofile
150 self.nsa = self.nsamplesPulse[0,0] #ngates
150 self.nsa = self.nsamplesPulse[0,0] #ngates
151 self.nchannels = self.beamCode.shape[1]
151 self.nchannels = self.beamCode.shape[1]
152 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
152 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
153 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
153 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
154 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
154 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
155
155
156 #filling radar controller header parameters
156 #filling radar controller header parameters
157 self.__ippKm = self.ippSeconds *.15*1e6 # in km
157 self.__ippKm = self.ippSeconds *.15*1e6 # in km
158 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
158 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
159 self.__txB = 0
159 self.__txB = 0
160 nWindows=1
160 nWindows=1
161 self.__nSamples = self.nsa
161 self.__nSamples = self.nsa
162 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
162 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
163 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
163 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
164
164
165 #for now until understand why the code saved is different (code included even though code not in tuf file)
165 #for now until understand why the code saved is different (code included even though code not in tuf file)
166 #self.__codeType = 0
166 #self.__codeType = 0
167 # self.__nCode = None
167 # self.__nCode = None
168 # self.__nBaud = None
168 # self.__nBaud = None
169 self.__code = self.code
169 self.__code = self.code
170 self.__codeType = 0
170 self.__codeType = 0
171 if self.code != None:
171 if self.code != None:
172 self.__codeType = 1
172 self.__codeType = 1
173 self.__nCode = self.nCode
173 self.__nCode = self.nCode
174 self.__nBaud = self.nBaud
174 self.__nBaud = self.nBaud
175 #self.__code = 0
175 #self.__code = 0
176
176
177 #filling system header parameters
177 #filling system header parameters
178 self.__nSamples = self.nsa
178 self.__nSamples = self.nsa
179 self.newProfiles = self.nprofiles/self.nchannels
179 self.newProfiles = self.nprofiles/self.nchannels
180 self.__channelList = list(range(self.nchannels))
180 self.__channelList = list(range(self.nchannels))
181
181
182 self.__frequency = self.frequency[0][0]
182 self.__frequency = self.frequency[0][0]
183
183
184
184
185
185
186 def createBuffers(self):
186 def createBuffers(self):
187
187
188 pass
188 pass
189
189
190 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
190 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
191 self.path = path
191 self.path = path
192 self.startDate = startDate
192 self.startDate = startDate
193 self.endDate = endDate
193 self.endDate = endDate
194 self.startTime = startTime
194 self.startTime = startTime
195 self.endTime = endTime
195 self.endTime = endTime
196 self.walk = walk
196 self.walk = walk
197
197
198 def __checkPath(self):
198 def __checkPath(self):
199 if os.path.exists(self.path):
199 if os.path.exists(self.path):
200 self.status = 1
200 self.status = 1
201 else:
201 else:
202 self.status = 0
202 self.status = 0
203 print('Path:%s does not exists'%self.path)
203 print('Path:%s does not exists'%self.path)
204
204
205 return
205 return
206
206
207
207
208 def __selDates(self, amisr_dirname_format):
208 def __selDates(self, amisr_dirname_format):
209 try:
209 try:
210 year = int(amisr_dirname_format[0:4])
210 year = int(amisr_dirname_format[0:4])
211 month = int(amisr_dirname_format[4:6])
211 month = int(amisr_dirname_format[4:6])
212 dom = int(amisr_dirname_format[6:8])
212 dom = int(amisr_dirname_format[6:8])
213 thisDate = datetime.date(year,month,dom)
213 thisDate = datetime.date(year,month,dom)
214
214
215 if (thisDate>=self.startDate and thisDate <= self.endDate):
215 if (thisDate>=self.startDate and thisDate <= self.endDate):
216 return amisr_dirname_format
216 return amisr_dirname_format
217 except:
217 except:
218 return None
218 return None
219
219
220
220
221 def __findDataForDates(self,online=False):
221 def __findDataForDates(self,online=False):
222
222
223 if not(self.status):
223 if not(self.status):
224 return None
224 return None
225
225
226 pat = '\d+.\d+'
226 pat = '\d+.\d+'
227 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
227 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
228 dirnameList = [x for x in dirnameList if x!=None]
228 dirnameList = [x for x in dirnameList if x!=None]
229 dirnameList = [x.string for x in dirnameList]
229 dirnameList = [x.string for x in dirnameList]
230 if not(online):
230 if not(online):
231 dirnameList = [self.__selDates(x) for x in dirnameList]
231 dirnameList = [self.__selDates(x) for x in dirnameList]
232 dirnameList = [x for x in dirnameList if x!=None]
232 dirnameList = [x for x in dirnameList if x!=None]
233 if len(dirnameList)>0:
233 if len(dirnameList)>0:
234 self.status = 1
234 self.status = 1
235 self.dirnameList = dirnameList
235 self.dirnameList = dirnameList
236 self.dirnameList.sort()
236 self.dirnameList.sort()
237 else:
237 else:
238 self.status = 0
238 self.status = 0
239 return None
239 return None
240
240
241 def __getTimeFromData(self):
241 def __getTimeFromData(self):
242 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
242 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
243 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
243 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
244
244
245 print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader))
245 print('Filtering Files from %s to %s'%(startDateTime_Reader, endDateTime_Reader))
246 print('........................................')
246 print('........................................')
247 filter_filenameList = []
247 filter_filenameList = []
248 self.filenameList.sort()
248 self.filenameList.sort()
249 #for i in range(len(self.filenameList)-1):
249 #for i in range(len(self.filenameList)-1):
250 for i in range(len(self.filenameList)):
250 for i in range(len(self.filenameList)):
251 filename = self.filenameList[i]
251 filename = self.filenameList[i]
252 fp = h5py.File(filename,'r')
252 fp = h5py.File(filename,'r')
253 time_str = fp.get('Time/RadacTimeString')
253 time_str = fp.get('Time/RadacTimeString')
254
254
255 startDateTimeStr_File = time_str[0][0].split('.')[0]
255 startDateTimeStr_File = time_str[0][0].split('.')[0]
256 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
256 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
257 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
257 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
258
258
259 endDateTimeStr_File = time_str[-1][-1].split('.')[0]
259 endDateTimeStr_File = time_str[-1][-1].split('.')[0]
260 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
260 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
261 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
261 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
262
262
263 fp.close()
263 fp.close()
264
264
265 if self.timezone == 'lt':
265 if self.timezone == 'lt':
266 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
266 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
267 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
267 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
268
268
269 if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader):
269 if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader):
270 #self.filenameList.remove(filename)
270 #self.filenameList.remove(filename)
271 filter_filenameList.append(filename)
271 filter_filenameList.append(filename)
272
272
273 if (endDateTime_File>=endDateTime_Reader):
273 if (endDateTime_File>=endDateTime_Reader):
274 break
274 break
275
275
276
276
277 filter_filenameList.sort()
277 filter_filenameList.sort()
278 self.filenameList = filter_filenameList
278 self.filenameList = filter_filenameList
279 return 1
279 return 1
280
280
281 def __filterByGlob1(self, dirName):
281 def __filterByGlob1(self, dirName):
282 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
282 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
283 filter_files.sort()
283 filter_files.sort()
284 filterDict = {}
284 filterDict = {}
285 filterDict.setdefault(dirName)
285 filterDict.setdefault(dirName)
286 filterDict[dirName] = filter_files
286 filterDict[dirName] = filter_files
287 return filterDict
287 return filterDict
288
288
289 def __getFilenameList(self, fileListInKeys, dirList):
289 def __getFilenameList(self, fileListInKeys, dirList):
290 for value in fileListInKeys:
290 for value in fileListInKeys:
291 dirName = list(value.keys())[0]
291 dirName = list(value.keys())[0]
292 for file in value[dirName]:
292 for file in value[dirName]:
293 filename = os.path.join(dirName, file)
293 filename = os.path.join(dirName, file)
294 self.filenameList.append(filename)
294 self.filenameList.append(filename)
295
295
296
296
297 def __selectDataForTimes(self, online=False):
297 def __selectDataForTimes(self, online=False):
298 #aun no esta implementado el filtro for tiempo
298 #aun no esta implementado el filtro for tiempo
299 if not(self.status):
299 if not(self.status):
300 return None
300 return None
301
301
302 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
302 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
303
303
304 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
304 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
305
305
306 self.__getFilenameList(fileListInKeys, dirList)
306 self.__getFilenameList(fileListInKeys, dirList)
307 if not(online):
307 if not(online):
308 #filtro por tiempo
308 #filtro por tiempo
309 if not(self.all):
309 if not(self.all):
310 self.__getTimeFromData()
310 self.__getTimeFromData()
311
311
312 if len(self.filenameList)>0:
312 if len(self.filenameList)>0:
313 self.status = 1
313 self.status = 1
314 self.filenameList.sort()
314 self.filenameList.sort()
315 else:
315 else:
316 self.status = 0
316 self.status = 0
317 return None
317 return None
318
318
319 else:
319 else:
320 #get the last file - 1
320 #get the last file - 1
321 self.filenameList = [self.filenameList[-2]]
321 self.filenameList = [self.filenameList[-2]]
322
322
323 new_dirnameList = []
323 new_dirnameList = []
324 for dirname in self.dirnameList:
324 for dirname in self.dirnameList:
325 junk = numpy.array([dirname in x for x in self.filenameList])
325 junk = numpy.array([dirname in x for x in self.filenameList])
326 junk_sum = junk.sum()
326 junk_sum = junk.sum()
327 if junk_sum > 0:
327 if junk_sum > 0:
328 new_dirnameList.append(dirname)
328 new_dirnameList.append(dirname)
329 self.dirnameList = new_dirnameList
329 self.dirnameList = new_dirnameList
330 return 1
330 return 1
331
331
332 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
332 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
333 endTime=datetime.time(23,59,59),walk=True):
333 endTime=datetime.time(23,59,59),walk=True):
334
334
335 if endDate ==None:
335 if endDate ==None:
336 startDate = datetime.datetime.utcnow().date()
336 startDate = datetime.datetime.utcnow().date()
337 endDate = datetime.datetime.utcnow().date()
337 endDate = datetime.datetime.utcnow().date()
338
338
339 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
339 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
340
340
341 self.__checkPath()
341 self.__checkPath()
342
342
343 self.__findDataForDates(online=True)
343 self.__findDataForDates(online=True)
344
344
345 self.dirnameList = [self.dirnameList[-1]]
345 self.dirnameList = [self.dirnameList[-1]]
346
346
347 self.__selectDataForTimes(online=True)
347 self.__selectDataForTimes(online=True)
348
348
349 return
349 return
350
350
351
351
352 def searchFilesOffLine(self,
352 def searchFilesOffLine(self,
353 path,
353 path,
354 startDate,
354 startDate,
355 endDate,
355 endDate,
356 startTime=datetime.time(0,0,0),
356 startTime=datetime.time(0,0,0),
357 endTime=datetime.time(23,59,59),
357 endTime=datetime.time(23,59,59),
358 walk=True):
358 walk=True):
359
359
360 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
360 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
361
361
362 self.__checkPath()
362 self.__checkPath()
363
363
364 self.__findDataForDates()
364 self.__findDataForDates()
365
365
366 self.__selectDataForTimes()
366 self.__selectDataForTimes()
367
367
368 for i in range(len(self.filenameList)):
368 for i in range(len(self.filenameList)):
369 print("%s" %(self.filenameList[i]))
369 print("%s" %(self.filenameList[i]))
370
370
371 return
371 return
372
372
373 def __setNextFileOffline(self):
373 def __setNextFileOffline(self):
374 idFile = self.fileIndex
374 idFile = self.fileIndex
375
375
376 while (True):
376 while (True):
377 idFile += 1
377 idFile += 1
378 if not(idFile < len(self.filenameList)):
378 if not(idFile < len(self.filenameList)):
379 self.flagNoMoreFiles = 1
379 self.flagNoMoreFiles = 1
380 print("No more Files")
380 print("No more Files")
381 return 0
381 return 0
382
382
383 filename = self.filenameList[idFile]
383 filename = self.filenameList[idFile]
384
384
385 amisrFilePointer = h5py.File(filename,'r')
385 amisrFilePointer = h5py.File(filename,'r')
386
386
387 break
387 break
388
388
389 self.flagIsNewFile = 1
389 self.flagIsNewFile = 1
390 self.fileIndex = idFile
390 self.fileIndex = idFile
391 self.filename = filename
391 self.filename = filename
392
392
393 self.amisrFilePointer = amisrFilePointer
393 self.amisrFilePointer = amisrFilePointer
394
394
395 print("Setting the file: %s"%self.filename)
395 print("Setting the file: %s"%self.filename)
396
396
397 return 1
397 return 1
398
398
399
399
400 def __setNextFileOnline(self):
400 def __setNextFileOnline(self):
401 filename = self.filenameList[0]
401 filename = self.filenameList[0]
402 if self.__filename_online != None:
402 if self.__filename_online != None:
403 self.__selectDataForTimes(online=True)
403 self.__selectDataForTimes(online=True)
404 filename = self.filenameList[0]
404 filename = self.filenameList[0]
405 wait = 0
405 wait = 0
406 while self.__filename_online == filename:
406 while self.__filename_online == filename:
407 print('waiting %d seconds to get a new file...'%(self.__waitForNewFile))
407 print('waiting %d seconds to get a new file...'%(self.__waitForNewFile))
408 if wait == 5:
408 if wait == 5:
409 return 0
409 return 0
410 sleep(self.__waitForNewFile)
410 sleep(self.__waitForNewFile)
411 self.__selectDataForTimes(online=True)
411 self.__selectDataForTimes(online=True)
412 filename = self.filenameList[0]
412 filename = self.filenameList[0]
413 wait += 1
413 wait += 1
414
414
415 self.__filename_online = filename
415 self.__filename_online = filename
416
416
417 self.amisrFilePointer = h5py.File(filename,'r')
417 self.amisrFilePointer = h5py.File(filename,'r')
418 self.flagIsNewFile = 1
418 self.flagIsNewFile = 1
419 self.filename = filename
419 self.filename = filename
420 print("Setting the file: %s"%self.filename)
420 print("Setting the file: %s"%self.filename)
421 return 1
421 return 1
422
422
423
423
424 def readData(self):
424 def readData(self):
425 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
425 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
426 re = buffer[:,:,:,0]
426 re = buffer[:,:,:,0]
427 im = buffer[:,:,:,1]
427 im = buffer[:,:,:,1]
428 dataset = re + im*1j
428 dataset = re + im*1j
429 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
429 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
430 timeset = self.radacTime[:,0]
430 timeset = self.radacTime[:,0]
431 return dataset,timeset
431 return dataset,timeset
432
432
433 def reshapeData(self):
433 def reshapeData(self):
434 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
434 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
435 channels = self.beamCodeByPulse[0,:]
435 channels = self.beamCodeByPulse[0,:]
436 nchan = self.nchannels
436 nchan = self.nchannels
437 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
437 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
438 nblocks = self.nblocks
438 nblocks = self.nblocks
439 nsamples = self.nsa
439 nsamples = self.nsa
440
440
441 #Dimensions : nChannels, nProfiles, nSamples
441 #Dimensions : nChannels, nProfiles, nSamples
442 new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64")
442 new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64")
443 ############################################
443 ############################################
444
444
445 for thisChannel in range(nchan):
445 for thisChannel in range(nchan):
446 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:]
446 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:]
447
447
448
448
449 new_block = numpy.transpose(new_block, (1,0,2,3))
449 new_block = numpy.transpose(new_block, (1,0,2,3))
450 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
450 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
451
451
452 return new_block
452 return new_block
453
453
454 def updateIndexes(self):
454 def updateIndexes(self):
455
455
456 pass
456 pass
457
457
458 def fillJROHeader(self):
458 def fillJROHeader(self):
459
459
460 #fill radar controller header
460 #fill radar controller header
461 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
461 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
462 txA=self.__txA,
462 txA=self.__txA,
463 txB=0,
463 txB=0,
464 nWindows=1,
464 nWindows=1,
465 nHeights=self.__nSamples,
465 nHeights=self.__nSamples,
466 firstHeight=self.__firstHeight,
466 firstHeight=self.__firstHeight,
467 deltaHeight=self.__deltaHeight,
467 deltaHeight=self.__deltaHeight,
468 codeType=self.__codeType,
468 codeType=self.__codeType,
469 nCode=self.__nCode, nBaud=self.__nBaud,
469 nCode=self.__nCode, nBaud=self.__nBaud,
470 code = self.__code,
470 code = self.__code,
471 fClock=1)
471 fClock=1)
472
472
473
473
474
474
475 #fill system header
475 #fill system header
476 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
476 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
477 nProfiles=self.newProfiles,
477 nProfiles=self.newProfiles,
478 nChannels=len(self.__channelList),
478 nChannels=len(self.__channelList),
479 adcResolution=14,
479 adcResolution=14,
480 pciDioBusWith=32)
480 pciDioBusWith=32)
481
481
482 self.dataOut.type = "Voltage"
482 self.dataOut.type = "Voltage"
483
483
484 self.dataOut.data = None
484 self.dataOut.data = None
485
485
486 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
486 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
487
487
488 # self.dataOut.nChannels = 0
488 # self.dataOut.nChannels = 0
489
489
490 # self.dataOut.nHeights = 0
490 # self.dataOut.nHeights = 0
491
491
492 self.dataOut.nProfiles = self.newProfiles*self.nblocks
492 self.dataOut.nProfiles = self.newProfiles*self.nblocks
493
493
494 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
494 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
495 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
495 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
496 self.dataOut.heightList = ranges/1000.0 #km
496 self.dataOut.heightList = ranges/1000.0 #km
497
497
498
498
499 self.dataOut.channelList = self.__channelList
499 self.dataOut.channelList = self.__channelList
500
500
501 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
501 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
502
502
503 # self.dataOut.channelIndexList = None
503 # self.dataOut.channelIndexList = None
504
504
505 self.dataOut.flagNoData = True
505 self.dataOut.flagNoData = True
506
506
507 #Set to TRUE if the data is discontinuous
507 #Set to TRUE if the data is discontinuous
508 self.dataOut.flagDiscontinuousBlock = False
508 self.dataOut.flagDiscontinuousBlock = False
509
509
510 self.dataOut.utctime = None
510 self.dataOut.utctime = None
511
511
512 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
512 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
513 if self.timezone == 'lt':
513 if self.timezone == 'lt':
514 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
514 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
515 else:
515 else:
516 self.dataOut.timeZone = 0 #by default time is UTC
516 self.dataOut.timeZone = 0 #by default time is UTC
517
517
518 self.dataOut.dstFlag = 0
518 self.dataOut.dstFlag = 0
519
519
520 self.dataOut.errorCount = 0
520 self.dataOut.errorCount = 0
521
521
522 self.dataOut.nCohInt = 1
522 self.dataOut.nCohInt = 1
523
523
524 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
524 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
525
525
526 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
526 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
527
527
528 self.dataOut.flagShiftFFT = False
528 self.dataOut.flagShiftFFT = False
529
529
530 self.dataOut.ippSeconds = self.ippSeconds
530 self.dataOut.ippSeconds = self.ippSeconds
531
531
532 #Time interval between profiles
532 #Time interval between profiles
533 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
533 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
534
534
535 self.dataOut.frequency = self.__frequency
535 self.dataOut.frequency = self.__frequency
536
536
537 self.dataOut.realtime = self.online
537 self.dataOut.realtime = self.online
538 pass
538 pass
539
539
540 def readNextFile(self,online=False):
540 def readNextFile(self,online=False):
541
541
542 if not(online):
542 if not(online):
543 newFile = self.__setNextFileOffline()
543 newFile = self.__setNextFileOffline()
544 else:
544 else:
545 newFile = self.__setNextFileOnline()
545 newFile = self.__setNextFileOnline()
546
546
547 if not(newFile):
547 if not(newFile):
548 return 0
548 return 0
549
549
550 #if self.__firstFile:
550 #if self.__firstFile:
551 self.readAMISRHeader(self.amisrFilePointer)
551 self.readAMISRHeader(self.amisrFilePointer)
552 self.createBuffers()
552 self.createBuffers()
553 self.fillJROHeader()
553 self.fillJROHeader()
554 #self.__firstFile = False
554 #self.__firstFile = False
555
555
556
556
557
557
558 self.dataset,self.timeset = self.readData()
558 self.dataset,self.timeset = self.readData()
559
559
560 if self.endDate!=None:
560 if self.endDate!=None:
561 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
561 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
562 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
562 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
563 startDateTimeStr_File = time_str[0][0].split('.')[0]
563 startDateTimeStr_File = time_str[0][0].split('.')[0]
564 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
564 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
565 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
565 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
566 if self.timezone == 'lt':
566 if self.timezone == 'lt':
567 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
567 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
568 if (startDateTime_File>endDateTime_Reader):
568 if (startDateTime_File>endDateTime_Reader):
569 return 0
569 return 0
570
570
571 self.jrodataset = self.reshapeData()
571 self.jrodataset = self.reshapeData()
572 #----self.updateIndexes()
572 #----self.updateIndexes()
573 self.profileIndex = 0
573 self.profileIndex = 0
574
574
575 return 1
575 return 1
576
576
577
577
578 def __hasNotDataInBuffer(self):
578 def __hasNotDataInBuffer(self):
579 if self.profileIndex >= (self.newProfiles*self.nblocks):
579 if self.profileIndex >= (self.newProfiles*self.nblocks):
580 return 1
580 return 1
581 return 0
581 return 0
582
582
583
583
584 def getData(self):
584 def getData(self):
585
585
586 if self.flagNoMoreFiles:
586 if self.flagNoMoreFiles:
587 self.dataOut.flagNoData = True
587 self.dataOut.flagNoData = True
588 return 0
588 return 0
589
589
590 if self.__hasNotDataInBuffer():
590 if self.__hasNotDataInBuffer():
591 if not (self.readNextFile(self.online)):
591 if not (self.readNextFile(self.online)):
592 return 0
592 return 0
593
593
594
594
595 if self.dataset is None: # setear esta condicion cuando no hayan datos por leers
595 if self.dataset is None: # setear esta condicion cuando no hayan datos por leers
596 self.dataOut.flagNoData = True
596 self.dataOut.flagNoData = True
597 return 0
597 return 0
598
598
599 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
599 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
600
600
601 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
601 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
602
602
603 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
603 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
604 #verificar basic header de jro data y ver si es compatible con este valor
604 #verificar basic header de jro data y ver si es compatible con este valor
605 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
605 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
606 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
606 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
607 indexblock = self.profileIndex/self.newProfiles
607 indexblock = self.profileIndex/self.newProfiles
608 #print indexblock, indexprof
608 #print indexblock, indexprof
609 self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels)
609 self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels)
610 self.dataOut.profileIndex = self.profileIndex
610 self.dataOut.profileIndex = self.profileIndex
611 self.dataOut.flagNoData = False
611 self.dataOut.flagNoData = False
612 # if indexprof == 0:
612 # if indexprof == 0:
613 # print self.dataOut.utctime
613 # print self.dataOut.utctime
614
614
615 self.profileIndex += 1
615 self.profileIndex += 1
616
616
617 return self.dataOut.data
617 return self.dataOut.data
618
618
619
619
620 def run(self, **kwargs):
620 def run(self, **kwargs):
621 '''
621 '''
622 This method will be called many times so here you should put all your code
622 This method will be called many times so here you should put all your code
623 '''
623 '''
624
624
625 if not self.isConfig:
625 if not self.isConfig:
626 self.setup(**kwargs)
626 self.setup(**kwargs)
627 self.isConfig = True
627 self.isConfig = True
628
628
629 self.getData()
629 self.getData()
630
631 class Writer(Operation):
632 '''
633 classdocs
634 '''
635
636 def __init__(self):
637 '''
638 Constructor
639 '''
640 self.dataOut = None
641
642 self.isConfig = False
643
644 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
645 '''
646 In this method we should set all initial parameters.
647
648 Input:
649 dataIn : Input data will also be outputa data
650
651 '''
652 self.dataOut = dataIn
653
654
655
656
657
658 self.isConfig = True
659
660 return
661
662 def run(self, dataIn, **kwargs):
663 '''
664 This method will be called many times so here you should put all your code
665
666 Inputs:
667
668 dataIn : object with the data
669
670 '''
671
672 if not self.isConfig:
673 self.setup(dataIn, **kwargs)
674 No newline at end of file
@@ -1,639 +1,597
1 '''
1 '''
2 Created on Aug 1, 2017
2 Created on Aug 1, 2017
3
3
4 @author: Juan C. Espinoza
4 @author: Juan C. Espinoza
5 '''
5 '''
6
6
7 import os
7 import os
8 import sys
8 import sys
9 import time
9 import time
10 import json
10 import json
11 import glob
11 import glob
12 import datetime
12 import datetime
13
13
14 import numpy
14 import numpy
15 import h5py
15 import h5py
16
16
17 import schainpy.admin
17 import schainpy.admin
18 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 from schainpy.model.data.jrodata import Parameters
20 from schainpy.model.data.jrodata import Parameters
21 from schainpy.utils import log
21 from schainpy.utils import log
22
22
23 try:
23 try:
24 import madrigal.cedar
24 import madrigal.cedar
25 except:
25 except:
26 log.warning(
26 log.warning(
27 'You should install "madrigal library" module if you want to read/write Madrigal data'
27 'You should install "madrigal library" module if you want to read/write Madrigal data'
28 )
28 )
29
29
30 try:
30 try:
31 basestring
31 basestring
32 except:
32 except:
33 basestring = str
33 basestring = str
34
34
35 DEF_CATALOG = {
35 DEF_CATALOG = {
36 'principleInvestigator': 'Marco Milla',
36 'principleInvestigator': 'Marco Milla',
37 'expPurpose': '',
37 'expPurpose': '',
38 'cycleTime': '',
38 'cycleTime': '',
39 'correlativeExp': '',
39 'correlativeExp': '',
40 'sciRemarks': '',
40 'sciRemarks': '',
41 'instRemarks': ''
41 'instRemarks': ''
42 }
42 }
43
43
44 DEF_HEADER = {
44 DEF_HEADER = {
45 'kindatDesc': '',
45 'kindatDesc': '',
46 'analyst': 'Jicamarca User',
46 'analyst': 'Jicamarca User',
47 'comments': '',
47 'comments': '',
48 'history': ''
48 'history': ''
49 }
49 }
50
50
51 MNEMONICS = {
51 MNEMONICS = {
52 10: 'jro',
52 10: 'jro',
53 11: 'jbr',
53 11: 'jbr',
54 840: 'jul',
54 840: 'jul',
55 13: 'jas',
55 13: 'jas',
56 1000: 'pbr',
56 1000: 'pbr',
57 1001: 'hbr',
57 1001: 'hbr',
58 1002: 'obr',
58 1002: 'obr',
59 400: 'clr'
59 400: 'clr'
60
60
61 }
61 }
62
62
63 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
63 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
64
64
65 def load_json(obj):
65 def load_json(obj):
66 '''
66 '''
67 Parse json as string instead of unicode
67 Parse json as string instead of unicode
68 '''
68 '''
69
69
70 if isinstance(obj, str):
70 if isinstance(obj, str):
71 iterable = json.loads(obj)
71 iterable = json.loads(obj)
72 else:
72 else:
73 iterable = obj
73 iterable = obj
74
74
75 if isinstance(iterable, dict):
75 if isinstance(iterable, dict):
76 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
76 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
77 for k, v in list(iterable.items())}
77 for k, v in list(iterable.items())}
78 elif isinstance(iterable, (list, tuple)):
78 elif isinstance(iterable, (list, tuple)):
79 return [str(v) if isinstance(v, basestring) else v for v in iterable]
79 return [str(v) if isinstance(v, basestring) else v for v in iterable]
80
80
81 return iterable
81 return iterable
82
82
83 @MPDecorator
83 @MPDecorator
84 class MADReader(JRODataReader, ProcessingUnit):
84 class MADReader(Reader, ProcessingUnit):
85
85
86 def __init__(self):
86 def __init__(self):
87
87
88 ProcessingUnit.__init__(self)
88 ProcessingUnit.__init__(self)
89
89
90 self.dataOut = Parameters()
90 self.dataOut = Parameters()
91 self.counter_records = 0
91 self.counter_records = 0
92 self.nrecords = None
92 self.nrecords = None
93 self.flagNoMoreFiles = 0
93 self.flagNoMoreFiles = 0
94 self.isConfig = False
95 self.filename = None
94 self.filename = None
96 self.intervals = set()
95 self.intervals = set()
96 self.datatime = datetime.datetime(1900,1,1)
97 self.format = None
98 self.filefmt = "***%Y%m%d*******"
97
99
98 def setup(self,
100 def setup(self, **kwargs):
99 path=None,
100 startDate=None,
101 endDate=None,
102 format=None,
103 startTime=datetime.time(0, 0, 0),
104 endTime=datetime.time(23, 59, 59),
105 **kwargs):
106
101
107 self.path = path
102 self.set_kwargs(**kwargs)
108 self.startDate = startDate
103 self.oneDDict = load_json(self.oneDDict)
109 self.endDate = endDate
104 self.twoDDict = load_json(self.twoDDict)
110 self.startTime = startTime
105 self.ind2DList = load_json(self.ind2DList)
111 self.endTime = endTime
106 self.independentParam = self.ind2DList[0]
112 self.datatime = datetime.datetime(1900,1,1)
113 self.oneDDict = load_json(kwargs.get('oneDDict',
114 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
115 self.twoDDict = load_json(kwargs.get('twoDDict',
116 "{\"GDALT\": \"heightList\"}"))
117 self.independentParam = 'GDALT'
118
107
119 if self.path is None:
108 if self.path is None:
120 raise ValueError('The path is not valid')
109 raise ValueError('The path is not valid')
121
110
122 if format is None:
111 self.open_file = open
112 self.open_mode = 'rb'
113
114 if self.format is None:
123 raise ValueError('The format is not valid choose simple or hdf5')
115 raise ValueError('The format is not valid choose simple or hdf5')
124 elif format.lower() in ('simple', 'txt'):
116 elif self.format.lower() in ('simple', 'txt'):
125 self.ext = '.txt'
117 self.ext = '.txt'
126 elif format.lower() in ('cedar',):
118 elif self.format.lower() in ('cedar',):
127 self.ext = '.001'
119 self.ext = '.001'
128 else:
120 else:
129 self.ext = '.hdf5'
121 self.ext = '.hdf5'
122 self.open_file = h5py.File
123 self.open_mode = 'r'
130
124
131 self.search_files(self.path)
125 if self.online:
132 self.fileId = 0
126 log.log("Searching files in online mode...", self.name)
133
134 if not self.fileList:
135 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
136
137 self.setNextFile()
138
139 def search_files(self, path):
140 '''
141 Searching for madrigal files in path
142 Creating a list of files to procces included in [startDate,endDate]
143
144 Input:
145 path - Path to find files
146 '''
147
127
148 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
128 for nTries in range(self.nTries):
149 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
129 fullpath = self.searchFilesOnLine(self.path, self.startDate,
150 fileList0.sort()
130 self.endDate, self.expLabel, self.ext, self.walk,
131 self.filefmt, self.folderfmt)
151
132
152 self.fileList = []
133 try:
153 self.dateFileList = []
134 fullpath = next(fullpath)
135 except:
136 fullpath = None
154
137
155 startDate = self.startDate - datetime.timedelta(1)
138 if fullpath:
156 endDate = self.endDate + datetime.timedelta(1)
139 break
157
140
158 for thisFile in fileList0:
141 log.warning(
159 year = thisFile[3:7]
142 'Waiting {} sec for a valid file in {}: try {} ...'.format(
160 if not year.isdigit():
143 self.delay, self.path, nTries + 1),
161 continue
144 self.name)
145 time.sleep(self.delay)
162
146
163 month = thisFile[7:9]
147 if not(fullpath):
164 if not month.isdigit():
148 raise schainpy.admin.SchainError(
165 continue
149 'There isn\'t any valid file in {}'.format(self.path))
166
150
167 day = thisFile[9:11]
151 else:
168 if not day.isdigit():
152 log.log("Searching files in {}".format(self.path), self.name)
169 continue
153 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
170
155
171 year, month, day = int(year), int(month), int(day)
156 self.setNextFile()
172 dateFile = datetime.date(year, month, day)
173
157
174 if (startDate > dateFile) or (endDate < dateFile):
158 def readFirstHeader(self):
175 continue
159 '''Read header and data'''
176
160
177 self.fileList.append(thisFile)
161 self.parseHeader()
178 self.dateFileList.append(dateFile)
162 self.parseData()
163 self.blockIndex = 0
179
164
180 return
165 return
181
166
182 def parseHeader(self):
167 def parseHeader(self):
183 '''
168 '''
184 '''
169 '''
185
170
186 self.output = {}
171 self.output = {}
187 self.version = '2'
172 self.version = '2'
188 s_parameters = None
173 s_parameters = None
189 if self.ext == '.txt':
174 if self.ext == '.txt':
190 self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
175 self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
191 elif self.ext == '.hdf5':
176 elif self.ext == '.hdf5':
192 self.metadata = self.fp['Metadata']
177 self.metadata = self.fp['Metadata']
193 if '_record_layout' in self.metadata:
178 if '_record_layout' in self.metadata:
194 s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
179 s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
195 self.version = '3'
180 self.version = '3'
196 self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
181 self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
197
182
198 log.success('Parameters found: {}'.format(self.parameters),
183 log.success('Parameters found: {}'.format(self.parameters),
199 'MADReader')
184 'MADReader')
200 if s_parameters:
185 if s_parameters:
201 log.success('Spatial parameters found: {}'.format(s_parameters),
186 log.success('Spatial parameters found: {}'.format(s_parameters),
202 'MADReader')
187 'MADReader')
203
188
204 for param in list(self.oneDDict.keys()):
189 for param in list(self.oneDDict.keys()):
205 if param.lower() not in self.parameters:
190 if param.lower() not in self.parameters:
206 log.warning(
191 log.warning(
207 'Parameter {} not found will be ignored'.format(
192 'Parameter {} not found will be ignored'.format(
208 param),
193 param),
209 'MADReader')
194 'MADReader')
210 self.oneDDict.pop(param, None)
195 self.oneDDict.pop(param, None)
211
196
212 for param, value in list(self.twoDDict.items()):
197 for param, value in list(self.twoDDict.items()):
213 if param.lower() not in self.parameters:
198 if param.lower() not in self.parameters:
214 log.warning(
199 log.warning(
215 'Parameter {} not found, it will be ignored'.format(
200 'Parameter {} not found, it will be ignored'.format(
216 param),
201 param),
217 'MADReader')
202 'MADReader')
218 self.twoDDict.pop(param, None)
203 self.twoDDict.pop(param, None)
219 continue
204 continue
220 if isinstance(value, list):
205 if isinstance(value, list):
221 if value[0] not in self.output:
206 if value[0] not in self.output:
222 self.output[value[0]] = []
207 self.output[value[0]] = []
223 self.output[value[0]].append([])
208 self.output[value[0]].append([])
224
209
225 def parseData(self):
210 def parseData(self):
226 '''
211 '''
227 '''
212 '''
228
213
229 if self.ext == '.txt':
214 if self.ext == '.txt':
230 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
215 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
231 self.nrecords = self.data.shape[0]
216 self.nrecords = self.data.shape[0]
232 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
217 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
233 self.counter_records = 0
218 self.counter_records = 0
234 elif self.ext == '.hdf5':
219 elif self.ext == '.hdf5':
235 self.data = self.fp['Data']
220 self.data = self.fp['Data']
236 self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
221 self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
237 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
222 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
238 self.counter_records = int(self.data['Table Layout']['recno'][0])
223 self.counter_records = int(self.data['Table Layout']['recno'][0])
239 self.nrecords = int(self.data['Table Layout']['recno'][-1])
224 self.nrecords = int(self.data['Table Layout']['recno'][-1])
240
225
241 def setNextFile(self):
242 '''
243 '''
244
245 file_id = self.fileId
246
247 if file_id == len(self.fileList):
248 log.success('No more files', 'MADReader')
249 self.flagNoMoreFiles = 1
250 return 0
251
252 log.success(
253 'Opening: {}'.format(self.fileList[file_id]),
254 'MADReader'
255 )
256
257 filename = os.path.join(self.path, self.fileList[file_id])
258
259 if self.filename is not None:
260 self.fp.close()
261
262 self.filename = filename
263 self.filedate = self.dateFileList[file_id]
264
265 if self.ext=='.hdf5':
266 self.fp = h5py.File(self.filename, 'r')
267 else:
268 self.fp = open(self.filename, 'rb')
269
270 self.parseHeader()
271 self.parseData()
272 self.sizeOfFile = os.path.getsize(self.filename)
273 self.flagIsNewFile = 0
274 self.fileId += 1
275
276 return 1
277
278 def readNextBlock(self):
226 def readNextBlock(self):
279
227
280 while True:
228 while True:
281 self.flagDiscontinuousBlock = 0
229 self.flagDiscontinuousBlock = 0
282 if self.flagIsNewFile:
230 if self.counter_records == self.nrecords:
283 if not self.setNextFile():
231 self.setNextFile()
284 return 0
285
232
286 self.readBlock()
233 self.readBlock()
287
234
288 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
235 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
289 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
236 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
290 log.warning(
237 log.warning(
291 'Reading Record No. {}/{} -> {} [Skipping]'.format(
238 'Reading Record No. {}/{} -> {} [Skipping]'.format(
292 self.counter_records,
239 self.counter_records,
293 self.nrecords,
240 self.nrecords,
294 self.datatime.ctime()),
241 self.datatime.ctime()),
295 'MADReader')
242 'MADReader')
296 continue
243 continue
297 break
244 break
298
245
299 log.log(
246 log.log(
300 'Reading Record No. {}/{} -> {}'.format(
247 'Reading Record No. {}/{} -> {}'.format(
301 self.counter_records,
248 self.counter_records,
302 self.nrecords,
249 self.nrecords,
303 self.datatime.ctime()),
250 self.datatime.ctime()),
304 'MADReader')
251 'MADReader')
305
252
306 return 1
253 return 1
307
254
308 def readBlock(self):
255 def readBlock(self):
309 '''
256 '''
310 '''
257 '''
311 dum = []
258 dum = []
312 if self.ext == '.txt':
259 if self.ext == '.txt':
313 dt = self.data[self.counter_records][:6].astype(int)
260 dt = self.data[self.counter_records][:6].astype(int)
314 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
261 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
315 self.flagDiscontinuousBlock = 1
262 self.flagDiscontinuousBlock = 1
316 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
263 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
317 while True:
264 while True:
318 dt = self.data[self.counter_records][:6].astype(int)
265 dt = self.data[self.counter_records][:6].astype(int)
319 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
266 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
320 if datatime == self.datatime:
267 if datatime == self.datatime:
321 dum.append(self.data[self.counter_records])
268 dum.append(self.data[self.counter_records])
322 self.counter_records += 1
269 self.counter_records += 1
323 if self.counter_records == self.nrecords:
270 if self.counter_records == self.nrecords:
324 self.flagIsNewFile = True
325 break
271 break
326 continue
272 continue
327 self.intervals.add((datatime-self.datatime).seconds)
273 self.intervals.add((datatime-self.datatime).seconds)
328 break
274 break
329 elif self.ext == '.hdf5':
275 elif self.ext == '.hdf5':
330 datatime = datetime.datetime.utcfromtimestamp(
276 datatime = datetime.datetime.utcfromtimestamp(
331 self.times[self.counter_records])
277 self.times[self.counter_records])
332 dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
278 dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
333 self.intervals.add((datatime-self.datatime).seconds)
279 self.intervals.add((datatime-self.datatime).seconds)
334 if datatime.date()>self.datatime.date():
280 if datatime.date()>self.datatime.date():
335 self.flagDiscontinuousBlock = 1
281 self.flagDiscontinuousBlock = 1
336 self.datatime = datatime
282 self.datatime = datatime
337 self.counter_records += 1
283 self.counter_records += 1
338 if self.counter_records == self.nrecords:
339 self.flagIsNewFile = True
340
284
341 self.buffer = numpy.array(dum)
285 self.buffer = numpy.array(dum)
342 return
286 return
343
287
344 def set_output(self):
288 def set_output(self):
345 '''
289 '''
346 Storing data from buffer to dataOut object
290 Storing data from buffer to dataOut object
347 '''
291 '''
348
292
349 parameters = [None for __ in self.parameters]
293 parameters = [None for __ in self.parameters]
350
294
351 for param, attr in list(self.oneDDict.items()):
295 for param, attr in list(self.oneDDict.items()):
352 x = self.parameters.index(param.lower())
296 x = self.parameters.index(param.lower())
353 setattr(self.dataOut, attr, self.buffer[0][x])
297 setattr(self.dataOut, attr, self.buffer[0][x])
354
298
355 for param, value in list(self.twoDDict.items()):
299 for param, value in list(self.twoDDict.items()):
356 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
300 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
357 if self.ext == '.txt':
301 if self.ext == '.txt':
358 x = self.parameters.index(param.lower())
302 x = self.parameters.index(param.lower())
359 y = self.parameters.index(self.independentParam.lower())
303 y = self.parameters.index(self.independentParam.lower())
360 ranges = self.buffer[:,y]
304 ranges = self.buffer[:,y]
361 #if self.ranges.size == ranges.size:
305 #if self.ranges.size == ranges.size:
362 # continue
306 # continue
363 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
307 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
364 dummy[index] = self.buffer[:,x]
308 dummy[index] = self.buffer[:,x]
365 else:
309 else:
366 ranges = self.buffer[self.independentParam.lower()]
310 ranges = self.buffer[self.independentParam.lower()]
367 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
311 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
368 dummy[index] = self.buffer[param.lower()]
312 dummy[index] = self.buffer[param.lower()]
369
313
370 if isinstance(value, str):
314 if isinstance(value, str):
371 if value not in self.independentParam:
315 if value not in self.independentParam:
372 setattr(self.dataOut, value, dummy.reshape(1,-1))
316 setattr(self.dataOut, value, dummy.reshape(1,-1))
373 elif isinstance(value, list):
317 elif isinstance(value, list):
374 self.output[value[0]][value[1]] = dummy
318 self.output[value[0]][value[1]] = dummy
375 parameters[value[1]] = param
319 parameters[value[1]] = param
376 for key, value in list(self.output.items()):
320 for key, value in list(self.output.items()):
377 setattr(self.dataOut, key, numpy.array(value))
321 setattr(self.dataOut, key, numpy.array(value))
378
322
379 self.dataOut.parameters = [s for s in parameters if s]
323 self.dataOut.parameters = [s for s in parameters if s]
380 self.dataOut.heightList = self.ranges
324 self.dataOut.heightList = self.ranges
381 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
325 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
382 self.dataOut.utctimeInit = self.dataOut.utctime
326 self.dataOut.utctimeInit = self.dataOut.utctime
383 self.dataOut.paramInterval = min(self.intervals)
327 self.dataOut.paramInterval = min(self.intervals)
384 self.dataOut.useLocalTime = False
328 self.dataOut.useLocalTime = False
385 self.dataOut.flagNoData = False
329 self.dataOut.flagNoData = False
386 self.dataOut.nrecords = self.nrecords
330 self.dataOut.nrecords = self.nrecords
387 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
331 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
388
332
389 def getData(self):
333 def getData(self):
390 '''
334 '''
391 Storing data from databuffer to dataOut object
335 Storing data from databuffer to dataOut object
392 '''
336 '''
393 if self.flagNoMoreFiles:
394 self.dataOut.flagNoData = True
395 raise schainpy.admin.SchainError('No file left to process')
396 return 0
397
337
398 if not self.readNextBlock():
338 if not self.readNextBlock():
399 self.dataOut.flagNoData = True
339 self.dataOut.flagNoData = True
400 return 0
340 return 0
401
341
402 self.set_output()
342 self.set_output()
403
343
404 return 1
344 return 1
405
345
406 @MPDecorator
346 def run(self, **kwargs):
407 class MADWriter(Operation):
408
347
409 missing = -32767
348 if not(self.isConfig):
349 self.setup(**kwargs)
350 self.isConfig = True
410
351
411 def __init__(self):
352 self.getData()
412
353
413 Operation.__init__(self)
354 return
414 self.dataOut = Parameters()
355
415 self.counter = 0
356 @MPDecorator
416 self.path = None
357 class MADWriter(Operation):
417 self.fp = None
358 '''Writing module for Madrigal files
359
360 type: external
418
361
419 def run(self, dataOut, path, oneDDict, independentParam='[]', twoDDict='{}',
420 metadata='{}', format='cedar', **kwargs):
421 '''
422 Inputs:
362 Inputs:
423 path - path where files will be created
363 path path where files will be created
424 oneDDict - json of one-dimensional parameters in record where keys
364 oneDDict json of one-dimensional parameters in record where keys
425 are Madrigal codes (integers or mnemonics) and values the corresponding
365 are Madrigal codes (integers or mnemonics) and values the corresponding
426 dataOut attribute e.g: {
366 dataOut attribute e.g: {
427 'gdlatr': 'lat',
367 'gdlatr': 'lat',
428 'gdlonr': 'lon',
368 'gdlonr': 'lon',
429 'gdlat2':'lat',
369 'gdlat2':'lat',
430 'glon2':'lon'}
370 'glon2':'lon'}
431 independentParam - list of independent spatial two-dimensional parameters e.g:
371 ind2DList list of independent spatial two-dimensional parameters e.g:
432 ['heigthList']
372 ['heigthList']
433 twoDDict - json of two-dimensional parameters in record where keys
373 twoDDict json of two-dimensional parameters in record where keys
434 are Madrigal codes (integers or mnemonics) and values the corresponding
374 are Madrigal codes (integers or mnemonics) and values the corresponding
435 dataOut attribute if multidimensional array specify as tupple
375 dataOut attribute if multidimensional array specify as tupple
436 ('attr', pos) e.g: {
376 ('attr', pos) e.g: {
437 'gdalt': 'heightList',
377 'gdalt': 'heightList',
438 'vn1p2': ('data_output', 0),
378 'vn1p2': ('data_output', 0),
439 'vn2p2': ('data_output', 1),
379 'vn2p2': ('data_output', 1),
440 'vn3': ('data_output', 2),
380 'vn3': ('data_output', 2),
441 'snl': ('data_SNR', 'db')
381 'snl': ('data_SNR', 'db')
442 }
382 }
443 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
383 metadata json of madrigal metadata (kinst, kindat, catalog and header)
444 '''
384 format hdf5, cedar
385 blocks number of blocks per file'''
386
387 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 missing = -32767
389
390 def __init__(self):
391
392 Operation.__init__(self)
393 self.dataOut = Parameters()
394 self.counter = 0
395 self.path = None
396 self.fp = None
397
398 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
399 metadata='{}', format='cedar', **kwargs):
400
445 if not self.isConfig:
401 if not self.isConfig:
446 self.setup(path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs)
402 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
447 self.isConfig = True
403 self.isConfig = True
448
404
449 self.dataOut = dataOut
405 self.dataOut = dataOut
450 self.putData()
406 self.putData()
451 return 1
407 return 1
452
408
453 def setup(self, path, oneDDict, independentParam, twoDDict, metadata, format, **kwargs):
409 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
454 '''
410 '''
455 Configure Operation
411 Configure Operation
456 '''
412 '''
457
413
458 self.path = path
414 self.path = path
459 self.blocks = kwargs.get('blocks', None)
415 self.blocks = kwargs.get('blocks', None)
460 self.counter = 0
416 self.counter = 0
461 self.oneDDict = load_json(oneDDict)
417 self.oneDDict = load_json(oneDDict)
462 self.twoDDict = load_json(twoDDict)
418 self.twoDDict = load_json(twoDDict)
463 self.independentParam = load_json(independentParam)
419 self.ind2DList = load_json(ind2DList)
464 meta = load_json(metadata)
420 meta = load_json(metadata)
465 self.kinst = meta.get('kinst')
421 self.kinst = meta.get('kinst')
466 self.kindat = meta.get('kindat')
422 self.kindat = meta.get('kindat')
467 self.catalog = meta.get('catalog', DEF_CATALOG)
423 self.catalog = meta.get('catalog', DEF_CATALOG)
468 self.header = meta.get('header', DEF_HEADER)
424 self.header = meta.get('header', DEF_HEADER)
469 if format == 'cedar':
425 if format == 'cedar':
470 self.ext = '.dat'
426 self.ext = '.dat'
471 self.extra_args = {}
427 self.extra_args = {}
472 elif format == 'hdf5':
428 elif format == 'hdf5':
473 self.ext = '.hdf5'
429 self.ext = '.hdf5'
474 self.extra_args = {'independentParam': self.independentParam}
430 self.extra_args = {'ind2DList': self.ind2DList}
475
431
476 self.keys = [k.lower() for k in self.twoDDict]
432 self.keys = [k.lower() for k in self.twoDDict]
477 if 'range' in self.keys:
433 if 'range' in self.keys:
478 self.keys.remove('range')
434 self.keys.remove('range')
479 if 'gdalt' in self.keys:
435 if 'gdalt' in self.keys:
480 self.keys.remove('gdalt')
436 self.keys.remove('gdalt')
481
437
482 def setFile(self):
438 def setFile(self):
483 '''
439 '''
484 Create new cedar file object
440 Create new cedar file object
485 '''
441 '''
486
442
487 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
443 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
488 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
444 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
489
445
490 filename = '{}{}{}'.format(self.mnemonic,
446 filename = '{}{}{}'.format(self.mnemonic,
491 date.strftime('%Y%m%d_%H%M%S'),
447 date.strftime('%Y%m%d_%H%M%S'),
492 self.ext)
448 self.ext)
493
449
494 self.fullname = os.path.join(self.path, filename)
450 self.fullname = os.path.join(self.path, filename)
495
451
496 if os.path.isfile(self.fullname) :
452 if os.path.isfile(self.fullname) :
497 log.warning(
453 log.warning(
498 'Destination file {} already exists, previous file deleted.'.format(
454 'Destination file {} already exists, previous file deleted.'.format(
499 self.fullname),
455 self.fullname),
500 'MADWriter')
456 'MADWriter')
501 os.remove(self.fullname)
457 os.remove(self.fullname)
502
458
503 try:
459 try:
504 log.success(
460 log.success(
505 'Creating file: {}'.format(self.fullname),
461 'Creating file: {}'.format(self.fullname),
506 'MADWriter')
462 'MADWriter')
463 if not os.path.exists(self.path):
464 os.makedirs(self.path)
507 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
465 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
508 except ValueError as e:
466 except ValueError as e:
509 log.error(
467 log.error(
510 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
468 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
511 'MADWriter')
469 'MADWriter')
512 return
470 return
513
471
514 return 1
472 return 1
515
473
516 def writeBlock(self):
474 def writeBlock(self):
517 '''
475 '''
518 Add data records to cedar file taking data from oneDDict and twoDDict
476 Add data records to cedar file taking data from oneDDict and twoDDict
519 attributes.
477 attributes.
520 Allowed parameters in: parcodes.tab
478 Allowed parameters in: parcodes.tab
521 '''
479 '''
522
480
523 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
481 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
524 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
482 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
525 heights = self.dataOut.heightList
483 heights = self.dataOut.heightList
526
484
527 if self.ext == '.dat':
485 if self.ext == '.dat':
528 for key, value in list(self.twoDDict.items()):
486 for key, value in list(self.twoDDict.items()):
529 if isinstance(value, str):
487 if isinstance(value, str):
530 data = getattr(self.dataOut, value)
488 data = getattr(self.dataOut, value)
531 invalid = numpy.isnan(data)
489 invalid = numpy.isnan(data)
532 data[invalid] = self.missing
490 data[invalid] = self.missing
533 elif isinstance(value, (tuple, list)):
491 elif isinstance(value, (tuple, list)):
534 attr, key = value
492 attr, key = value
535 data = getattr(self.dataOut, attr)
493 data = getattr(self.dataOut, attr)
536 invalid = numpy.isnan(data)
494 invalid = numpy.isnan(data)
537 data[invalid] = self.missing
495 data[invalid] = self.missing
538
496
539 out = {}
497 out = {}
540 for key, value in list(self.twoDDict.items()):
498 for key, value in list(self.twoDDict.items()):
541 key = key.lower()
499 key = key.lower()
542 if isinstance(value, str):
500 if isinstance(value, str):
543 if 'db' in value.lower():
501 if 'db' in value.lower():
544 tmp = getattr(self.dataOut, value.replace('_db', ''))
502 tmp = getattr(self.dataOut, value.replace('_db', ''))
545 SNRavg = numpy.average(tmp, axis=0)
503 SNRavg = numpy.average(tmp, axis=0)
546 tmp = 10*numpy.log10(SNRavg)
504 tmp = 10*numpy.log10(SNRavg)
547 else:
505 else:
548 tmp = getattr(self.dataOut, value)
506 tmp = getattr(self.dataOut, value)
549 out[key] = tmp.flatten()
507 out[key] = tmp.flatten()[:len(heights)]
550 elif isinstance(value, (tuple, list)):
508 elif isinstance(value, (tuple, list)):
551 attr, x = value
509 attr, x = value
552 data = getattr(self.dataOut, attr)
510 data = getattr(self.dataOut, attr)
553 out[key] = data[int(x)]
511 out[key] = data[int(x)][:len(heights)]
554
512
555 a = numpy.array([out[k] for k in self.keys])
513 a = numpy.array([out[k] for k in self.keys])
556 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
514 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
557 index = numpy.where(nrows == False)[0]
515 index = numpy.where(nrows == False)[0]
558
516
559 rec = madrigal.cedar.MadrigalDataRecord(
517 rec = madrigal.cedar.MadrigalDataRecord(
560 self.kinst,
518 self.kinst,
561 self.kindat,
519 self.kindat,
562 startTime.year,
520 startTime.year,
563 startTime.month,
521 startTime.month,
564 startTime.day,
522 startTime.day,
565 startTime.hour,
523 startTime.hour,
566 startTime.minute,
524 startTime.minute,
567 startTime.second,
525 startTime.second,
568 startTime.microsecond/10000,
526 startTime.microsecond/10000,
569 endTime.year,
527 endTime.year,
570 endTime.month,
528 endTime.month,
571 endTime.day,
529 endTime.day,
572 endTime.hour,
530 endTime.hour,
573 endTime.minute,
531 endTime.minute,
574 endTime.second,
532 endTime.second,
575 endTime.microsecond/10000,
533 endTime.microsecond/10000,
576 list(self.oneDDict.keys()),
534 list(self.oneDDict.keys()),
577 list(self.twoDDict.keys()),
535 list(self.twoDDict.keys()),
578 len(index),
536 len(index),
579 **self.extra_args
537 **self.extra_args
580 )
538 )
581
539
582 # Setting 1d values
540 # Setting 1d values
583 for key in self.oneDDict:
541 for key in self.oneDDict:
584 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
542 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
585
543
586 # Setting 2d values
544 # Setting 2d values
587 nrec = 0
545 nrec = 0
588 for n in index:
546 for n in index:
589 for key in out:
547 for key in out:
590 rec.set2D(key, nrec, out[key][n])
548 rec.set2D(key, nrec, out[key][n])
591 nrec += 1
549 nrec += 1
592
550
593 self.fp.append(rec)
551 self.fp.append(rec)
594 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
552 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
595 self.fp.dump()
553 self.fp.dump()
596 if self.counter % 20 == 0 and self.counter > 0:
554 if self.counter % 20 == 0 and self.counter > 0:
597 log.log(
555 log.log(
598 'Writing {} records'.format(
556 'Writing {} records'.format(
599 self.counter),
557 self.counter),
600 'MADWriter')
558 'MADWriter')
601
559
602 def setHeader(self):
560 def setHeader(self):
603 '''
561 '''
604 Create an add catalog and header to cedar file
562 Create an add catalog and header to cedar file
605 '''
563 '''
606
564
607 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
565 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
608
566
609 if self.ext == '.dat':
567 if self.ext == '.dat':
610 self.fp.write()
568 self.fp.write()
611 else:
569 else:
612 self.fp.dump()
570 self.fp.dump()
613 self.fp.close()
571 self.fp.close()
614
572
615 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
573 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
616 header.createCatalog(**self.catalog)
574 header.createCatalog(**self.catalog)
617 header.createHeader(**self.header)
575 header.createHeader(**self.header)
618 header.write()
576 header.write()
619
577
620 def putData(self):
578 def putData(self):
621
579
622 if self.dataOut.flagNoData:
580 if self.dataOut.flagNoData:
623 return 0
581 return 0
624
582
625 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
583 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
626 if self.counter > 0:
584 if self.counter > 0:
627 self.setHeader()
585 self.setHeader()
628 self.counter = 0
586 self.counter = 0
629
587
630 if self.counter == 0:
588 if self.counter == 0:
631 self.setFile()
589 self.setFile()
632
590
633 self.writeBlock()
591 self.writeBlock()
634 self.counter += 1
592 self.counter += 1
635
593
636 def close(self):
594 def close(self):
637
595
638 if self.counter > 0:
596 if self.counter > 0:
639 self.setHeader() No newline at end of file
597 self.setHeader()
@@ -1,1544 +1,1435
1 import numpy
1 import numpy
2 import time
2 import time
3 import os
3 import os
4 import h5py
4 import h5py
5 import re
5 import re
6 import datetime
6 import datetime
7
7
8 import schainpy.admin
8 import schainpy.admin
9 from schainpy.model.data.jrodata import *
9 from schainpy.model.data.jrodata import *
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 from schainpy.model.io.jroIO_base import *
11 from schainpy.model.io.jroIO_base import *
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14 @MPDecorator
14 @MPDecorator
15 class ParamReader(JRODataReader,ProcessingUnit):
15 class ParamReader(JRODataReader,ProcessingUnit):
16 '''
16 '''
17 Reads HDF5 format files
17 Reads HDF5 format files
18 path
18 path
19 startDate
19 startDate
20 endDate
20 endDate
21 startTime
21 startTime
22 endTime
22 endTime
23 '''
23 '''
24
24
25 ext = ".hdf5"
25 ext = ".hdf5"
26 optchar = "D"
26 optchar = "D"
27 timezone = None
27 timezone = None
28 startTime = None
28 startTime = None
29 endTime = None
29 endTime = None
30 fileIndex = None
30 fileIndex = None
31 utcList = None #To select data in the utctime list
31 utcList = None #To select data in the utctime list
32 blockList = None #List to blocks to be read from the file
32 blockList = None #List to blocks to be read from the file
33 blocksPerFile = None #Number of blocks to be read
33 blocksPerFile = None #Number of blocks to be read
34 blockIndex = None
34 blockIndex = None
35 path = None
35 path = None
36 #List of Files
36 #List of Files
37 filenameList = None
37 filenameList = None
38 datetimeList = None
38 datetimeList = None
39 #Hdf5 File
39 #Hdf5 File
40 listMetaname = None
40 listMetaname = None
41 listMeta = None
41 listMeta = None
42 listDataname = None
42 listDataname = None
43 listData = None
43 listData = None
44 listShapes = None
44 listShapes = None
45 fp = None
45 fp = None
46 #dataOut reconstruction
46 #dataOut reconstruction
47 dataOut = None
47 dataOut = None
48
48
49 def __init__(self):#, **kwargs):
49 def __init__(self):#, **kwargs):
50 ProcessingUnit.__init__(self) #, **kwargs)
50 ProcessingUnit.__init__(self) #, **kwargs)
51 self.dataOut = Parameters()
51 self.dataOut = Parameters()
52 return
52 return
53
53
54 def setup(self, **kwargs):
54 def setup(self, **kwargs):
55
55
56 path = kwargs['path']
56 path = kwargs['path']
57 startDate = kwargs['startDate']
57 startDate = kwargs['startDate']
58 endDate = kwargs['endDate']
58 endDate = kwargs['endDate']
59 startTime = kwargs['startTime']
59 startTime = kwargs['startTime']
60 endTime = kwargs['endTime']
60 endTime = kwargs['endTime']
61 walk = kwargs['walk']
61 walk = kwargs['walk']
62 if 'ext' in kwargs:
62 if 'ext' in kwargs:
63 ext = kwargs['ext']
63 ext = kwargs['ext']
64 else:
64 else:
65 ext = '.hdf5'
65 ext = '.hdf5'
66 if 'timezone' in kwargs:
66 if 'timezone' in kwargs:
67 self.timezone = kwargs['timezone']
67 self.timezone = kwargs['timezone']
68 else:
68 else:
69 self.timezone = 'lt'
69 self.timezone = 'lt'
70
70
71 print("[Reading] Searching files in offline mode ...")
71 print("[Reading] Searching files in offline mode ...")
72 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
72 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
73 startTime=startTime, endTime=endTime,
73 startTime=startTime, endTime=endTime,
74 ext=ext, walk=walk)
74 ext=ext, walk=walk)
75
75
76 if not(filenameList):
76 if not(filenameList):
77 print("There is no files into the folder: %s"%(path))
77 print("There is no files into the folder: %s"%(path))
78 sys.exit(-1)
78 sys.exit(-1)
79
79
80 self.fileIndex = -1
80 self.fileIndex = -1
81 self.startTime = startTime
81 self.startTime = startTime
82 self.endTime = endTime
82 self.endTime = endTime
83
83
84 self.__readMetadata()
84 self.__readMetadata()
85
85
86 self.__setNextFileOffline()
86 self.__setNextFileOffline()
87
87
88 return
88 return
89
89
90 def searchFilesOffLine(self,
90 def searchFilesOffLine(self,
91 path,
91 path,
92 startDate=None,
92 startDate=None,
93 endDate=None,
93 endDate=None,
94 startTime=datetime.time(0,0,0),
94 startTime=datetime.time(0,0,0),
95 endTime=datetime.time(23,59,59),
95 endTime=datetime.time(23,59,59),
96 ext='.hdf5',
96 ext='.hdf5',
97 walk=True):
97 walk=True):
98
98
99 expLabel = ''
99 expLabel = ''
100 self.filenameList = []
100 self.filenameList = []
101 self.datetimeList = []
101 self.datetimeList = []
102
102
103 pathList = []
103 pathList = []
104
104
105 JRODataObj = JRODataReader()
105 JRODataObj = JRODataReader()
106 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
106 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
107
107
108 if dateList == []:
108 if dateList == []:
109 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
109 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
110 datetime.datetime.combine(startDate,startTime).ctime(),
110 datetime.datetime.combine(startDate,startTime).ctime(),
111 datetime.datetime.combine(endDate,endTime).ctime()))
111 datetime.datetime.combine(endDate,endTime).ctime()))
112
112
113 return None, None
113 return None, None
114
114
115 if len(dateList) > 1:
115 if len(dateList) > 1:
116 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
116 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
117 else:
117 else:
118 print("[Reading] data was found for the date %s" %(dateList[0]))
118 print("[Reading] data was found for the date %s" %(dateList[0]))
119
119
120 filenameList = []
120 filenameList = []
121 datetimeList = []
121 datetimeList = []
122
122
123 #----------------------------------------------------------------------------------
123 #----------------------------------------------------------------------------------
124
124
125 for thisPath in pathList:
125 for thisPath in pathList:
126
126
127 fileList = glob.glob1(thisPath, "*%s" %ext)
127 fileList = glob.glob1(thisPath, "*%s" %ext)
128 fileList.sort()
128 fileList.sort()
129
129
130 for file in fileList:
130 for file in fileList:
131
131
132 filename = os.path.join(thisPath,file)
132 filename = os.path.join(thisPath,file)
133
133
134 if not isFileInDateRange(filename, startDate, endDate):
134 if not isFileInDateRange(filename, startDate, endDate):
135 continue
135 continue
136
136
137 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
137 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
138
138
139 if not(thisDatetime):
139 if not(thisDatetime):
140 continue
140 continue
141
141
142 filenameList.append(filename)
142 filenameList.append(filename)
143 datetimeList.append(thisDatetime)
143 datetimeList.append(thisDatetime)
144
144
145 if not(filenameList):
145 if not(filenameList):
146 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
146 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
147 return None, None
147 return None, None
148
148
149 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
149 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
150 print()
150 print()
151
151
152 self.filenameList = filenameList
152 self.filenameList = filenameList
153 self.datetimeList = datetimeList
153 self.datetimeList = datetimeList
154
154
155 return pathList, filenameList
155 return pathList, filenameList
156
156
157 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
157 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
158
158
159 """
159 """
160 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
160 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
161
161
162 Inputs:
162 Inputs:
163 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
163 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
164 startDate : fecha inicial del rango seleccionado en formato datetime.date
164 startDate : fecha inicial del rango seleccionado en formato datetime.date
165 endDate : fecha final del rango seleccionado en formato datetime.date
165 endDate : fecha final del rango seleccionado en formato datetime.date
166 startTime : tiempo inicial del rango seleccionado en formato datetime.time
166 startTime : tiempo inicial del rango seleccionado en formato datetime.time
167 endTime : tiempo final del rango seleccionado en formato datetime.time
167 endTime : tiempo final del rango seleccionado en formato datetime.time
168
168
169 Return:
169 Return:
170 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
170 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
171 fecha especificado, de lo contrario retorna False.
171 fecha especificado, de lo contrario retorna False.
172
172
173 Excepciones:
173 Excepciones:
174 Si el archivo no existe o no puede ser abierto
174 Si el archivo no existe o no puede ser abierto
175 Si la cabecera no puede ser leida.
175 Si la cabecera no puede ser leida.
176
176
177 """
177 """
178
178
179 try:
179 try:
180 fp = h5py.File(filename,'r')
180 fp = h5py.File(filename,'r')
181 grp1 = fp['Data']
181 grp1 = fp['Data']
182
182
183 except IOError:
183 except IOError:
184 traceback.print_exc()
184 traceback.print_exc()
185 raise IOError("The file %s can't be opened" %(filename))
185 raise IOError("The file %s can't be opened" %(filename))
186
186
187 #In case has utctime attribute
187 #In case has utctime attribute
188 grp2 = grp1['utctime']
188 grp2 = grp1['utctime']
189 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
189 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
190 thisUtcTime = grp2.value[0]
190 thisUtcTime = grp2.value[0]
191
191
192 fp.close()
192 fp.close()
193
193
194 if self.timezone == 'lt':
194 if self.timezone == 'lt':
195 thisUtcTime -= 5*3600
195 thisUtcTime -= 5*3600
196
196
197 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
197 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
198 thisDate = thisDatetime.date()
198 thisDate = thisDatetime.date()
199 thisTime = thisDatetime.time()
199 thisTime = thisDatetime.time()
200
200
201 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
201 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
202 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
202 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
203
203
204 #General case
204 #General case
205 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
205 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
206 #-----------o----------------------------o-----------
206 #-----------o----------------------------o-----------
207 # startTime endTime
207 # startTime endTime
208
208
209 if endTime >= startTime:
209 if endTime >= startTime:
210 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
210 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
211 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
211 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
212 return thisDatetime
212 return thisDatetime
213 return None
213 return None
214
214
215 #If endTime < startTime then endTime belongs to the next day
215 #If endTime < startTime then endTime belongs to the next day
216 #<<<<<<<<<<<o o>>>>>>>>>>>
216 #<<<<<<<<<<<o o>>>>>>>>>>>
217 #-----------o----------------------------o-----------
217 #-----------o----------------------------o-----------
218 # endTime startTime
218 # endTime startTime
219
219
220 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
220 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
221 return None
221 return None
222
222
223 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
223 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
224 return None
224 return None
225
225
226 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
226 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
227 return None
227 return None
228
228
229 return thisDatetime
229 return thisDatetime
230
230
231 def __setNextFileOffline(self):
231 def __setNextFileOffline(self):
232
232
233 self.fileIndex += 1
233 self.fileIndex += 1
234 idFile = self.fileIndex
234 idFile = self.fileIndex
235
235
236 if not(idFile < len(self.filenameList)):
236 if not(idFile < len(self.filenameList)):
237 raise schainpy.admin.SchainError("No more Files")
237 raise schainpy.admin.SchainError("No more Files")
238 return 0
238 return 0
239
239
240 filename = self.filenameList[idFile]
240 filename = self.filenameList[idFile]
241 filePointer = h5py.File(filename,'r')
241 filePointer = h5py.File(filename,'r')
242 self.filename = filename
242 self.filename = filename
243 self.fp = filePointer
243 self.fp = filePointer
244
244
245 print("Setting the file: %s"%self.filename)
245 print("Setting the file: %s"%self.filename)
246
246
247 self.__setBlockList()
247 self.__setBlockList()
248 self.__readData()
248 self.__readData()
249 self.blockIndex = 0
249 self.blockIndex = 0
250 return 1
250 return 1
251
251
252 def __setBlockList(self):
252 def __setBlockList(self):
253 '''
253 '''
254 Selects the data within the times defined
254 Selects the data within the times defined
255
255
256 self.fp
256 self.fp
257 self.startTime
257 self.startTime
258 self.endTime
258 self.endTime
259
259
260 self.blockList
260 self.blockList
261 self.blocksPerFile
261 self.blocksPerFile
262
262
263 '''
263 '''
264 fp = self.fp
264 fp = self.fp
265 startTime = self.startTime
265 startTime = self.startTime
266 endTime = self.endTime
266 endTime = self.endTime
267
267
268 grp = fp['Data']
268 grp = fp['Data']
269 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
269 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
270
270
271 #ERROOOOR
271 #ERROOOOR
272 if self.timezone == 'lt':
272 if self.timezone == 'lt':
273 thisUtcTime -= 5*3600
273 thisUtcTime -= 5*3600
274
274
275 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
275 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
276
276
277 thisDate = thisDatetime.date()
277 thisDate = thisDatetime.date()
278 thisTime = thisDatetime.time()
278 thisTime = thisDatetime.time()
279
279
280 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
280 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
281 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
281 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
282
282
283 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
283 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
284
284
285 self.blockList = ind
285 self.blockList = ind
286 self.blocksPerFile = len(ind)
286 self.blocksPerFile = len(ind)
287
287
288 return
288 return
289
289
290 def __readMetadata(self):
290 def __readMetadata(self):
291 '''
291 '''
292 Reads Metadata
292 Reads Metadata
293
293
294 self.pathMeta
294 self.pathMeta
295 self.listShapes
295 self.listShapes
296 self.listMetaname
296 self.listMetaname
297 self.listMeta
297 self.listMeta
298
298
299 '''
299 '''
300
300
301 filename = self.filenameList[0]
301 filename = self.filenameList[0]
302 fp = h5py.File(filename,'r')
302 fp = h5py.File(filename,'r')
303 gp = fp['Metadata']
303 gp = fp['Metadata']
304
304
305 listMetaname = []
305 listMetaname = []
306 listMetadata = []
306 listMetadata = []
307 for item in list(gp.items()):
307 for item in list(gp.items()):
308 name = item[0]
308 name = item[0]
309
309
310 if name=='array dimensions':
310 if name=='array dimensions':
311 table = gp[name][:]
311 table = gp[name][:]
312 listShapes = {}
312 listShapes = {}
313 for shapes in table:
313 for shapes in table:
314 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
314 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
315 else:
315 else:
316 data = gp[name].value
316 data = gp[name].value
317 listMetaname.append(name)
317 listMetaname.append(name)
318 listMetadata.append(data)
318 listMetadata.append(data)
319
319
320 self.listShapes = listShapes
320 self.listShapes = listShapes
321 self.listMetaname = listMetaname
321 self.listMetaname = listMetaname
322 self.listMeta = listMetadata
322 self.listMeta = listMetadata
323
323
324 fp.close()
324 fp.close()
325 return
325 return
326
326
327 def __readData(self):
327 def __readData(self):
328 grp = self.fp['Data']
328 grp = self.fp['Data']
329 listdataname = []
329 listdataname = []
330 listdata = []
330 listdata = []
331
331
332 for item in list(grp.items()):
332 for item in list(grp.items()):
333 name = item[0]
333 name = item[0]
334 listdataname.append(name)
334 listdataname.append(name)
335
335
336 array = self.__setDataArray(grp[name],self.listShapes[name])
336 array = self.__setDataArray(grp[name],self.listShapes[name])
337 listdata.append(array)
337 listdata.append(array)
338
338
339 self.listDataname = listdataname
339 self.listDataname = listdataname
340 self.listData = listdata
340 self.listData = listdata
341 return
341 return
342
342
343 def __setDataArray(self, dataset, shapes):
343 def __setDataArray(self, dataset, shapes):
344
344
345 nDims = shapes[0]
345 nDims = shapes[0]
346 nDim2 = shapes[1] #Dimension 0
346 nDim2 = shapes[1] #Dimension 0
347 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
347 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
348 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
348 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
349 mode = shapes[4] #Mode of storing
349 mode = shapes[4] #Mode of storing
350 blockList = self.blockList
350 blockList = self.blockList
351 blocksPerFile = self.blocksPerFile
351 blocksPerFile = self.blocksPerFile
352
352
353 #Depending on what mode the data was stored
353 #Depending on what mode the data was stored
354 if mode == 0: #Divided in channels
354 if mode == 0: #Divided in channels
355 arrayData = dataset.value.astype(numpy.float)[0][blockList]
355 arrayData = dataset.value.astype(numpy.float)[0][blockList]
356 if mode == 1: #Divided in parameter
356 if mode == 1: #Divided in parameter
357 strds = 'table'
357 strds = 'table'
358 nDatas = nDim1
358 nDatas = nDim1
359 newShapes = (blocksPerFile,nDim2,nDim0)
359 newShapes = (blocksPerFile,nDim2,nDim0)
360 elif mode==2: #Concatenated in a table
360 elif mode==2: #Concatenated in a table
361 strds = 'table0'
361 strds = 'table0'
362 arrayData = dataset[strds].value
362 arrayData = dataset[strds].value
363 #Selecting part of the dataset
363 #Selecting part of the dataset
364 utctime = arrayData[:,0]
364 utctime = arrayData[:,0]
365 u, indices = numpy.unique(utctime, return_index=True)
365 u, indices = numpy.unique(utctime, return_index=True)
366
366
367 if blockList.size != indices.size:
367 if blockList.size != indices.size:
368 indMin = indices[blockList[0]]
368 indMin = indices[blockList[0]]
369 if blockList[1] + 1 >= indices.size:
369 if blockList[1] + 1 >= indices.size:
370 arrayData = arrayData[indMin:,:]
370 arrayData = arrayData[indMin:,:]
371 else:
371 else:
372 indMax = indices[blockList[1] + 1]
372 indMax = indices[blockList[1] + 1]
373 arrayData = arrayData[indMin:indMax,:]
373 arrayData = arrayData[indMin:indMax,:]
374 return arrayData
374 return arrayData
375
375
376 # One dimension
376 # One dimension
377 if nDims == 0:
377 if nDims == 0:
378 arrayData = dataset.value.astype(numpy.float)[0][blockList]
378 arrayData = dataset.value.astype(numpy.float)[0][blockList]
379
379
380 # Two dimensions
380 # Two dimensions
381 elif nDims == 2:
381 elif nDims == 2:
382 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
382 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
383 newShapes = (blocksPerFile,nDim0)
383 newShapes = (blocksPerFile,nDim0)
384 nDatas = nDim1
384 nDatas = nDim1
385
385
386 for i in range(nDatas):
386 for i in range(nDatas):
387 data = dataset[strds + str(i)].value
387 data = dataset[strds + str(i)].value
388 arrayData[:,i,:] = data[blockList,:]
388 arrayData[:,i,:] = data[blockList,:]
389
389
390 # Three dimensions
390 # Three dimensions
391 else:
391 else:
392 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
392 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
393 for i in range(nDatas):
393 for i in range(nDatas):
394
394
395 data = dataset[strds + str(i)].value
395 data = dataset[strds + str(i)].value
396
396
397 for b in range(blockList.size):
397 for b in range(blockList.size):
398 arrayData[b,:,i,:] = data[:,:,blockList[b]]
398 arrayData[b,:,i,:] = data[:,:,blockList[b]]
399
399
400 return arrayData
400 return arrayData
401
401
402 def __setDataOut(self):
402 def __setDataOut(self):
403 listMeta = self.listMeta
403 listMeta = self.listMeta
404 listMetaname = self.listMetaname
404 listMetaname = self.listMetaname
405 listDataname = self.listDataname
405 listDataname = self.listDataname
406 listData = self.listData
406 listData = self.listData
407 listShapes = self.listShapes
407 listShapes = self.listShapes
408
408
409 blockIndex = self.blockIndex
409 blockIndex = self.blockIndex
410 # blockList = self.blockList
410 # blockList = self.blockList
411
411
412 for i in range(len(listMeta)):
412 for i in range(len(listMeta)):
413 setattr(self.dataOut,listMetaname[i],listMeta[i])
413 setattr(self.dataOut,listMetaname[i],listMeta[i])
414
414
415 for j in range(len(listData)):
415 for j in range(len(listData)):
416 nShapes = listShapes[listDataname[j]][0]
416 nShapes = listShapes[listDataname[j]][0]
417 mode = listShapes[listDataname[j]][4]
417 mode = listShapes[listDataname[j]][4]
418 if nShapes == 1:
418 if nShapes == 1:
419 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
419 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
420 elif nShapes > 1:
420 elif nShapes > 1:
421 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
421 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
422 elif mode==0:
422 elif mode==0:
423 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
423 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
424 #Mode Meteors
424 #Mode Meteors
425 elif mode ==2:
425 elif mode ==2:
426 selectedData = self.__selectDataMode2(listData[j], blockIndex)
426 selectedData = self.__selectDataMode2(listData[j], blockIndex)
427 setattr(self.dataOut, listDataname[j], selectedData)
427 setattr(self.dataOut, listDataname[j], selectedData)
428 return
428 return
429
429
430 def __selectDataMode2(self, data, blockIndex):
430 def __selectDataMode2(self, data, blockIndex):
431 utctime = data[:,0]
431 utctime = data[:,0]
432 aux, indices = numpy.unique(utctime, return_inverse=True)
432 aux, indices = numpy.unique(utctime, return_inverse=True)
433 selInd = numpy.where(indices == blockIndex)[0]
433 selInd = numpy.where(indices == blockIndex)[0]
434 selData = data[selInd,:]
434 selData = data[selInd,:]
435
435
436 return selData
436 return selData
437
437
438 def getData(self):
438 def getData(self):
439
439
440 if self.blockIndex==self.blocksPerFile:
440 if self.blockIndex==self.blocksPerFile:
441 if not( self.__setNextFileOffline() ):
441 if not( self.__setNextFileOffline() ):
442 self.dataOut.flagNoData = True
442 self.dataOut.flagNoData = True
443 return 0
443 return 0
444
444
445 self.__setDataOut()
445 self.__setDataOut()
446 self.dataOut.flagNoData = False
446 self.dataOut.flagNoData = False
447
447
448 self.blockIndex += 1
448 self.blockIndex += 1
449
449
450 return
450 return
451
451
452 def run(self, **kwargs):
452 def run(self, **kwargs):
453
453
454 if not(self.isConfig):
454 if not(self.isConfig):
455 self.setup(**kwargs)
455 self.setup(**kwargs)
456 self.isConfig = True
456 self.isConfig = True
457
457
458 self.getData()
458 self.getData()
459
459
460 return
460 return
461
461
462 @MPDecorator
462 @MPDecorator
463 class ParamWriter(Operation):
463 class ParamWriter(Operation):
464 '''
464 '''
465 HDF5 Writer, stores parameters data in HDF5 format files
465 HDF5 Writer, stores parameters data in HDF5 format files
466
466
467 path: path where the files will be stored
467 path: path where the files will be stored
468 blocksPerFile: number of blocks that will be saved in per HDF5 format file
468 blocksPerFile: number of blocks that will be saved in per HDF5 format file
469 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
469 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
470 metadataList: list of attributes that will be stored as metadata
470 metadataList: list of attributes that will be stored as metadata
471 dataList: list of attributes that will be stores as data
471 dataList: list of attributes that will be stores as data
472 '''
472 '''
473
473
474 ext = ".hdf5"
474 ext = ".hdf5"
475 optchar = "D"
475 optchar = "D"
476 metaoptchar = "M"
476 metaoptchar = "M"
477 metaFile = None
477 metaFile = None
478 filename = None
478 filename = None
479 path = None
479 path = None
480 setFile = None
480 setFile = None
481 fp = None
481 fp = None
482 grp = None
482 grp = None
483 ds = None
483 ds = None
484 firsttime = True
484 firsttime = True
485 #Configurations
485 #Configurations
486 blocksPerFile = None
486 blocksPerFile = None
487 blockIndex = None
487 blockIndex = None
488 dataOut = None
488 dataOut = None
489 #Data Arrays
489 #Data Arrays
490 dataList = None
490 dataList = None
491 metadataList = None
491 metadataList = None
492 dsList = None #List of dictionaries with dataset properties
492 dsList = None #List of dictionaries with dataset properties
493 tableDim = None
493 tableDim = None
494 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
494 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
495 currentDay = None
495 currentDay = None
496 lastTime = None
496 lastTime = None
497 setType = None
497 setType = None
498
498
499 def __init__(self):
499 def __init__(self):
500
500
501 Operation.__init__(self)
501 Operation.__init__(self)
502 return
502 return
503
503
504 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
504 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
505 self.path = path
505 self.path = path
506 self.blocksPerFile = blocksPerFile
506 self.blocksPerFile = blocksPerFile
507 self.metadataList = metadataList
507 self.metadataList = metadataList
508 self.dataList = dataList
508 self.dataList = dataList
509 self.dataOut = dataOut
509 self.dataOut = dataOut
510 self.mode = mode
510 self.mode = mode
511 if self.mode is not None:
511 if self.mode is not None:
512 self.mode = numpy.zeros(len(self.dataList)) + mode
512 self.mode = numpy.zeros(len(self.dataList)) + mode
513 else:
513 else:
514 self.mode = numpy.ones(len(self.dataList))
514 self.mode = numpy.ones(len(self.dataList))
515
515
516 self.setType = setType
516 self.setType = setType
517
517
518 arrayDim = numpy.zeros((len(self.dataList),5))
518 arrayDim = numpy.zeros((len(self.dataList),5))
519
519
520 #Table dimensions
520 #Table dimensions
521 dtype0 = self.dtype
521 dtype0 = self.dtype
522 tableList = []
522 tableList = []
523
523
524 #Dictionary and list of tables
524 #Dictionary and list of tables
525 dsList = []
525 dsList = []
526
526
527 for i in range(len(self.dataList)):
527 for i in range(len(self.dataList)):
528 dsDict = {}
528 dsDict = {}
529 dataAux = getattr(self.dataOut, self.dataList[i])
529 dataAux = getattr(self.dataOut, self.dataList[i])
530 dsDict['variable'] = self.dataList[i]
530 dsDict['variable'] = self.dataList[i]
531 #--------------------- Conditionals ------------------------
531 #--------------------- Conditionals ------------------------
532 #There is no data
532 #There is no data
533
533
534 if dataAux is None:
534 if dataAux is None:
535
535
536 return 0
536 return 0
537
537
538 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
538 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
539 dsDict['mode'] = 0
539 dsDict['mode'] = 0
540 dsDict['nDim'] = 0
540 dsDict['nDim'] = 0
541 arrayDim[i,0] = 0
541 arrayDim[i,0] = 0
542 dsList.append(dsDict)
542 dsList.append(dsDict)
543
543
544 #Mode 2: meteors
544 #Mode 2: meteors
545 elif self.mode[i] == 2:
545 elif self.mode[i] == 2:
546 dsDict['dsName'] = 'table0'
546 dsDict['dsName'] = 'table0'
547 dsDict['mode'] = 2 # Mode meteors
547 dsDict['mode'] = 2 # Mode meteors
548 dsDict['shape'] = dataAux.shape[-1]
548 dsDict['shape'] = dataAux.shape[-1]
549 dsDict['nDim'] = 0
549 dsDict['nDim'] = 0
550 dsDict['dsNumber'] = 1
550 dsDict['dsNumber'] = 1
551 arrayDim[i,3] = dataAux.shape[-1]
551 arrayDim[i,3] = dataAux.shape[-1]
552 arrayDim[i,4] = self.mode[i] #Mode the data was stored
552 arrayDim[i,4] = self.mode[i] #Mode the data was stored
553 dsList.append(dsDict)
553 dsList.append(dsDict)
554
554
555 #Mode 1
555 #Mode 1
556 else:
556 else:
557 arrayDim0 = dataAux.shape #Data dimensions
557 arrayDim0 = dataAux.shape #Data dimensions
558 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
558 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
559 arrayDim[i,4] = self.mode[i] #Mode the data was stored
559 arrayDim[i,4] = self.mode[i] #Mode the data was stored
560 strtable = 'table'
560 strtable = 'table'
561 dsDict['mode'] = 1 # Mode parameters
561 dsDict['mode'] = 1 # Mode parameters
562
562
563 # Three-dimension arrays
563 # Three-dimension arrays
564 if len(arrayDim0) == 3:
564 if len(arrayDim0) == 3:
565 arrayDim[i,1:-1] = numpy.array(arrayDim0)
565 arrayDim[i,1:-1] = numpy.array(arrayDim0)
566 nTables = int(arrayDim[i,2])
566 nTables = int(arrayDim[i,2])
567 dsDict['dsNumber'] = nTables
567 dsDict['dsNumber'] = nTables
568 dsDict['shape'] = arrayDim[i,2:4]
568 dsDict['shape'] = arrayDim[i,2:4]
569 dsDict['nDim'] = 3
569 dsDict['nDim'] = 3
570
570
571 for j in range(nTables):
571 for j in range(nTables):
572 dsDict = dsDict.copy()
572 dsDict = dsDict.copy()
573 dsDict['dsName'] = strtable + str(j)
573 dsDict['dsName'] = strtable + str(j)
574 dsList.append(dsDict)
574 dsList.append(dsDict)
575
575
576 # Two-dimension arrays
576 # Two-dimension arrays
577 elif len(arrayDim0) == 2:
577 elif len(arrayDim0) == 2:
578 arrayDim[i,2:-1] = numpy.array(arrayDim0)
578 arrayDim[i,2:-1] = numpy.array(arrayDim0)
579 nTables = int(arrayDim[i,2])
579 nTables = int(arrayDim[i,2])
580 dsDict['dsNumber'] = nTables
580 dsDict['dsNumber'] = nTables
581 dsDict['shape'] = arrayDim[i,3]
581 dsDict['shape'] = arrayDim[i,3]
582 dsDict['nDim'] = 2
582 dsDict['nDim'] = 2
583
583
584 for j in range(nTables):
584 for j in range(nTables):
585 dsDict = dsDict.copy()
585 dsDict = dsDict.copy()
586 dsDict['dsName'] = strtable + str(j)
586 dsDict['dsName'] = strtable + str(j)
587 dsList.append(dsDict)
587 dsList.append(dsDict)
588
588
589 # One-dimension arrays
589 # One-dimension arrays
590 elif len(arrayDim0) == 1:
590 elif len(arrayDim0) == 1:
591 arrayDim[i,3] = arrayDim0[0]
591 arrayDim[i,3] = arrayDim0[0]
592 dsDict['shape'] = arrayDim0[0]
592 dsDict['shape'] = arrayDim0[0]
593 dsDict['dsNumber'] = 1
593 dsDict['dsNumber'] = 1
594 dsDict['dsName'] = strtable + str(0)
594 dsDict['dsName'] = strtable + str(0)
595 dsDict['nDim'] = 1
595 dsDict['nDim'] = 1
596 dsList.append(dsDict)
596 dsList.append(dsDict)
597
597
598 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
598 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
599 tableList.append(table)
599 tableList.append(table)
600
600
601 self.dsList = dsList
601 self.dsList = dsList
602 self.tableDim = numpy.array(tableList, dtype = dtype0)
602 self.tableDim = numpy.array(tableList, dtype = dtype0)
603 self.blockIndex = 0
603 self.blockIndex = 0
604 timeTuple = time.localtime(dataOut.utctime)
604 timeTuple = time.localtime(dataOut.utctime)
605 self.currentDay = timeTuple.tm_yday
605 self.currentDay = timeTuple.tm_yday
606
606
607 def putMetadata(self):
607 def putMetadata(self):
608
608
609 fp = self.createMetadataFile()
609 fp = self.createMetadataFile()
610 self.writeMetadata(fp)
610 self.writeMetadata(fp)
611 fp.close()
611 fp.close()
612 return
612 return
613
613
614 def createMetadataFile(self):
614 def createMetadataFile(self):
615 ext = self.ext
615 ext = self.ext
616 path = self.path
616 path = self.path
617 setFile = self.setFile
617 setFile = self.setFile
618
618
619 timeTuple = time.localtime(self.dataOut.utctime)
619 timeTuple = time.localtime(self.dataOut.utctime)
620
620
621 subfolder = ''
621 subfolder = ''
622 fullpath = os.path.join( path, subfolder )
622 fullpath = os.path.join( path, subfolder )
623
623
624 if not( os.path.exists(fullpath) ):
624 if not( os.path.exists(fullpath) ):
625 os.mkdir(fullpath)
625 os.mkdir(fullpath)
626 setFile = -1 #inicializo mi contador de seteo
626 setFile = -1 #inicializo mi contador de seteo
627
627
628 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
628 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
629 fullpath = os.path.join( path, subfolder )
629 fullpath = os.path.join( path, subfolder )
630
630
631 if not( os.path.exists(fullpath) ):
631 if not( os.path.exists(fullpath) ):
632 os.mkdir(fullpath)
632 os.mkdir(fullpath)
633 setFile = -1 #inicializo mi contador de seteo
633 setFile = -1 #inicializo mi contador de seteo
634
634
635 else:
635 else:
636 filesList = os.listdir( fullpath )
636 filesList = os.listdir( fullpath )
637 filesList = sorted( filesList, key=str.lower )
637 filesList = sorted( filesList, key=str.lower )
638 if len( filesList ) > 0:
638 if len( filesList ) > 0:
639 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
639 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
640 filen = filesList[-1]
640 filen = filesList[-1]
641 # el filename debera tener el siguiente formato
641 # el filename debera tener el siguiente formato
642 # 0 1234 567 89A BCDE (hex)
642 # 0 1234 567 89A BCDE (hex)
643 # x YYYY DDD SSS .ext
643 # x YYYY DDD SSS .ext
644 if isNumber( filen[8:11] ):
644 if isNumber( filen[8:11] ):
645 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
645 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
646 else:
646 else:
647 setFile = -1
647 setFile = -1
648 else:
648 else:
649 setFile = -1 #inicializo mi contador de seteo
649 setFile = -1 #inicializo mi contador de seteo
650
650
651 if self.setType is None:
651 if self.setType is None:
652 setFile += 1
652 setFile += 1
653 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
653 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
654 timeTuple.tm_year,
654 timeTuple.tm_year,
655 timeTuple.tm_yday,
655 timeTuple.tm_yday,
656 setFile,
656 setFile,
657 ext )
657 ext )
658 else:
658 else:
659 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
659 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
660 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
660 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
661 timeTuple.tm_year,
661 timeTuple.tm_year,
662 timeTuple.tm_yday,
662 timeTuple.tm_yday,
663 setFile,
663 setFile,
664 ext )
664 ext )
665
665
666 filename = os.path.join( path, subfolder, file )
666 filename = os.path.join( path, subfolder, file )
667 self.metaFile = file
667 self.metaFile = file
668 #Setting HDF5 File
668 #Setting HDF5 File
669 fp = h5py.File(filename,'w')
669 fp = h5py.File(filename,'w')
670
670
671 return fp
671 return fp
672
672
673 def writeMetadata(self, fp):
673 def writeMetadata(self, fp):
674
674
675 grp = fp.create_group("Metadata")
675 grp = fp.create_group("Metadata")
676 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
676 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
677
677
678 for i in range(len(self.metadataList)):
678 for i in range(len(self.metadataList)):
679 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
679 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
680 return
680 return
681
681
682 def timeFlag(self):
682 def timeFlag(self):
683 currentTime = self.dataOut.utctime
683 currentTime = self.dataOut.utctime
684
684
685 if self.lastTime is None:
685 if self.lastTime is None:
686 self.lastTime = currentTime
686 self.lastTime = currentTime
687
687
688 #Day
688 #Day
689 timeTuple = time.localtime(currentTime)
689 timeTuple = time.localtime(currentTime)
690 dataDay = timeTuple.tm_yday
690 dataDay = timeTuple.tm_yday
691
691
692 #Time
692 #Time
693 timeDiff = currentTime - self.lastTime
693 timeDiff = currentTime - self.lastTime
694
694
695 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
695 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
696 if dataDay != self.currentDay:
696 if dataDay != self.currentDay:
697 self.currentDay = dataDay
697 self.currentDay = dataDay
698 return True
698 return True
699 elif timeDiff > 3*60*60:
699 elif timeDiff > 3*60*60:
700 self.lastTime = currentTime
700 self.lastTime = currentTime
701 return True
701 return True
702 else:
702 else:
703 self.lastTime = currentTime
703 self.lastTime = currentTime
704 return False
704 return False
705
705
706 def setNextFile(self):
706 def setNextFile(self):
707
707
708 ext = self.ext
708 ext = self.ext
709 path = self.path
709 path = self.path
710 setFile = self.setFile
710 setFile = self.setFile
711 mode = self.mode
711 mode = self.mode
712
712
713 timeTuple = time.localtime(self.dataOut.utctime)
713 timeTuple = time.localtime(self.dataOut.utctime)
714 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
714 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
715
715
716 fullpath = os.path.join( path, subfolder )
716 fullpath = os.path.join( path, subfolder )
717
717
718 if os.path.exists(fullpath):
718 if os.path.exists(fullpath):
719 filesList = os.listdir( fullpath )
719 filesList = os.listdir( fullpath )
720 filesList = [k for k in filesList if 'M' in k]
720 filesList = [k for k in filesList if 'M' in k]
721 if len( filesList ) > 0:
721 if len( filesList ) > 0:
722 filesList = sorted( filesList, key=str.lower )
722 filesList = sorted( filesList, key=str.lower )
723 filen = filesList[-1]
723 filen = filesList[-1]
724 # el filename debera tener el siguiente formato
724 # el filename debera tener el siguiente formato
725 # 0 1234 567 89A BCDE (hex)
725 # 0 1234 567 89A BCDE (hex)
726 # x YYYY DDD SSS .ext
726 # x YYYY DDD SSS .ext
727 if isNumber( filen[8:11] ):
727 if isNumber( filen[8:11] ):
728 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
728 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
729 else:
729 else:
730 setFile = -1
730 setFile = -1
731 else:
731 else:
732 setFile = -1 #inicializo mi contador de seteo
732 setFile = -1 #inicializo mi contador de seteo
733 else:
733 else:
734 os.makedirs(fullpath)
734 os.makedirs(fullpath)
735 setFile = -1 #inicializo mi contador de seteo
735 setFile = -1 #inicializo mi contador de seteo
736
736
737 if self.setType is None:
737 if self.setType is None:
738 setFile += 1
738 setFile += 1
739 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
739 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
740 timeTuple.tm_year,
740 timeTuple.tm_year,
741 timeTuple.tm_yday,
741 timeTuple.tm_yday,
742 setFile,
742 setFile,
743 ext )
743 ext )
744 else:
744 else:
745 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
745 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
746 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
746 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
747 timeTuple.tm_year,
747 timeTuple.tm_year,
748 timeTuple.tm_yday,
748 timeTuple.tm_yday,
749 setFile,
749 setFile,
750 ext )
750 ext )
751
751
752 filename = os.path.join( path, subfolder, file )
752 filename = os.path.join( path, subfolder, file )
753
753
754 #Setting HDF5 File
754 #Setting HDF5 File
755 fp = h5py.File(filename,'w')
755 fp = h5py.File(filename,'w')
756 #write metadata
756 #write metadata
757 self.writeMetadata(fp)
757 self.writeMetadata(fp)
758 #Write data
758 #Write data
759 grp = fp.create_group("Data")
759 grp = fp.create_group("Data")
760 ds = []
760 ds = []
761 data = []
761 data = []
762 dsList = self.dsList
762 dsList = self.dsList
763 i = 0
763 i = 0
764 while i < len(dsList):
764 while i < len(dsList):
765 dsInfo = dsList[i]
765 dsInfo = dsList[i]
766 #One-dimension data
766 #One-dimension data
767 if dsInfo['mode'] == 0:
767 if dsInfo['mode'] == 0:
768 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
768 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
769 ds.append(ds0)
769 ds.append(ds0)
770 data.append([])
770 data.append([])
771 i += 1
771 i += 1
772 continue
772 continue
773
773
774 elif dsInfo['mode'] == 2:
774 elif dsInfo['mode'] == 2:
775 grp0 = grp.create_group(dsInfo['variable'])
775 grp0 = grp.create_group(dsInfo['variable'])
776 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
776 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
777 ds.append(ds0)
777 ds.append(ds0)
778 data.append([])
778 data.append([])
779 i += 1
779 i += 1
780 continue
780 continue
781
781
782 elif dsInfo['mode'] == 1:
782 elif dsInfo['mode'] == 1:
783 grp0 = grp.create_group(dsInfo['variable'])
783 grp0 = grp.create_group(dsInfo['variable'])
784
784
785 for j in range(dsInfo['dsNumber']):
785 for j in range(dsInfo['dsNumber']):
786 dsInfo = dsList[i]
786 dsInfo = dsList[i]
787 tableName = dsInfo['dsName']
787 tableName = dsInfo['dsName']
788
788
789
789
790 if dsInfo['nDim'] == 3:
790 if dsInfo['nDim'] == 3:
791 shape = dsInfo['shape'].astype(int)
791 shape = dsInfo['shape'].astype(int)
792 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
792 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
793 else:
793 else:
794 shape = int(dsInfo['shape'])
794 shape = int(dsInfo['shape'])
795 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
795 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
796
796
797 ds.append(ds0)
797 ds.append(ds0)
798 data.append([])
798 data.append([])
799 i += 1
799 i += 1
800
800
801 fp.flush()
801 fp.flush()
802 fp.close()
802 fp.close()
803
803
804 log.log('creating file: {}'.format(filename), 'Writing')
804 log.log('creating file: {}'.format(filename), 'Writing')
805 self.filename = filename
805 self.filename = filename
806 self.ds = ds
806 self.ds = ds
807 self.data = data
807 self.data = data
808 self.firsttime = True
808 self.firsttime = True
809 self.blockIndex = 0
809 self.blockIndex = 0
810 return
810 return
811
811
812 def putData(self):
812 def putData(self):
813
813
814 if self.blockIndex == self.blocksPerFile or self.timeFlag():
814 if self.blockIndex == self.blocksPerFile or self.timeFlag():
815 self.setNextFile()
815 self.setNextFile()
816
816
817 self.readBlock()
817 self.readBlock()
818 self.setBlock() #Prepare data to be written
818 self.setBlock() #Prepare data to be written
819 self.writeBlock() #Write data
819 self.writeBlock() #Write data
820
820
821 return
821 return
822
822
823 def readBlock(self):
823 def readBlock(self):
824
824
825 '''
825 '''
826 data Array configured
826 data Array configured
827
827
828
828
829 self.data
829 self.data
830 '''
830 '''
831 dsList = self.dsList
831 dsList = self.dsList
832 ds = self.ds
832 ds = self.ds
833 #Setting HDF5 File
833 #Setting HDF5 File
834 fp = h5py.File(self.filename,'r+')
834 fp = h5py.File(self.filename,'r+')
835 grp = fp["Data"]
835 grp = fp["Data"]
836 ind = 0
836 ind = 0
837
837
838 while ind < len(dsList):
838 while ind < len(dsList):
839 dsInfo = dsList[ind]
839 dsInfo = dsList[ind]
840
840
841 if dsInfo['mode'] == 0:
841 if dsInfo['mode'] == 0:
842 ds0 = grp[dsInfo['variable']]
842 ds0 = grp[dsInfo['variable']]
843 ds[ind] = ds0
843 ds[ind] = ds0
844 ind += 1
844 ind += 1
845 else:
845 else:
846
846
847 grp0 = grp[dsInfo['variable']]
847 grp0 = grp[dsInfo['variable']]
848
848
849 for j in range(dsInfo['dsNumber']):
849 for j in range(dsInfo['dsNumber']):
850 dsInfo = dsList[ind]
850 dsInfo = dsList[ind]
851 ds0 = grp0[dsInfo['dsName']]
851 ds0 = grp0[dsInfo['dsName']]
852 ds[ind] = ds0
852 ds[ind] = ds0
853 ind += 1
853 ind += 1
854
854
855 self.fp = fp
855 self.fp = fp
856 self.grp = grp
856 self.grp = grp
857 self.ds = ds
857 self.ds = ds
858
858
859 return
859 return
860
860
861 def setBlock(self):
861 def setBlock(self):
862 '''
862 '''
863 data Array configured
863 data Array configured
864
864
865
865
866 self.data
866 self.data
867 '''
867 '''
868 #Creating Arrays
868 #Creating Arrays
869 dsList = self.dsList
869 dsList = self.dsList
870 data = self.data
870 data = self.data
871 ind = 0
871 ind = 0
872
872
873 while ind < len(dsList):
873 while ind < len(dsList):
874 dsInfo = dsList[ind]
874 dsInfo = dsList[ind]
875 dataAux = getattr(self.dataOut, dsInfo['variable'])
875 dataAux = getattr(self.dataOut, dsInfo['variable'])
876
876
877 mode = dsInfo['mode']
877 mode = dsInfo['mode']
878 nDim = dsInfo['nDim']
878 nDim = dsInfo['nDim']
879
879
880 if mode == 0 or mode == 2 or nDim == 1:
880 if mode == 0 or mode == 2 or nDim == 1:
881 data[ind] = dataAux
881 data[ind] = dataAux
882 ind += 1
882 ind += 1
883 # elif nDim == 1:
883 # elif nDim == 1:
884 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
884 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
885 # ind += 1
885 # ind += 1
886 elif nDim == 2:
886 elif nDim == 2:
887 for j in range(dsInfo['dsNumber']):
887 for j in range(dsInfo['dsNumber']):
888 data[ind] = dataAux[j,:]
888 data[ind] = dataAux[j,:]
889 ind += 1
889 ind += 1
890 elif nDim == 3:
890 elif nDim == 3:
891 for j in range(dsInfo['dsNumber']):
891 for j in range(dsInfo['dsNumber']):
892 data[ind] = dataAux[:,j,:]
892 data[ind] = dataAux[:,j,:]
893 ind += 1
893 ind += 1
894
894
895 self.data = data
895 self.data = data
896 return
896 return
897
897
898 def writeBlock(self):
898 def writeBlock(self):
899 '''
899 '''
900 Saves the block in the HDF5 file
900 Saves the block in the HDF5 file
901 '''
901 '''
902 dsList = self.dsList
902 dsList = self.dsList
903
903
904 for i in range(len(self.ds)):
904 for i in range(len(self.ds)):
905 dsInfo = dsList[i]
905 dsInfo = dsList[i]
906 nDim = dsInfo['nDim']
906 nDim = dsInfo['nDim']
907 mode = dsInfo['mode']
907 mode = dsInfo['mode']
908
908
909 # First time
909 # First time
910 if self.firsttime:
910 if self.firsttime:
911 if type(self.data[i]) == numpy.ndarray:
911 if type(self.data[i]) == numpy.ndarray:
912
912
913 if nDim == 3:
913 if nDim == 3:
914 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
914 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
915 self.ds[i].resize(self.data[i].shape)
915 self.ds[i].resize(self.data[i].shape)
916 if mode == 2:
916 if mode == 2:
917 self.ds[i].resize(self.data[i].shape)
917 self.ds[i].resize(self.data[i].shape)
918 self.ds[i][:] = self.data[i]
918 self.ds[i][:] = self.data[i]
919 else:
919 else:
920
920
921 # From second time
921 # From second time
922 # Meteors!
922 # Meteors!
923 if mode == 2:
923 if mode == 2:
924 dataShape = self.data[i].shape
924 dataShape = self.data[i].shape
925 dsShape = self.ds[i].shape
925 dsShape = self.ds[i].shape
926 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
926 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
927 self.ds[i][dsShape[0]:,:] = self.data[i]
927 self.ds[i][dsShape[0]:,:] = self.data[i]
928 # No dimension
928 # No dimension
929 elif mode == 0:
929 elif mode == 0:
930 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
930 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
931 self.ds[i][0,-1] = self.data[i]
931 self.ds[i][0,-1] = self.data[i]
932 # One dimension
932 # One dimension
933 elif nDim == 1:
933 elif nDim == 1:
934 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
934 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
935 self.ds[i][-1,:] = self.data[i]
935 self.ds[i][-1,:] = self.data[i]
936 # Two dimension
936 # Two dimension
937 elif nDim == 2:
937 elif nDim == 2:
938 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
938 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
939 self.ds[i][self.blockIndex,:] = self.data[i]
939 self.ds[i][self.blockIndex,:] = self.data[i]
940 # Three dimensions
940 # Three dimensions
941 elif nDim == 3:
941 elif nDim == 3:
942 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
942 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
943 self.ds[i][:,:,-1] = self.data[i]
943 self.ds[i][:,:,-1] = self.data[i]
944
944
945 self.firsttime = False
945 self.firsttime = False
946 self.blockIndex += 1
946 self.blockIndex += 1
947
947
948 #Close to save changes
948 #Close to save changes
949 self.fp.flush()
949 self.fp.flush()
950 self.fp.close()
950 self.fp.close()
951 return
951 return
952
952
953 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
953 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
954
954
955 self.dataOut = dataOut
955 self.dataOut = dataOut
956 if not(self.isConfig):
956 if not(self.isConfig):
957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
958 metadataList=metadataList, dataList=dataList, mode=mode,
958 metadataList=metadataList, dataList=dataList, mode=mode,
959 setType=setType)
959 setType=setType)
960
960
961 self.isConfig = True
961 self.isConfig = True
962 self.setNextFile()
962 self.setNextFile()
963
963
964 self.putData()
964 self.putData()
965 return
965 return
966
966
967
967
968 @MPDecorator
968 @MPDecorator
969 class ParameterReader(JRODataReader,ProcessingUnit):
969 class ParameterReader(Reader, ProcessingUnit):
970 '''
970 '''
971 Reads HDF5 format files
971 Reads HDF5 format files
972 '''
972 '''
973
973
974 ext = ".hdf5"
975 optchar = "D"
976 timezone = None
977 startTime = None
978 endTime = None
979 fileIndex = None
980 blockList = None #List to blocks to be read from the file
981 blocksPerFile = None #Number of blocks to be read
982 blockIndex = None
983 path = None
984 #List of Files
985 filenameList = None
986 datetimeList = None
987 #Hdf5 File
988 listMetaname = None
989 listMeta = None
990 listDataname = None
991 listData = None
992 listShapes = None
993 fp = None
994 #dataOut reconstruction
995 dataOut = None
996
997 def __init__(self):
974 def __init__(self):
998 ProcessingUnit.__init__(self)
975 ProcessingUnit.__init__(self)
999 self.dataOut = Parameters()
976 self.dataOut = Parameters()
1000 return
977 self.ext = ".hdf5"
978 self.optchar = "D"
979 self.timezone = "lt"
980 self.listMetaname = []
981 self.listMeta = []
982 self.listDataname = []
983 self.listData = []
984 self.listShapes = []
985 self.open_file = h5py.File
986 self.open_mode = 'r'
987 self.metadata = False
988 self.filefmt = "*%Y%j***"
989 self.folderfmt = "*%Y%j"
1001
990
1002 def setup(self, **kwargs):
991 def setup(self, **kwargs):
1003
992
1004 path = kwargs['path']
993 self.set_kwargs(**kwargs)
1005 startDate = kwargs['startDate']
994 if not self.ext.startswith('.'):
1006 endDate = kwargs['endDate']
995 self.ext = '.{}'.format(self.ext)
1007 startTime = kwargs['startTime']
1008 endTime = kwargs['endTime']
1009 walk = kwargs['walk']
1010 if 'ext' in kwargs:
1011 ext = kwargs['ext']
1012 else:
1013 ext = '.hdf5'
1014 if 'timezone' in kwargs:
1015 self.timezone = kwargs['timezone']
1016 else:
1017 self.timezone = 'lt'
1018
1019 print("[Reading] Searching files in offline mode ...")
1020 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1021 startTime=startTime, endTime=endTime,
1022 ext=ext, walk=walk)
1023
1024 if not(filenameList):
1025 print("There is no files into the folder: %s"%(path))
1026 sys.exit(-1)
1027
1028 self.fileIndex = -1
1029 self.startTime = startTime
1030 self.endTime = endTime
1031 self.__readMetadata()
1032 self.__setNextFileOffline()
1033
1034 return
1035
1036 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1037
1038 expLabel = ''
1039 self.filenameList = []
1040 self.datetimeList = []
1041 pathList = []
1042 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1043
1044 if dateList == []:
1045 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1046 datetime.datetime.combine(startDate,startTime).ctime(),
1047 datetime.datetime.combine(endDate,endTime).ctime()))
1048
1049 return None, None
1050
1051 if len(dateList) > 1:
1052 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1053 else:
1054 print("[Reading] data was found for the date %s" %(dateList[0]))
1055
1056 filenameList = []
1057 datetimeList = []
1058
1059 for thisPath in pathList:
1060
1061 fileList = glob.glob1(thisPath, "*%s" %ext)
1062 fileList.sort()
1063
1064 for file in fileList:
1065
1066 filename = os.path.join(thisPath,file)
1067
1068 if not isFileInDateRange(filename, startDate, endDate):
1069 continue
1070
1071 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1072
1073 if not(thisDatetime):
1074 continue
1075
1076 filenameList.append(filename)
1077 datetimeList.append(thisDatetime)
1078
1079 if not(filenameList):
1080 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1081 return None, None
1082
1083 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1084 print()
1085
1086 self.filenameList = filenameList
1087 self.datetimeList = datetimeList
1088
1089 return pathList, filenameList
1090
1091 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1092
1093 """
1094 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1095
1096 Inputs:
1097 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1098 startDate : fecha inicial del rango seleccionado en formato datetime.date
1099 endDate : fecha final del rango seleccionado en formato datetime.date
1100 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1101 endTime : tiempo final del rango seleccionado en formato datetime.time
1102
1103 Return:
1104 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1105 fecha especificado, de lo contrario retorna False.
1106
996
1107 Excepciones:
997 if self.online:
1108 Si el archivo no existe o no puede ser abierto
998 log.log("Searching files in online mode...", self.name)
1109 Si la cabecera no puede ser leida.
1110
999
1111 """
1000 for nTries in range(self.nTries):
1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 self.filefmt, self.folderfmt)
1112
1004
1113 try:
1005 try:
1114 fp = h5py.File(filename, 'r')
1006 fullpath = next(fullpath)
1115 grp1 = fp['Data']
1007 except:
1116
1008 fullpath = None
1117 except IOError:
1009
1118 traceback.print_exc()
1010 if fullpath:
1119 raise IOError("The file %s can't be opened" %(filename))
1011 break
1120 #In case has utctime attribute
1012
1121 grp2 = grp1['utctime']
1013 log.warning(
1122 thisUtcTime = grp2.value[0]
1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1123
1015 self.delay, self.path, nTries + 1),
1124 fp.close()
1016 self.name)
1125
1017 time.sleep(self.delay)
1126 if self.timezone == 'lt':
1018
1127 thisUtcTime -= 5*3600
1019 if not(fullpath):
1128
1020 raise schainpy.admin.SchainError(
1129 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1021 'There isn\'t any valid file in {}'.format(self.path))
1130 thisDate = thisDatetime.date()
1022
1131 thisTime = thisDatetime.time()
1023 pathname, filename = os.path.split(fullpath)
1132
1024 self.year = int(filename[1:5])
1133 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1025 self.doy = int(filename[5:8])
1134 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1026 self.set = int(filename[8:11]) - 1
1135
1027 else:
1136 #General case
1028 log.log("Searching files in {}".format(self.path), self.name)
1137 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1138 #-----------o----------------------------o-----------
1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1139 # startTime endTime
1140
1141 if endTime >= startTime:
1142 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1143 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1144 return thisDatetime
1145 return None
1146
1147 #If endTime < startTime then endTime belongs to the next day
1148 #<<<<<<<<<<<o o>>>>>>>>>>>
1149 #-----------o----------------------------o-----------
1150 # endTime startTime
1151
1152 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1153 return None
1154
1155 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1156 return None
1157
1158 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1159 return None
1160
1161 return thisDatetime
1162
1163 def __setNextFileOffline(self):
1164
1165 self.fileIndex += 1
1166 idFile = self.fileIndex
1167
1031
1168 if not(idFile < len(self.filenameList)):
1032 self.setNextFile()
1169 raise schainpy.admin.SchainError('No more files')
1170
1033
1171 filename = self.filenameList[idFile]
1034 return
1172 self.fp = h5py.File(filename, 'r')
1173 self.filename = filename
1174
1035
1175 print("Setting the file: %s"%self.filename)
1036 def readFirstHeader(self):
1037 '''Read metadata and data'''
1176
1038
1177 self.__setBlockList()
1039 self.__readMetadata()
1178 self.__readData()
1040 self.__readData()
1041 self.__setBlockList()
1179 self.blockIndex = 0
1042 self.blockIndex = 0
1180 return 1
1043
1044 return
1181
1045
1182 def __setBlockList(self):
1046 def __setBlockList(self):
1183 '''
1047 '''
1184 Selects the data within the times defined
1048 Selects the data within the times defined
1185
1049
1186 self.fp
1050 self.fp
1187 self.startTime
1051 self.startTime
1188 self.endTime
1052 self.endTime
1189 self.blockList
1053 self.blockList
1190 self.blocksPerFile
1054 self.blocksPerFile
1191
1055
1192 '''
1056 '''
1193 fp = self.fp
1057
1194 startTime = self.startTime
1058 startTime = self.startTime
1195 endTime = self.endTime
1059 endTime = self.endTime
1196
1060
1197 grp = fp['Data']
1061 index = self.listDataname.index('utctime')
1198 thisUtcTime = grp['utctime'].value
1062 thisUtcTime = self.listData[index]
1063 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
1199
1064
1200 if self.timezone == 'lt':
1065 if self.timezone == 'lt':
1201 thisUtcTime -= 5*3600
1066 thisUtcTime -= 5*3600
1202
1067
1203 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
1068 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
1204
1069
1205 thisDate = thisDatetime.date()
1070 thisDate = thisDatetime.date()
1206 thisTime = thisDatetime.time()
1071 thisTime = thisDatetime.time()
1207
1072
1208 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1073 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1209 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1074 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1210
1075
1211 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
1076 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
1212
1077
1213 self.blockList = ind
1078 self.blockList = ind
1214 self.blocksPerFile = len(ind)
1079 self.blocksPerFile = len(ind)
1215 return
1080 return
1216
1081
1217 def __readMetadata(self):
1082 def __readMetadata(self):
1218 '''
1083 '''
1219 Reads Metadata
1084 Reads Metadata
1220 '''
1085 '''
1221
1086
1222 filename = self.filenameList[0]
1223 fp = h5py.File(filename, 'r')
1224 gp = fp['Metadata']
1225 listMetaname = []
1087 listMetaname = []
1226 listMetadata = []
1088 listMetadata = []
1227
1089 if 'Metadata' in self.fp:
1090 gp = self.fp['Metadata']
1228 for item in list(gp.items()):
1091 for item in list(gp.items()):
1229 name = item[0]
1092 name = item[0]
1230
1093
1231 if name=='variables':
1094 if name=='variables':
1232 table = gp[name][:]
1095 table = gp[name][:]
1233 listShapes = {}
1096 listShapes = {}
1234 for shapes in table:
1097 for shapes in table:
1235 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1098 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1236 else:
1099 else:
1237 data = gp[name].value
1100 data = gp[name].value
1238 listMetaname.append(name)
1101 listMetaname.append(name)
1239 listMetadata.append(data)
1102 listMetadata.append(data)
1103 elif self.metadata:
1104 metadata = json.loads(self.metadata)
1105 listShapes = {}
1106 for tup in metadata:
1107 name, values, dim = tup
1108 if dim == -1:
1109 listMetaname.append(name)
1110 listMetadata.append(self.fp[values].value)
1111 else:
1112 listShapes[name] = numpy.array([dim])
1113 else:
1114 raise IOError('Missing Metadata group in file or metadata info')
1240
1115
1241 self.listShapes = listShapes
1116 self.listShapes = listShapes
1242 self.listMetaname = listMetaname
1117 self.listMetaname = listMetaname
1243 self.listMeta = listMetadata
1118 self.listMeta = listMetadata
1244
1119
1245 fp.close()
1246 return
1120 return
1247
1121
1248 def __readData(self):
1122 def __readData(self):
1249
1123
1250 grp = self.fp['Data']
1251 listdataname = []
1124 listdataname = []
1252 listdata = []
1125 listdata = []
1253
1126
1127 if 'Data' in self.fp:
1128 grp = self.fp['Data']
1254 for item in list(grp.items()):
1129 for item in list(grp.items()):
1255 name = item[0]
1130 name = item[0]
1256 listdataname.append(name)
1131 listdataname.append(name)
1257 dim = self.listShapes[name][0]
1132 dim = self.listShapes[name][0]
1258 if dim == 0:
1133 if dim == 0:
1259 array = grp[name].value
1134 array = grp[name].value
1260 else:
1135 else:
1261 array = []
1136 array = []
1262 for i in range(dim):
1137 for i in range(dim):
1263 array.append(grp[name]['table{:02d}'.format(i)].value)
1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1264 array = numpy.array(array)
1139 array = numpy.array(array)
1265
1140
1266 listdata.append(array)
1141 listdata.append(array)
1142 elif self.metadata:
1143 metadata = json.loads(self.metadata)
1144 for tup in metadata:
1145 name, values, dim = tup
1146 listdataname.append(name)
1147 if dim == -1:
1148 continue
1149 elif dim == 0:
1150 array = self.fp[values].value
1151 else:
1152 array = []
1153 for var in values:
1154 array.append(self.fp[var].value)
1155 array = numpy.array(array)
1156 listdata.append(array)
1157 else:
1158 raise IOError('Missing Data group in file or metadata info')
1267
1159
1268 self.listDataname = listdataname
1160 self.listDataname = listdataname
1269 self.listData = listdata
1161 self.listData = listdata
1270 return
1162 return
1271
1163
1272 def getData(self):
1164 def getData(self):
1273
1165
1274 for i in range(len(self.listMeta)):
1166 for i in range(len(self.listMeta)):
1275 setattr(self.dataOut, self.listMetaname[i], self.listMeta[i])
1167 setattr(self.dataOut, self.listMetaname[i], self.listMeta[i])
1276
1168
1277 for j in range(len(self.listData)):
1169 for j in range(len(self.listData)):
1278 dim = self.listShapes[self.listDataname[j]][0]
1170 dim = self.listShapes[self.listDataname[j]][0]
1279 if dim == 0:
1171 if dim == 0:
1280 setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex])
1172 setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex])
1281 else:
1173 else:
1282 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1174 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283
1175
1176 self.dataOut.paramInterval = self.interval
1284 self.dataOut.flagNoData = False
1177 self.dataOut.flagNoData = False
1285 self.blockIndex += 1
1178 self.blockIndex += 1
1286
1179
1287 return
1180 return
1288
1181
1289 def run(self, **kwargs):
1182 def run(self, **kwargs):
1290
1183
1291 if not(self.isConfig):
1184 if not(self.isConfig):
1292 self.setup(**kwargs)
1185 self.setup(**kwargs)
1293 self.isConfig = True
1186 self.isConfig = True
1294
1187
1295 if self.blockIndex == self.blocksPerFile:
1188 if self.blockIndex == self.blocksPerFile:
1296 if not(self.__setNextFileOffline()):
1189 self.setNextFile()
1297 self.dataOut.flagNoData = True
1298 return 0
1299
1190
1300 self.getData()
1191 self.getData()
1301
1192
1302 return
1193 return
1303
1194
1304 @MPDecorator
1195 @MPDecorator
1305 class ParameterWriter(Operation):
1196 class ParameterWriter(Operation):
1306 '''
1197 '''
1307 HDF5 Writer, stores parameters data in HDF5 format files
1198 HDF5 Writer, stores parameters data in HDF5 format files
1308
1199
1309 path: path where the files will be stored
1200 path: path where the files will be stored
1310 blocksPerFile: number of blocks that will be saved in per HDF5 format file
1201 blocksPerFile: number of blocks that will be saved in per HDF5 format file
1311 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
1202 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
1312 metadataList: list of attributes that will be stored as metadata
1203 metadataList: list of attributes that will be stored as metadata
1313 dataList: list of attributes that will be stores as data
1204 dataList: list of attributes that will be stores as data
1314 '''
1205 '''
1315
1206
1316
1207
1317 ext = ".hdf5"
1208 ext = ".hdf5"
1318 optchar = "D"
1209 optchar = "D"
1319 metaoptchar = "M"
1210 metaoptchar = "M"
1320 metaFile = None
1211 metaFile = None
1321 filename = None
1212 filename = None
1322 path = None
1213 path = None
1323 setFile = None
1214 setFile = None
1324 fp = None
1215 fp = None
1325 grp = None
1216 grp = None
1326 ds = None
1217 ds = None
1327 firsttime = True
1218 firsttime = True
1328 #Configurations
1219 #Configurations
1329 blocksPerFile = None
1220 blocksPerFile = None
1330 blockIndex = None
1221 blockIndex = None
1331 dataOut = None
1222 dataOut = None
1332 #Data Arrays
1223 #Data Arrays
1333 dataList = None
1224 dataList = None
1334 metadataList = None
1225 metadataList = None
1335 dsList = None #List of dictionaries with dataset properties
1226 dsList = None #List of dictionaries with dataset properties
1336 tableDim = None
1227 tableDim = None
1337 dtype = [('name', 'S20'),('nDim', 'i')]
1228 dtype = [('name', 'S20'),('nDim', 'i')]
1338 currentDay = None
1229 currentDay = None
1339 lastTime = None
1230 lastTime = None
1340
1231
1341 def __init__(self):
1232 def __init__(self):
1342
1233
1343 Operation.__init__(self)
1234 Operation.__init__(self)
1344 return
1235 return
1345
1236
1346 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1237 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1347 self.path = path
1238 self.path = path
1348 self.blocksPerFile = blocksPerFile
1239 self.blocksPerFile = blocksPerFile
1349 self.metadataList = metadataList
1240 self.metadataList = metadataList
1350 self.dataList = dataList
1241 self.dataList = dataList
1351 self.setType = setType
1242 self.setType = setType
1352
1243
1353 tableList = []
1244 tableList = []
1354 dsList = []
1245 dsList = []
1355
1246
1356 for i in range(len(self.dataList)):
1247 for i in range(len(self.dataList)):
1357 dsDict = {}
1248 dsDict = {}
1358 dataAux = getattr(self.dataOut, self.dataList[i])
1249 dataAux = getattr(self.dataOut, self.dataList[i])
1359 dsDict['variable'] = self.dataList[i]
1250 dsDict['variable'] = self.dataList[i]
1360
1251
1361 if dataAux is None:
1252 if dataAux is None:
1362 continue
1253 continue
1363 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
1254 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
1364 dsDict['nDim'] = 0
1255 dsDict['nDim'] = 0
1365 else:
1256 else:
1366 dsDict['nDim'] = len(dataAux.shape)
1257 dsDict['nDim'] = len(dataAux.shape)
1367 dsDict['shape'] = dataAux.shape
1258 dsDict['shape'] = dataAux.shape
1368 dsDict['dsNumber'] = dataAux.shape[0]
1259 dsDict['dsNumber'] = dataAux.shape[0]
1369
1260
1370 dsList.append(dsDict)
1261 dsList.append(dsDict)
1371 tableList.append((self.dataList[i], dsDict['nDim']))
1262 tableList.append((self.dataList[i], dsDict['nDim']))
1372
1263
1373 self.dsList = dsList
1264 self.dsList = dsList
1374 self.tableDim = numpy.array(tableList, dtype=self.dtype)
1265 self.tableDim = numpy.array(tableList, dtype=self.dtype)
1375 self.currentDay = self.dataOut.datatime.date()
1266 self.currentDay = self.dataOut.datatime.date()
1376
1267
1377 def timeFlag(self):
1268 def timeFlag(self):
1378 currentTime = self.dataOut.utctime
1269 currentTime = self.dataOut.utctime
1379 timeTuple = time.localtime(currentTime)
1270 timeTuple = time.localtime(currentTime)
1380 dataDay = timeTuple.tm_yday
1271 dataDay = timeTuple.tm_yday
1381
1272
1382 if self.lastTime is None:
1273 if self.lastTime is None:
1383 self.lastTime = currentTime
1274 self.lastTime = currentTime
1384 self.currentDay = dataDay
1275 self.currentDay = dataDay
1385 return False
1276 return False
1386
1277
1387 timeDiff = currentTime - self.lastTime
1278 timeDiff = currentTime - self.lastTime
1388
1279
1389 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
1280 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
1390 if dataDay != self.currentDay:
1281 if dataDay != self.currentDay:
1391 self.currentDay = dataDay
1282 self.currentDay = dataDay
1392 return True
1283 return True
1393 elif timeDiff > 3*60*60:
1284 elif timeDiff > 3*60*60:
1394 self.lastTime = currentTime
1285 self.lastTime = currentTime
1395 return True
1286 return True
1396 else:
1287 else:
1397 self.lastTime = currentTime
1288 self.lastTime = currentTime
1398 return False
1289 return False
1399
1290
1400 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1291 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1401
1292
1402 self.dataOut = dataOut
1293 self.dataOut = dataOut
1403 if not(self.isConfig):
1294 if not(self.isConfig):
1404 self.setup(path=path, blocksPerFile=blocksPerFile,
1295 self.setup(path=path, blocksPerFile=blocksPerFile,
1405 metadataList=metadataList, dataList=dataList,
1296 metadataList=metadataList, dataList=dataList,
1406 setType=setType)
1297 setType=setType)
1407
1298
1408 self.isConfig = True
1299 self.isConfig = True
1409 self.setNextFile()
1300 self.setNextFile()
1410
1301
1411 self.putData()
1302 self.putData()
1412 return
1303 return
1413
1304
1414 def setNextFile(self):
1305 def setNextFile(self):
1415
1306
1416 ext = self.ext
1307 ext = self.ext
1417 path = self.path
1308 path = self.path
1418 setFile = self.setFile
1309 setFile = self.setFile
1419
1310
1420 timeTuple = time.localtime(self.dataOut.utctime)
1311 timeTuple = time.localtime(self.dataOut.utctime)
1421 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1312 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1422 fullpath = os.path.join(path, subfolder)
1313 fullpath = os.path.join(path, subfolder)
1423
1314
1424 if os.path.exists(fullpath):
1315 if os.path.exists(fullpath):
1425 filesList = os.listdir(fullpath)
1316 filesList = os.listdir(fullpath)
1426 filesList = [k for k in filesList if k.startswith(self.optchar)]
1317 filesList = [k for k in filesList if k.startswith(self.optchar)]
1427 if len( filesList ) > 0:
1318 if len( filesList ) > 0:
1428 filesList = sorted(filesList, key=str.lower)
1319 filesList = sorted(filesList, key=str.lower)
1429 filen = filesList[-1]
1320 filen = filesList[-1]
1430 # el filename debera tener el siguiente formato
1321 # el filename debera tener el siguiente formato
1431 # 0 1234 567 89A BCDE (hex)
1322 # 0 1234 567 89A BCDE (hex)
1432 # x YYYY DDD SSS .ext
1323 # x YYYY DDD SSS .ext
1433 if isNumber(filen[8:11]):
1324 if isNumber(filen[8:11]):
1434 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
1325 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
1435 else:
1326 else:
1436 setFile = -1
1327 setFile = -1
1437 else:
1328 else:
1438 setFile = -1 #inicializo mi contador de seteo
1329 setFile = -1 #inicializo mi contador de seteo
1439 else:
1330 else:
1440 os.makedirs(fullpath)
1331 os.makedirs(fullpath)
1441 setFile = -1 #inicializo mi contador de seteo
1332 setFile = -1 #inicializo mi contador de seteo
1442
1333
1443 if self.setType is None:
1334 if self.setType is None:
1444 setFile += 1
1335 setFile += 1
1445 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
1336 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
1446 timeTuple.tm_year,
1337 timeTuple.tm_year,
1447 timeTuple.tm_yday,
1338 timeTuple.tm_yday,
1448 setFile,
1339 setFile,
1449 ext )
1340 ext )
1450 else:
1341 else:
1451 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
1342 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
1452 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
1343 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
1453 timeTuple.tm_year,
1344 timeTuple.tm_year,
1454 timeTuple.tm_yday,
1345 timeTuple.tm_yday,
1455 setFile,
1346 setFile,
1456 ext )
1347 ext )
1457
1348
1458 self.filename = os.path.join( path, subfolder, file )
1349 self.filename = os.path.join( path, subfolder, file )
1459
1350
1460 #Setting HDF5 File
1351 #Setting HDF5 File
1461 self.fp = h5py.File(self.filename, 'w')
1352 self.fp = h5py.File(self.filename, 'w')
1462 #write metadata
1353 #write metadata
1463 self.writeMetadata(self.fp)
1354 self.writeMetadata(self.fp)
1464 #Write data
1355 #Write data
1465 self.writeData(self.fp)
1356 self.writeData(self.fp)
1466
1357
1467 def writeMetadata(self, fp):
1358 def writeMetadata(self, fp):
1468
1359
1469 grp = fp.create_group("Metadata")
1360 grp = fp.create_group("Metadata")
1470 grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype)
1361 grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype)
1471
1362
1472 for i in range(len(self.metadataList)):
1363 for i in range(len(self.metadataList)):
1473 if not hasattr(self.dataOut, self.metadataList[i]):
1364 if not hasattr(self.dataOut, self.metadataList[i]):
1474 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
1365 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
1475 continue
1366 continue
1476 value = getattr(self.dataOut, self.metadataList[i])
1367 value = getattr(self.dataOut, self.metadataList[i])
1477 grp.create_dataset(self.metadataList[i], data=value)
1368 grp.create_dataset(self.metadataList[i], data=value)
1478 return
1369 return
1479
1370
1480 def writeData(self, fp):
1371 def writeData(self, fp):
1481
1372
1482 grp = fp.create_group("Data")
1373 grp = fp.create_group("Data")
1483 dtsets = []
1374 dtsets = []
1484 data = []
1375 data = []
1485
1376
1486 for dsInfo in self.dsList:
1377 for dsInfo in self.dsList:
1487 if dsInfo['nDim'] == 0:
1378 if dsInfo['nDim'] == 0:
1488 ds = grp.create_dataset(
1379 ds = grp.create_dataset(
1489 dsInfo['variable'],
1380 dsInfo['variable'],
1490 (self.blocksPerFile, ),
1381 (self.blocksPerFile, ),
1491 chunks=True,
1382 chunks=True,
1492 dtype=numpy.float64)
1383 dtype=numpy.float64)
1493 dtsets.append(ds)
1384 dtsets.append(ds)
1494 data.append((dsInfo['variable'], -1))
1385 data.append((dsInfo['variable'], -1))
1495 else:
1386 else:
1496 sgrp = grp.create_group(dsInfo['variable'])
1387 sgrp = grp.create_group(dsInfo['variable'])
1497 for i in range(dsInfo['dsNumber']):
1388 for i in range(dsInfo['dsNumber']):
1498 ds = sgrp.create_dataset(
1389 ds = sgrp.create_dataset(
1499 'table{:02d}'.format(i),
1390 'table{:02d}'.format(i),
1500 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1391 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1501 chunks=True)
1392 chunks=True)
1502 dtsets.append(ds)
1393 dtsets.append(ds)
1503 data.append((dsInfo['variable'], i))
1394 data.append((dsInfo['variable'], i))
1504 fp.flush()
1395 fp.flush()
1505
1396
1506 log.log('Creating file: {}'.format(fp.filename), self.name)
1397 log.log('Creating file: {}'.format(fp.filename), self.name)
1507
1398
1508 self.ds = dtsets
1399 self.ds = dtsets
1509 self.data = data
1400 self.data = data
1510 self.firsttime = True
1401 self.firsttime = True
1511 self.blockIndex = 0
1402 self.blockIndex = 0
1512 return
1403 return
1513
1404
1514 def putData(self):
1405 def putData(self):
1515
1406
1516 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
1407 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
1517 self.closeFile()
1408 self.closeFile()
1518 self.setNextFile()
1409 self.setNextFile()
1519
1410
1520 for i, ds in enumerate(self.ds):
1411 for i, ds in enumerate(self.ds):
1521 attr, ch = self.data[i]
1412 attr, ch = self.data[i]
1522 if ch == -1:
1413 if ch == -1:
1523 ds[self.blockIndex] = getattr(self.dataOut, attr)
1414 ds[self.blockIndex] = getattr(self.dataOut, attr)
1524 else:
1415 else:
1525 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
1416 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
1526
1417
1527 self.fp.flush()
1418 self.fp.flush()
1528 self.blockIndex += 1
1419 self.blockIndex += 1
1529 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1420 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1530
1421
1531 return
1422 return
1532
1423
1533 def closeFile(self):
1424 def closeFile(self):
1534
1425
1535 if self.blockIndex != self.blocksPerFile:
1426 if self.blockIndex != self.blocksPerFile:
1536 for ds in self.ds:
1427 for ds in self.ds:
1537 ds.resize(self.blockIndex, axis=0)
1428 ds.resize(self.blockIndex, axis=0)
1538
1429
1539 self.fp.flush()
1430 self.fp.flush()
1540 self.fp.close()
1431 self.fp.close()
1541
1432
1542 def close(self):
1433 def close(self):
1543
1434
1544 self.closeFile()
1435 self.closeFile()
@@ -1,433 +1,415
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import inspect
16 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 import traceback
20 import traceback
21 try:
21 try:
22 from queue import Queue
22 from queue import Queue
23 except:
23 except:
24 from Queue import Queue
24 from Queue import Queue
25 from threading import Thread
25 from threading import Thread
26 from multiprocessing import Process
26 from multiprocessing import Process
27
27
28 from schainpy.utils import log
28 from schainpy.utils import log
29
29
30
30
31 class ProcessingUnit(object):
31 class ProcessingUnit(object):
32
32
33 """
33 """
34 Update - Jan 2018 - MULTIPROCESSING
34 Update - Jan 2018 - MULTIPROCESSING
35 All the "call" methods present in the previous base were removed.
35 All the "call" methods present in the previous base were removed.
36 The majority of operations are independant processes, thus
36 The majority of operations are independant processes, thus
37 the decorator is in charge of communicate the operation processes
37 the decorator is in charge of communicate the operation processes
38 with the proccessing unit via IPC.
38 with the proccessing unit via IPC.
39
39
40 The constructor does not receive any argument. The remaining methods
40 The constructor does not receive any argument. The remaining methods
41 are related with the operations to execute.
41 are related with the operations to execute.
42
42
43
43
44 """
44 """
45 proc_type = 'processing'
46 __attrs__ = []
45
47
46 def __init__(self):
48 def __init__(self):
47
49
48 self.dataIn = None
50 self.dataIn = None
49 self.dataOut = None
51 self.dataOut = None
50 self.isConfig = False
52 self.isConfig = False
51 self.operations = []
53 self.operations = []
52 self.plots = []
54 self.plots = []
53
55
54 def getAllowedArgs(self):
56 def getAllowedArgs(self):
55 if hasattr(self, '__attrs__'):
57 if hasattr(self, '__attrs__'):
56 return self.__attrs__
58 return self.__attrs__
57 else:
59 else:
58 return inspect.getargspec(self.run).args
60 return inspect.getargspec(self.run).args
59
61
60 def addOperation(self, conf, operation):
62 def addOperation(self, conf, operation):
61 """
63 """
62 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
63 posses the id of the operation process (IPC purposes)
65 posses the id of the operation process (IPC purposes)
64
66
65 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
67 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
66 identificador asociado a este objeto.
68 identificador asociado a este objeto.
67
69
68 Input:
70 Input:
69
71
70 object : objeto de la clase "Operation"
72 object : objeto de la clase "Operation"
71
73
72 Return:
74 Return:
73
75
74 objId : identificador del objeto, necesario para comunicar con master(procUnit)
76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
75 """
77 """
76
78
77 self.operations.append(
79 self.operations.append(
78 (operation, conf.type, conf.id, conf.getKwargs()))
80 (operation, conf.type, conf.id, conf.getKwargs()))
79
81
80 if 'plot' in self.name.lower():
82 if 'plot' in self.name.lower():
81 self.plots.append(operation.CODE)
83 self.plots.append(operation.CODE)
82
84
83 def getOperationObj(self, objId):
85 def getOperationObj(self, objId):
84
86
85 if objId not in list(self.operations.keys()):
87 if objId not in list(self.operations.keys()):
86 return None
88 return None
87
89
88 return self.operations[objId]
90 return self.operations[objId]
89
91
90 def operation(self, **kwargs):
92 def operation(self, **kwargs):
91 """
93 """
92 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
94 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
93 atributos del objeto dataOut
95 atributos del objeto dataOut
94
96
95 Input:
97 Input:
96
98
97 **kwargs : Diccionario de argumentos de la funcion a ejecutar
99 **kwargs : Diccionario de argumentos de la funcion a ejecutar
98 """
100 """
99
101
100 raise NotImplementedError
102 raise NotImplementedError
101
103
102 def setup(self):
104 def setup(self):
103
105
104 raise NotImplementedError
106 raise NotImplementedError
105
107
106 def run(self):
108 def run(self):
107
109
108 raise NotImplementedError
110 raise NotImplementedError
109
111
110 def close(self):
112 def close(self):
111
113
112 return
114 return
113
115
114
116
115 class Operation(object):
117 class Operation(object):
116
118
117 """
119 """
118 Update - Jan 2018 - MULTIPROCESSING
120 Update - Jan 2018 - MULTIPROCESSING
119
121
120 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
122 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
121 The constructor doe snot receive any argument, neither the baseclass.
123 The constructor doe snot receive any argument, neither the baseclass.
122
124
123
125
124 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
126 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
125 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
127 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
126 acumulacion dentro de esta clase
128 acumulacion dentro de esta clase
127
129
128 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
129
131
130 """
132 """
133 proc_type = 'operation'
134 __attrs__ = []
131
135
132 def __init__(self):
136 def __init__(self):
133
137
134 self.id = None
138 self.id = None
135 self.isConfig = False
139 self.isConfig = False
136
140
137 if not hasattr(self, 'name'):
141 if not hasattr(self, 'name'):
138 self.name = self.__class__.__name__
142 self.name = self.__class__.__name__
139
143
140 def getAllowedArgs(self):
144 def getAllowedArgs(self):
141 if hasattr(self, '__attrs__'):
145 if hasattr(self, '__attrs__'):
142 return self.__attrs__
146 return self.__attrs__
143 else:
147 else:
144 return inspect.getargspec(self.run).args
148 return inspect.getargspec(self.run).args
145
149
146 def setup(self):
150 def setup(self):
147
151
148 self.isConfig = True
152 self.isConfig = True
149
153
150 raise NotImplementedError
154 raise NotImplementedError
151
155
152 def run(self, dataIn, **kwargs):
156 def run(self, dataIn, **kwargs):
153 """
157 """
154 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
158 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
155 atributos del objeto dataIn.
159 atributos del objeto dataIn.
156
160
157 Input:
161 Input:
158
162
159 dataIn : objeto del tipo JROData
163 dataIn : objeto del tipo JROData
160
164
161 Return:
165 Return:
162
166
163 None
167 None
164
168
165 Affected:
169 Affected:
166 __buffer : buffer de recepcion de datos.
170 __buffer : buffer de recepcion de datos.
167
171
168 """
172 """
169 if not self.isConfig:
173 if not self.isConfig:
170 self.setup(**kwargs)
174 self.setup(**kwargs)
171
175
172 raise NotImplementedError
176 raise NotImplementedError
173
177
174 def close(self):
178 def close(self):
175
179
176 return
180 return
177
181
178 class InputQueue(Thread):
182 class InputQueue(Thread):
179
183
180 '''
184 '''
181
182 Class to hold input data for Proccessing Units and external Operations,
185 Class to hold input data for Proccessing Units and external Operations,
183
184 '''
186 '''
185
187
186
187
188 def __init__(self, project_id, inputId):
188 def __init__(self, project_id, inputId):
189
189
190 Thread.__init__(self)
190 Thread.__init__(self)
191
192 self.queue = Queue()
191 self.queue = Queue()
193
194 self.project_id = project_id
192 self.project_id = project_id
195
196 self.inputId = inputId
193 self.inputId = inputId
197
194
198
199
200 def run(self):
195 def run(self):
201
196
202
203
204 c = zmq.Context()
197 c = zmq.Context()
205
206 self.receiver = c.socket(zmq.SUB)
198 self.receiver = c.socket(zmq.SUB)
207
208 self.receiver.connect(
199 self.receiver.connect(
209
210 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
211
212 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
213
202
214
215
216 while True:
203 while True:
217
218 self.queue.put(self.receiver.recv_multipart()[1])
204 self.queue.put(self.receiver.recv_multipart()[1])
219
205
220
221
222 def get(self):
206 def get(self):
223
207
224
225
226 return pickle.loads(self.queue.get())
208 return pickle.loads(self.queue.get())
227
209
228
210
229
230 def MPDecorator(BaseClass):
211 def MPDecorator(BaseClass):
231 """
212 """
232 Multiprocessing class decorator
213 Multiprocessing class decorator
233
214
234 This function add multiprocessing features to a BaseClass. Also, it handle
215 This function add multiprocessing features to a BaseClass. Also, it handle
235 the communication beetween processes (readers, procUnits and operations).
216 the communication beetween processes (readers, procUnits and operations).
236 """
217 """
237
218
238 class MPClass(BaseClass, Process):
219 class MPClass(BaseClass, Process):
239
220
240 def __init__(self, *args, **kwargs):
221 def __init__(self, *args, **kwargs):
241 super(MPClass, self).__init__()
222 super(MPClass, self).__init__()
242 Process.__init__(self)
223 Process.__init__(self)
243 self.operationKwargs = {}
224 self.operationKwargs = {}
244 self.args = args
225 self.args = args
245 self.kwargs = kwargs
226 self.kwargs = kwargs
246 self.sender = None
227 self.sender = None
247 self.receiver = None
228 self.receiver = None
248 self.i = 0
229 self.i = 0
249 self.t = time.time()
230 self.t = time.time()
250 self.name = BaseClass.__name__
231 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
251
233
252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254
236
255 self.start_time = time.time()
237 self.start_time = time.time()
256 self.id = args[0]
238 self.id = args[0]
257 self.inputId = args[1]
239 self.inputId = args[1]
258 self.project_id = args[2]
240 self.project_id = args[2]
259 self.err_queue = args[3]
241 self.err_queue = args[3]
260 self.typeProc = args[4]
242 self.typeProc = args[4]
261 self.err_queue.put('#_start_#')
243 self.err_queue.put('#_start_#')
262 self.queue = InputQueue(self.project_id, self.inputId)
244 self.queue = InputQueue(self.project_id, self.inputId)
263
245
264 def subscribe(self):
246 def subscribe(self):
265 '''
247 '''
266 Start the zmq socket receiver and subcribe to input ID.
248 Start the zmq socket receiver and subcribe to input ID.
267 '''
249 '''
268
250
269 self.queue.start()
251 self.queue.start()
270
252
271 def listen(self):
253 def listen(self):
272 '''
254 '''
273 This function waits for objects
255 This function waits for objects
274 '''
256 '''
275
257
276 return self.queue.get()
258 return self.queue.get()
277
259
278 def set_publisher(self):
260 def set_publisher(self):
279 '''
261 '''
280 This function create a zmq socket for publishing objects.
262 This function create a zmq socket for publishing objects.
281 '''
263 '''
282
264
283 time.sleep(0.5)
265 time.sleep(0.5)
284
266
285 c = zmq.Context()
267 c = zmq.Context()
286 self.sender = c.socket(zmq.PUB)
268 self.sender = c.socket(zmq.PUB)
287 self.sender.connect(
269 self.sender.connect(
288 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
270 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
289
271
290 def publish(self, data, id):
272 def publish(self, data, id):
291 '''
273 '''
292 This function publish an object, to an specific topic.
274 This function publish an object, to an specific topic.
293 For Read Units (inputId == None) adds a little delay
275 For Read Units (inputId == None) adds a little delay
294 to avoid data loss
276 to avoid data loss
295 '''
277 '''
296
278
297 if self.inputId is None:
279 if self.inputId is None:
298 self.i += 1
280 self.i += 1
299 if self.i % 40 == 0 and time.time()-self.t > 0.1:
281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
300 self.i = 0
282 self.i = 0
301 self.t = time.time()
283 self.t = time.time()
302 time.sleep(0.05)
284 time.sleep(0.05)
303 elif self.i % 40 == 0:
285 elif self.i % 40 == 0:
304 self.i = 0
286 self.i = 0
305 self.t = time.time()
287 self.t = time.time()
306 time.sleep(0.01)
288 time.sleep(0.01)
307
289
308 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
290 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
309
291
310 def runReader(self):
292 def runReader(self):
311 '''
293 '''
312 Run fuction for read units
294 Run fuction for read units
313 '''
295 '''
314 while True:
296 while True:
315
297
316 try:
298 try:
317 BaseClass.run(self, **self.kwargs)
299 BaseClass.run(self, **self.kwargs)
318 except:
300 except:
319 err = traceback.format_exc()
301 err = traceback.format_exc()
320 if 'No more files' in err:
302 if 'No more files' in err:
321 log.warning('No more files to read', self.name)
303 log.warning('No more files to read', self.name)
322 else:
304 else:
323 self.err_queue.put('{}|{}'.format(self.name, err))
305 self.err_queue.put('{}|{}'.format(self.name, err))
324 self.dataOut.error = True
306 self.dataOut.error = True
325
307
326 for op, optype, opId, kwargs in self.operations:
308 for op, optype, opId, kwargs in self.operations:
327 if optype == 'self' and not self.dataOut.flagNoData:
309 if optype == 'self' and not self.dataOut.flagNoData:
328 op(**kwargs)
310 op(**kwargs)
329 elif optype == 'other' and not self.dataOut.flagNoData:
311 elif optype == 'other' and not self.dataOut.flagNoData:
330 self.dataOut = op.run(self.dataOut, **self.kwargs)
312 self.dataOut = op.run(self.dataOut, **self.kwargs)
331 elif optype == 'external':
313 elif optype == 'external':
332 self.publish(self.dataOut, opId)
314 self.publish(self.dataOut, opId)
333
315
334 if self.dataOut.flagNoData and not self.dataOut.error:
316 if self.dataOut.flagNoData and not self.dataOut.error:
335 continue
317 continue
336
318
337 self.publish(self.dataOut, self.id)
319 self.publish(self.dataOut, self.id)
338
320
339 if self.dataOut.error:
321 if self.dataOut.error:
340 break
322 break
341
323
342 time.sleep(0.5)
324 time.sleep(0.5)
343
325
344 def runProc(self):
326 def runProc(self):
345 '''
327 '''
346 Run function for proccessing units
328 Run function for proccessing units
347 '''
329 '''
348
330
349 while True:
331 while True:
350 self.dataIn = self.listen()
332 self.dataIn = self.listen()
351
333
352 if self.dataIn.flagNoData and self.dataIn.error is None:
334 if self.dataIn.flagNoData and self.dataIn.error is None:
353 continue
335 continue
354 elif not self.dataIn.error:
336 elif not self.dataIn.error:
355 try:
337 try:
356 BaseClass.run(self, **self.kwargs)
338 BaseClass.run(self, **self.kwargs)
357 except:
339 except:
358 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
340 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
359 self.dataOut.error = True
341 self.dataOut.error = True
360 elif self.dataIn.error:
342 elif self.dataIn.error:
361 self.dataOut.error = self.dataIn.error
343 self.dataOut.error = self.dataIn.error
362 self.dataOut.flagNoData = True
344 self.dataOut.flagNoData = True
363
345
364 for op, optype, opId, kwargs in self.operations:
346 for op, optype, opId, kwargs in self.operations:
365 if optype == 'self' and not self.dataOut.flagNoData:
347 if optype == 'self' and not self.dataOut.flagNoData:
366 op(**kwargs)
348 op(**kwargs)
367 elif optype == 'other' and not self.dataOut.flagNoData:
349 elif optype == 'other' and not self.dataOut.flagNoData:
368 self.dataOut = op.run(self.dataOut, **kwargs)
350 self.dataOut = op.run(self.dataOut, **kwargs)
369 elif optype == 'external' and not self.dataOut.flagNoData:
351 elif optype == 'external' and not self.dataOut.flagNoData:
370 self.publish(self.dataOut, opId)
352 self.publish(self.dataOut, opId)
371
353
372 self.publish(self.dataOut, self.id)
354 self.publish(self.dataOut, self.id)
373 for op, optype, opId, kwargs in self.operations:
355 for op, optype, opId, kwargs in self.operations:
374 if optype == 'external' and self.dataOut.error:
356 if optype == 'external' and self.dataOut.error:
375 self.publish(self.dataOut, opId)
357 self.publish(self.dataOut, opId)
376
358
377 if self.dataOut.error:
359 if self.dataOut.error:
378 break
360 break
379
361
380 time.sleep(0.5)
362 time.sleep(0.5)
381
363
382 def runOp(self):
364 def runOp(self):
383 '''
365 '''
384 Run function for external operations (this operations just receive data
366 Run function for external operations (this operations just receive data
385 ex: plots, writers, publishers)
367 ex: plots, writers, publishers)
386 '''
368 '''
387
369
388 while True:
370 while True:
389
371
390 dataOut = self.listen()
372 dataOut = self.listen()
391
373
392 if not dataOut.error:
374 if not dataOut.error:
393 BaseClass.run(self, dataOut, **self.kwargs)
375 BaseClass.run(self, dataOut, **self.kwargs)
394 else:
376 else:
395 break
377 break
396
378
397 def run(self):
379 def run(self):
398 if self.typeProc is "ProcUnit":
380 if self.typeProc is "ProcUnit":
399
381
400 if self.inputId is not None:
382 if self.inputId is not None:
401 self.subscribe()
383 self.subscribe()
402
384
403 self.set_publisher()
385 self.set_publisher()
404
386
405 if 'Reader' not in BaseClass.__name__:
387 if 'Reader' not in BaseClass.__name__:
406 self.runProc()
388 self.runProc()
407 else:
389 else:
408 self.runReader()
390 self.runReader()
409
391
410 elif self.typeProc is "Operation":
392 elif self.typeProc is "Operation":
411
393
412 self.subscribe()
394 self.subscribe()
413 self.runOp()
395 self.runOp()
414
396
415 else:
397 else:
416 raise ValueError("Unknown type")
398 raise ValueError("Unknown type")
417
399
418 self.close()
400 self.close()
419
401
420 def close(self):
402 def close(self):
421
403
422 BaseClass.close(self)
404 BaseClass.close(self)
423 self.err_queue.put('#_end_#')
405 self.err_queue.put('#_end_#')
424
406
425 if self.sender:
407 if self.sender:
426 self.sender.close()
408 self.sender.close()
427
409
428 if self.receiver:
410 if self.receiver:
429 self.receiver.close()
411 self.receiver.close()
430
412
431 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
413 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
432
414
433 return MPClass
415 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now