##// END OF EJS Templates
Add input queues for processing units and external operations
Juan C. Espinoza -
r1235:fff8599e0346
parent child
Show More
@@ -1,408 +1,407
1 1 '''
2 2 Created on Nov 9, 2016
3 3
4 4 @author: roj- LouVD
5 5 '''
6 6
7 7
8 8 import os
9 9 import sys
10 10 import time
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15
16 16 from schainpy.model.proc.jroproc_base import ProcessingUnit, MPDecorator
17 17 from schainpy.model.data.jrodata import Parameters
18 18 from schainpy.model.io.jroIO_base import JRODataReader, isNumber
19 19 from schainpy.utils import log
20 20
21 21 FILE_HEADER_STRUCTURE = numpy.dtype([
22 22 ('FMN', '<u4'),
23 23 ('nrec', '<u4'),
24 24 ('fr_offset', '<u4'),
25 25 ('id', '<u4'),
26 26 ('site', 'u1', (32,))
27 27 ])
28 28
29 29 REC_HEADER_STRUCTURE = numpy.dtype([
30 30 ('rmn', '<u4'),
31 31 ('rcounter', '<u4'),
32 32 ('nr_offset', '<u4'),
33 33 ('tr_offset', '<u4'),
34 34 ('time', '<u4'),
35 35 ('time_msec', '<u4'),
36 36 ('tag', 'u1', (32,)),
37 37 ('comments', 'u1', (32,)),
38 38 ('lat', '<f4'),
39 39 ('lon', '<f4'),
40 40 ('gps_status', '<u4'),
41 41 ('freq', '<u4'),
42 42 ('freq0', '<u4'),
43 43 ('nchan', '<u4'),
44 44 ('delta_r', '<u4'),
45 45 ('nranges', '<u4'),
46 46 ('r0', '<u4'),
47 47 ('prf', '<u4'),
48 48 ('ncoh', '<u4'),
49 49 ('npoints', '<u4'),
50 50 ('polarization', '<i4'),
51 51 ('rx_filter', '<u4'),
52 52 ('nmodes', '<u4'),
53 53 ('dmode_index', '<u4'),
54 54 ('dmode_rngcorr', '<u4'),
55 55 ('nrxs', '<u4'),
56 56 ('acf_length', '<u4'),
57 57 ('acf_lags', '<u4'),
58 58 ('sea_to_atmos', '<f4'),
59 59 ('sea_notch', '<u4'),
60 60 ('lh_sea', '<u4'),
61 61 ('hh_sea', '<u4'),
62 62 ('nbins_sea', '<u4'),
63 63 ('min_snr', '<f4'),
64 64 ('min_cc', '<f4'),
65 65 ('max_time_diff', '<f4')
66 66 ])
67 67
68 68 DATA_STRUCTURE = numpy.dtype([
69 69 ('range', '<u4'),
70 70 ('status', '<u4'),
71 71 ('zonal', '<f4'),
72 72 ('meridional', '<f4'),
73 73 ('vertical', '<f4'),
74 74 ('zonal_a', '<f4'),
75 75 ('meridional_a', '<f4'),
76 76 ('corrected_fading', '<f4'), # seconds
77 77 ('uncorrected_fading', '<f4'), # seconds
78 78 ('time_diff', '<f4'),
79 79 ('major_axis', '<f4'),
80 80 ('axial_ratio', '<f4'),
81 81 ('orientation', '<f4'),
82 82 ('sea_power', '<u4'),
83 83 ('sea_algorithm', '<u4')
84 84 ])
85 85
86 86 @MPDecorator
87 87 class BLTRParamReader(JRODataReader, ProcessingUnit):
88 88 '''
89 89 Boundary Layer and Tropospheric Radar (BLTR) reader, Wind velocities and SNR
90 90 from *.sswma files
91 91 '''
92 92
93 93 ext = '.sswma'
94 94
95 95 def __init__(self):
96 96
97 97 ProcessingUnit.__init__(self)
98 98
99 99 self.dataOut = Parameters()
100 100 self.counter_records = 0
101 101 self.flagNoMoreFiles = 0
102 102 self.isConfig = False
103 103 self.filename = None
104 104
105 105 def setup(self,
106 106 path=None,
107 107 startDate=None,
108 108 endDate=None,
109 109 ext=None,
110 110 startTime=datetime.time(0, 0, 0),
111 111 endTime=datetime.time(23, 59, 59),
112 112 timezone=0,
113 113 status_value=0,
114 114 **kwargs):
115 115 self.path = path
116 116 self.startDate = startDate
117 117 self.endDate = endDate
118 118 self.startTime = startTime
119 119 self.endTime = endTime
120 120 self.status_value = status_value
121 121 self.datatime = datetime.datetime(1900,1,1)
122 122 self.delay = kwargs.get('delay', 10)
123 123 self.online = kwargs.get('online', False)
124 124 self.nTries = kwargs.get('nTries', 3)
125 125
126 126 if self.path is None:
127 127 raise ValueError("The path is not valid")
128 128
129 129 if ext is None:
130 130 ext = self.ext
131 131
132 132 self.fileList = self.search_files(self.path, startDate, endDate, ext)
133 133 self.timezone = timezone
134 134 self.fileIndex = 0
135 135
136 136 if not self.fileList:
137 137 raise Warning("There is no files matching these date in the folder: %s. \n Check 'startDate' and 'endDate' " % (
138 138 path))
139 139
140 140 self.setNextFile()
141 141
142 142 def search_last_file(self):
143 143 '''
144 144 Get last file and add it to the list
145 145 '''
146 146
147 147 for n in range(self.nTries+1):
148 148 if n>0:
149 149 log.warning(
150 150 "Waiting %0.2f seconds for the next file, try %03d ..." % (self.delay, n+1),
151 151 self.name
152 152 )
153 153 time.sleep(self.delay)
154 154 file_list = os.listdir(self.path)
155 155 file_list.sort()
156 156 if file_list:
157 157 if self.filename:
158 158 if file_list[-1] not in self.filename:
159 159 return file_list[-1]
160 160 else:
161 161 continue
162 162 return file_list[-1]
163 163 return 0
164 164
165 165 def search_files(self, path, startDate, endDate, ext):
166 166 '''
167 167 Searching for BLTR rawdata file in path
168 168 Creating a list of file to proces included in [startDate,endDate]
169 169
170 170 Input:
171 171 path - Path to find BLTR rawdata files
172 172 startDate - Select file from this date
173 173 enDate - Select file until this date
174 174 ext - Extension of the file to read
175 175 '''
176 176
177 177 log.success('Searching files in {} '.format(path), 'BLTRParamReader')
178 178 foldercounter = 0
179 179 fileList0 = glob.glob1(path, "*%s" % ext)
180 180 fileList0.sort()
181 181
182 182 for thisFile in fileList0:
183 183 year = thisFile[-14:-10]
184 184 if not isNumber(year):
185 185 continue
186 186
187 187 month = thisFile[-10:-8]
188 188 if not isNumber(month):
189 189 continue
190 190
191 191 day = thisFile[-8:-6]
192 192 if not isNumber(day):
193 193 continue
194 194
195 195 year, month, day = int(year), int(month), int(day)
196 196 dateFile = datetime.date(year, month, day)
197 197
198 198 if (startDate > dateFile) or (endDate < dateFile):
199 199 continue
200 200
201 201 yield thisFile
202 202
203 203 return
204 204
205 205 def setNextFile(self):
206 206
207 207 if self.online:
208 208 filename = self.search_last_file()
209 209 if not filename:
210 210 self.flagNoMoreFiles = 1
211 211 return 0
212 212 else:
213 213 try:
214 214 filename = next(self.fileList)
215 215 except StopIteration:
216 216 self.flagNoMoreFiles = 1
217 217 return 0
218 218
219 219 log.success('Opening {}'.format(filename), 'BLTRParamReader')
220 220
221 221 dirname, name = os.path.split(filename)
222 222 # 'peru2' ---> Piura - 'peru1' ---> Huancayo or Porcuya
223 223 self.siteFile = filename.split('.')[0]
224 224 if self.filename is not None:
225 225 self.fp.close()
226 226 self.filename = os.path.join(self.path, filename)
227 227 self.fp = open(self.filename, 'rb')
228 228 self.header_file = numpy.fromfile(self.fp, FILE_HEADER_STRUCTURE, 1)
229 229 self.nrecords = self.header_file['nrec'][0]
230 230 self.sizeOfFile = os.path.getsize(self.filename)
231 231 self.counter_records = 0
232 232 self.flagIsNewFile = 0
233 self.fileIndex += 1
234 time.sleep(2)
233 self.fileIndex += 1
235 234
236 235 return 1
237 236
238 237 def readNextBlock(self):
239 238
240 239 while True:
241 240 if not self.online and self.counter_records == self.nrecords:
242 241 self.flagIsNewFile = 1
243 242 if not self.setNextFile():
244 243 return 0
245 244
246 245 try:
247 246 pointer = self.fp.tell()
248 247 self.readBlock()
249 248 except:
250 249 if self.online and self.waitDataBlock(pointer, 38512) == 1:
251 250 continue
252 251 else:
253 252 if not self.setNextFile():
254 253 return 0
255 254
256 255 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
257 256 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
258 257 log.warning(
259 258 'Reading Record No. {}/{} -> {} [Skipping]'.format(
260 259 self.counter_records,
261 260 self.nrecords,
262 261 self.datatime.ctime()),
263 262 'BLTRParamReader')
264 263 continue
265 264 break
266 265
267 266 log.log('Reading Record No. {} -> {}'.format(
268 267 self.counter_records,
269 268 # self.nrecords,
270 269 self.datatime.ctime()), 'BLTRParamReader')
271 270
272 271 return 1
273 272
274 273 def readBlock(self):
275 274
276 275 pointer = self.fp.tell()
277 276 header_rec = numpy.fromfile(self.fp, REC_HEADER_STRUCTURE, 1)
278 277 self.nchannels = int(header_rec['nchan'][0] / 2)
279 278 self.kchan = header_rec['nrxs'][0]
280 279 self.nmodes = header_rec['nmodes'][0]
281 280 self.nranges = header_rec['nranges'][0]
282 281 self.fp.seek(pointer)
283 282 self.height = numpy.empty((self.nmodes, self.nranges))
284 283 self.snr = numpy.empty((self.nmodes, int(self.nchannels), self.nranges))
285 284 self.buffer = numpy.empty((self.nmodes, 3, self.nranges))
286 285 self.flagDiscontinuousBlock = 0
287 286
288 287 for mode in range(self.nmodes):
289 288 self.readHeader()
290 289 data = self.readData()
291 290 self.height[mode] = (data[0] - self.correction) / 1000.
292 291 self.buffer[mode] = data[1]
293 292 self.snr[mode] = data[2]
294 293
295 294 self.counter_records = self.counter_records + self.nmodes
296 295
297 296 return
298 297
299 298 def readHeader(self):
300 299 '''
301 300 RecordHeader of BLTR rawdata file
302 301 '''
303 302
304 303 header_structure = numpy.dtype(
305 304 REC_HEADER_STRUCTURE.descr + [
306 305 ('antenna_coord', 'f4', (2, int(self.nchannels))),
307 306 ('rx_gains', 'u4', (int(self.nchannels),)),
308 307 ('rx_analysis', 'u4', (int(self.nchannels),))
309 308 ]
310 309 )
311 310
312 311 self.header_rec = numpy.fromfile(self.fp, header_structure, 1)
313 312 self.lat = self.header_rec['lat'][0]
314 313 self.lon = self.header_rec['lon'][0]
315 314 self.delta = self.header_rec['delta_r'][0]
316 315 self.correction = self.header_rec['dmode_rngcorr'][0]
317 316 self.imode = self.header_rec['dmode_index'][0]
318 317 self.antenna = self.header_rec['antenna_coord']
319 318 self.rx_gains = self.header_rec['rx_gains']
320 319 self.time = self.header_rec['time'][0]
321 320 dt = datetime.datetime.utcfromtimestamp(self.time)
322 321 if dt.date()>self.datatime.date():
323 322 self.flagDiscontinuousBlock = 1
324 323 self.datatime = dt
325 324
326 325 def readData(self):
327 326 '''
328 327 Reading and filtering data block record of BLTR rawdata file,
329 328 filtering is according to status_value.
330 329
331 330 Input:
332 331 status_value - Array data is set to NAN for values that are not
333 332 equal to status_value
334 333
335 334 '''
336 335 self.nchannels = int(self.nchannels)
337 336
338 337 data_structure = numpy.dtype(
339 338 DATA_STRUCTURE.descr + [
340 339 ('rx_saturation', 'u4', (self.nchannels,)),
341 340 ('chan_offset', 'u4', (2 * self.nchannels,)),
342 341 ('rx_amp', 'u4', (self.nchannels,)),
343 342 ('rx_snr', 'f4', (self.nchannels,)),
344 343 ('cross_snr', 'f4', (self.kchan,)),
345 344 ('sea_power_relative', 'f4', (self.kchan,))]
346 345 )
347 346
348 347 data = numpy.fromfile(self.fp, data_structure, self.nranges)
349 348
350 349 height = data['range']
351 350 winds = numpy.array(
352 351 (data['zonal'], data['meridional'], data['vertical']))
353 352 snr = data['rx_snr'].T
354 353
355 354 winds[numpy.where(winds == -9999.)] = numpy.nan
356 355 winds[:, numpy.where(data['status'] != self.status_value)] = numpy.nan
357 356 snr[numpy.where(snr == -9999.)] = numpy.nan
358 357 snr[:, numpy.where(data['status'] != self.status_value)] = numpy.nan
359 358 snr = numpy.power(10, snr / 10)
360 359
361 360 return height, winds, snr
362 361
363 362 def set_output(self):
364 363 '''
365 364 Storing data from databuffer to dataOut object
366 365 '''
367 366
368 367 self.dataOut.data_SNR = self.snr
369 368 self.dataOut.height = self.height
370 369 self.dataOut.data = self.buffer
371 370 self.dataOut.utctimeInit = self.time
372 371 self.dataOut.utctime = self.dataOut.utctimeInit
373 372 self.dataOut.useLocalTime = False
374 373 self.dataOut.paramInterval = 157
375 374 self.dataOut.timezone = self.timezone
376 375 self.dataOut.site = self.siteFile
377 376 self.dataOut.nrecords = self.nrecords / self.nmodes
378 377 self.dataOut.sizeOfFile = self.sizeOfFile
379 378 self.dataOut.lat = self.lat
380 379 self.dataOut.lon = self.lon
381 380 self.dataOut.channelList = list(range(self.nchannels))
382 381 self.dataOut.kchan = self.kchan
383 382 self.dataOut.delta = self.delta
384 383 self.dataOut.correction = self.correction
385 384 self.dataOut.nmodes = self.nmodes
386 385 self.dataOut.imode = self.imode
387 386 self.dataOut.antenna = self.antenna
388 387 self.dataOut.rx_gains = self.rx_gains
389 388 self.dataOut.flagNoData = False
390 389 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
391 390
392 391 def getData(self):
393 392 '''
394 393 Storing data from databuffer to dataOut object
395 394 '''
396 395 if self.flagNoMoreFiles:
397 396 self.dataOut.flagNoData = True
398 397 self.dataOut.error = 'No More files to read'
399 398 return
400 399
401 400 if not self.readNextBlock():
402 401 self.dataOut.flagNoData = True
403 402 self.dataOut.error = 'Time for wait new file reach!!!'
404 403
405 404 self.set_output()
406 405
407 406 return 1
408 407 No newline at end of file
@@ -1,1833 +1,1831
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import inspect
13 13 import time
14 14 import datetime
15 15 import traceback
16 16 import zmq
17 17
18 18 try:
19 19 from gevent import sleep
20 20 except:
21 21 from time import sleep
22 22
23 23 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
24 24 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
25 25 from schainpy.utils import log
26 26 import schainpy.admin
27 27
28 28 LOCALTIME = True
29 29
30 30
31 31 def isNumber(cad):
32 32 """
33 33 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
34 34
35 35 Excepciones:
36 36 Si un determinado string no puede ser convertido a numero
37 37 Input:
38 38 str, string al cual se le analiza para determinar si convertible a un numero o no
39 39
40 40 Return:
41 41 True : si el string es uno numerico
42 42 False : no es un string numerico
43 43 """
44 44 try:
45 45 float(cad)
46 46 return True
47 47 except:
48 48 return False
49 49
50 50
51 51 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
52 52 """
53 53 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
54 54
55 55 Inputs:
56 56 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
57 57
58 58 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
59 59 segundos contados desde 01/01/1970.
60 60 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
61 61 segundos contados desde 01/01/1970.
62 62
63 63 Return:
64 64 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
65 65 fecha especificado, de lo contrario retorna False.
66 66
67 67 Excepciones:
68 68 Si el archivo no existe o no puede ser abierto
69 69 Si la cabecera no puede ser leida.
70 70
71 71 """
72 72 basicHeaderObj = BasicHeader(LOCALTIME)
73 73
74 74 try:
75 75 fp = open(filename, 'rb')
76 76 except IOError:
77 77 print("The file %s can't be opened" % (filename))
78 78 return 0
79 79
80 80 sts = basicHeaderObj.read(fp)
81 81 fp.close()
82 82
83 83 if not(sts):
84 84 print("Skipping the file %s because it has not a valid header" % (filename))
85 85 return 0
86 86
87 87 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
88 88 return 0
89 89
90 90 return 1
91 91
92 92
93 93 def isTimeInRange(thisTime, startTime, endTime):
94 94 if endTime >= startTime:
95 95 if (thisTime < startTime) or (thisTime > endTime):
96 96 return 0
97 97 return 1
98 98 else:
99 99 if (thisTime < startTime) and (thisTime > endTime):
100 100 return 0
101 101 return 1
102 102
103 103
104 104 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
105 105 """
106 106 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
107 107
108 108 Inputs:
109 109 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
110 110
111 111 startDate : fecha inicial del rango seleccionado en formato datetime.date
112 112
113 113 endDate : fecha final del rango seleccionado en formato datetime.date
114 114
115 115 startTime : tiempo inicial del rango seleccionado en formato datetime.time
116 116
117 117 endTime : tiempo final del rango seleccionado en formato datetime.time
118 118
119 119 Return:
120 120 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
121 121 fecha especificado, de lo contrario retorna False.
122 122
123 123 Excepciones:
124 124 Si el archivo no existe o no puede ser abierto
125 125 Si la cabecera no puede ser leida.
126 126
127 127 """
128 128
129 129 try:
130 130 fp = open(filename, 'rb')
131 131 except IOError:
132 132 print("The file %s can't be opened" % (filename))
133 133 return None
134 134
135 135 firstBasicHeaderObj = BasicHeader(LOCALTIME)
136 136 systemHeaderObj = SystemHeader()
137 137 radarControllerHeaderObj = RadarControllerHeader()
138 138 processingHeaderObj = ProcessingHeader()
139 139
140 140 lastBasicHeaderObj = BasicHeader(LOCALTIME)
141 141
142 142 sts = firstBasicHeaderObj.read(fp)
143 143
144 144 if not(sts):
145 145 print("[Reading] Skipping the file %s because it has not a valid header" % (filename))
146 146 return None
147 147
148 148 if not systemHeaderObj.read(fp):
149 149 return None
150 150
151 151 if not radarControllerHeaderObj.read(fp):
152 152 return None
153 153
154 154 if not processingHeaderObj.read(fp):
155 155 return None
156 156
157 157 filesize = os.path.getsize(filename)
158 158
159 159 offset = processingHeaderObj.blockSize + 24 # header size
160 160
161 161 if filesize <= offset:
162 162 print("[Reading] %s: This file has not enough data" % filename)
163 163 return None
164 164
165 165 fp.seek(-offset, 2)
166 166
167 167 sts = lastBasicHeaderObj.read(fp)
168 168
169 169 fp.close()
170 170
171 171 thisDatetime = lastBasicHeaderObj.datatime
172 172 thisTime_last_block = thisDatetime.time()
173 173
174 174 thisDatetime = firstBasicHeaderObj.datatime
175 175 thisDate = thisDatetime.date()
176 176 thisTime_first_block = thisDatetime.time()
177 177
178 178 # General case
179 179 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
180 180 #-----------o----------------------------o-----------
181 181 # startTime endTime
182 182
183 183 if endTime >= startTime:
184 184 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
185 185 return None
186 186
187 187 return thisDatetime
188 188
189 189 # If endTime < startTime then endTime belongs to the next day
190 190
191 191 #<<<<<<<<<<<o o>>>>>>>>>>>
192 192 #-----------o----------------------------o-----------
193 193 # endTime startTime
194 194
195 195 if (thisDate == startDate) and (thisTime_last_block < startTime):
196 196 return None
197 197
198 198 if (thisDate == endDate) and (thisTime_first_block > endTime):
199 199 return None
200 200
201 201 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
202 202 return None
203 203
204 204 return thisDatetime
205 205
206 206
207 207 def isFolderInDateRange(folder, startDate=None, endDate=None):
208 208 """
209 209 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
210 210
211 211 Inputs:
212 212 folder : nombre completo del directorio.
213 213 Su formato deberia ser "/path_root/?YYYYDDD"
214 214
215 215 siendo:
216 216 YYYY : Anio (ejemplo 2015)
217 217 DDD : Dia del anio (ejemplo 305)
218 218
219 219 startDate : fecha inicial del rango seleccionado en formato datetime.date
220 220
221 221 endDate : fecha final del rango seleccionado en formato datetime.date
222 222
223 223 Return:
224 224 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
225 225 fecha especificado, de lo contrario retorna False.
226 226 Excepciones:
227 227 Si el directorio no tiene el formato adecuado
228 228 """
229 229
230 230 basename = os.path.basename(folder)
231 231
232 232 if not isRadarFolder(basename):
233 233 print("The folder %s has not the rigth format" % folder)
234 234 return 0
235 235
236 236 if startDate and endDate:
237 237 thisDate = getDateFromRadarFolder(basename)
238 238
239 239 if thisDate < startDate:
240 240 return 0
241 241
242 242 if thisDate > endDate:
243 243 return 0
244 244
245 245 return 1
246 246
247 247
248 248 def isFileInDateRange(filename, startDate=None, endDate=None):
249 249 """
250 250 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
251 251
252 252 Inputs:
253 253 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
254 254
255 255 Su formato deberia ser "?YYYYDDDsss"
256 256
257 257 siendo:
258 258 YYYY : Anio (ejemplo 2015)
259 259 DDD : Dia del anio (ejemplo 305)
260 260 sss : set
261 261
262 262 startDate : fecha inicial del rango seleccionado en formato datetime.date
263 263
264 264 endDate : fecha final del rango seleccionado en formato datetime.date
265 265
266 266 Return:
267 267 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
268 268 fecha especificado, de lo contrario retorna False.
269 269 Excepciones:
270 270 Si el archivo no tiene el formato adecuado
271 271 """
272 272
273 273 basename = os.path.basename(filename)
274 274
275 275 if not isRadarFile(basename):
276 276 print("The filename %s has not the rigth format" % filename)
277 277 return 0
278 278
279 279 if startDate and endDate:
280 280 thisDate = getDateFromRadarFile(basename)
281 281
282 282 if thisDate < startDate:
283 283 return 0
284 284
285 285 if thisDate > endDate:
286 286 return 0
287 287
288 288 return 1
289 289
290 290
291 291 def getFileFromSet(path, ext, set):
292 292 validFilelist = []
293 293 fileList = os.listdir(path)
294 294
295 295 # 0 1234 567 89A BCDE
296 296 # H YYYY DDD SSS .ext
297 297
298 298 for thisFile in fileList:
299 299 try:
300 300 year = int(thisFile[1:5])
301 301 doy = int(thisFile[5:8])
302 302 except:
303 303 continue
304 304
305 305 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
306 306 continue
307 307
308 308 validFilelist.append(thisFile)
309 309
310 310 myfile = fnmatch.filter(
311 311 validFilelist, '*%4.4d%3.3d%3.3d*' % (year, doy, set))
312 312
313 313 if len(myfile) != 0:
314 314 return myfile[0]
315 315 else:
316 316 filename = '*%4.4d%3.3d%3.3d%s' % (year, doy, set, ext.lower())
317 317 print('the filename %s does not exist' % filename)
318 318 print('...going to the last file: ')
319 319
320 320 if validFilelist:
321 321 validFilelist = sorted(validFilelist, key=str.lower)
322 322 return validFilelist[-1]
323 323
324 324 return None
325 325
326 326
327 327 def getlastFileFromPath(path, ext):
328 328 """
329 329 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
330 330 al final de la depuracion devuelve el ultimo file de la lista que quedo.
331 331
332 332 Input:
333 333 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
334 334 ext : extension de los files contenidos en una carpeta
335 335
336 336 Return:
337 337 El ultimo file de una determinada carpeta, no se considera el path.
338 338 """
339 339 validFilelist = []
340 340 fileList = os.listdir(path)
341 341
342 342 # 0 1234 567 89A BCDE
343 343 # H YYYY DDD SSS .ext
344 344
345 345 for thisFile in fileList:
346 346
347 347 year = thisFile[1:5]
348 348 if not isNumber(year):
349 349 continue
350 350
351 351 doy = thisFile[5:8]
352 352 if not isNumber(doy):
353 353 continue
354 354
355 355 year = int(year)
356 356 doy = int(doy)
357 357
358 358 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
359 359 continue
360 360
361 361 validFilelist.append(thisFile)
362 362
363 363 if validFilelist:
364 364 validFilelist = sorted(validFilelist, key=str.lower)
365 365 return validFilelist[-1]
366 366
367 367 return None
368 368
369 369
370 370 def checkForRealPath(path, foldercounter, year, doy, set, ext):
371 371 """
372 372 Por ser Linux Case Sensitive entonces checkForRealPath encuentra el nombre correcto de un path,
373 373 Prueba por varias combinaciones de nombres entre mayusculas y minusculas para determinar
374 374 el path exacto de un determinado file.
375 375
376 376 Example :
377 377 nombre correcto del file es .../.../D2009307/P2009307367.ext
378 378
379 379 Entonces la funcion prueba con las siguientes combinaciones
380 380 .../.../y2009307367.ext
381 381 .../.../Y2009307367.ext
382 382 .../.../x2009307/y2009307367.ext
383 383 .../.../x2009307/Y2009307367.ext
384 384 .../.../X2009307/y2009307367.ext
385 385 .../.../X2009307/Y2009307367.ext
386 386 siendo para este caso, la ultima combinacion de letras, identica al file buscado
387 387
388 388 Return:
389 389 Si encuentra la cobinacion adecuada devuelve el path completo y el nombre del file
390 390 caso contrario devuelve None como path y el la ultima combinacion de nombre en mayusculas
391 391 para el filename
392 392 """
393 393 fullfilename = None
394 394 find_flag = False
395 395 filename = None
396 396
397 397 prefixDirList = [None, 'd', 'D']
398 398 if ext.lower() == ".r": # voltage
399 399 prefixFileList = ['d', 'D']
400 400 elif ext.lower() == ".pdata": # spectra
401 401 prefixFileList = ['p', 'P']
402 402 else:
403 403 return None, filename
404 404
405 405 # barrido por las combinaciones posibles
406 406 for prefixDir in prefixDirList:
407 407 thispath = path
408 408 if prefixDir != None:
409 409 # formo el nombre del directorio xYYYYDDD (x=d o x=D)
410 410 if foldercounter == 0:
411 411 thispath = os.path.join(path, "%s%04d%03d" %
412 412 (prefixDir, year, doy))
413 413 else:
414 414 thispath = os.path.join(path, "%s%04d%03d_%02d" % (
415 415 prefixDir, year, doy, foldercounter))
416 416 for prefixFile in prefixFileList: # barrido por las dos combinaciones posibles de "D"
417 417 # formo el nombre del file xYYYYDDDSSS.ext
418 418 filename = "%s%04d%03d%03d%s" % (prefixFile, year, doy, set, ext)
419 419 fullfilename = os.path.join(
420 420 thispath, filename) # formo el path completo
421 421
422 422 if os.path.exists(fullfilename): # verifico que exista
423 423 find_flag = True
424 424 break
425 425 if find_flag:
426 426 break
427 427
428 428 if not(find_flag):
429 429 return None, filename
430 430
431 431 return fullfilename, filename
432 432
433 433
434 434 def isRadarFolder(folder):
435 435 try:
436 436 year = int(folder[1:5])
437 437 doy = int(folder[5:8])
438 438 except:
439 439 return 0
440 440
441 441 return 1
442 442
443 443
444 444 def isRadarFile(file):
445 445 try:
446 446 year = int(file[1:5])
447 447 doy = int(file[5:8])
448 448 set = int(file[8:11])
449 449 except:
450 450 return 0
451 451
452 452 return 1
453 453
454 454
455 455 def getDateFromRadarFile(file):
456 456 try:
457 457 year = int(file[1:5])
458 458 doy = int(file[5:8])
459 459 set = int(file[8:11])
460 460 except:
461 461 return None
462 462
463 463 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
464 464 return thisDate
465 465
466 466
467 467 def getDateFromRadarFolder(folder):
468 468 try:
469 469 year = int(folder[1:5])
470 470 doy = int(folder[5:8])
471 471 except:
472 472 return None
473 473
474 474 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy - 1)
475 475 return thisDate
476 476
477 477
478 478 class JRODataIO:
479 479
480 480 c = 3E8
481 481
482 482 isConfig = False
483 483
484 484 basicHeaderObj = None
485 485
486 486 systemHeaderObj = None
487 487
488 488 radarControllerHeaderObj = None
489 489
490 490 processingHeaderObj = None
491 491
492 492 dtype = None
493 493
494 494 pathList = []
495 495
496 496 filenameList = []
497 497
498 498 filename = None
499 499
500 500 ext = None
501 501
502 502 flagIsNewFile = 1
503 503
504 504 flagDiscontinuousBlock = 0
505 505
506 506 flagIsNewBlock = 0
507 507
508 508 fp = None
509 509
510 510 firstHeaderSize = 0
511 511
512 512 basicHeaderSize = 24
513 513
514 514 versionFile = 1103
515 515
516 516 fileSize = None
517 517
518 518 # ippSeconds = None
519 519
520 520 fileSizeByHeader = None
521 521
522 522 fileIndex = None
523 523
524 524 profileIndex = None
525 525
526 526 blockIndex = None
527 527
528 528 nTotalBlocks = None
529 529
530 530 maxTimeStep = 30
531 531
532 532 lastUTTime = None
533 533
534 534 datablock = None
535 535
536 536 dataOut = None
537 537
538 538 blocksize = None
539 539
540 540 getByBlock = False
541 541
542 542 def __init__(self):
543 543
544 544 raise NotImplementedError
545 545
546 546 def run(self):
547 547
548 548 raise NotImplementedError
549 549
550 550 def getDtypeWidth(self):
551 551
552 552 dtype_index = get_dtype_index(self.dtype)
553 553 dtype_width = get_dtype_width(dtype_index)
554 554
555 555 return dtype_width
556 556
557 557 def getAllowedArgs(self):
558 558 if hasattr(self, '__attrs__'):
559 559 return self.__attrs__
560 560 else:
561 561 return inspect.getargspec(self.run).args
562 562
563 563
564 564 class JRODataReader(JRODataIO):
565 565
566 566 online = 0
567 567
568 568 realtime = 0
569 569
570 570 nReadBlocks = 0
571 571
572 572 delay = 10 # number of seconds waiting a new file
573 573
574 574 nTries = 3 # quantity tries
575 575
576 576 nFiles = 3 # number of files for searching
577 577
578 578 path = None
579 579
580 580 foldercounter = 0
581 581
582 582 flagNoMoreFiles = 0
583 583
584 584 datetimeList = []
585 585
586 586 __isFirstTimeOnline = 1
587 587
588 588 __printInfo = True
589 589
590 590 profileIndex = None
591 591
592 592 nTxs = 1
593 593
594 594 txIndex = None
595 595
596 596 # Added--------------------
597 597
598 598 selBlocksize = None
599 599
600 600 selBlocktime = None
601 601
602 602 def __init__(self):
603 603 """
604 604 This class is used to find data files
605 605
606 606 Example:
607 607 reader = JRODataReader()
608 608 fileList = reader.findDataFiles()
609 609
610 610 """
611 611 pass
612 612
613 613 def createObjByDefault(self):
614 614 """
615 615
616 616 """
617 617 raise NotImplementedError
618 618
619 619 def getBlockDimension(self):
620 620
621 621 raise NotImplementedError
622 622
623 623 def searchFilesOffLine(self,
624 624 path,
625 625 startDate=None,
626 626 endDate=None,
627 627 startTime=datetime.time(0, 0, 0),
628 628 endTime=datetime.time(23, 59, 59),
629 629 set=None,
630 630 expLabel='',
631 631 ext='.r',
632 632 cursor=None,
633 633 skip=None,
634 634 walk=True):
635 635
636 636 self.filenameList = []
637 637 self.datetimeList = []
638 638
639 639 pathList = []
640 640
641 641 dateList, pathList = self.findDatafiles(
642 642 path, startDate, endDate, expLabel, ext, walk, include_path=True)
643 643
644 644 if dateList == []:
645 645 return [], []
646 646
647 647 if len(dateList) > 1:
648 648 print("[Reading] Data found for date range [%s - %s]: total days = %d" % (startDate, endDate, len(dateList)))
649 649 else:
650 650 print("[Reading] Data found for date range [%s - %s]: date = %s" % (startDate, endDate, dateList[0]))
651 651
652 652 filenameList = []
653 653 datetimeList = []
654 654
655 655 for thisPath in pathList:
656 656
657 657 fileList = glob.glob1(thisPath, "*%s" % ext)
658 658 fileList.sort()
659 659
660 660 for file in fileList:
661 661
662 662 filename = os.path.join(thisPath, file)
663 663
664 664 if not isFileInDateRange(filename, startDate, endDate):
665 665 continue
666 666
667 667 thisDatetime = isFileInTimeRange(
668 668 filename, startDate, endDate, startTime, endTime)
669 669
670 670 if not(thisDatetime):
671 671 continue
672 672
673 673 filenameList.append(filename)
674 674 datetimeList.append(thisDatetime)
675 675
676 676 if cursor is not None and skip is not None:
677 677 filenameList = filenameList[cursor * skip:cursor * skip + skip]
678 678 datetimeList = datetimeList[cursor * skip:cursor * skip + skip]
679 679
680 680 if not(filenameList):
681 681 print("[Reading] Time range selected invalid [%s - %s]: No *%s files in %s)" % (startTime, endTime, ext, path))
682 682 return [], []
683 683
684 684 print("[Reading] %d file(s) was(were) found in time range: %s - %s" % (len(filenameList), startTime, endTime))
685 685
686 686 # for i in range(len(filenameList)):
687 687 # print "[Reading] %s -> [%s]" %(filenameList[i], datetimeList[i].ctime())
688 688
689 689 self.filenameList = filenameList
690 690 self.datetimeList = datetimeList
691 691
692 692 return pathList, filenameList
693 693
694 694 def __searchFilesOnLine(self, path, expLabel="", ext=None, walk=True, set=None):
695 695 """
696 696 Busca el ultimo archivo de la ultima carpeta (determinada o no por startDateTime) y
697 697 devuelve el archivo encontrado ademas de otros datos.
698 698
699 699 Input:
700 700 path : carpeta donde estan contenidos los files que contiene data
701 701
702 702 expLabel : Nombre del subexperimento (subfolder)
703 703
704 704 ext : extension de los files
705 705
706 706 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
707 707
708 708 Return:
709 709 directory : eL directorio donde esta el file encontrado
710 710 filename : el ultimo file de una determinada carpeta
711 711 year : el anho
712 712 doy : el numero de dia del anho
713 713 set : el set del archivo
714 714
715 715
716 716 """
717 717 if not os.path.isdir(path):
718 718 return None, None, None, None, None, None
719 719
720 720 dirList = []
721 721
722 722 if not walk:
723 723 fullpath = path
724 724 foldercounter = 0
725 725 else:
726 726 # Filtra solo los directorios
727 727 for thisPath in os.listdir(path):
728 728 if not os.path.isdir(os.path.join(path, thisPath)):
729 729 continue
730 730 if not isRadarFolder(thisPath):
731 731 continue
732 732
733 733 dirList.append(thisPath)
734 734
735 735 if not(dirList):
736 736 return None, None, None, None, None, None
737 737
738 738 dirList = sorted(dirList, key=str.lower)
739 739
740 740 doypath = dirList[-1]
741 741 foldercounter = int(doypath.split('_')[1]) if len(
742 742 doypath.split('_')) > 1 else 0
743 743 fullpath = os.path.join(path, doypath, expLabel)
744 744
745 745 print("[Reading] %s folder was found: " % (fullpath))
746 746
747 747 if set == None:
748 748 filename = getlastFileFromPath(fullpath, ext)
749 749 else:
750 750 filename = getFileFromSet(fullpath, ext, set)
751 751
752 752 if not(filename):
753 753 return None, None, None, None, None, None
754 754
755 755 print("[Reading] %s file was found" % (filename))
756 756
757 757 if not(self.__verifyFile(os.path.join(fullpath, filename))):
758 758 return None, None, None, None, None, None
759 759
760 760 year = int(filename[1:5])
761 761 doy = int(filename[5:8])
762 762 set = int(filename[8:11])
763 763
764 764 return fullpath, foldercounter, filename, year, doy, set
765 765
766 766 def __setNextFileOffline(self):
767 767
768 768 idFile = self.fileIndex
769 769
770 770 while (True):
771 771 idFile += 1
772 772 if not(idFile < len(self.filenameList)):
773 773 self.flagNoMoreFiles = 1
774 774 # print "[Reading] No more Files"
775 775 return 0
776 776
777 777 filename = self.filenameList[idFile]
778 778
779 779 if not(self.__verifyFile(filename)):
780 780 continue
781 781
782 782 fileSize = os.path.getsize(filename)
783 783 fp = open(filename, 'rb')
784 784 break
785 785
786 786 self.flagIsNewFile = 1
787 787 self.fileIndex = idFile
788 788 self.filename = filename
789 789 self.fileSize = fileSize
790 790 self.fp = fp
791 791
792 792 # print "[Reading] Setting the file: %s"%self.filename
793 793
794 794 return 1
795 795
796 796 def __setNextFileOnline(self):
797 797 """
798 798 Busca el siguiente file que tenga suficiente data para ser leida, dentro de un folder especifico, si
799 799 no encuentra un file valido espera un tiempo determinado y luego busca en los posibles n files
800 800 siguientes.
801 801
802 802 Affected:
803 803 self.flagIsNewFile
804 804 self.filename
805 805 self.fileSize
806 806 self.fp
807 807 self.set
808 808 self.flagNoMoreFiles
809 809
810 810 Return:
811 811 0 : si luego de una busqueda del siguiente file valido este no pudo ser encontrado
812 812 1 : si el file fue abierto con exito y esta listo a ser leido
813 813
814 814 Excepciones:
815 815 Si un determinado file no puede ser abierto
816 816 """
817 817 nFiles = 0
818 818 fileOk_flag = False
819 819 firstTime_flag = True
820 820
821 821 self.set += 1
822 822
823 823 if self.set > 999:
824 824 self.set = 0
825 825 self.foldercounter += 1
826 826
827 827 # busca el 1er file disponible
828 828 fullfilename, filename = checkForRealPath(
829 829 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
830 830 if fullfilename:
831 831 if self.__verifyFile(fullfilename, False):
832 832 fileOk_flag = True
833 833
834 834 # si no encuentra un file entonces espera y vuelve a buscar
835 835 if not(fileOk_flag):
836 836 # busco en los siguientes self.nFiles+1 files posibles
837 837 for nFiles in range(self.nFiles + 1):
838 838
839 839 if firstTime_flag: # si es la 1era vez entonces hace el for self.nTries veces
840 840 tries = self.nTries
841 841 else:
842 842 tries = 1 # si no es la 1era vez entonces solo lo hace una vez
843 843
844 844 for nTries in range(tries):
845 845 if firstTime_flag:
846 846 print("\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % (self.delay, filename, nTries + 1))
847 847 sleep(self.delay)
848 848 else:
849 849 print("\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext))
850 850
851 851 fullfilename, filename = checkForRealPath(
852 852 self.path, self.foldercounter, self.year, self.doy, self.set, self.ext)
853 853 if fullfilename:
854 854 if self.__verifyFile(fullfilename):
855 855 fileOk_flag = True
856 856 break
857 857
858 858 if fileOk_flag:
859 859 break
860 860
861 861 firstTime_flag = False
862 862
863 863 log.warning('Skipping the file {} due to this file doesn\'t exist'.format(filename))
864 864 self.set += 1
865 865
866 866 # si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
867 867 if nFiles == (self.nFiles - 1):
868 868 self.set = 0
869 869 self.doy += 1
870 870 self.foldercounter = 0
871 871
872 872 if fileOk_flag:
873 873 self.fileSize = os.path.getsize(fullfilename)
874 874 self.filename = fullfilename
875 875 self.flagIsNewFile = 1
876 876 if self.fp != None:
877 877 self.fp.close()
878 878 self.fp = open(fullfilename, 'rb')
879 879 self.flagNoMoreFiles = 0
880 880 # print '[Reading] Setting the file: %s' % fullfilename
881 881 else:
882 882 self.fileSize = 0
883 883 self.filename = None
884 884 self.flagIsNewFile = 0
885 885 self.fp = None
886 886 self.flagNoMoreFiles = 1
887 887 # print '[Reading] No more files to read'
888 888
889 889 return fileOk_flag
890 890
891 891 def setNextFile(self):
892 892 if self.fp != None:
893 893 self.fp.close()
894 894
895 895 if self.online:
896 896 newFile = self.__setNextFileOnline()
897 897 else:
898 898 newFile = self.__setNextFileOffline()
899 899
900 900 if not(newFile):
901 901 self.dataOut.error = 'No more files to read'
902 902 return 0
903 903
904 904 if self.verbose:
905 905 print('[Reading] Setting the file: %s' % self.filename)
906 906
907 907 self.__readFirstHeader()
908 908 self.nReadBlocks = 0
909 909 return 1
910 910
911 911 def __waitNewBlock(self):
912 912 """
913 913 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
914 914
915 915 Si el modo de lectura es OffLine siempre retorn 0
916 916 """
917 917 if not self.online:
918 918 return 0
919 919
920 920 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
921 921 return 0
922 922
923 923 currentPointer = self.fp.tell()
924 924
925 925 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
926 926
927 927 for nTries in range(self.nTries):
928 928
929 929 self.fp.close()
930 930 self.fp = open(self.filename, 'rb')
931 931 self.fp.seek(currentPointer)
932 932
933 933 self.fileSize = os.path.getsize(self.filename)
934 934 currentSize = self.fileSize - currentPointer
935 935
936 936 if (currentSize >= neededSize):
937 937 self.basicHeaderObj.read(self.fp)
938 938 return 1
939 939
940 940 if self.fileSize == self.fileSizeByHeader:
941 941 # self.flagEoF = True
942 942 return 0
943 943
944 944 print("[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1))
945 945 sleep(self.delay)
946 946
947 947 return 0
948 948
949 949 def waitDataBlock(self, pointer_location, blocksize=None):
950 950
951 951 currentPointer = pointer_location
952 952 if blocksize is None:
953 953 neededSize = self.processingHeaderObj.blockSize # + self.basicHeaderSize
954 954 else:
955 955 neededSize = blocksize
956 956
957 957 for nTries in range(self.nTries):
958 958 self.fp.close()
959 959 self.fp = open(self.filename, 'rb')
960 960 self.fp.seek(currentPointer)
961 961
962 962 self.fileSize = os.path.getsize(self.filename)
963 963 currentSize = self.fileSize - currentPointer
964 964
965 965 if (currentSize >= neededSize):
966 966 return 1
967 967
968 968 log.warning(
969 969 "Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries + 1),
970 970 self.name
971 971 )
972 972 sleep(self.delay)
973 973
974 974 return 0
975 975
976 976 def __jumpToLastBlock(self):
977 977
978 978 if not(self.__isFirstTimeOnline):
979 979 return
980 980
981 981 csize = self.fileSize - self.fp.tell()
982 982 blocksize = self.processingHeaderObj.blockSize
983 983
984 984 # salta el primer bloque de datos
985 985 if csize > self.processingHeaderObj.blockSize:
986 986 self.fp.seek(self.fp.tell() + blocksize)
987 987 else:
988 988 return
989 989
990 990 csize = self.fileSize - self.fp.tell()
991 991 neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
992 992 while True:
993 993
994 994 if self.fp.tell() < self.fileSize:
995 995 self.fp.seek(self.fp.tell() + neededsize)
996 996 else:
997 997 self.fp.seek(self.fp.tell() - neededsize)
998 998 break
999 999
1000 1000 # csize = self.fileSize - self.fp.tell()
1001 1001 # neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
1002 1002 # factor = int(csize/neededsize)
1003 1003 # if factor > 0:
1004 1004 # self.fp.seek(self.fp.tell() + factor*neededsize)
1005 1005
1006 1006 self.flagIsNewFile = 0
1007 1007 self.__isFirstTimeOnline = 0
1008 1008
1009 1009 def __setNewBlock(self):
1010 1010 # if self.server is None:
1011 1011 if self.fp == None:
1012 1012 return 0
1013 1013
1014 1014 # if self.online:
1015 1015 # self.__jumpToLastBlock()
1016 1016
1017 1017 if self.flagIsNewFile:
1018 1018 self.lastUTTime = self.basicHeaderObj.utc
1019 1019 return 1
1020 1020
1021 1021 if self.realtime:
1022 1022 self.flagDiscontinuousBlock = 1
1023 1023 if not(self.setNextFile()):
1024 1024 return 0
1025 1025 else:
1026 1026 return 1
1027 1027 # if self.server is None:
1028 1028 currentSize = self.fileSize - self.fp.tell()
1029 1029 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
1030 1030 if (currentSize >= neededSize):
1031 1031 self.basicHeaderObj.read(self.fp)
1032 1032 self.lastUTTime = self.basicHeaderObj.utc
1033 1033 return 1
1034 1034 # else:
1035 1035 # self.basicHeaderObj.read(self.zHeader)
1036 1036 # self.lastUTTime = self.basicHeaderObj.utc
1037 1037 # return 1
1038 1038 if self.__waitNewBlock():
1039 1039 self.lastUTTime = self.basicHeaderObj.utc
1040 1040 return 1
1041 1041 # if self.server is None:
1042 1042 if not(self.setNextFile()):
1043 1043 return 0
1044 1044
1045 1045 deltaTime = self.basicHeaderObj.utc - self.lastUTTime
1046 1046 self.lastUTTime = self.basicHeaderObj.utc
1047 1047
1048 1048 self.flagDiscontinuousBlock = 0
1049 1049
1050 1050 if deltaTime > self.maxTimeStep:
1051 1051 self.flagDiscontinuousBlock = 1
1052 1052
1053 1053 return 1
1054 1054
1055 1055 def readNextBlock(self):
1056 1056
1057 1057 # Skip block out of startTime and endTime
1058 1058 while True:
1059 1059 if not(self.__setNewBlock()):
1060 1060 self.dataOut.error = 'No more files to read'
1061 1061 return 0
1062 1062
1063 1063 if not(self.readBlock()):
1064 1064 return 0
1065 1065
1066 1066 self.getBasicHeader()
1067 1067 if (self.dataOut.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or (self.dataOut.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
1068 1068 print("[Reading] Block No. %d/%d -> %s [Skipping]" % (self.nReadBlocks,
1069 1069 self.processingHeaderObj.dataBlocksPerFile,
1070 1070 self.dataOut.datatime.ctime()))
1071 1071 continue
1072 1072
1073 1073 break
1074 1074
1075 1075 if self.verbose:
1076 1076 print("[Reading] Block No. %d/%d -> %s" % (self.nReadBlocks,
1077 1077 self.processingHeaderObj.dataBlocksPerFile,
1078 1078 self.dataOut.datatime.ctime()))
1079 1079 return 1
1080 1080
1081 1081 def __readFirstHeader(self):
1082 1082
1083 1083 self.basicHeaderObj.read(self.fp)
1084 1084 self.systemHeaderObj.read(self.fp)
1085 1085 self.radarControllerHeaderObj.read(self.fp)
1086 1086 self.processingHeaderObj.read(self.fp)
1087 1087
1088 1088 self.firstHeaderSize = self.basicHeaderObj.size
1089 1089
1090 1090 datatype = int(numpy.log2((self.processingHeaderObj.processFlags &
1091 1091 PROCFLAG.DATATYPE_MASK)) - numpy.log2(PROCFLAG.DATATYPE_CHAR))
1092 1092 if datatype == 0:
1093 1093 datatype_str = numpy.dtype([('real', '<i1'), ('imag', '<i1')])
1094 1094 elif datatype == 1:
1095 1095 datatype_str = numpy.dtype([('real', '<i2'), ('imag', '<i2')])
1096 1096 elif datatype == 2:
1097 1097 datatype_str = numpy.dtype([('real', '<i4'), ('imag', '<i4')])
1098 1098 elif datatype == 3:
1099 1099 datatype_str = numpy.dtype([('real', '<i8'), ('imag', '<i8')])
1100 1100 elif datatype == 4:
1101 1101 datatype_str = numpy.dtype([('real', '<f4'), ('imag', '<f4')])
1102 1102 elif datatype == 5:
1103 1103 datatype_str = numpy.dtype([('real', '<f8'), ('imag', '<f8')])
1104 1104 else:
1105 1105 raise ValueError('Data type was not defined')
1106 1106
1107 1107 self.dtype = datatype_str
1108 1108 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
1109 1109 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + \
1110 1110 self.firstHeaderSize + self.basicHeaderSize * \
1111 1111 (self.processingHeaderObj.dataBlocksPerFile - 1)
1112 1112 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
1113 1113 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
1114 1114 self.getBlockDimension()
1115 1115
1116 1116 def __verifyFile(self, filename, msgFlag=True):
1117 1117
1118 1118 msg = None
1119 1119
1120 1120 try:
1121 1121 fp = open(filename, 'rb')
1122 1122 except IOError:
1123 1123
1124 1124 if msgFlag:
1125 1125 print("[Reading] File %s can't be opened" % (filename))
1126 1126
1127 1127 return False
1128 1128
1129 1129 currentPosition = fp.tell()
1130 1130 neededSize = self.processingHeaderObj.blockSize + self.firstHeaderSize
1131 1131
1132 1132 if neededSize == 0:
1133 1133 basicHeaderObj = BasicHeader(LOCALTIME)
1134 1134 systemHeaderObj = SystemHeader()
1135 1135 radarControllerHeaderObj = RadarControllerHeader()
1136 1136 processingHeaderObj = ProcessingHeader()
1137 1137
1138 1138 if not(basicHeaderObj.read(fp)):
1139 1139 fp.close()
1140 1140 return False
1141 1141
1142 1142 if not(systemHeaderObj.read(fp)):
1143 1143 fp.close()
1144 1144 return False
1145 1145
1146 1146 if not(radarControllerHeaderObj.read(fp)):
1147 1147 fp.close()
1148 1148 return False
1149 1149
1150 1150 if not(processingHeaderObj.read(fp)):
1151 1151 fp.close()
1152 1152 return False
1153 1153
1154 1154 neededSize = processingHeaderObj.blockSize + basicHeaderObj.size
1155 1155 else:
1156 1156 msg = "[Reading] Skipping the file %s due to it hasn't enough data" % filename
1157 1157
1158 1158 fp.close()
1159 1159
1160 1160 fileSize = os.path.getsize(filename)
1161 1161 currentSize = fileSize - currentPosition
1162 1162
1163 1163 if currentSize < neededSize:
1164 1164 if msgFlag and (msg != None):
1165 1165 print(msg)
1166 1166 return False
1167 1167
1168 1168 return True
1169 1169
1170 1170 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1171 1171
1172 1172 path_empty = True
1173 1173
1174 1174 dateList = []
1175 1175 pathList = []
1176 1176
1177 1177 multi_path = path.split(',')
1178 1178
1179 1179 if not walk:
1180 1180
1181 1181 for single_path in multi_path:
1182 1182
1183 1183 if not os.path.isdir(single_path):
1184 1184 continue
1185 1185
1186 1186 fileList = glob.glob1(single_path, "*" + ext)
1187 1187
1188 1188 if not fileList:
1189 1189 continue
1190 1190
1191 1191 path_empty = False
1192 1192
1193 1193 fileList.sort()
1194 1194
1195 1195 for thisFile in fileList:
1196 1196
1197 1197 if not os.path.isfile(os.path.join(single_path, thisFile)):
1198 1198 continue
1199 1199
1200 1200 if not isRadarFile(thisFile):
1201 1201 continue
1202 1202
1203 1203 if not isFileInDateRange(thisFile, startDate, endDate):
1204 1204 continue
1205 1205
1206 1206 thisDate = getDateFromRadarFile(thisFile)
1207 1207
1208 1208 if thisDate in dateList or single_path in pathList:
1209 1209 continue
1210 1210
1211 1211 dateList.append(thisDate)
1212 1212 pathList.append(single_path)
1213 1213
1214 1214 else:
1215 1215 for single_path in multi_path:
1216 1216
1217 1217 if not os.path.isdir(single_path):
1218 1218 continue
1219 1219
1220 1220 dirList = []
1221 1221
1222 1222 for thisPath in os.listdir(single_path):
1223 1223
1224 1224 if not os.path.isdir(os.path.join(single_path, thisPath)):
1225 1225 continue
1226 1226
1227 1227 if not isRadarFolder(thisPath):
1228 1228 continue
1229 1229
1230 1230 if not isFolderInDateRange(thisPath, startDate, endDate):
1231 1231 continue
1232 1232
1233 1233 dirList.append(thisPath)
1234 1234
1235 1235 if not dirList:
1236 1236 continue
1237 1237
1238 1238 dirList.sort()
1239 1239
1240 1240 for thisDir in dirList:
1241 1241
1242 1242 datapath = os.path.join(single_path, thisDir, expLabel)
1243 1243 fileList = glob.glob1(datapath, "*" + ext)
1244 1244
1245 1245 if not fileList:
1246 1246 continue
1247 1247
1248 1248 path_empty = False
1249 1249
1250 1250 thisDate = getDateFromRadarFolder(thisDir)
1251 1251
1252 1252 pathList.append(datapath)
1253 1253 dateList.append(thisDate)
1254 1254
1255 1255 dateList.sort()
1256 1256
1257 1257 if walk:
1258 1258 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1259 1259 else:
1260 1260 pattern_path = multi_path[0]
1261 1261
1262 1262 if path_empty:
1263 1263 print("[Reading] No *%s files in %s for %s to %s" % (ext, pattern_path, startDate, endDate))
1264 1264 else:
1265 1265 if not dateList:
1266 1266 print("[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" % (startDate, endDate, ext, path))
1267 1267
1268 1268 if include_path:
1269 1269 return dateList, pathList
1270 1270
1271 1271 return dateList
1272 1272
1273 1273 def setup(self,
1274 1274 path=None,
1275 1275 startDate=None,
1276 1276 endDate=None,
1277 1277 startTime=datetime.time(0, 0, 0),
1278 1278 endTime=datetime.time(23, 59, 59),
1279 1279 set=None,
1280 1280 expLabel="",
1281 1281 ext=None,
1282 1282 online=False,
1283 1283 delay=60,
1284 1284 walk=True,
1285 1285 getblock=False,
1286 1286 nTxs=1,
1287 1287 realtime=False,
1288 1288 blocksize=None,
1289 1289 blocktime=None,
1290 1290 skip=None,
1291 1291 cursor=None,
1292 1292 warnings=True,
1293 1293 verbose=True,
1294 1294 server=None,
1295 1295 format=None,
1296 1296 oneDDict=None,
1297 1297 twoDDict=None,
1298 1298 independentParam=None):
1299 1299 if server is not None:
1300 1300 if 'tcp://' in server:
1301 1301 address = server
1302 1302 else:
1303 1303 address = 'ipc:///tmp/%s' % server
1304 1304 self.server = address
1305 1305 self.context = zmq.Context()
1306 1306 self.receiver = self.context.socket(zmq.PULL)
1307 1307 self.receiver.connect(self.server)
1308 1308 time.sleep(0.5)
1309 1309 print('[Starting] ReceiverData from {}'.format(self.server))
1310 1310 else:
1311 1311 self.server = None
1312 1312 if path == None:
1313 1313 raise ValueError("[Reading] The path is not valid")
1314 1314
1315 1315 if ext == None:
1316 1316 ext = self.ext
1317 1317
1318 1318 if online:
1319 1319 print("[Reading] Searching files in online mode...")
1320 1320
1321 1321 for nTries in range(self.nTries):
1322 1322 fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(
1323 1323 path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
1324 1324
1325 1325 if fullpath:
1326 1326 break
1327 1327
1328 1328 print('[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries + 1))
1329 1329 sleep(self.delay)
1330 1330
1331 1331 if not(fullpath):
1332 1332 self.dataOut.error = 'There isn\'t any valid file in {}'.format(path)
1333 1333 return
1334 1334
1335 1335 self.year = year
1336 1336 self.doy = doy
1337 1337 self.set = set - 1
1338 1338 self.path = path
1339 1339 self.foldercounter = foldercounter
1340 1340 last_set = None
1341 1341 else:
1342 1342 print("[Reading] Searching files in offline mode ...")
1343 1343 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1344 1344 startTime=startTime, endTime=endTime,
1345 1345 set=set, expLabel=expLabel, ext=ext,
1346 1346 walk=walk, cursor=cursor,
1347 1347 skip=skip)
1348 1348
1349 1349 if not(pathList):
1350 1350 self.fileIndex = -1
1351 1351 self.pathList = []
1352 1352 self.filenameList = []
1353 1353 return
1354 1354
1355 1355 self.fileIndex = -1
1356 1356 self.pathList = pathList
1357 1357 self.filenameList = filenameList
1358 1358 file_name = os.path.basename(filenameList[-1])
1359 1359 basename, ext = os.path.splitext(file_name)
1360 1360 last_set = int(basename[-3:])
1361 1361
1362 1362 self.online = online
1363 1363 self.realtime = realtime
1364 1364 self.delay = delay
1365 1365 ext = ext.lower()
1366 1366 self.ext = ext
1367 1367 self.getByBlock = getblock
1368 1368 self.nTxs = nTxs
1369 1369 self.startTime = startTime
1370 1370 self.endTime = endTime
1371 1371 self.endDate = endDate
1372 1372 self.startDate = startDate
1373 1373 # Added-----------------
1374 1374 self.selBlocksize = blocksize
1375 1375 self.selBlocktime = blocktime
1376 1376
1377 1377 # Verbose-----------
1378 1378 self.verbose = verbose
1379 1379 self.warnings = warnings
1380 1380
1381 1381 if not(self.setNextFile()):
1382 1382 if (startDate != None) and (endDate != None):
1383 1383 print("[Reading] No files in range: %s - %s" % (datetime.datetime.combine(startDate, startTime).ctime(), datetime.datetime.combine(endDate, endTime).ctime()))
1384 1384 elif startDate != None:
1385 1385 print("[Reading] No files in range: %s" % (datetime.datetime.combine(startDate, startTime).ctime()))
1386 1386 else:
1387 1387 print("[Reading] No files")
1388 1388
1389 1389 self.fileIndex = -1
1390 1390 self.pathList = []
1391 1391 self.filenameList = []
1392 1392 return
1393 1393
1394 1394 # self.getBasicHeader()
1395 1395
1396 1396 if last_set != None:
1397 1397 self.dataOut.last_block = last_set * \
1398 1398 self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
1399 1399 return
1400 1400
1401 1401 def getBasicHeader(self):
1402 1402
1403 1403 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond / \
1404 1404 1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1405 1405
1406 1406 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1407 1407
1408 1408 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1409 1409
1410 1410 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1411 1411
1412 1412 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1413 1413
1414 1414 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1415 1415
1416 1416 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds / self.nTxs
1417 1417
1418 1418 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1419 1419
1420 1420 def getFirstHeader(self):
1421 1421
1422 1422 raise NotImplementedError
1423 1423
1424 1424 def getData(self):
1425 1425
1426 1426 raise NotImplementedError
1427 1427
1428 1428 def hasNotDataInBuffer(self):
1429 1429
1430 1430 raise NotImplementedError
1431 1431
1432 1432 def readBlock(self):
1433 1433
1434 1434 raise NotImplementedError
1435 1435
1436 1436 def isEndProcess(self):
1437 1437
1438 1438 return self.flagNoMoreFiles
1439 1439
1440 1440 def printReadBlocks(self):
1441 1441
1442 1442 print("[Reading] Number of read blocks per file %04d" % self.nReadBlocks)
1443 1443
1444 1444 def printTotalBlocks(self):
1445 1445
1446 1446 print("[Reading] Number of read blocks %04d" % self.nTotalBlocks)
1447 1447
1448 1448 def printNumberOfBlock(self):
1449 1449 'SPAM!'
1450 1450
1451 1451 # if self.flagIsNewBlock:
1452 1452 # print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1453 1453 # self.processingHeaderObj.dataBlocksPerFile,
1454 1454 # self.dataOut.datatime.ctime())
1455 1455
1456 1456 def printInfo(self):
1457 1457
1458 1458 if self.__printInfo == False:
1459 1459 return
1460 1460
1461 1461 self.basicHeaderObj.printInfo()
1462 1462 self.systemHeaderObj.printInfo()
1463 1463 self.radarControllerHeaderObj.printInfo()
1464 1464 self.processingHeaderObj.printInfo()
1465 1465
1466 1466 self.__printInfo = False
1467 1467
1468 1468 def run(self,
1469 1469 path=None,
1470 1470 startDate=None,
1471 1471 endDate=None,
1472 1472 startTime=datetime.time(0, 0, 0),
1473 1473 endTime=datetime.time(23, 59, 59),
1474 1474 set=None,
1475 1475 expLabel="",
1476 1476 ext=None,
1477 1477 online=False,
1478 1478 delay=60,
1479 1479 walk=True,
1480 1480 getblock=False,
1481 1481 nTxs=1,
1482 1482 realtime=False,
1483 1483 blocksize=None,
1484 1484 blocktime=None,
1485 1485 skip=None,
1486 1486 cursor=None,
1487 1487 warnings=True,
1488 1488 server=None,
1489 1489 verbose=True,
1490 1490 format=None,
1491 1491 oneDDict=None,
1492 1492 twoDDict=None,
1493 1493 independentParam=None, **kwargs):
1494 1494
1495 1495 if not(self.isConfig):
1496 1496 self.setup(path=path,
1497 1497 startDate=startDate,
1498 1498 endDate=endDate,
1499 1499 startTime=startTime,
1500 1500 endTime=endTime,
1501 1501 set=set,
1502 1502 expLabel=expLabel,
1503 1503 ext=ext,
1504 1504 online=online,
1505 1505 delay=delay,
1506 1506 walk=walk,
1507 1507 getblock=getblock,
1508 1508 nTxs=nTxs,
1509 1509 realtime=realtime,
1510 1510 blocksize=blocksize,
1511 1511 blocktime=blocktime,
1512 1512 skip=skip,
1513 1513 cursor=cursor,
1514 1514 warnings=warnings,
1515 1515 server=server,
1516 1516 verbose=verbose,
1517 1517 format=format,
1518 1518 oneDDict=oneDDict,
1519 1519 twoDDict=twoDDict,
1520 1520 independentParam=independentParam)
1521 1521 self.isConfig = True
1522 1522 if server is None:
1523 1523 self.getData()
1524 1524 else:
1525 1525 self.getFromServer()
1526 1526
1527 1527
1528 1528 class JRODataWriter(JRODataIO):
1529 1529
1530 1530 """
1531 1531 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1532 1532 de los datos siempre se realiza por bloques.
1533 1533 """
1534 1534
1535 1535 blockIndex = 0
1536 1536
1537 1537 path = None
1538 1538
1539 1539 setFile = None
1540 1540
1541 1541 profilesPerBlock = None
1542 1542
1543 1543 blocksPerFile = None
1544 1544
1545 1545 nWriteBlocks = 0
1546 1546
1547 1547 fileDate = None
1548 1548
1549 1549 def __init__(self, dataOut=None):
1550 1550 raise NotImplementedError
1551 1551
1552 1552 def hasAllDataInBuffer(self):
1553 1553 raise NotImplementedError
1554 1554
1555 1555 def setBlockDimension(self):
1556 1556 raise NotImplementedError
1557 1557
1558 1558 def writeBlock(self):
1559 1559 raise NotImplementedError
1560 1560
1561 1561 def putData(self):
1562 1562 raise NotImplementedError
1563 1563
1564 1564 def getProcessFlags(self):
1565 1565
1566 1566 processFlags = 0
1567 1567
1568 1568 dtype_index = get_dtype_index(self.dtype)
1569 1569 procflag_dtype = get_procflag_dtype(dtype_index)
1570 1570
1571 1571 processFlags += procflag_dtype
1572 1572
1573 1573 if self.dataOut.flagDecodeData:
1574 1574 processFlags += PROCFLAG.DECODE_DATA
1575 1575
1576 1576 if self.dataOut.flagDeflipData:
1577 1577 processFlags += PROCFLAG.DEFLIP_DATA
1578 1578
1579 1579 if self.dataOut.code is not None:
1580 1580 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1581 1581
1582 1582 if self.dataOut.nCohInt > 1:
1583 1583 processFlags += PROCFLAG.COHERENT_INTEGRATION
1584 1584
1585 1585 if self.dataOut.type == "Spectra":
1586 1586 if self.dataOut.nIncohInt > 1:
1587 1587 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1588 1588
1589 1589 if self.dataOut.data_dc is not None:
1590 1590 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1591 1591
1592 1592 if self.dataOut.flagShiftFFT:
1593 1593 processFlags += PROCFLAG.SHIFT_FFT_DATA
1594 1594
1595 1595 return processFlags
1596 1596
1597 1597 def setBasicHeader(self):
1598 1598
1599 1599 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1600 1600 self.basicHeaderObj.version = self.versionFile
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1602 log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime))
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1603 1602 utc = numpy.floor(self.dataOut.utctime)
1604 milisecond = (self.dataOut.utctime - utc) * 1000.0
1605 log.warning(milisecond)
1603 milisecond = (self.dataOut.utctime - utc) * 1000.0
1606 1604 self.basicHeaderObj.utc = utc
1607 1605 self.basicHeaderObj.miliSecond = milisecond
1608 1606 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1609 1607 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1610 1608 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1611 1609
1612 1610 def setFirstHeader(self):
1613 1611 """
1614 1612 Obtiene una copia del First Header
1615 1613
1616 1614 Affected:
1617 1615
1618 1616 self.basicHeaderObj
1619 1617 self.systemHeaderObj
1620 1618 self.radarControllerHeaderObj
1621 1619 self.processingHeaderObj self.
1622 1620
1623 1621 Return:
1624 1622 None
1625 1623 """
1626 1624
1627 1625 raise NotImplementedError
1628 1626
1629 1627 def __writeFirstHeader(self):
1630 1628 """
1631 1629 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1632 1630
1633 1631 Affected:
1634 1632 __dataType
1635 1633
1636 1634 Return:
1637 1635 None
1638 1636 """
1639 1637
1640 1638 # CALCULAR PARAMETROS
1641 1639
1642 1640 sizeLongHeader = self.systemHeaderObj.size + \
1643 1641 self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1644 1642 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1645 1643
1646 1644 self.basicHeaderObj.write(self.fp)
1647 1645 self.systemHeaderObj.write(self.fp)
1648 1646 self.radarControllerHeaderObj.write(self.fp)
1649 1647 self.processingHeaderObj.write(self.fp)
1650 1648
1651 1649 def __setNewBlock(self):
1652 1650 """
1653 1651 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1654 1652
1655 1653 Return:
1656 1654 0 : si no pudo escribir nada
1657 1655 1 : Si escribio el Basic el First Header
1658 1656 """
1659 1657 if self.fp == None:
1660 1658 self.setNextFile()
1661 1659
1662 1660 if self.flagIsNewFile:
1663 1661 return 1
1664 1662
1665 1663 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1666 1664 self.basicHeaderObj.write(self.fp)
1667 1665 return 1
1668 1666
1669 1667 if not(self.setNextFile()):
1670 1668 return 0
1671 1669
1672 1670 return 1
1673 1671
1674 1672 def writeNextBlock(self):
1675 1673 """
1676 1674 Selecciona el bloque siguiente de datos y los escribe en un file
1677 1675
1678 1676 Return:
1679 1677 0 : Si no hizo pudo escribir el bloque de datos
1680 1678 1 : Si no pudo escribir el bloque de datos
1681 1679 """
1682 1680 if not(self.__setNewBlock()):
1683 1681 return 0
1684 1682
1685 1683 self.writeBlock()
1686 1684
1687 1685 print("[Writing] Block No. %d/%d" % (self.blockIndex,
1688 1686 self.processingHeaderObj.dataBlocksPerFile))
1689 1687
1690 1688 return 1
1691 1689
1692 1690 def setNextFile(self):
1693 1691 """
1694 1692 Determina el siguiente file que sera escrito
1695 1693
1696 1694 Affected:
1697 1695 self.filename
1698 1696 self.subfolder
1699 1697 self.fp
1700 1698 self.setFile
1701 1699 self.flagIsNewFile
1702 1700
1703 1701 Return:
1704 1702 0 : Si el archivo no puede ser escrito
1705 1703 1 : Si el archivo esta listo para ser escrito
1706 1704 """
1707 1705 ext = self.ext
1708 1706 path = self.path
1709 1707
1710 1708 if self.fp != None:
1711 1709 self.fp.close()
1712 1710
1713 1711 timeTuple = time.localtime(self.dataOut.utctime)
1714 1712 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year, timeTuple.tm_yday)
1715 1713
1716 1714 fullpath = os.path.join(path, subfolder)
1717 1715 setFile = self.setFile
1718 1716
1719 1717 if not(os.path.exists(fullpath)):
1720 1718 os.mkdir(fullpath)
1721 1719 setFile = -1 # inicializo mi contador de seteo
1722 1720 else:
1723 1721 filesList = os.listdir(fullpath)
1724 1722 if len(filesList) > 0:
1725 1723 filesList = sorted(filesList, key=str.lower)
1726 1724 filen = filesList[-1]
1727 1725 # el filename debera tener el siguiente formato
1728 1726 # 0 1234 567 89A BCDE (hex)
1729 1727 # x YYYY DDD SSS .ext
1730 1728 if isNumber(filen[8:11]):
1731 1729 # inicializo mi contador de seteo al seteo del ultimo file
1732 1730 setFile = int(filen[8:11])
1733 1731 else:
1734 1732 setFile = -1
1735 1733 else:
1736 1734 setFile = -1 # inicializo mi contador de seteo
1737 1735
1738 1736 setFile += 1
1739 1737
1740 1738 # If this is a new day it resets some values
1741 1739 if self.dataOut.datatime.date() > self.fileDate:
1742 1740 setFile = 0
1743 1741 self.nTotalBlocks = 0
1744 1742
1745 1743 filen = '{}{:04d}{:03d}{:03d}{}'.format(
1746 1744 self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext)
1747 1745
1748 1746 filename = os.path.join(path, subfolder, filen)
1749 1747
1750 1748 fp = open(filename, 'wb')
1751 1749
1752 1750 self.blockIndex = 0
1753 1751
1754 1752 # guardando atributos
1755 1753 self.filename = filename
1756 1754 self.subfolder = subfolder
1757 1755 self.fp = fp
1758 1756 self.setFile = setFile
1759 1757 self.flagIsNewFile = 1
1760 1758 self.fileDate = self.dataOut.datatime.date()
1761 1759
1762 1760 self.setFirstHeader()
1763 1761
1764 1762 print('[Writing] Opening file: %s' % self.filename)
1765 1763
1766 1764 self.__writeFirstHeader()
1767 1765
1768 1766 return 1
1769 1767
1770 1768 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1771 1769 """
1772 1770 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1773 1771
1774 1772 Inputs:
1775 1773 path : directory where data will be saved
1776 1774 profilesPerBlock : number of profiles per block
1777 1775 set : initial file set
1778 1776 datatype : An integer number that defines data type:
1779 1777 0 : int8 (1 byte)
1780 1778 1 : int16 (2 bytes)
1781 1779 2 : int32 (4 bytes)
1782 1780 3 : int64 (8 bytes)
1783 1781 4 : float32 (4 bytes)
1784 1782 5 : double64 (8 bytes)
1785 1783
1786 1784 Return:
1787 1785 0 : Si no realizo un buen seteo
1788 1786 1 : Si realizo un buen seteo
1789 1787 """
1790 1788
1791 1789 if ext == None:
1792 1790 ext = self.ext
1793 1791
1794 1792 self.ext = ext.lower()
1795 1793
1796 1794 self.path = path
1797 1795
1798 1796 if set is None:
1799 1797 self.setFile = -1
1800 1798 else:
1801 1799 self.setFile = set - 1
1802 1800
1803 1801 self.blocksPerFile = blocksPerFile
1804 1802
1805 1803 self.profilesPerBlock = profilesPerBlock
1806 1804
1807 1805 self.dataOut = dataOut
1808 1806 self.fileDate = self.dataOut.datatime.date()
1809 1807 # By default
1810 1808 self.dtype = self.dataOut.dtype
1811 1809
1812 1810 if datatype is not None:
1813 1811 self.dtype = get_numpy_dtype(datatype)
1814 1812
1815 1813 if not(self.setNextFile()):
1816 1814 print("[Writing] There isn't a next file")
1817 1815 return 0
1818 1816
1819 1817 self.setBlockDimension()
1820 1818
1821 1819 return 1
1822 1820
1823 1821 def run(self, dataOut, path, blocksPerFile=100, profilesPerBlock=64, set=None, ext=None, datatype=4, **kwargs):
1824 1822
1825 1823 if not(self.isConfig):
1826 1824
1827 1825 self.setup(dataOut, path, blocksPerFile, profilesPerBlock=profilesPerBlock,
1828 1826 set=set, ext=ext, datatype=datatype, **kwargs)
1829 1827 self.isConfig = True
1830 1828
1831 1829 self.dataOut = dataOut
1832 1830 self.putData()
1833 1831 return self.dataOut No newline at end of file
@@ -1,1543 +1,1544
1 1 import numpy
2 2 import time
3 3 import os
4 4 import h5py
5 5 import re
6 6 import datetime
7 7
8 8 from schainpy.model.data.jrodata import *
9 9 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 10 from schainpy.model.io.jroIO_base import *
11 11 from schainpy.utils import log
12 12
13 13 @MPDecorator
14 14 class ParamReader(JRODataReader,ProcessingUnit):
15 15 '''
16 16 Reads HDF5 format files
17 17 path
18 18 startDate
19 19 endDate
20 20 startTime
21 21 endTime
22 22 '''
23 23
24 24 ext = ".hdf5"
25 25 optchar = "D"
26 26 timezone = None
27 27 startTime = None
28 28 endTime = None
29 29 fileIndex = None
30 30 utcList = None #To select data in the utctime list
31 31 blockList = None #List to blocks to be read from the file
32 32 blocksPerFile = None #Number of blocks to be read
33 33 blockIndex = None
34 34 path = None
35 35 #List of Files
36 36 filenameList = None
37 37 datetimeList = None
38 38 #Hdf5 File
39 39 listMetaname = None
40 40 listMeta = None
41 41 listDataname = None
42 42 listData = None
43 43 listShapes = None
44 44 fp = None
45 45 #dataOut reconstruction
46 46 dataOut = None
47 47
48 48 def __init__(self):#, **kwargs):
49 49 ProcessingUnit.__init__(self) #, **kwargs)
50 50 self.dataOut = Parameters()
51 51 return
52 52
53 53 def setup(self, **kwargs):
54 54
55 55 path = kwargs['path']
56 56 startDate = kwargs['startDate']
57 57 endDate = kwargs['endDate']
58 58 startTime = kwargs['startTime']
59 59 endTime = kwargs['endTime']
60 60 walk = kwargs['walk']
61 61 if 'ext' in kwargs:
62 62 ext = kwargs['ext']
63 63 else:
64 64 ext = '.hdf5'
65 65 if 'timezone' in kwargs:
66 66 self.timezone = kwargs['timezone']
67 67 else:
68 68 self.timezone = 'lt'
69 69
70 70 print("[Reading] Searching files in offline mode ...")
71 71 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
72 72 startTime=startTime, endTime=endTime,
73 73 ext=ext, walk=walk)
74 74
75 75 if not(filenameList):
76 76 print("There is no files into the folder: %s"%(path))
77 77 sys.exit(-1)
78 78
79 79 self.fileIndex = -1
80 80 self.startTime = startTime
81 81 self.endTime = endTime
82 82
83 83 self.__readMetadata()
84 84
85 85 self.__setNextFileOffline()
86 86
87 87 return
88 88
89 89 def searchFilesOffLine(self,
90 90 path,
91 91 startDate=None,
92 92 endDate=None,
93 93 startTime=datetime.time(0,0,0),
94 94 endTime=datetime.time(23,59,59),
95 95 ext='.hdf5',
96 96 walk=True):
97 97
98 98 expLabel = ''
99 99 self.filenameList = []
100 100 self.datetimeList = []
101 101
102 102 pathList = []
103 103
104 104 JRODataObj = JRODataReader()
105 105 dateList, pathList = JRODataObj.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
106 106
107 107 if dateList == []:
108 108 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
109 109 datetime.datetime.combine(startDate,startTime).ctime(),
110 110 datetime.datetime.combine(endDate,endTime).ctime()))
111 111
112 112 return None, None
113 113
114 114 if len(dateList) > 1:
115 115 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
116 116 else:
117 117 print("[Reading] data was found for the date %s" %(dateList[0]))
118 118
119 119 filenameList = []
120 120 datetimeList = []
121 121
122 122 #----------------------------------------------------------------------------------
123 123
124 124 for thisPath in pathList:
125 125
126 126 fileList = glob.glob1(thisPath, "*%s" %ext)
127 127 fileList.sort()
128 128
129 129 for file in fileList:
130 130
131 131 filename = os.path.join(thisPath,file)
132 132
133 133 if not isFileInDateRange(filename, startDate, endDate):
134 134 continue
135 135
136 136 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
137 137
138 138 if not(thisDatetime):
139 139 continue
140 140
141 141 filenameList.append(filename)
142 142 datetimeList.append(thisDatetime)
143 143
144 144 if not(filenameList):
145 145 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
146 146 return None, None
147 147
148 148 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
149 149 print()
150 150
151 151 self.filenameList = filenameList
152 152 self.datetimeList = datetimeList
153 153
154 154 return pathList, filenameList
155 155
156 156 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
157 157
158 158 """
159 159 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
160 160
161 161 Inputs:
162 162 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
163 163 startDate : fecha inicial del rango seleccionado en formato datetime.date
164 164 endDate : fecha final del rango seleccionado en formato datetime.date
165 165 startTime : tiempo inicial del rango seleccionado en formato datetime.time
166 166 endTime : tiempo final del rango seleccionado en formato datetime.time
167 167
168 168 Return:
169 169 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
170 170 fecha especificado, de lo contrario retorna False.
171 171
172 172 Excepciones:
173 173 Si el archivo no existe o no puede ser abierto
174 174 Si la cabecera no puede ser leida.
175 175
176 176 """
177 177
178 178 try:
179 179 fp = h5py.File(filename,'r')
180 180 grp1 = fp['Data']
181 181
182 182 except IOError:
183 183 traceback.print_exc()
184 184 raise IOError("The file %s can't be opened" %(filename))
185 185
186 186 #In case has utctime attribute
187 187 grp2 = grp1['utctime']
188 188 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
189 189 thisUtcTime = grp2.value[0]
190 190
191 191 fp.close()
192 192
193 193 if self.timezone == 'lt':
194 194 thisUtcTime -= 5*3600
195 195
196 196 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
197 197 thisDate = thisDatetime.date()
198 198 thisTime = thisDatetime.time()
199 199
200 200 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
201 201 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
202 202
203 203 #General case
204 204 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
205 205 #-----------o----------------------------o-----------
206 206 # startTime endTime
207 207
208 208 if endTime >= startTime:
209 209 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
210 210 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
211 211 return thisDatetime
212 212 return None
213 213
214 214 #If endTime < startTime then endTime belongs to the next day
215 215 #<<<<<<<<<<<o o>>>>>>>>>>>
216 216 #-----------o----------------------------o-----------
217 217 # endTime startTime
218 218
219 219 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
220 220 return None
221 221
222 222 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
223 223 return None
224 224
225 225 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
226 226 return None
227 227
228 228 return thisDatetime
229 229
230 230 def __setNextFileOffline(self):
231 231
232 232 self.fileIndex += 1
233 233 idFile = self.fileIndex
234 234
235 235 if not(idFile < len(self.filenameList)):
236 236 self.dataOut.error = "No more Files"
237 237 return 0
238 238
239 239 filename = self.filenameList[idFile]
240 240 filePointer = h5py.File(filename,'r')
241 241 self.filename = filename
242 242 self.fp = filePointer
243 243
244 244 print("Setting the file: %s"%self.filename)
245 245
246 246 self.__setBlockList()
247 247 self.__readData()
248 248 self.blockIndex = 0
249 249 return 1
250 250
251 251 def __setBlockList(self):
252 252 '''
253 253 Selects the data within the times defined
254 254
255 255 self.fp
256 256 self.startTime
257 257 self.endTime
258 258
259 259 self.blockList
260 260 self.blocksPerFile
261 261
262 262 '''
263 263 fp = self.fp
264 264 startTime = self.startTime
265 265 endTime = self.endTime
266 266
267 267 grp = fp['Data']
268 268 thisUtcTime = grp['utctime'].value.astype(numpy.float)[0]
269 269
270 270 #ERROOOOR
271 271 if self.timezone == 'lt':
272 272 thisUtcTime -= 5*3600
273 273
274 274 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
275 275
276 276 thisDate = thisDatetime.date()
277 277 thisTime = thisDatetime.time()
278 278
279 279 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
280 280 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
281 281
282 282 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
283 283
284 284 self.blockList = ind
285 285 self.blocksPerFile = len(ind)
286 286
287 287 return
288 288
289 289 def __readMetadata(self):
290 290 '''
291 291 Reads Metadata
292 292
293 293 self.pathMeta
294 294 self.listShapes
295 295 self.listMetaname
296 296 self.listMeta
297 297
298 298 '''
299 299
300 300 filename = self.filenameList[0]
301 301 fp = h5py.File(filename,'r')
302 302 gp = fp['Metadata']
303 303
304 304 listMetaname = []
305 305 listMetadata = []
306 306 for item in list(gp.items()):
307 307 name = item[0]
308 308
309 309 if name=='array dimensions':
310 310 table = gp[name][:]
311 311 listShapes = {}
312 312 for shapes in table:
313 313 listShapes[shapes[0]] = numpy.array([shapes[1],shapes[2],shapes[3],shapes[4],shapes[5]])
314 314 else:
315 315 data = gp[name].value
316 316 listMetaname.append(name)
317 317 listMetadata.append(data)
318 318
319 319 self.listShapes = listShapes
320 320 self.listMetaname = listMetaname
321 321 self.listMeta = listMetadata
322 322
323 323 fp.close()
324 324 return
325 325
326 326 def __readData(self):
327 327 grp = self.fp['Data']
328 328 listdataname = []
329 329 listdata = []
330 330
331 331 for item in list(grp.items()):
332 332 name = item[0]
333 333 listdataname.append(name)
334 334
335 335 array = self.__setDataArray(grp[name],self.listShapes[name])
336 336 listdata.append(array)
337 337
338 338 self.listDataname = listdataname
339 339 self.listData = listdata
340 340 return
341 341
342 342 def __setDataArray(self, dataset, shapes):
343 343
344 344 nDims = shapes[0]
345 345 nDim2 = shapes[1] #Dimension 0
346 346 nDim1 = shapes[2] #Dimension 1, number of Points or Parameters
347 347 nDim0 = shapes[3] #Dimension 2, number of samples or ranges
348 348 mode = shapes[4] #Mode of storing
349 349 blockList = self.blockList
350 350 blocksPerFile = self.blocksPerFile
351 351
352 352 #Depending on what mode the data was stored
353 353 if mode == 0: #Divided in channels
354 354 arrayData = dataset.value.astype(numpy.float)[0][blockList]
355 355 if mode == 1: #Divided in parameter
356 356 strds = 'table'
357 357 nDatas = nDim1
358 358 newShapes = (blocksPerFile,nDim2,nDim0)
359 359 elif mode==2: #Concatenated in a table
360 360 strds = 'table0'
361 361 arrayData = dataset[strds].value
362 362 #Selecting part of the dataset
363 363 utctime = arrayData[:,0]
364 364 u, indices = numpy.unique(utctime, return_index=True)
365 365
366 366 if blockList.size != indices.size:
367 367 indMin = indices[blockList[0]]
368 368 if blockList[1] + 1 >= indices.size:
369 369 arrayData = arrayData[indMin:,:]
370 370 else:
371 371 indMax = indices[blockList[1] + 1]
372 372 arrayData = arrayData[indMin:indMax,:]
373 373 return arrayData
374 374
375 375 # One dimension
376 376 if nDims == 0:
377 377 arrayData = dataset.value.astype(numpy.float)[0][blockList]
378 378
379 379 # Two dimensions
380 380 elif nDims == 2:
381 381 arrayData = numpy.zeros((blocksPerFile,nDim1,nDim0))
382 382 newShapes = (blocksPerFile,nDim0)
383 383 nDatas = nDim1
384 384
385 385 for i in range(nDatas):
386 386 data = dataset[strds + str(i)].value
387 387 arrayData[:,i,:] = data[blockList,:]
388 388
389 389 # Three dimensions
390 390 else:
391 391 arrayData = numpy.zeros((blocksPerFile,nDim2,nDim1,nDim0))
392 392 for i in range(nDatas):
393 393
394 394 data = dataset[strds + str(i)].value
395 395
396 396 for b in range(blockList.size):
397 397 arrayData[b,:,i,:] = data[:,:,blockList[b]]
398 398
399 399 return arrayData
400 400
401 401 def __setDataOut(self):
402 402 listMeta = self.listMeta
403 403 listMetaname = self.listMetaname
404 404 listDataname = self.listDataname
405 405 listData = self.listData
406 406 listShapes = self.listShapes
407 407
408 408 blockIndex = self.blockIndex
409 409 # blockList = self.blockList
410 410
411 411 for i in range(len(listMeta)):
412 412 setattr(self.dataOut,listMetaname[i],listMeta[i])
413 413
414 414 for j in range(len(listData)):
415 415 nShapes = listShapes[listDataname[j]][0]
416 416 mode = listShapes[listDataname[j]][4]
417 417 if nShapes == 1:
418 418 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
419 419 elif nShapes > 1:
420 420 setattr(self.dataOut,listDataname[j],listData[j][blockIndex,:])
421 421 elif mode==0:
422 422 setattr(self.dataOut,listDataname[j],listData[j][blockIndex])
423 423 #Mode Meteors
424 424 elif mode ==2:
425 425 selectedData = self.__selectDataMode2(listData[j], blockIndex)
426 426 setattr(self.dataOut, listDataname[j], selectedData)
427 427 return
428 428
429 429 def __selectDataMode2(self, data, blockIndex):
430 430 utctime = data[:,0]
431 431 aux, indices = numpy.unique(utctime, return_inverse=True)
432 432 selInd = numpy.where(indices == blockIndex)[0]
433 433 selData = data[selInd,:]
434 434
435 435 return selData
436 436
437 437 def getData(self):
438 438
439 439 if self.blockIndex==self.blocksPerFile:
440 440 if not( self.__setNextFileOffline() ):
441 441 self.dataOut.flagNoData = True
442 442 return 0
443 443
444 444 self.__setDataOut()
445 445 self.dataOut.flagNoData = False
446 446
447 447 self.blockIndex += 1
448 448
449 449 return
450 450
451 451 def run(self, **kwargs):
452 452
453 453 if not(self.isConfig):
454 454 self.setup(**kwargs)
455 455 self.isConfig = True
456 456
457 457 self.getData()
458 458
459 459 return
460 460
461 461 @MPDecorator
462 462 class ParamWriter(Operation):
463 463 '''
464 464 HDF5 Writer, stores parameters data in HDF5 format files
465 465
466 466 path: path where the files will be stored
467 467 blocksPerFile: number of blocks that will be saved in per HDF5 format file
468 468 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
469 469 metadataList: list of attributes that will be stored as metadata
470 470 dataList: list of attributes that will be stores as data
471 471 '''
472 472
473 473 ext = ".hdf5"
474 474 optchar = "D"
475 475 metaoptchar = "M"
476 476 metaFile = None
477 477 filename = None
478 478 path = None
479 479 setFile = None
480 480 fp = None
481 481 grp = None
482 482 ds = None
483 483 firsttime = True
484 484 #Configurations
485 485 blocksPerFile = None
486 486 blockIndex = None
487 487 dataOut = None
488 488 #Data Arrays
489 489 dataList = None
490 490 metadataList = None
491 491 dsList = None #List of dictionaries with dataset properties
492 492 tableDim = None
493 493 dtype = [('arrayName', 'S20'),('nDimensions', 'i'), ('dim2', 'i'), ('dim1', 'i'),('dim0', 'i'),('mode', 'b')]
494 494 currentDay = None
495 495 lastTime = None
496 496 setType = None
497 497
498 498 def __init__(self):
499 499
500 500 Operation.__init__(self)
501 501 return
502 502
503 503 def setup(self, dataOut, path=None, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
504 504 self.path = path
505 505 self.blocksPerFile = blocksPerFile
506 506 self.metadataList = metadataList
507 507 self.dataList = dataList
508 508 self.dataOut = dataOut
509 509 self.mode = mode
510 510 if self.mode is not None:
511 511 self.mode = numpy.zeros(len(self.dataList)) + mode
512 512 else:
513 513 self.mode = numpy.ones(len(self.dataList))
514 514
515 515 self.setType = setType
516 516
517 517 arrayDim = numpy.zeros((len(self.dataList),5))
518 518
519 519 #Table dimensions
520 520 dtype0 = self.dtype
521 521 tableList = []
522 522
523 523 #Dictionary and list of tables
524 524 dsList = []
525 525
526 526 for i in range(len(self.dataList)):
527 527 dsDict = {}
528 528 dataAux = getattr(self.dataOut, self.dataList[i])
529 529 dsDict['variable'] = self.dataList[i]
530 530 #--------------------- Conditionals ------------------------
531 531 #There is no data
532 532
533 533 if dataAux is None:
534 534
535 535 return 0
536 536
537 537 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
538 538 dsDict['mode'] = 0
539 539 dsDict['nDim'] = 0
540 540 arrayDim[i,0] = 0
541 541 dsList.append(dsDict)
542 542
543 543 #Mode 2: meteors
544 544 elif self.mode[i] == 2:
545 545 dsDict['dsName'] = 'table0'
546 546 dsDict['mode'] = 2 # Mode meteors
547 547 dsDict['shape'] = dataAux.shape[-1]
548 548 dsDict['nDim'] = 0
549 549 dsDict['dsNumber'] = 1
550 550 arrayDim[i,3] = dataAux.shape[-1]
551 551 arrayDim[i,4] = self.mode[i] #Mode the data was stored
552 552 dsList.append(dsDict)
553 553
554 554 #Mode 1
555 555 else:
556 556 arrayDim0 = dataAux.shape #Data dimensions
557 557 arrayDim[i,0] = len(arrayDim0) #Number of array dimensions
558 558 arrayDim[i,4] = self.mode[i] #Mode the data was stored
559 559 strtable = 'table'
560 560 dsDict['mode'] = 1 # Mode parameters
561 561
562 562 # Three-dimension arrays
563 563 if len(arrayDim0) == 3:
564 564 arrayDim[i,1:-1] = numpy.array(arrayDim0)
565 565 nTables = int(arrayDim[i,2])
566 566 dsDict['dsNumber'] = nTables
567 567 dsDict['shape'] = arrayDim[i,2:4]
568 568 dsDict['nDim'] = 3
569 569
570 570 for j in range(nTables):
571 571 dsDict = dsDict.copy()
572 572 dsDict['dsName'] = strtable + str(j)
573 573 dsList.append(dsDict)
574 574
575 575 # Two-dimension arrays
576 576 elif len(arrayDim0) == 2:
577 577 arrayDim[i,2:-1] = numpy.array(arrayDim0)
578 578 nTables = int(arrayDim[i,2])
579 579 dsDict['dsNumber'] = nTables
580 580 dsDict['shape'] = arrayDim[i,3]
581 581 dsDict['nDim'] = 2
582 582
583 583 for j in range(nTables):
584 584 dsDict = dsDict.copy()
585 585 dsDict['dsName'] = strtable + str(j)
586 586 dsList.append(dsDict)
587 587
588 588 # One-dimension arrays
589 589 elif len(arrayDim0) == 1:
590 590 arrayDim[i,3] = arrayDim0[0]
591 591 dsDict['shape'] = arrayDim0[0]
592 592 dsDict['dsNumber'] = 1
593 593 dsDict['dsName'] = strtable + str(0)
594 594 dsDict['nDim'] = 1
595 595 dsList.append(dsDict)
596 596
597 597 table = numpy.array((self.dataList[i],) + tuple(arrayDim[i,:]),dtype = dtype0)
598 598 tableList.append(table)
599 599
600 600 self.dsList = dsList
601 601 self.tableDim = numpy.array(tableList, dtype = dtype0)
602 602 self.blockIndex = 0
603 603 timeTuple = time.localtime(dataOut.utctime)
604 604 self.currentDay = timeTuple.tm_yday
605 605
606 606 def putMetadata(self):
607 607
608 608 fp = self.createMetadataFile()
609 609 self.writeMetadata(fp)
610 610 fp.close()
611 611 return
612 612
613 613 def createMetadataFile(self):
614 614 ext = self.ext
615 615 path = self.path
616 616 setFile = self.setFile
617 617
618 618 timeTuple = time.localtime(self.dataOut.utctime)
619 619
620 620 subfolder = ''
621 621 fullpath = os.path.join( path, subfolder )
622 622
623 623 if not( os.path.exists(fullpath) ):
624 624 os.mkdir(fullpath)
625 625 setFile = -1 #inicializo mi contador de seteo
626 626
627 627 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
628 628 fullpath = os.path.join( path, subfolder )
629 629
630 630 if not( os.path.exists(fullpath) ):
631 631 os.mkdir(fullpath)
632 632 setFile = -1 #inicializo mi contador de seteo
633 633
634 634 else:
635 635 filesList = os.listdir( fullpath )
636 636 filesList = sorted( filesList, key=str.lower )
637 637 if len( filesList ) > 0:
638 638 filesList = [k for k in filesList if k.startswith(self.metaoptchar)]
639 639 filen = filesList[-1]
640 640 # el filename debera tener el siguiente formato
641 641 # 0 1234 567 89A BCDE (hex)
642 642 # x YYYY DDD SSS .ext
643 643 if isNumber( filen[8:11] ):
644 644 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
645 645 else:
646 646 setFile = -1
647 647 else:
648 648 setFile = -1 #inicializo mi contador de seteo
649 649
650 650 if self.setType is None:
651 651 setFile += 1
652 652 file = '%s%4.4d%3.3d%03d%s' % (self.metaoptchar,
653 653 timeTuple.tm_year,
654 654 timeTuple.tm_yday,
655 655 setFile,
656 656 ext )
657 657 else:
658 658 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
659 659 file = '%s%4.4d%3.3d%04d%s' % (self.metaoptchar,
660 660 timeTuple.tm_year,
661 661 timeTuple.tm_yday,
662 662 setFile,
663 663 ext )
664 664
665 665 filename = os.path.join( path, subfolder, file )
666 666 self.metaFile = file
667 667 #Setting HDF5 File
668 668 fp = h5py.File(filename,'w')
669 669
670 670 return fp
671 671
672 672 def writeMetadata(self, fp):
673 673
674 674 grp = fp.create_group("Metadata")
675 675 grp.create_dataset('array dimensions', data = self.tableDim, dtype = self.dtype)
676 676
677 677 for i in range(len(self.metadataList)):
678 678 grp.create_dataset(self.metadataList[i], data=getattr(self.dataOut, self.metadataList[i]))
679 679 return
680 680
681 681 def timeFlag(self):
682 682 currentTime = self.dataOut.utctime
683 683
684 684 if self.lastTime is None:
685 685 self.lastTime = currentTime
686 686
687 687 #Day
688 688 timeTuple = time.localtime(currentTime)
689 689 dataDay = timeTuple.tm_yday
690 690
691 691 #Time
692 692 timeDiff = currentTime - self.lastTime
693 693
694 694 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
695 695 if dataDay != self.currentDay:
696 696 self.currentDay = dataDay
697 697 return True
698 698 elif timeDiff > 3*60*60:
699 699 self.lastTime = currentTime
700 700 return True
701 701 else:
702 702 self.lastTime = currentTime
703 703 return False
704 704
705 705 def setNextFile(self):
706 706
707 707 ext = self.ext
708 708 path = self.path
709 709 setFile = self.setFile
710 710 mode = self.mode
711 711
712 712 timeTuple = time.localtime(self.dataOut.utctime)
713 713 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
714 714
715 715 fullpath = os.path.join( path, subfolder )
716 716
717 717 if os.path.exists(fullpath):
718 718 filesList = os.listdir( fullpath )
719 719 filesList = [k for k in filesList if 'M' in k]
720 720 if len( filesList ) > 0:
721 721 filesList = sorted( filesList, key=str.lower )
722 722 filen = filesList[-1]
723 723 # el filename debera tener el siguiente formato
724 724 # 0 1234 567 89A BCDE (hex)
725 725 # x YYYY DDD SSS .ext
726 726 if isNumber( filen[8:11] ):
727 727 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
728 728 else:
729 729 setFile = -1
730 730 else:
731 731 setFile = -1 #inicializo mi contador de seteo
732 732 else:
733 733 os.makedirs(fullpath)
734 734 setFile = -1 #inicializo mi contador de seteo
735 735
736 736 if self.setType is None:
737 737 setFile += 1
738 738 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
739 739 timeTuple.tm_year,
740 740 timeTuple.tm_yday,
741 741 setFile,
742 742 ext )
743 743 else:
744 744 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
745 745 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
746 746 timeTuple.tm_year,
747 747 timeTuple.tm_yday,
748 748 setFile,
749 749 ext )
750 750
751 751 filename = os.path.join( path, subfolder, file )
752 752
753 753 #Setting HDF5 File
754 754 fp = h5py.File(filename,'w')
755 755 #write metadata
756 756 self.writeMetadata(fp)
757 757 #Write data
758 758 grp = fp.create_group("Data")
759 759 ds = []
760 760 data = []
761 761 dsList = self.dsList
762 762 i = 0
763 763 while i < len(dsList):
764 764 dsInfo = dsList[i]
765 765 #One-dimension data
766 766 if dsInfo['mode'] == 0:
767 767 ds0 = grp.create_dataset(dsInfo['variable'], (1,1), maxshape=(1,self.blocksPerFile) , chunks = True, dtype=numpy.float64)
768 768 ds.append(ds0)
769 769 data.append([])
770 770 i += 1
771 771 continue
772 772
773 773 elif dsInfo['mode'] == 2:
774 774 grp0 = grp.create_group(dsInfo['variable'])
775 775 ds0 = grp0.create_dataset(dsInfo['dsName'], (1,dsInfo['shape']), data = numpy.zeros((1,dsInfo['shape'])) , maxshape=(None,dsInfo['shape']), chunks=True)
776 776 ds.append(ds0)
777 777 data.append([])
778 778 i += 1
779 779 continue
780 780
781 781 elif dsInfo['mode'] == 1:
782 782 grp0 = grp.create_group(dsInfo['variable'])
783 783
784 784 for j in range(dsInfo['dsNumber']):
785 785 dsInfo = dsList[i]
786 786 tableName = dsInfo['dsName']
787 787
788 788
789 789 if dsInfo['nDim'] == 3:
790 790 shape = dsInfo['shape'].astype(int)
791 791 ds0 = grp0.create_dataset(tableName, (shape[0],shape[1],1) , data = numpy.zeros((shape[0],shape[1],1)), maxshape = (None,shape[1],None), chunks=True)
792 792 else:
793 793 shape = int(dsInfo['shape'])
794 794 ds0 = grp0.create_dataset(tableName, (1,shape), data = numpy.zeros((1,shape)) , maxshape=(None,shape), chunks=True)
795 795
796 796 ds.append(ds0)
797 797 data.append([])
798 798 i += 1
799 799
800 800 fp.flush()
801 801 fp.close()
802 802
803 803 log.log('creating file: {}'.format(filename), 'Writing')
804 804 self.filename = filename
805 805 self.ds = ds
806 806 self.data = data
807 807 self.firsttime = True
808 808 self.blockIndex = 0
809 809 return
810 810
811 811 def putData(self):
812 812
813 813 if self.blockIndex == self.blocksPerFile or self.timeFlag():
814 814 self.setNextFile()
815 815
816 816 self.readBlock()
817 817 self.setBlock() #Prepare data to be written
818 818 self.writeBlock() #Write data
819 819
820 820 return
821 821
822 822 def readBlock(self):
823 823
824 824 '''
825 825 data Array configured
826 826
827 827
828 828 self.data
829 829 '''
830 830 dsList = self.dsList
831 831 ds = self.ds
832 832 #Setting HDF5 File
833 833 fp = h5py.File(self.filename,'r+')
834 834 grp = fp["Data"]
835 835 ind = 0
836 836
837 837 while ind < len(dsList):
838 838 dsInfo = dsList[ind]
839 839
840 840 if dsInfo['mode'] == 0:
841 841 ds0 = grp[dsInfo['variable']]
842 842 ds[ind] = ds0
843 843 ind += 1
844 844 else:
845 845
846 846 grp0 = grp[dsInfo['variable']]
847 847
848 848 for j in range(dsInfo['dsNumber']):
849 849 dsInfo = dsList[ind]
850 850 ds0 = grp0[dsInfo['dsName']]
851 851 ds[ind] = ds0
852 852 ind += 1
853 853
854 854 self.fp = fp
855 855 self.grp = grp
856 856 self.ds = ds
857 857
858 858 return
859 859
860 860 def setBlock(self):
861 861 '''
862 862 data Array configured
863 863
864 864
865 865 self.data
866 866 '''
867 867 #Creating Arrays
868 868 dsList = self.dsList
869 869 data = self.data
870 870 ind = 0
871 871
872 872 while ind < len(dsList):
873 873 dsInfo = dsList[ind]
874 874 dataAux = getattr(self.dataOut, dsInfo['variable'])
875 875
876 876 mode = dsInfo['mode']
877 877 nDim = dsInfo['nDim']
878 878
879 879 if mode == 0 or mode == 2 or nDim == 1:
880 880 data[ind] = dataAux
881 881 ind += 1
882 882 # elif nDim == 1:
883 883 # data[ind] = numpy.reshape(dataAux,(numpy.size(dataAux),1))
884 884 # ind += 1
885 885 elif nDim == 2:
886 886 for j in range(dsInfo['dsNumber']):
887 887 data[ind] = dataAux[j,:]
888 888 ind += 1
889 889 elif nDim == 3:
890 890 for j in range(dsInfo['dsNumber']):
891 891 data[ind] = dataAux[:,j,:]
892 892 ind += 1
893 893
894 894 self.data = data
895 895 return
896 896
897 897 def writeBlock(self):
898 898 '''
899 899 Saves the block in the HDF5 file
900 900 '''
901 901 dsList = self.dsList
902 902
903 903 for i in range(len(self.ds)):
904 904 dsInfo = dsList[i]
905 905 nDim = dsInfo['nDim']
906 906 mode = dsInfo['mode']
907 907
908 908 # First time
909 909 if self.firsttime:
910 910 if type(self.data[i]) == numpy.ndarray:
911 911
912 912 if nDim == 3:
913 913 self.data[i] = self.data[i].reshape((self.data[i].shape[0],self.data[i].shape[1],1))
914 914 self.ds[i].resize(self.data[i].shape)
915 915 if mode == 2:
916 916 self.ds[i].resize(self.data[i].shape)
917 917 self.ds[i][:] = self.data[i]
918 918 else:
919 919
920 920 # From second time
921 921 # Meteors!
922 922 if mode == 2:
923 923 dataShape = self.data[i].shape
924 924 dsShape = self.ds[i].shape
925 925 self.ds[i].resize((self.ds[i].shape[0] + dataShape[0],self.ds[i].shape[1]))
926 926 self.ds[i][dsShape[0]:,:] = self.data[i]
927 927 # No dimension
928 928 elif mode == 0:
929 929 self.ds[i].resize((self.ds[i].shape[0], self.ds[i].shape[1] + 1))
930 930 self.ds[i][0,-1] = self.data[i]
931 931 # One dimension
932 932 elif nDim == 1:
933 933 self.ds[i].resize((self.ds[i].shape[0] + 1, self.ds[i].shape[1]))
934 934 self.ds[i][-1,:] = self.data[i]
935 935 # Two dimension
936 936 elif nDim == 2:
937 937 self.ds[i].resize((self.ds[i].shape[0] + 1,self.ds[i].shape[1]))
938 938 self.ds[i][self.blockIndex,:] = self.data[i]
939 939 # Three dimensions
940 940 elif nDim == 3:
941 941 self.ds[i].resize((self.ds[i].shape[0],self.ds[i].shape[1],self.ds[i].shape[2]+1))
942 942 self.ds[i][:,:,-1] = self.data[i]
943 943
944 944 self.firsttime = False
945 945 self.blockIndex += 1
946 946
947 947 #Close to save changes
948 948 self.fp.flush()
949 949 self.fp.close()
950 950 return
951 951
952 952 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, mode=None, setType=None):
953 953
954 954 self.dataOut = dataOut
955 955 if not(self.isConfig):
956 956 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
957 957 metadataList=metadataList, dataList=dataList, mode=mode,
958 958 setType=setType)
959 959
960 960 self.isConfig = True
961 961 self.setNextFile()
962 962
963 963 self.putData()
964 964 return
965 965
966 966
967 967 @MPDecorator
968 968 class ParameterReader(JRODataReader,ProcessingUnit):
969 969 '''
970 970 Reads HDF5 format files
971 971 '''
972 972
973 973 ext = ".hdf5"
974 974 optchar = "D"
975 975 timezone = None
976 976 startTime = None
977 977 endTime = None
978 978 fileIndex = None
979 979 blockList = None #List to blocks to be read from the file
980 980 blocksPerFile = None #Number of blocks to be read
981 981 blockIndex = None
982 982 path = None
983 983 #List of Files
984 984 filenameList = None
985 985 datetimeList = None
986 986 #Hdf5 File
987 987 listMetaname = None
988 988 listMeta = None
989 989 listDataname = None
990 990 listData = None
991 991 listShapes = None
992 992 fp = None
993 993 #dataOut reconstruction
994 994 dataOut = None
995 995
996 996 def __init__(self):
997 997 ProcessingUnit.__init__(self)
998 998 self.dataOut = Parameters()
999 999 return
1000 1000
1001 1001 def setup(self, **kwargs):
1002 1002
1003 1003 path = kwargs['path']
1004 1004 startDate = kwargs['startDate']
1005 1005 endDate = kwargs['endDate']
1006 1006 startTime = kwargs['startTime']
1007 1007 endTime = kwargs['endTime']
1008 1008 walk = kwargs['walk']
1009 1009 if 'ext' in kwargs:
1010 1010 ext = kwargs['ext']
1011 1011 else:
1012 1012 ext = '.hdf5'
1013 1013 if 'timezone' in kwargs:
1014 1014 self.timezone = kwargs['timezone']
1015 1015 else:
1016 1016 self.timezone = 'lt'
1017 1017
1018 1018 print("[Reading] Searching files in offline mode ...")
1019 1019 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1020 1020 startTime=startTime, endTime=endTime,
1021 1021 ext=ext, walk=walk)
1022 1022
1023 1023 if not(filenameList):
1024 1024 print("There is no files into the folder: %s"%(path))
1025 1025 sys.exit(-1)
1026 1026
1027 1027 self.fileIndex = -1
1028 1028 self.startTime = startTime
1029 1029 self.endTime = endTime
1030 1030 self.__readMetadata()
1031 1031 self.__setNextFileOffline()
1032 1032
1033 1033 return
1034 1034
1035 1035 def searchFilesOffLine(self, path, startDate=None, endDate=None, startTime=datetime.time(0,0,0), endTime=datetime.time(23,59,59), ext='.hdf5', walk=True):
1036 1036
1037 1037 expLabel = ''
1038 1038 self.filenameList = []
1039 1039 self.datetimeList = []
1040 1040 pathList = []
1041 1041 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
1042 1042
1043 1043 if dateList == []:
1044 1044 print("[Reading] No *%s files in %s from %s to %s)"%(ext, path,
1045 1045 datetime.datetime.combine(startDate,startTime).ctime(),
1046 1046 datetime.datetime.combine(endDate,endTime).ctime()))
1047 1047
1048 1048 return None, None
1049 1049
1050 1050 if len(dateList) > 1:
1051 1051 print("[Reading] %d days were found in date range: %s - %s" %(len(dateList), startDate, endDate))
1052 1052 else:
1053 1053 print("[Reading] data was found for the date %s" %(dateList[0]))
1054 1054
1055 1055 filenameList = []
1056 1056 datetimeList = []
1057 1057
1058 1058 for thisPath in pathList:
1059 1059
1060 1060 fileList = glob.glob1(thisPath, "*%s" %ext)
1061 1061 fileList.sort()
1062 1062
1063 1063 for file in fileList:
1064 1064
1065 1065 filename = os.path.join(thisPath,file)
1066 1066
1067 1067 if not isFileInDateRange(filename, startDate, endDate):
1068 1068 continue
1069 1069
1070 1070 thisDatetime = self.__isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
1071 1071
1072 1072 if not(thisDatetime):
1073 1073 continue
1074 1074
1075 1075 filenameList.append(filename)
1076 1076 datetimeList.append(thisDatetime)
1077 1077
1078 1078 if not(filenameList):
1079 1079 print("[Reading] Any file was found int time range %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
1080 1080 return None, None
1081 1081
1082 1082 print("[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime))
1083 1083 print()
1084 1084
1085 1085 self.filenameList = filenameList
1086 1086 self.datetimeList = datetimeList
1087 1087
1088 1088 return pathList, filenameList
1089 1089
1090 1090 def __isFileInTimeRange(self,filename, startDate, endDate, startTime, endTime):
1091 1091
1092 1092 """
1093 1093 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
1094 1094
1095 1095 Inputs:
1096 1096 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
1097 1097 startDate : fecha inicial del rango seleccionado en formato datetime.date
1098 1098 endDate : fecha final del rango seleccionado en formato datetime.date
1099 1099 startTime : tiempo inicial del rango seleccionado en formato datetime.time
1100 1100 endTime : tiempo final del rango seleccionado en formato datetime.time
1101 1101
1102 1102 Return:
1103 1103 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
1104 1104 fecha especificado, de lo contrario retorna False.
1105 1105
1106 1106 Excepciones:
1107 1107 Si el archivo no existe o no puede ser abierto
1108 1108 Si la cabecera no puede ser leida.
1109 1109
1110 1110 """
1111 1111
1112 1112 try:
1113 1113 fp = h5py.File(filename, 'r')
1114 1114 grp1 = fp['Data']
1115 1115
1116 1116 except IOError:
1117 1117 traceback.print_exc()
1118 1118 raise IOError("The file %s can't be opened" %(filename))
1119 1119 #In case has utctime attribute
1120 1120 grp2 = grp1['utctime']
1121 1121 thisUtcTime = grp2.value[0]
1122 1122
1123 1123 fp.close()
1124 1124
1125 1125 if self.timezone == 'lt':
1126 1126 thisUtcTime -= 5*3600
1127 1127
1128 1128 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime + 5*3600)
1129 1129 thisDate = thisDatetime.date()
1130 1130 thisTime = thisDatetime.time()
1131 1131
1132 1132 startUtcTime = (datetime.datetime.combine(thisDate,startTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1133 1133 endUtcTime = (datetime.datetime.combine(thisDate,endTime)- datetime.datetime(1970, 1, 1)).total_seconds()
1134 1134
1135 1135 #General case
1136 1136 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
1137 1137 #-----------o----------------------------o-----------
1138 1138 # startTime endTime
1139 1139
1140 1140 if endTime >= startTime:
1141 1141 thisUtcLog = numpy.logical_and(thisUtcTime > startUtcTime, thisUtcTime < endUtcTime)
1142 1142 if numpy.any(thisUtcLog): #If there is one block between the hours mentioned
1143 1143 return thisDatetime
1144 1144 return None
1145 1145
1146 1146 #If endTime < startTime then endTime belongs to the next day
1147 1147 #<<<<<<<<<<<o o>>>>>>>>>>>
1148 1148 #-----------o----------------------------o-----------
1149 1149 # endTime startTime
1150 1150
1151 1151 if (thisDate == startDate) and numpy.all(thisUtcTime < startUtcTime):
1152 1152 return None
1153 1153
1154 1154 if (thisDate == endDate) and numpy.all(thisUtcTime > endUtcTime):
1155 1155 return None
1156 1156
1157 1157 if numpy.all(thisUtcTime < startUtcTime) and numpy.all(thisUtcTime > endUtcTime):
1158 1158 return None
1159 1159
1160 1160 return thisDatetime
1161 1161
1162 1162 def __setNextFileOffline(self):
1163 1163
1164 1164 self.fileIndex += 1
1165 1165 idFile = self.fileIndex
1166 1166
1167 1167 if not(idFile < len(self.filenameList)):
1168 1168 self.dataOut.error = 'No more files'
1169 1169 return 0
1170 1170
1171 1171 filename = self.filenameList[idFile]
1172 1172 self.fp = h5py.File(filename, 'r')
1173 1173 self.filename = filename
1174 1174
1175 1175 print("Setting the file: %s"%self.filename)
1176 1176
1177 1177 self.__setBlockList()
1178 1178 self.__readData()
1179 1179 self.blockIndex = 0
1180 1180 return 1
1181 1181
1182 1182 def __setBlockList(self):
1183 1183 '''
1184 1184 Selects the data within the times defined
1185 1185
1186 1186 self.fp
1187 1187 self.startTime
1188 1188 self.endTime
1189 1189 self.blockList
1190 1190 self.blocksPerFile
1191 1191
1192 1192 '''
1193 1193 fp = self.fp
1194 1194 startTime = self.startTime
1195 1195 endTime = self.endTime
1196 1196
1197 1197 grp = fp['Data']
1198 1198 thisUtcTime = grp['utctime'].value
1199 1199
1200 1200 if self.timezone == 'lt':
1201 1201 thisUtcTime -= 5*3600
1202 1202
1203 1203 thisDatetime = datetime.datetime.fromtimestamp(thisUtcTime[0] + 5*3600)
1204 1204
1205 1205 thisDate = thisDatetime.date()
1206 1206 thisTime = thisDatetime.time()
1207 1207
1208 1208 startUtcTime = (datetime.datetime.combine(thisDate,startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1209 1209 endUtcTime = (datetime.datetime.combine(thisDate,endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
1210 1210
1211 1211 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
1212 1212
1213 1213 self.blockList = ind
1214 1214 self.blocksPerFile = len(ind)
1215 1215 return
1216 1216
1217 1217 def __readMetadata(self):
1218 1218 '''
1219 1219 Reads Metadata
1220 1220 '''
1221 1221
1222 1222 filename = self.filenameList[0]
1223 1223 fp = h5py.File(filename, 'r')
1224 1224 gp = fp['Metadata']
1225 1225 listMetaname = []
1226 1226 listMetadata = []
1227 1227
1228 1228 for item in list(gp.items()):
1229 1229 name = item[0]
1230 1230
1231 1231 if name=='variables':
1232 1232 table = gp[name][:]
1233 1233 listShapes = {}
1234 1234 for shapes in table:
1235 1235 listShapes[shapes[0].decode()] = numpy.array([shapes[1]])
1236 1236 else:
1237 1237 data = gp[name].value
1238 1238 listMetaname.append(name)
1239 1239 listMetadata.append(data)
1240 1240
1241 1241 self.listShapes = listShapes
1242 1242 self.listMetaname = listMetaname
1243 1243 self.listMeta = listMetadata
1244 1244
1245 1245 fp.close()
1246 1246 return
1247 1247
1248 1248 def __readData(self):
1249 1249
1250 1250 grp = self.fp['Data']
1251 1251 listdataname = []
1252 1252 listdata = []
1253 1253
1254 1254 for item in list(grp.items()):
1255 1255 name = item[0]
1256 1256 listdataname.append(name)
1257 1257 dim = self.listShapes[name][0]
1258 1258 if dim == 0:
1259 1259 array = grp[name].value
1260 1260 else:
1261 1261 array = []
1262 1262 for i in range(dim):
1263 1263 array.append(grp[name]['table{:02d}'.format(i)].value)
1264 1264 array = numpy.array(array)
1265 1265
1266 1266 listdata.append(array)
1267 1267
1268 1268 self.listDataname = listdataname
1269 1269 self.listData = listdata
1270 1270 return
1271 1271
1272 1272 def getData(self):
1273 1273
1274 1274 for i in range(len(self.listMeta)):
1275 1275 setattr(self.dataOut, self.listMetaname[i], self.listMeta[i])
1276 1276
1277 1277 for j in range(len(self.listData)):
1278 1278 dim = self.listShapes[self.listDataname[j]][0]
1279 1279 if dim == 0:
1280 1280 setattr(self.dataOut, self.listDataname[j], self.listData[j][self.blockIndex])
1281 1281 else:
1282 1282 setattr(self.dataOut, self.listDataname[j], self.listData[j][:,self.blockIndex])
1283 1283
1284 1284 self.dataOut.flagNoData = False
1285 1285 self.blockIndex += 1
1286 1286
1287 1287 return
1288 1288
1289 1289 def run(self, **kwargs):
1290 1290
1291 1291 if not(self.isConfig):
1292 1292 self.setup(**kwargs)
1293 1293 self.isConfig = True
1294 1294
1295 1295 if self.blockIndex == self.blocksPerFile:
1296 1296 if not(self.__setNextFileOffline()):
1297 1297 self.dataOut.flagNoData = True
1298 1298 return 0
1299 1299
1300 1300 self.getData()
1301 1301
1302 1302 return
1303 1303
1304 1304 @MPDecorator
1305 1305 class ParameterWriter(Operation):
1306 1306 '''
1307 1307 HDF5 Writer, stores parameters data in HDF5 format files
1308 1308
1309 1309 path: path where the files will be stored
1310 1310 blocksPerFile: number of blocks that will be saved in per HDF5 format file
1311 1311 mode: selects the data stacking mode: '0' channels, '1' parameters, '3' table (for meteors)
1312 1312 metadataList: list of attributes that will be stored as metadata
1313 1313 dataList: list of attributes that will be stores as data
1314 1314 '''
1315 1315
1316 1316
1317 1317 ext = ".hdf5"
1318 1318 optchar = "D"
1319 1319 metaoptchar = "M"
1320 1320 metaFile = None
1321 1321 filename = None
1322 1322 path = None
1323 1323 setFile = None
1324 1324 fp = None
1325 1325 grp = None
1326 1326 ds = None
1327 1327 firsttime = True
1328 1328 #Configurations
1329 1329 blocksPerFile = None
1330 1330 blockIndex = None
1331 1331 dataOut = None
1332 1332 #Data Arrays
1333 1333 dataList = None
1334 1334 metadataList = None
1335 1335 dsList = None #List of dictionaries with dataset properties
1336 1336 tableDim = None
1337 1337 dtype = [('name', 'S20'),('nDim', 'i')]
1338 1338 currentDay = None
1339 1339 lastTime = None
1340 1340
1341 1341 def __init__(self):
1342 1342
1343 1343 Operation.__init__(self)
1344 1344 return
1345 1345
1346 1346 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1347 1347 self.path = path
1348 1348 self.blocksPerFile = blocksPerFile
1349 1349 self.metadataList = metadataList
1350 1350 self.dataList = dataList
1351 1351 self.setType = setType
1352 1352
1353 1353 tableList = []
1354 1354 dsList = []
1355 1355
1356 1356 for i in range(len(self.dataList)):
1357 1357 dsDict = {}
1358 1358 dataAux = getattr(self.dataOut, self.dataList[i])
1359 1359 dsDict['variable'] = self.dataList[i]
1360 1360
1361 1361 if dataAux is None:
1362 1362 continue
1363 1363 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
1364 1364 dsDict['nDim'] = 0
1365 1365 else:
1366 1366 dsDict['nDim'] = len(dataAux.shape)
1367 1367 dsDict['shape'] = dataAux.shape
1368 1368 dsDict['dsNumber'] = dataAux.shape[0]
1369 1369
1370 1370 dsList.append(dsDict)
1371 1371 tableList.append((self.dataList[i], dsDict['nDim']))
1372 1372
1373 1373 self.dsList = dsList
1374 1374 self.tableDim = numpy.array(tableList, dtype=self.dtype)
1375 1375 self.currentDay = self.dataOut.datatime.date()
1376 1376
1377 1377 def timeFlag(self):
1378 1378 currentTime = self.dataOut.utctime
1379 1379 timeTuple = time.localtime(currentTime)
1380 1380 dataDay = timeTuple.tm_yday
1381 1381
1382 1382 if self.lastTime is None:
1383 1383 self.lastTime = currentTime
1384 1384 self.currentDay = dataDay
1385 1385 return False
1386 1386
1387 1387 timeDiff = currentTime - self.lastTime
1388 1388
1389 1389 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
1390 1390 if dataDay != self.currentDay:
1391 1391 self.currentDay = dataDay
1392 1392 return True
1393 1393 elif timeDiff > 3*60*60:
1394 1394 self.lastTime = currentTime
1395 1395 return True
1396 1396 else:
1397 1397 self.lastTime = currentTime
1398 1398 return False
1399 1399
1400 1400 def run(self, dataOut, path, blocksPerFile=10, metadataList=None, dataList=None, setType=None):
1401 1401
1402 1402 self.dataOut = dataOut
1403 1403 if not(self.isConfig):
1404 1404 self.setup(path=path, blocksPerFile=blocksPerFile,
1405 1405 metadataList=metadataList, dataList=dataList,
1406 1406 setType=setType)
1407 1407
1408 1408 self.isConfig = True
1409 1409 self.setNextFile()
1410 1410
1411 1411 self.putData()
1412 1412 return
1413 1413
1414 1414 def setNextFile(self):
1415 1415
1416 1416 ext = self.ext
1417 1417 path = self.path
1418 1418 setFile = self.setFile
1419 1419
1420 1420 timeTuple = time.localtime(self.dataOut.utctime)
1421 1421 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1422 1422 fullpath = os.path.join(path, subfolder)
1423 1423
1424 1424 if os.path.exists(fullpath):
1425 1425 filesList = os.listdir(fullpath)
1426 1426 filesList = [k for k in filesList if k.startswith(self.optchar)]
1427 1427 if len( filesList ) > 0:
1428 1428 filesList = sorted(filesList, key=str.lower)
1429 1429 filen = filesList[-1]
1430 1430 # el filename debera tener el siguiente formato
1431 1431 # 0 1234 567 89A BCDE (hex)
1432 1432 # x YYYY DDD SSS .ext
1433 1433 if isNumber(filen[8:11]):
1434 1434 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
1435 1435 else:
1436 1436 setFile = -1
1437 1437 else:
1438 1438 setFile = -1 #inicializo mi contador de seteo
1439 1439 else:
1440 1440 os.makedirs(fullpath)
1441 1441 setFile = -1 #inicializo mi contador de seteo
1442 1442
1443 1443 if self.setType is None:
1444 1444 setFile += 1
1445 1445 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
1446 1446 timeTuple.tm_year,
1447 1447 timeTuple.tm_yday,
1448 1448 setFile,
1449 1449 ext )
1450 1450 else:
1451 1451 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
1452 1452 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
1453 1453 timeTuple.tm_year,
1454 1454 timeTuple.tm_yday,
1455 1455 setFile,
1456 1456 ext )
1457 1457
1458 1458 self.filename = os.path.join( path, subfolder, file )
1459 1459
1460 1460 #Setting HDF5 File
1461 1461 self.fp = h5py.File(self.filename, 'w')
1462 1462 #write metadata
1463 1463 self.writeMetadata(self.fp)
1464 1464 #Write data
1465 1465 self.writeData(self.fp)
1466 1466
1467 1467 def writeMetadata(self, fp):
1468 1468
1469 1469 grp = fp.create_group("Metadata")
1470 1470 grp.create_dataset('variables', data=self.tableDim, dtype=self.dtype)
1471 1471
1472 1472 for i in range(len(self.metadataList)):
1473 1473 if not hasattr(self.dataOut, self.metadataList[i]):
1474 1474 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
1475 1475 continue
1476 1476 value = getattr(self.dataOut, self.metadataList[i])
1477 1477 grp.create_dataset(self.metadataList[i], data=value)
1478 1478 return
1479 1479
1480 1480 def writeData(self, fp):
1481 1481
1482 1482 grp = fp.create_group("Data")
1483 1483 dtsets = []
1484 1484 data = []
1485 1485
1486 1486 for dsInfo in self.dsList:
1487 1487 if dsInfo['nDim'] == 0:
1488 1488 ds = grp.create_dataset(
1489 1489 dsInfo['variable'],
1490 1490 (self.blocksPerFile, ),
1491 1491 chunks=True,
1492 1492 dtype=numpy.float64)
1493 1493 dtsets.append(ds)
1494 1494 data.append((dsInfo['variable'], -1))
1495 1495 else:
1496 1496 sgrp = grp.create_group(dsInfo['variable'])
1497 1497 for i in range(dsInfo['dsNumber']):
1498 1498 ds = sgrp.create_dataset(
1499 1499 'table{:02d}'.format(i),
1500 1500 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1501 1501 chunks=True)
1502 1502 dtsets.append(ds)
1503 1503 data.append((dsInfo['variable'], i))
1504 1504 fp.flush()
1505 1505
1506 log.log('creating file: {}'.format(fp.filename), 'Writing')
1506 log.log('Creating file: {}'.format(fp.filename), self.name)
1507 1507
1508 1508 self.ds = dtsets
1509 1509 self.data = data
1510 1510 self.firsttime = True
1511 1511 self.blockIndex = 0
1512 1512 return
1513 1513
1514 1514 def putData(self):
1515 1515
1516 1516 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
1517 1517 self.closeFile()
1518 1518 self.setNextFile()
1519 1519
1520 1520 for i, ds in enumerate(self.ds):
1521 1521 attr, ch = self.data[i]
1522 1522 if ch == -1:
1523 1523 ds[self.blockIndex] = getattr(self.dataOut, attr)
1524 1524 else:
1525 1525 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
1526 1526
1527 1527 self.fp.flush()
1528 1528 self.blockIndex += 1
1529 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1529 1530
1530 1531 return
1531 1532
1532 1533 def closeFile(self):
1533 1534
1534 1535 if self.blockIndex != self.blocksPerFile:
1535 1536 for ds in self.ds:
1536 1537 ds.resize(self.blockIndex, axis=0)
1537 1538
1538 1539 self.fp.flush()
1539 1540 self.fp.close()
1540 1541
1541 1542 def close(self):
1542 1543
1543 1544 self.closeFile()
@@ -1,404 +1,424
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import os
16 16 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 from queue import Queue
21 from threading import Thread
20 22 from multiprocessing import Process
21 23 from zmq.utils.monitor import recv_monitor_message
22 24
23 25 from schainpy.utils import log
24 26
25 27
26 28 class ProcessingUnit(object):
27 29
28 30 """
29 31 Update - Jan 2018 - MULTIPROCESSING
30 32 All the "call" methods present in the previous base were removed.
31 33 The majority of operations are independant processes, thus
32 34 the decorator is in charge of communicate the operation processes
33 35 with the proccessing unit via IPC.
34 36
35 37 The constructor does not receive any argument. The remaining methods
36 38 are related with the operations to execute.
37 39
38 40
39 41 """
40 42
41 43 def __init__(self):
42 44
43 45 self.dataIn = None
44 46 self.dataOut = None
45 47 self.isConfig = False
46 48 self.operations = []
47 49 self.plots = []
48 50
49 51 def getAllowedArgs(self):
50 52 if hasattr(self, '__attrs__'):
51 53 return self.__attrs__
52 54 else:
53 55 return inspect.getargspec(self.run).args
54 56
55 57 def addOperation(self, conf, operation):
56 58 """
57 59 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 60 posses the id of the operation process (IPC purposes)
59 61
60 62 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 63 identificador asociado a este objeto.
62 64
63 65 Input:
64 66
65 67 object : objeto de la clase "Operation"
66 68
67 69 Return:
68 70
69 71 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 72 """
71 73
72 74 self.operations.append(
73 75 (operation, conf.type, conf.id, conf.getKwargs()))
74 76
75 77 if 'plot' in self.name.lower():
76 78 self.plots.append(operation.CODE)
77 79
78 80 def getOperationObj(self, objId):
79 81
80 82 if objId not in list(self.operations.keys()):
81 83 return None
82 84
83 85 return self.operations[objId]
84 86
85 87 def operation(self, **kwargs):
86 88 """
87 89 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 90 atributos del objeto dataOut
89 91
90 92 Input:
91 93
92 94 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 95 """
94 96
95 97 raise NotImplementedError
96 98
97 99 def setup(self):
98 100
99 101 raise NotImplementedError
100 102
101 103 def run(self):
102 104
103 105 raise NotImplementedError
104 106
105 107 def close(self):
106 108
107 109 return
108 110
109 111
110 112 class Operation(object):
111 113
112 114 """
113 115 Update - Jan 2018 - MULTIPROCESSING
114 116
115 117 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 118 The constructor doe snot receive any argument, neither the baseclass.
117 119
118 120
119 121 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 122 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 123 acumulacion dentro de esta clase
122 124
123 125 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 126
125 127 """
126 128
127 129 def __init__(self):
128 130
129 131 self.id = None
130 132 self.isConfig = False
131 133
132 134 if not hasattr(self, 'name'):
133 135 self.name = self.__class__.__name__
134 136
135 137 def getAllowedArgs(self):
136 138 if hasattr(self, '__attrs__'):
137 139 return self.__attrs__
138 140 else:
139 141 return inspect.getargspec(self.run).args
140 142
141 143 def setup(self):
142 144
143 145 self.isConfig = True
144 146
145 147 raise NotImplementedError
146 148
147 149 def run(self, dataIn, **kwargs):
148 150 """
149 151 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 152 atributos del objeto dataIn.
151 153
152 154 Input:
153 155
154 156 dataIn : objeto del tipo JROData
155 157
156 158 Return:
157 159
158 160 None
159 161
160 162 Affected:
161 163 __buffer : buffer de recepcion de datos.
162 164
163 165 """
164 166 if not self.isConfig:
165 167 self.setup(**kwargs)
166 168
167 169 raise NotImplementedError
168 170
169 171 def close(self):
170 172
171 173 return
172 174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
173 201
174 202 def MPDecorator(BaseClass):
175 203 """
176 204 Multiprocessing class decorator
177 205
178 206 This function add multiprocessing features to a BaseClass. Also, it handle
179 207 the communication beetween processes (readers, procUnits and operations).
180 208 """
181 209
182 210 class MPClass(BaseClass, Process):
183 211
184 212 def __init__(self, *args, **kwargs):
185 213 super(MPClass, self).__init__()
186 214 Process.__init__(self)
187 215 self.operationKwargs = {}
188 216 self.args = args
189 217 self.kwargs = kwargs
190 218 self.sender = None
191 219 self.receiver = None
192 self.i = 0
220 self.i = 0
193 221 self.name = BaseClass.__name__
194 222 if 'plot' in self.name.lower() and not self.name.endswith('_'):
195 223 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
196 224 self.start_time = time.time()
197 225
198 226 if len(self.args) is 3:
199 227 self.typeProc = "ProcUnit"
200 228 self.id = args[0]
201 229 self.inputId = args[1]
202 230 self.project_id = args[2]
203 231 elif len(self.args) is 2:
204 232 self.id = args[0]
205 233 self.inputId = args[0]
206 234 self.project_id = args[1]
207 235 self.typeProc = "Operation"
208
209 def fix_publish(self,valor,multiple1):
210 return True if valor%multiple1 ==0 else False
236
237 self.queue = InputQueue(self.project_id, self.inputId)
211 238
212 239 def subscribe(self):
213 240 '''
214 This function create a socket to receive objects from the
215 topic `inputId`.
241 This function start the input queue.
216 242 '''
217
218 c = zmq.Context()
219 self.receiver = c.socket(zmq.SUB)
220 self.receiver.connect(
221 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
222 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
243
244 self.queue.start()
245
223 246
224 247 def listen(self):
225 248 '''
226 This function waits for objects and deserialize using pickle
249 This function waits for objects
227 250 '''
228 try:
229 data = pickle.loads(self.receiver.recv_multipart()[1])
230 except zmq.ZMQError as e:
231 if e.errno == zmq.ETERM:
232 print (e.errno)
233
234 return data
251
252 return self.queue.get()
235 253
236 254 def set_publisher(self):
237 255 '''
238 256 This function create a socket for publishing purposes.
239 257 '''
240 258
241 259 time.sleep(1)
242 260 c = zmq.Context()
243 261 self.sender = c.socket(zmq.PUB)
244 262 self.sender.connect(
245 263 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
246 264
247 265 def publish(self, data, id):
248 266 '''
249 267 This function publish an object, to a specific topic.
250 The fix method only affect inputId None which is Read Unit
251 Use value between 64 80, you should notice a little retard in processing
268 For Read Units (inputId == None) adds a little delay
269 to avoid data loss
252 270 '''
271
253 272 if self.inputId is None:
254 self.i+=1
255 if self.fix_publish(self.i,80) == True:# value n
256 time.sleep(0.01)
273 self.i += 1
274 if self.i % 100 == 0:
275 self.i = 0
276 time.sleep(0.01)
257 277
258 278 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
259 279
260 280 def runReader(self):
261 281 '''
262 282 Run fuction for read units
263 283 '''
264 284 while True:
265 285
266 286 BaseClass.run(self, **self.kwargs)
267 287
268 288 for op, optype, opId, kwargs in self.operations:
269 289 if optype == 'self' and not self.dataOut.flagNoData:
270 290 op(**kwargs)
271 291 elif optype == 'other' and not self.dataOut.flagNoData:
272 292 self.dataOut = op.run(self.dataOut, **self.kwargs)
273 293 elif optype == 'external':
274 294 self.publish(self.dataOut, opId)
275 295
276 296 if self.dataOut.flagNoData and not self.dataOut.error:
277 297 continue
278 298
279 299 self.publish(self.dataOut, self.id)
280 300
281 301 if self.dataOut.error:
282 302 log.error(self.dataOut.error, self.name)
283 303 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
284 304 break
285 305
286 306 time.sleep(1)
287 307
288 308 def runProc(self):
289 309 '''
290 310 Run function for proccessing units
291 311 '''
292 312
293 313 while True:
294 314 self.dataIn = self.listen()
295 315
296 316 if self.dataIn.flagNoData and self.dataIn.error is None:
297 317 continue
298 318
299 319 BaseClass.run(self, **self.kwargs)
300 320
301 321 if self.dataIn.error:
302 322 self.dataOut.error = self.dataIn.error
303 323 self.dataOut.flagNoData = True
304 324
305 325 for op, optype, opId, kwargs in self.operations:
306 326 if optype == 'self' and not self.dataOut.flagNoData:
307 327 op(**kwargs)
308 328 elif optype == 'other' and not self.dataOut.flagNoData:
309 329 self.dataOut = op.run(self.dataOut, **kwargs)
310 330 elif optype == 'external' and not self.dataOut.flagNoData:
311 331 self.publish(self.dataOut, opId)
312 332
313 333 if not self.dataOut.flagNoData or self.dataOut.error:
314 334 self.publish(self.dataOut, self.id)
315 335 for op, optype, opId, kwargs in self.operations:
316 336 if optype == 'self' and self.dataOut.error:
317 337 op(**kwargs)
318 338 elif optype == 'other' and self.dataOut.error:
319 339 self.dataOut = op.run(self.dataOut, **kwargs)
320 340 elif optype == 'external' and self.dataOut.error:
321 341 self.publish(self.dataOut, opId)
322 342
323 343 if self.dataIn.error:
324 344 break
325 345
326 346 time.sleep(1)
327 347
328 348 def runOp(self):
329 349 '''
330 350 Run function for external operations (this operations just receive data
331 351 ex: plots, writers, publishers)
332 352 '''
333 353
334 354 while True:
335 355
336 356 dataOut = self.listen()
337 357
338 358 BaseClass.run(self, dataOut, **self.kwargs)
339 359
340 360 if dataOut.error:
341 361 break
342 362
343 363 time.sleep(1)
344 364
345 365 def run(self):
346 366 if self.typeProc is "ProcUnit":
347 367
348 368 if self.inputId is not None:
349 369
350 370 self.subscribe()
351 371
352 372 self.set_publisher()
353 373
354 374 if 'Reader' not in BaseClass.__name__:
355 375 self.runProc()
356 376 else:
357 377 self.runReader()
358 378
359 379 elif self.typeProc is "Operation":
360 380
361 381 self.subscribe()
362 382 self.runOp()
363 383
364 384 else:
365 385 raise ValueError("Unknown type")
366 386
367 387 self.close()
368 388
369 389 def event_monitor(self, monitor):
370 390
371 391 events = {}
372 392
373 393 for name in dir(zmq):
374 394 if name.startswith('EVENT_'):
375 395 value = getattr(zmq, name)
376 396 events[value] = name
377 397
378 398 while monitor.poll():
379 399 evt = recv_monitor_message(monitor)
380 400 if evt['event'] == 32:
381 401 self.connections += 1
382 402 if evt['event'] == 512:
383 403 pass
384 404
385 405 evt.update({'description': events[evt['event']]})
386 406
387 407 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
388 408 break
389 409 monitor.close()
390 410 print('event monitor thread done!')
391 411
392 412 def close(self):
393 413
394 414 BaseClass.close(self)
395 415
396 416 if self.sender:
397 417 self.sender.close()
398 418
399 419 if self.receiver:
400 420 self.receiver.close()
401 421
402 422 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
403 423
404 424 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now