##// END OF EJS Templates
NCDFReader -> PXReader, add PXParametersProc to select ppi or rhi modes
jespinoza -
r1140:cd2e2ed64989
parent child
Show More
@@ -0,0 +1,24
1 """
2 Utilities for IO modules
3 """
4
5 import os
6 from datetime import datetime
7
8 def folder_in_range(folder, start_date, end_date, pattern):
9 """
10 Check whether folder is bettwen start_date and end_date
11
12 Args:
13 folder (str): Folder to check
14 start_date (date): Initial date
15 end_date (date): Final date
16 pattern (str): Datetime format of the folder
17 Returns:
18 bool: True for success, False otherwise
19 """
20 try:
21 dt = datetime.strptime(folder, pattern)
22 except:
23 raise ValueError('Folder {} does not match {} format'.format(folder, pattern))
24 return start_date <= dt.date() <= end_date
@@ -0,0 +1,64
1 '''
2 Created on Oct 24, 2016
3
4 @author: roj- LouVD
5 '''
6
7 import numpy
8 import datetime
9 import time
10 from time import gmtime
11
12 from numpy import transpose
13
14 from jroproc_base import ProcessingUnit, Operation
15 from schainpy.model.data.jrodata import Parameters
16
17
18 class PXParametersProc(ProcessingUnit):
19 '''
20 Processing unit for PX parameters data
21 '''
22
23 def __init__(self, **kwargs):
24 """
25 Inputs: None
26 """
27 ProcessingUnit.__init__(self, **kwargs)
28 self.dataOut = Parameters()
29 self.isConfig = False
30
31 def setup(self, mode):
32 """
33 """
34 self.dataOut.mode = mode
35
36 def run(self, mode):
37 """
38 Args:
39 mode (str): select independent variable 'E' for elevation or 'A' for azimuth
40 """
41
42 if not self.isConfig:
43 self.setup(mode)
44 self.isConfig = True
45
46 if self.dataIn.type == 'Parameters':
47 self.dataOut.copy(self.dataIn)
48
49 self.dataOut.data_param = numpy.array([self.dataOut.data[var] for var in self.dataOut.parameters])
50 self.dataOut.data_param[self.dataOut.data_param == self.dataOut.missing] = numpy.nan
51
52 if mode.upper()=='E':
53 self.dataOut.heightList = self.dataOut.data['Azimuth']
54 else:
55 self.dataOut.heightList = self.dataOut.data['Elevation']
56
57 attrs = ['units', 'elevation', 'azimuth', 'max_range']
58 meta = {}
59
60 for attr in attrs:
61 meta[attr] = getattr(self.dataOut, attr)
62
63 meta['mode'] = mode
64 self.dataOut.meta = meta No newline at end of file
@@ -1,354 +1,361
1 '''
1 '''
2 Created on Dec 27, 2017
2 Created on Dec 27, 2017
3
3
4 @author: Juan C. Espinoza
4 @author: Juan C. Espinoza
5 '''
5 '''
6
6
7 import os
7 import os
8 import sys
8 import sys
9 import time
9 import time
10 import json
10 import json
11 import glob
11 import glob
12 import datetime
12 import datetime
13 import tarfile
13 import tarfile
14
14
15 import numpy
15 import numpy
16 from netCDF4 import Dataset
16 from netCDF4 import Dataset
17
17
18 from utils import folder_in_range
19
18 from schainpy.model.io.jroIO_base import JRODataReader
20 from schainpy.model.io.jroIO_base import JRODataReader
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
21 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
20 from schainpy.model.data.jrodata import Parameters
22 from schainpy.model.data.jrodata import Parameters
21 from schainpy.utils import log
23 from schainpy.utils import log
22
24
23 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
25 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
24
26
25 def load_json(obj):
27 def load_json(obj):
26 '''
28 '''
27 Parse json as string instead of unicode
29 Parse json as string instead of unicode
28 '''
30 '''
29
31
30 if isinstance(obj, str):
32 if isinstance(obj, str):
31 iterable = json.loads(obj)
33 iterable = json.loads(obj)
32 else:
34 else:
33 iterable = obj
35 iterable = obj
34
36
35 if isinstance(iterable, dict):
37 if isinstance(iterable, dict):
36 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v
38 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v
37 for k, v in iterable.items()}
39 for k, v in iterable.items()}
38 elif isinstance(iterable, (list, tuple)):
40 elif isinstance(iterable, (list, tuple)):
39 return [str(v) if isinstance(v, unicode) else v for v in iterable]
41 return [str(v) if isinstance(v, unicode) else v for v in iterable]
40
42
41 return iterable
43 return iterable
42
44
43
45
44 class NCDFReader(JRODataReader, ProcessingUnit):
46 class PXReader(JRODataReader, ProcessingUnit):
45
47
46 def __init__(self, **kwargs):
48 def __init__(self, **kwargs):
47
49
48 ProcessingUnit.__init__(self, **kwargs)
50 ProcessingUnit.__init__(self, **kwargs)
49
51
50 self.dataOut = Parameters()
52 self.dataOut = Parameters()
51 self.counter_records = 0
53 self.counter_records = 0
52 self.nrecords = None
54 self.nrecords = None
53 self.flagNoMoreFiles = 0
55 self.flagNoMoreFiles = 0
54 self.isConfig = False
56 self.isConfig = False
55 self.filename = None
57 self.filename = None
56 self.intervals = set()
58 self.intervals = set()
57 self.ext = ('.nc', '.tgz')
59 self.ext = ('.nc', '.tgz')
58 self.online_mode = False
60 self.online_mode = False
59
61
60 def setup(self,
62 def setup(self,
61 path=None,
63 path=None,
62 startDate=None,
64 startDate=None,
63 endDate=None,
65 endDate=None,
64 format=None,
66 format=None,
65 startTime=datetime.time(0, 0, 0),
67 startTime=datetime.time(0, 0, 0),
66 endTime=datetime.time(23, 59, 59),
68 endTime=datetime.time(23, 59, 59),
67 walk=False,
69 walk=False,
68 **kwargs):
70 **kwargs):
69
71
70 self.path = path
72 self.path = path
71 self.startDate = startDate
73 self.startDate = startDate
72 self.endDate = endDate
74 self.endDate = endDate
73 self.startTime = startTime
75 self.startTime = startTime
74 self.endTime = endTime
76 self.endTime = endTime
75 self.datatime = datetime.datetime(1900,1,1)
77 self.datatime = datetime.datetime(1900,1,1)
76 self.walk = walk
78 self.walk = walk
77 self.nTries = kwargs.get('nTries', 3)
79 self.nTries = kwargs.get('nTries', 3)
78 self.online = kwargs.get('online', False)
80 self.online = kwargs.get('online', False)
79 self.delay = kwargs.get('delay', 30)
81 self.delay = kwargs.get('delay', 30)
80 self.ele = kwargs.get('ext', '')
82 self.ele = kwargs.get('ext', '')
81
83
82 if self.path is None:
84 if self.path is None:
83 raise ValueError, 'The path is not valid'
85 raise ValueError, 'The path is not valid'
84
86
85 self.search_files(path, startDate, endDate, startTime, endTime, walk)
87 self.search_files(path, startDate, endDate, startTime, endTime, walk)
86 self.cursor = 0
88 self.cursor = 0
87 self.counter_records = 0
89 self.counter_records = 0
88
90
89 if not self.files:
91 if not self.files:
90 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
92 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
91
93
92 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
94 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
93 '''
95 '''
94 Searching for NCDF files in path
96 Searching for NCDF files in path
95 Creating a list of files to procces included in [startDate,endDate]
97 Creating a list of files to procces included in [startDate,endDate]
96
98
97 Input:
99 Input:
98 path - Path to find files
100 path - Path to find files
99 '''
101 '''
100
102
101 log.log('Searching files {} in {} '.format(self.ext, path), 'NCDFReader')
103 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
102 if walk:
104 if walk:
103 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
105 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
104 paths.sort()
106 paths.sort()
105 else:
107 else:
106 paths = [path]
108 paths = [path]
107
109
108 fileList0 = []
110 fileList0 = []
109
111
110 for subpath in paths:
112 for subpath in paths:
111 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
113 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
114 continue
115 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
112
116
113 fileList0.sort()
117 fileList0.sort()
114 if self.online:
118 if self.online:
115 fileList0 = fileList0[-1:]
119 fileList0 = fileList0[-1:]
116
120
117 self.files = {}
121 self.files = {}
118
122
119 startDate = startDate - datetime.timedelta(1)
123 startDate = startDate - datetime.timedelta(1)
120 endDate = endDate + datetime.timedelta(1)
124 endDate = endDate + datetime.timedelta(1)
121
125
122 for fullname in fileList0:
126 for fullname in fileList0:
123 thisFile = fullname.split('/')[-1]
127 thisFile = fullname.split('/')[-1]
124 year = thisFile[3:7]
128 year = thisFile[3:7]
125 if not year.isdigit():
129 if not year.isdigit():
126 continue
130 continue
127
131
128 month = thisFile[7:9]
132 month = thisFile[7:9]
129 if not month.isdigit():
133 if not month.isdigit():
130 continue
134 continue
131
135
132 day = thisFile[9:11]
136 day = thisFile[9:11]
133 if not day.isdigit():
137 if not day.isdigit():
134 continue
138 continue
135
139
136 year, month, day = int(year), int(month), int(day)
140 year, month, day = int(year), int(month), int(day)
137 dateFile = datetime.date(year, month, day)
141 dateFile = datetime.date(year, month, day)
138 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
142 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
139
143
140 if (startDate > dateFile) or (endDate < dateFile):
144 if (startDate > dateFile) or (endDate < dateFile):
141 continue
145 continue
142
146
143 dt = datetime.datetime.combine(dateFile, timeFile)
147 dt = datetime.datetime.combine(dateFile, timeFile)
144 if dt not in self.files:
148 if dt not in self.files:
145 self.files[dt] = []
149 self.files[dt] = []
146 self.files[dt].append(fullname)
150 self.files[dt].append(fullname)
147
151
148 self.dates = self.files.keys()
152 self.dates = self.files.keys()
149 self.dates.sort()
153 self.dates.sort()
150
154
151 return
155 return
152
156
153 def search_files_online(self):
157 def search_files_online(self):
154 '''
158 '''
155 Searching for NCDF files in online mode path
159 Searching for NCDF files in online mode path
156 Creating a list of files to procces included in [startDate,endDate]
160 Creating a list of files to procces included in [startDate,endDate]
157
161
158 Input:
162 Input:
159 path - Path to find files
163 path - Path to find files
160 '''
164 '''
161
165
162 self.files = {}
166 self.files = {}
163
167
164 for n in range(self.nTries):
168 for n in range(self.nTries):
165
169
166 if self.walk:
170 if self.walk:
167 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
171 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
168 path = paths[-1]
172 path = paths[-1]
169 else:
173 else:
170 paths = self.path
174 paths = self.path
171
175
172 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
176 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
173
177
174 new_files.sort()
178 new_files.sort()
175
179
176 for fullname in new_files:
180 for fullname in new_files:
177 thisFile = fullname.split('/')[-1]
181 thisFile = fullname.split('/')[-1]
178 year = thisFile[3:7]
182 year = thisFile[3:7]
179 if not year.isdigit():
183 if not year.isdigit():
180 continue
184 continue
181
185
182 month = thisFile[7:9]
186 month = thisFile[7:9]
183 if not month.isdigit():
187 if not month.isdigit():
184 continue
188 continue
185
189
186 day = thisFile[9:11]
190 day = thisFile[9:11]
187 if not day.isdigit():
191 if not day.isdigit():
188 continue
192 continue
189
193
190 year, month, day = int(year), int(month), int(day)
194 year, month, day = int(year), int(month), int(day)
191 dateFile = datetime.date(year, month, day)
195 dateFile = datetime.date(year, month, day)
192 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
196 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
193
197
194 dt = datetime.datetime.combine(dateFile, timeFile)
198 dt = datetime.datetime.combine(dateFile, timeFile)
195
199
196 if self.dt >= dt:
200 if self.dt >= dt:
197 continue
201 continue
198
202
199 if dt not in self.files:
203 if dt not in self.files:
200 self.dt = dt
204 self.dt = dt
201 self.files[dt] = []
205 self.files[dt] = []
202
206
203 self.files[dt].append(fullname)
207 self.files[dt].append(fullname)
204 break
208 break
205
209
206 if self.files:
210 if self.files:
207 break
211 break
208 else:
212 else:
209 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'NCDFReader')
213 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
210 time.sleep(self.delay)
214 time.sleep(self.delay)
211
215
212 if not self.files:
216 if not self.files:
213 return 0
217 return 0
214
218
215 self.dates = self.files.keys()
219 self.dates = self.files.keys()
216 self.dates.sort()
220 self.dates.sort()
217 self.cursor = 0
221 self.cursor = 0
218
222
219 return 1
223 return 1
220
224
221 def parseFile(self):
225 def parseFile(self):
222 '''
226 '''
223 '''
227 '''
224
228
225 self.header = {}
229 header = {}
226
230
227 for attr in self.fp.ncattrs():
231 for attr in self.fp.ncattrs():
228 self.header[str(attr)] = getattr(self.fp, attr)
232 header[str(attr)] = getattr(self.fp, attr)
229
233
230 self.data[self.header['TypeName']] = numpy.array(self.fp.variables[self.header['TypeName']])
234 self.header.append(header)
231
235
232 if 'Azimuth' not in self.data:
236 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
233 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
234
235
237
236 def setNextFile(self):
238 def setNextFile(self):
237 '''
239 '''
238 Open next files for the current datetime
240 Open next files for the current datetime
239 '''
241 '''
240
242
241 cursor = self.cursor
243 cursor = self.cursor
242 if not self.online_mode:
244 if not self.online_mode:
243 if cursor == len(self.dates):
245 if cursor == len(self.dates):
244 if self.online:
246 if self.online:
245 cursor = 0
247 cursor = 0
246 self.dt = self.dates[cursor]
248 self.dt = self.dates[cursor]
247 self.online_mode = True
249 self.online_mode = True
248 if not self.search_files_online():
250 if not self.search_files_online():
249 log.success('No more files', 'NCDFReader')
251 log.success('No more files', 'PXReader')
250 return 0
252 return 0
251 else:
253 else:
252 log.success('No more files', 'NCDFReader')
254 log.success('No more files', 'PXReader')
253 self.flagNoMoreFiles = 1
255 self.flagNoMoreFiles = 1
254 return 0
256 return 0
255 else:
257 else:
256 if not self.search_files_online():
258 if not self.search_files_online():
257 return 0
259 return 0
258 cursor = self.cursor
260 cursor = self.cursor
259
260 log.log(
261 'Opening: {}\'s files'.format(self.dates[cursor]),
262 'NCDFReader'
263 )
264
261
265 self.data = {}
262 self.data = {}
263 self.header = []
266
264
267 for fullname in self.files[self.dates[cursor]]:
265 for fullname in self.files[self.dates[cursor]]:
268
266
267 log.log('Opening: {}'.format(fullname), 'PXReader')
268
269 if os.path.splitext(fullname)[-1] == '.tgz':
269 if os.path.splitext(fullname)[-1] == '.tgz':
270 tar = tarfile.open(fullname, 'r:gz')
270 tar = tarfile.open(fullname, 'r:gz')
271 tar.extractall('/tmp')
271 tar.extractall('/tmp')
272 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
272 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
273 else:
273 else:
274 files = [fullname]
274 files = [fullname]
275
275
276 for filename in files:
276 for filename in files:
277 if self.filename is not None:
277 if self.filename is not None:
278 self.fp.close()
278 self.fp.close()
279
279
280 self.filename = filename
280 self.filename = filename
281 self.filedate = self.dates[cursor]
281 self.filedate = self.dates[cursor]
282 self.fp = Dataset(self.filename, 'r')
282 self.fp = Dataset(self.filename, 'r')
283 self.parseFile()
283 self.parseFile()
284
284
285 self.counter_records += 1
285 self.counter_records += 1
286 self.cursor += 1
286 self.cursor += 1
287 return 1
287 return 1
288
288
289 def readNextFile(self):
289 def readNextFile(self):
290
290
291 while True:
291 while True:
292 self.flagDiscontinuousBlock = 0
292 self.flagDiscontinuousBlock = 0
293 if not self.setNextFile():
293 if not self.setNextFile():
294 return 0
294 return 0
295
295
296 self.datatime = datetime.datetime.utcfromtimestamp(self.header['Time'])
296 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
297
297
298 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
298 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
299 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
299 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
300 log.warning(
300 log.warning(
301 'Reading Record No. {}/{} -> {} [Skipping]'.format(
301 'Reading Record No. {}/{} -> {} [Skipping]'.format(
302 self.counter_records,
302 self.counter_records,
303 self.nrecords,
303 self.nrecords,
304 self.datatime.ctime()),
304 self.datatime.ctime()),
305 'NCDFReader')
305 'PXReader')
306 continue
306 continue
307 break
307 break
308
308
309 log.log(
309 log.log(
310 'Reading Record No. {}/{} -> {}'.format(
310 'Reading Record No. {}/{} -> {}'.format(
311 self.counter_records,
311 self.counter_records,
312 self.nrecords,
312 self.nrecords,
313 self.datatime.ctime()),
313 self.datatime.ctime()),
314 'NCDFReader')
314 'PXReader')
315
315
316 return 1
316 return 1
317
317
318
318
319 def set_output(self):
319 def set_output(self):
320 '''
320 '''
321 Storing data from buffer to dataOut object
321 Storing data from buffer to dataOut object
322 '''
322 '''
323
323
324 self.dataOut.heightList = self.data.pop('Azimuth')
324 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
325
325 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
326 log.log('Parameters found: {}'.format(','.join(self.data.keys())),
326 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
327 'PXReader')
327 self.dataOut.data = self.data
328
328 self.dataOut.units = [h['Unit-value'] for h in self.header]
329 self.dataOut.data_param = numpy.array(self.data.values())
329 self.dataOut.parameters = [h['TypeName'] for h in self.header]
330 self.dataOut.data_param[self.dataOut.data_param == -99900.] = numpy.nan
330 self.dataOut.missing = self.header[0]['MissingData']
331 self.dataOut.parameters = self.data.keys()
331 self.dataOut.max_range = self.header[0]['MaximumRange-value']
332 self.dataOut.utctime = self.header['Time']
332 self.dataOut.elevation = self.header[0]['Elevation']
333 self.dataOut.azimuth = self.header[0]['Azimuth']
334 self.dataOut.latitude = self.header[0]['Latitude']
335 self.dataOut.longitude = self.header[0]['Longitude']
336 self.dataOut.utctime = self.header[0]['Time']
333 self.dataOut.utctimeInit = self.dataOut.utctime
337 self.dataOut.utctimeInit = self.dataOut.utctime
334 self.dataOut.useLocalTime = False
338 self.dataOut.useLocalTime = True
335 self.dataOut.flagNoData = False
339 self.dataOut.flagNoData = False
336 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
340 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
337
341
342 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
343 'PXReader')
344
338 def getData(self):
345 def getData(self):
339 '''
346 '''
340 Storing data from databuffer to dataOut object
347 Storing data from databuffer to dataOut object
341 '''
348 '''
342 if self.flagNoMoreFiles:
349 if self.flagNoMoreFiles:
343 self.dataOut.flagNoData = True
350 self.dataOut.flagNoData = True
344 log.error('No file left to process', 'NCDFReader')
351 log.error('No file left to process', 'PXReader')
345 return 0
352 return 0
346
353
347 if not self.readNextFile():
354 if not self.readNextFile():
348 self.dataOut.flagNoData = True
355 self.dataOut.flagNoData = True
349 return 0
356 return 0
350
357
351 self.set_output()
358 self.set_output()
352
359
353 return 1
360 return 1
354
361
@@ -1,15 +1,16
1 '''
1 '''
2
2
3 $Author: murco $
3 $Author: murco $
4 $Id: Processor.py 1 2012-11-12 18:56:07Z murco $
4 $Id: Processor.py 1 2012-11-12 18:56:07Z murco $
5 '''
5 '''
6
6
7 from jroproc_voltage import *
7 from jroproc_voltage import *
8 from jroproc_spectra import *
8 from jroproc_spectra import *
9 from jroproc_heispectra import *
9 from jroproc_heispectra import *
10 from jroproc_amisr import *
10 from jroproc_amisr import *
11 from jroproc_correlation import *
11 from jroproc_correlation import *
12 from jroproc_parameters import *
12 from jroproc_parameters import *
13 from jroproc_spectra_lags import *
13 from jroproc_spectra_lags import *
14 from jroproc_spectra_acf import *
14 from jroproc_spectra_acf import *
15 from bltrproc_parameters import *
15 from bltrproc_parameters import *
16 from pxproc_parameters import *
General Comments 0
You need to be logged in to leave comments. Login now