##// END OF EJS Templates
NCDFReader -> PXReader, add PXParametersProc to select ppi or rhi modes
jespinoza -
r1140:cd2e2ed64989
parent child
Show More
@@ -0,0 +1,24
1 """
2 Utilities for IO modules
3 """
4
5 import os
6 from datetime import datetime
7
8 def folder_in_range(folder, start_date, end_date, pattern):
9 """
10 Check whether folder is bettwen start_date and end_date
11
12 Args:
13 folder (str): Folder to check
14 start_date (date): Initial date
15 end_date (date): Final date
16 pattern (str): Datetime format of the folder
17 Returns:
18 bool: True for success, False otherwise
19 """
20 try:
21 dt = datetime.strptime(folder, pattern)
22 except:
23 raise ValueError('Folder {} does not match {} format'.format(folder, pattern))
24 return start_date <= dt.date() <= end_date
@@ -0,0 +1,64
1 '''
2 Created on Oct 24, 2016
3
4 @author: roj- LouVD
5 '''
6
7 import numpy
8 import datetime
9 import time
10 from time import gmtime
11
12 from numpy import transpose
13
14 from jroproc_base import ProcessingUnit, Operation
15 from schainpy.model.data.jrodata import Parameters
16
17
18 class PXParametersProc(ProcessingUnit):
19 '''
20 Processing unit for PX parameters data
21 '''
22
23 def __init__(self, **kwargs):
24 """
25 Inputs: None
26 """
27 ProcessingUnit.__init__(self, **kwargs)
28 self.dataOut = Parameters()
29 self.isConfig = False
30
31 def setup(self, mode):
32 """
33 """
34 self.dataOut.mode = mode
35
36 def run(self, mode):
37 """
38 Args:
39 mode (str): select independent variable 'E' for elevation or 'A' for azimuth
40 """
41
42 if not self.isConfig:
43 self.setup(mode)
44 self.isConfig = True
45
46 if self.dataIn.type == 'Parameters':
47 self.dataOut.copy(self.dataIn)
48
49 self.dataOut.data_param = numpy.array([self.dataOut.data[var] for var in self.dataOut.parameters])
50 self.dataOut.data_param[self.dataOut.data_param == self.dataOut.missing] = numpy.nan
51
52 if mode.upper()=='E':
53 self.dataOut.heightList = self.dataOut.data['Azimuth']
54 else:
55 self.dataOut.heightList = self.dataOut.data['Elevation']
56
57 attrs = ['units', 'elevation', 'azimuth', 'max_range']
58 meta = {}
59
60 for attr in attrs:
61 meta[attr] = getattr(self.dataOut, attr)
62
63 meta['mode'] = mode
64 self.dataOut.meta = meta No newline at end of file
@@ -1,354 +1,361
1 1 '''
2 2 Created on Dec 27, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13 import tarfile
14 14
15 15 import numpy
16 16 from netCDF4 import Dataset
17 17
18 from utils import folder_in_range
19
18 20 from schainpy.model.io.jroIO_base import JRODataReader
19 21 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
20 22 from schainpy.model.data.jrodata import Parameters
21 23 from schainpy.utils import log
22 24
23 25 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
24 26
25 27 def load_json(obj):
26 28 '''
27 29 Parse json as string instead of unicode
28 30 '''
29 31
30 32 if isinstance(obj, str):
31 33 iterable = json.loads(obj)
32 34 else:
33 35 iterable = obj
34 36
35 37 if isinstance(iterable, dict):
36 38 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, unicode) else v
37 39 for k, v in iterable.items()}
38 40 elif isinstance(iterable, (list, tuple)):
39 41 return [str(v) if isinstance(v, unicode) else v for v in iterable]
40 42
41 43 return iterable
42 44
43 45
44 class NCDFReader(JRODataReader, ProcessingUnit):
46 class PXReader(JRODataReader, ProcessingUnit):
45 47
46 48 def __init__(self, **kwargs):
47 49
48 50 ProcessingUnit.__init__(self, **kwargs)
49 51
50 52 self.dataOut = Parameters()
51 53 self.counter_records = 0
52 54 self.nrecords = None
53 55 self.flagNoMoreFiles = 0
54 56 self.isConfig = False
55 57 self.filename = None
56 58 self.intervals = set()
57 59 self.ext = ('.nc', '.tgz')
58 60 self.online_mode = False
59 61
60 62 def setup(self,
61 63 path=None,
62 64 startDate=None,
63 65 endDate=None,
64 66 format=None,
65 67 startTime=datetime.time(0, 0, 0),
66 68 endTime=datetime.time(23, 59, 59),
67 69 walk=False,
68 70 **kwargs):
69 71
70 72 self.path = path
71 73 self.startDate = startDate
72 74 self.endDate = endDate
73 75 self.startTime = startTime
74 76 self.endTime = endTime
75 77 self.datatime = datetime.datetime(1900,1,1)
76 78 self.walk = walk
77 79 self.nTries = kwargs.get('nTries', 3)
78 80 self.online = kwargs.get('online', False)
79 81 self.delay = kwargs.get('delay', 30)
80 82 self.ele = kwargs.get('ext', '')
81 83
82 84 if self.path is None:
83 85 raise ValueError, 'The path is not valid'
84 86
85 87 self.search_files(path, startDate, endDate, startTime, endTime, walk)
86 88 self.cursor = 0
87 89 self.counter_records = 0
88 90
89 91 if not self.files:
90 92 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
91 93
92 94 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
93 95 '''
94 96 Searching for NCDF files in path
95 97 Creating a list of files to procces included in [startDate,endDate]
96 98
97 99 Input:
98 100 path - Path to find files
99 101 '''
100 102
101 log.log('Searching files {} in {} '.format(self.ext, path), 'NCDFReader')
103 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
102 104 if walk:
103 105 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
104 106 paths.sort()
105 107 else:
106 108 paths = [path]
107 109
108 110 fileList0 = []
109 111
110 112 for subpath in paths:
111 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
113 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
114 continue
115 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
112 116
113 117 fileList0.sort()
114 118 if self.online:
115 119 fileList0 = fileList0[-1:]
116 120
117 121 self.files = {}
118 122
119 123 startDate = startDate - datetime.timedelta(1)
120 124 endDate = endDate + datetime.timedelta(1)
121 125
122 126 for fullname in fileList0:
123 127 thisFile = fullname.split('/')[-1]
124 128 year = thisFile[3:7]
125 129 if not year.isdigit():
126 130 continue
127 131
128 132 month = thisFile[7:9]
129 133 if not month.isdigit():
130 134 continue
131 135
132 136 day = thisFile[9:11]
133 137 if not day.isdigit():
134 138 continue
135 139
136 140 year, month, day = int(year), int(month), int(day)
137 141 dateFile = datetime.date(year, month, day)
138 142 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
139 143
140 144 if (startDate > dateFile) or (endDate < dateFile):
141 145 continue
142 146
143 147 dt = datetime.datetime.combine(dateFile, timeFile)
144 148 if dt not in self.files:
145 149 self.files[dt] = []
146 150 self.files[dt].append(fullname)
147 151
148 152 self.dates = self.files.keys()
149 153 self.dates.sort()
150 154
151 155 return
152 156
153 157 def search_files_online(self):
154 158 '''
155 159 Searching for NCDF files in online mode path
156 160 Creating a list of files to procces included in [startDate,endDate]
157 161
158 162 Input:
159 163 path - Path to find files
160 164 '''
161 165
162 166 self.files = {}
163 167
164 168 for n in range(self.nTries):
165 169
166 170 if self.walk:
167 171 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
168 172 path = paths[-1]
169 173 else:
170 174 paths = self.path
171 175
172 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and 'E{}'.format(self.ele) in s]
176 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
173 177
174 178 new_files.sort()
175 179
176 180 for fullname in new_files:
177 181 thisFile = fullname.split('/')[-1]
178 182 year = thisFile[3:7]
179 183 if not year.isdigit():
180 184 continue
181 185
182 186 month = thisFile[7:9]
183 187 if not month.isdigit():
184 188 continue
185 189
186 190 day = thisFile[9:11]
187 191 if not day.isdigit():
188 192 continue
189 193
190 194 year, month, day = int(year), int(month), int(day)
191 195 dateFile = datetime.date(year, month, day)
192 196 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
193 197
194 198 dt = datetime.datetime.combine(dateFile, timeFile)
195 199
196 200 if self.dt >= dt:
197 201 continue
198 202
199 203 if dt not in self.files:
200 204 self.dt = dt
201 205 self.files[dt] = []
202 206
203 207 self.files[dt].append(fullname)
204 208 break
205 209
206 210 if self.files:
207 211 break
208 212 else:
209 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'NCDFReader')
213 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
210 214 time.sleep(self.delay)
211 215
212 216 if not self.files:
213 217 return 0
214 218
215 219 self.dates = self.files.keys()
216 220 self.dates.sort()
217 221 self.cursor = 0
218 222
219 223 return 1
220 224
221 225 def parseFile(self):
222 226 '''
223 227 '''
224 228
225 self.header = {}
229 header = {}
226 230
227 231 for attr in self.fp.ncattrs():
228 self.header[str(attr)] = getattr(self.fp, attr)
232 header[str(attr)] = getattr(self.fp, attr)
229 233
230 self.data[self.header['TypeName']] = numpy.array(self.fp.variables[self.header['TypeName']])
231
232 if 'Azimuth' not in self.data:
233 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
234 self.header.append(header)
234 235
236 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
235 237
236 238 def setNextFile(self):
237 239 '''
238 240 Open next files for the current datetime
239 241 '''
240 242
241 243 cursor = self.cursor
242 244 if not self.online_mode:
243 245 if cursor == len(self.dates):
244 246 if self.online:
245 247 cursor = 0
246 248 self.dt = self.dates[cursor]
247 249 self.online_mode = True
248 250 if not self.search_files_online():
249 log.success('No more files', 'NCDFReader')
251 log.success('No more files', 'PXReader')
250 252 return 0
251 253 else:
252 log.success('No more files', 'NCDFReader')
254 log.success('No more files', 'PXReader')
253 255 self.flagNoMoreFiles = 1
254 256 return 0
255 257 else:
256 258 if not self.search_files_online():
257 259 return 0
258 260 cursor = self.cursor
259 261
260 log.log(
261 'Opening: {}\'s files'.format(self.dates[cursor]),
262 'NCDFReader'
263 )
264
265 262 self.data = {}
263 self.header = []
266 264
267 265 for fullname in self.files[self.dates[cursor]]:
268 266
267 log.log('Opening: {}'.format(fullname), 'PXReader')
268
269 269 if os.path.splitext(fullname)[-1] == '.tgz':
270 270 tar = tarfile.open(fullname, 'r:gz')
271 271 tar.extractall('/tmp')
272 272 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
273 273 else:
274 274 files = [fullname]
275 275
276 276 for filename in files:
277 277 if self.filename is not None:
278 278 self.fp.close()
279 279
280 280 self.filename = filename
281 281 self.filedate = self.dates[cursor]
282 282 self.fp = Dataset(self.filename, 'r')
283 283 self.parseFile()
284 284
285 285 self.counter_records += 1
286 286 self.cursor += 1
287 287 return 1
288 288
289 289 def readNextFile(self):
290 290
291 291 while True:
292 292 self.flagDiscontinuousBlock = 0
293 293 if not self.setNextFile():
294 294 return 0
295 295
296 self.datatime = datetime.datetime.utcfromtimestamp(self.header['Time'])
296 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
297 297
298 298 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
299 299 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
300 300 log.warning(
301 301 'Reading Record No. {}/{} -> {} [Skipping]'.format(
302 302 self.counter_records,
303 303 self.nrecords,
304 304 self.datatime.ctime()),
305 'NCDFReader')
305 'PXReader')
306 306 continue
307 307 break
308 308
309 309 log.log(
310 310 'Reading Record No. {}/{} -> {}'.format(
311 311 self.counter_records,
312 312 self.nrecords,
313 313 self.datatime.ctime()),
314 'NCDFReader')
314 'PXReader')
315 315
316 316 return 1
317 317
318 318
319 319 def set_output(self):
320 320 '''
321 321 Storing data from buffer to dataOut object
322 322 '''
323 323
324 self.dataOut.heightList = self.data.pop('Azimuth')
325
326 log.log('Parameters found: {}'.format(','.join(self.data.keys())),
327 'PXReader')
328
329 self.dataOut.data_param = numpy.array(self.data.values())
330 self.dataOut.data_param[self.dataOut.data_param == -99900.] = numpy.nan
331 self.dataOut.parameters = self.data.keys()
332 self.dataOut.utctime = self.header['Time']
324 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
325 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
326 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
327 self.dataOut.data = self.data
328 self.dataOut.units = [h['Unit-value'] for h in self.header]
329 self.dataOut.parameters = [h['TypeName'] for h in self.header]
330 self.dataOut.missing = self.header[0]['MissingData']
331 self.dataOut.max_range = self.header[0]['MaximumRange-value']
332 self.dataOut.elevation = self.header[0]['Elevation']
333 self.dataOut.azimuth = self.header[0]['Azimuth']
334 self.dataOut.latitude = self.header[0]['Latitude']
335 self.dataOut.longitude = self.header[0]['Longitude']
336 self.dataOut.utctime = self.header[0]['Time']
333 337 self.dataOut.utctimeInit = self.dataOut.utctime
334 self.dataOut.useLocalTime = False
338 self.dataOut.useLocalTime = True
335 339 self.dataOut.flagNoData = False
336 340 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
337 341
342 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
343 'PXReader')
344
338 345 def getData(self):
339 346 '''
340 347 Storing data from databuffer to dataOut object
341 348 '''
342 349 if self.flagNoMoreFiles:
343 350 self.dataOut.flagNoData = True
344 log.error('No file left to process', 'NCDFReader')
351 log.error('No file left to process', 'PXReader')
345 352 return 0
346 353
347 354 if not self.readNextFile():
348 355 self.dataOut.flagNoData = True
349 356 return 0
350 357
351 358 self.set_output()
352 359
353 360 return 1
354 361
@@ -1,15 +1,16
1 1 '''
2 2
3 3 $Author: murco $
4 4 $Id: Processor.py 1 2012-11-12 18:56:07Z murco $
5 5 '''
6 6
7 7 from jroproc_voltage import *
8 8 from jroproc_spectra import *
9 9 from jroproc_heispectra import *
10 10 from jroproc_amisr import *
11 11 from jroproc_correlation import *
12 12 from jroproc_parameters import *
13 13 from jroproc_spectra_lags import *
14 14 from jroproc_spectra_acf import *
15 15 from bltrproc_parameters import *
16 from pxproc_parameters import *
General Comments 0
You need to be logged in to leave comments. Login now