##// END OF EJS Templates
Hot fix
Juan C. Espinoza -
r1166:ce0987c48654
parent child
Show More
@@ -1,350 +1,351
1 1 '''
2 2 Created on Jan 15, 2018
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import glob
11 11 import datetime
12 12 import tarfile
13 13
14 14 import numpy
15 try:
16 from netCDF4 import Dataset
17 except:
18 log.warning(
19 'You should install "netCDF4" module if you want to read/write NCDF files'
20 )
21 15
22 16 from utils import folder_in_range
23 17
24 18 from schainpy.model.io.jroIO_base import JRODataReader
25 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
26 20 from schainpy.model.data.jrodata import Parameters
27 21 from schainpy.utils import log
28 22
23 try:
24 from netCDF4 import Dataset
25 except:
26 log.warning(
27 'You should install "netCDF4" module if you want to read/write NCDF files'
28 )
29
29 30 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
30 31
31 32
32 33 class PXReader(JRODataReader, ProcessingUnit):
33 34
34 35 def __init__(self, **kwargs):
35 36
36 37 ProcessingUnit.__init__(self, **kwargs)
37 38
38 39 self.dataOut = Parameters()
39 40 self.counter_records = 0
40 41 self.nrecords = None
41 42 self.flagNoMoreFiles = 0
42 43 self.isConfig = False
43 44 self.filename = None
44 45 self.intervals = set()
45 46 self.ext = ('.nc', '.tgz')
46 47 self.online_mode = False
47 48
48 49 def setup(self,
49 50 path=None,
50 51 startDate=None,
51 52 endDate=None,
52 53 format=None,
53 54 startTime=datetime.time(0, 0, 0),
54 55 endTime=datetime.time(23, 59, 59),
55 56 walk=False,
56 57 **kwargs):
57 58
58 59 self.path = path
59 60 self.startDate = startDate
60 61 self.endDate = endDate
61 62 self.startTime = startTime
62 63 self.endTime = endTime
63 64 self.datatime = datetime.datetime(1900,1,1)
64 65 self.walk = walk
65 66 self.nTries = kwargs.get('nTries', 10)
66 67 self.online = kwargs.get('online', False)
67 68 self.delay = kwargs.get('delay', 60)
68 69 self.ele = kwargs.get('ext', '')
69 70
70 71 if self.path is None:
71 72 raise ValueError, 'The path is not valid'
72 73
73 74 self.search_files(path, startDate, endDate, startTime, endTime, walk)
74 75 self.cursor = 0
75 76 self.counter_records = 0
76 77
77 78 if not self.files:
78 79 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
79 80
80 81 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
81 82 '''
82 83 Searching for NCDF files in path
83 84 Creating a list of files to procces included in [startDate,endDate]
84 85
85 86 Input:
86 87 path - Path to find files
87 88 '''
88 89
89 90 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
90 91 if walk:
91 92 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
92 93 paths.sort()
93 94 else:
94 95 paths = [path]
95 96
96 97 fileList0 = []
97 98
98 99 for subpath in paths:
99 100 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
100 101 continue
101 102 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
102 103
103 104 fileList0.sort()
104 105 if self.online:
105 106 fileList0 = fileList0[-1:]
106 107
107 108 self.files = {}
108 109
109 110 startDate = startDate - datetime.timedelta(1)
110 111 endDate = endDate + datetime.timedelta(1)
111 112
112 113 for fullname in fileList0:
113 114 thisFile = fullname.split('/')[-1]
114 115 year = thisFile[3:7]
115 116 if not year.isdigit():
116 117 continue
117 118
118 119 month = thisFile[7:9]
119 120 if not month.isdigit():
120 121 continue
121 122
122 123 day = thisFile[9:11]
123 124 if not day.isdigit():
124 125 continue
125 126
126 127 year, month, day = int(year), int(month), int(day)
127 128 dateFile = datetime.date(year, month, day)
128 129 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
129 130
130 131 if (startDate > dateFile) or (endDate < dateFile):
131 132 continue
132 133
133 134 dt = datetime.datetime.combine(dateFile, timeFile)
134 135 if dt not in self.files:
135 136 self.files[dt] = []
136 137 self.files[dt].append(fullname)
137 138
138 139 self.dates = self.files.keys()
139 140 self.dates.sort()
140 141
141 142 return
142 143
143 144 def search_files_online(self):
144 145 '''
145 146 Searching for NCDF files in online mode path
146 147 Creating a list of files to procces included in [startDate,endDate]
147 148
148 149 Input:
149 150 path - Path to find files
150 151 '''
151 152
152 153 self.files = {}
153 154
154 155 for n in range(self.nTries):
155 156
156 157 if self.walk:
157 158 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
158 159 paths.sort()
159 160 path = paths[-1]
160 161 else:
161 162 path = self.path
162 163
163 164 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
164 165 new_files.sort()
165 166
166 167 for fullname in new_files:
167 168 thisFile = fullname.split('/')[-1]
168 169 year = thisFile[3:7]
169 170 if not year.isdigit():
170 171 continue
171 172
172 173 month = thisFile[7:9]
173 174 if not month.isdigit():
174 175 continue
175 176
176 177 day = thisFile[9:11]
177 178 if not day.isdigit():
178 179 continue
179 180
180 181 year, month, day = int(year), int(month), int(day)
181 182 dateFile = datetime.date(year, month, day)
182 183 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
183 184
184 185 dt = datetime.datetime.combine(dateFile, timeFile)
185 186
186 187 if self.dt >= dt:
187 188 continue
188 189
189 190 if dt not in self.files:
190 191 self.dt = dt
191 192 self.files[dt] = []
192 193
193 194 self.files[dt].append(fullname)
194 195 break
195 196
196 197 if self.files:
197 198 break
198 199 else:
199 200 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
200 201 time.sleep(self.delay)
201 202
202 203 if not self.files:
203 204 return 0
204 205
205 206 self.dates = self.files.keys()
206 207 self.dates.sort()
207 208 self.cursor = 0
208 209
209 210 return 1
210 211
211 212 def parseFile(self):
212 213 '''
213 214 '''
214 215
215 216 header = {}
216 217
217 218 for attr in self.fp.ncattrs():
218 219 header[str(attr)] = getattr(self.fp, attr)
219 220
220 221 self.header.append(header)
221 222
222 223 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
223 224
224 225 def setNextFile(self):
225 226 '''
226 227 Open next files for the current datetime
227 228 '''
228 229
229 230 cursor = self.cursor
230 231 if not self.online_mode:
231 232 if cursor == len(self.dates):
232 233 if self.online:
233 234 cursor = 0
234 235 self.dt = self.dates[cursor]
235 236 self.online_mode = True
236 237 if not self.search_files_online():
237 238 log.success('No more files', 'PXReader')
238 239 return 0
239 240 else:
240 241 log.success('No more files', 'PXReader')
241 242 self.flagNoMoreFiles = 1
242 243 return 0
243 244 else:
244 245 if not self.search_files_online():
245 246 return 0
246 247 cursor = self.cursor
247 248
248 249 self.data = {}
249 250 self.header = []
250 251
251 252 for fullname in self.files[self.dates[cursor]]:
252 253
253 254 log.log('Opening: {}'.format(fullname), 'PXReader')
254 255
255 256 if os.path.splitext(fullname)[-1] == '.tgz':
256 257 tar = tarfile.open(fullname, 'r:gz')
257 258 tar.extractall('/tmp')
258 259 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
259 260 else:
260 261 files = [fullname]
261 262
262 263 for filename in files:
263 264 if self.filename is not None:
264 265 self.fp.close()
265 266
266 267 self.filename = filename
267 268 self.filedate = self.dates[cursor]
268 269 self.fp = Dataset(self.filename, 'r')
269 270 self.parseFile()
270 271
271 272 self.counter_records += 1
272 273 self.cursor += 1
273 274 return 1
274 275
275 276 def readNextFile(self):
276 277
277 278 while True:
278 279 self.flagDiscontinuousBlock = 0
279 280 if not self.setNextFile():
280 281 return 0
281 282
282 283 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
283 284
284 285 if self.online:
285 286 break
286 287
287 288 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
288 289 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
289 290 log.warning(
290 291 'Reading Record No. {}/{} -> {} [Skipping]'.format(
291 292 self.counter_records,
292 293 self.nrecords,
293 294 self.datatime.ctime()),
294 295 'PXReader')
295 296 continue
296 297 break
297 298
298 299 log.log(
299 300 'Reading Record No. {}/{} -> {}'.format(
300 301 self.counter_records,
301 302 self.nrecords,
302 303 self.datatime.ctime()),
303 304 'PXReader')
304 305
305 306 return 1
306 307
307 308
308 309 def set_output(self):
309 310 '''
310 311 Storing data from buffer to dataOut object
311 312 '''
312 313
313 314 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
314 315 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
315 316 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
316 317 self.dataOut.data = self.data
317 318 self.dataOut.units = [h['Unit-value'] for h in self.header]
318 319 self.dataOut.parameters = [h['TypeName'] for h in self.header]
319 320 self.dataOut.missing = self.header[0]['MissingData']
320 321 self.dataOut.max_range = self.header[0]['MaximumRange-value']
321 322 self.dataOut.elevation = self.header[0]['Elevation']
322 323 self.dataOut.azimuth = self.header[0]['Azimuth']
323 324 self.dataOut.latitude = self.header[0]['Latitude']
324 325 self.dataOut.longitude = self.header[0]['Longitude']
325 326 self.dataOut.utctime = self.header[0]['Time']
326 327 self.dataOut.utctimeInit = self.dataOut.utctime
327 328 self.dataOut.useLocalTime = True
328 329 self.dataOut.flagNoData = False
329 330 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
330 331
331 332 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
332 333 'PXReader')
333 334
334 335 def getData(self):
335 336 '''
336 337 Storing data from databuffer to dataOut object
337 338 '''
338 339 if self.flagNoMoreFiles:
339 340 self.dataOut.flagNoData = True
340 341 log.error('No file left to process', 'PXReader')
341 342 return 0
342 343
343 344 if not self.readNextFile():
344 345 self.dataOut.flagNoData = True
345 346 return 0
346 347
347 348 self.set_output()
348 349
349 350 return 1
350 351
@@ -1,864 +1,869
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import os
6 6 import glob
7 7 import time
8 8 import json
9 9 import numpy
10 10 import paho.mqtt.client as mqtt
11 11 import zmq
12 12 import datetime
13 13 import ftplib
14 14 from zmq.utils.monitor import recv_monitor_message
15 15 from functools import wraps
16 16 from threading import Thread
17 17 from multiprocessing import Process
18 18
19 19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 20 from schainpy.model.data.jrodata import JROData
21 21 from schainpy.utils import log
22 22
23 23 MAXNUMX = 500
24 24 MAXNUMY = 500
25 25
26 26 PLOT_CODES = {
27 27 'rti': 0, # Range time intensity (RTI).
28 28 'spc': 1, # Spectra (and Cross-spectra) information.
29 29 'cspc': 2, # Cross-Correlation information.
30 30 'coh': 3, # Coherence map.
31 31 'base': 4, # Base lines graphic.
32 32 'row': 5, # Row Spectra.
33 33 'total': 6, # Total Power.
34 34 'drift': 7, # Drifts graphics.
35 35 'height': 8, # Height profile.
36 36 'phase': 9, # Signal Phase.
37 37 'power': 16,
38 38 'noise': 17,
39 39 'beacon': 18,
40 40 'wind': 22,
41 41 'skymap': 23,
42 42 'Unknown': 24,
43 43 'V-E': 25, # PIP Velocity.
44 44 'Z-E': 26, # PIP Reflectivity.
45 45 'V-A': 27, # RHI Velocity.
46 46 'Z-A': 28, # RHI Reflectivity.
47 47 }
48 48
49 49 def get_plot_code(s):
50 50 label = s.split('_')[0]
51 51 codes = [key for key in PLOT_CODES if key in label]
52 52 if codes:
53 53 return PLOT_CODES[codes[0]]
54 54 else:
55 55 return 24
56 56
57 57 def roundFloats(obj):
58 58 if isinstance(obj, list):
59 59 return map(roundFloats, obj)
60 60 elif isinstance(obj, float):
61 61 return round(obj, 2)
62 62
63 63 def decimate(z, MAXNUMY):
64 64 dy = int(len(z[0])/MAXNUMY) + 1
65 65
66 66 return z[::, ::dy]
67 67
68 68 class throttle(object):
69 69 '''
70 70 Decorator that prevents a function from being called more than once every
71 71 time period.
72 72 To create a function that cannot be called more than once a minute, but
73 73 will sleep until it can be called:
74 74 @throttle(minutes=1)
75 75 def foo():
76 76 pass
77 77
78 78 for i in range(10):
79 79 foo()
80 80 print "This function has run %s times." % i
81 81 '''
82 82
83 83 def __init__(self, seconds=0, minutes=0, hours=0):
84 84 self.throttle_period = datetime.timedelta(
85 85 seconds=seconds, minutes=minutes, hours=hours
86 86 )
87 87
88 88 self.time_of_last_call = datetime.datetime.min
89 89
90 90 def __call__(self, fn):
91 91 @wraps(fn)
92 92 def wrapper(*args, **kwargs):
93 93 coerce = kwargs.pop('coerce', None)
94 94 if coerce:
95 95 self.time_of_last_call = datetime.datetime.now()
96 96 return fn(*args, **kwargs)
97 97 else:
98 98 now = datetime.datetime.now()
99 99 time_since_last_call = now - self.time_of_last_call
100 100 time_left = self.throttle_period - time_since_last_call
101 101
102 102 if time_left > datetime.timedelta(seconds=0):
103 103 return
104 104
105 105 self.time_of_last_call = datetime.datetime.now()
106 106 return fn(*args, **kwargs)
107 107
108 108 return wrapper
109 109
110 110 class Data(object):
111 111 '''
112 112 Object to hold data to be plotted
113 113 '''
114 114
115 115 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
116 116 self.plottypes = plottypes
117 117 self.throttle = throttle_value
118 118 self.exp_code = exp_code
119 119 self.buffering = buffering
120 120 self.ended = False
121 121 self.localtime = False
122 122 self.meta = {}
123 123 self.__times = []
124 124 self.__heights = []
125 125
126 126 def __str__(self):
127 127 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
128 128 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
129 129
130 130 def __len__(self):
131 131 return len(self.__times)
132 132
133 133 def __getitem__(self, key):
134 134 if key not in self.data:
135 135 raise KeyError(log.error('Missing key: {}'.format(key)))
136 136
137 137 if 'spc' in key or not self.buffering:
138 138 ret = self.data[key]
139 139 else:
140 140 ret = numpy.array([self.data[key][x] for x in self.times])
141 141 if ret.ndim > 1:
142 142 ret = numpy.swapaxes(ret, 0, 1)
143 143 return ret
144 144
145 145 def __contains__(self, key):
146 146 return key in self.data
147 147
148 148 def setup(self):
149 149 '''
150 150 Configure object
151 151 '''
152 152
153 153 self.type = ''
154 154 self.ended = False
155 155 self.data = {}
156 156 self.__times = []
157 157 self.__heights = []
158 158 self.__all_heights = set()
159 159 for plot in self.plottypes:
160 160 if 'snr' in plot:
161 161 plot = 'snr'
162 162 self.data[plot] = {}
163 163
164 164 def shape(self, key):
165 165 '''
166 166 Get the shape of the one-element data for the given key
167 167 '''
168 168
169 169 if len(self.data[key]):
170 170 if 'spc' in key or not self.buffering:
171 171 return self.data[key].shape
172 172 return self.data[key][self.__times[0]].shape
173 173 return (0,)
174 174
175 175 def update(self, dataOut, tm):
176 176 '''
177 177 Update data object with new dataOut
178 178 '''
179 179
180 180 if tm in self.__times:
181 181 return
182 182
183 183 self.type = dataOut.type
184 184 self.parameters = getattr(dataOut, 'parameters', [])
185 185 if hasattr(dataOut, 'pairsList'):
186 186 self.pairs = dataOut.pairsList
187 187 if hasattr(dataOut, 'meta'):
188 188 self.meta = dataOut.meta
189 189 self.channels = dataOut.channelList
190 190 self.interval = dataOut.getTimeInterval()
191 191 self.localtime = dataOut.useLocalTime
192 192 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
193 193 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
194 194 self.__heights.append(dataOut.heightList)
195 195 self.__all_heights.update(dataOut.heightList)
196 196 self.__times.append(tm)
197 197
198 198 for plot in self.plottypes:
199 199 if plot == 'spc':
200 200 z = dataOut.data_spc/dataOut.normFactor
201 201 buffer = 10*numpy.log10(z)
202 202 if plot == 'cspc':
203 203 buffer = dataOut.data_cspc
204 204 if plot == 'noise':
205 205 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
206 206 if plot == 'rti':
207 207 buffer = dataOut.getPower()
208 208 if plot == 'snr_db':
209 209 buffer = dataOut.data_SNR
210 210 if plot == 'snr':
211 211 buffer = 10*numpy.log10(dataOut.data_SNR)
212 212 if plot == 'dop':
213 213 buffer = 10*numpy.log10(dataOut.data_DOP)
214 214 if plot == 'mean':
215 215 buffer = dataOut.data_MEAN
216 216 if plot == 'std':
217 217 buffer = dataOut.data_STD
218 218 if plot == 'coh':
219 219 buffer = dataOut.getCoherence()
220 220 if plot == 'phase':
221 221 buffer = dataOut.getCoherence(phase=True)
222 222 if plot == 'output':
223 223 buffer = dataOut.data_output
224 224 if plot == 'param':
225 225 buffer = dataOut.data_param
226 226
227 227 if 'spc' in plot:
228 228 self.data[plot] = buffer
229 229 else:
230 230 if self.buffering:
231 231 self.data[plot][tm] = buffer
232 232 else:
233 233 self.data[plot] = buffer
234 234
235 235 def normalize_heights(self):
236 236 '''
237 237 Ensure same-dimension of the data for different heighList
238 238 '''
239 239
240 240 H = numpy.array(list(self.__all_heights))
241 241 H.sort()
242 242 for key in self.data:
243 243 shape = self.shape(key)[:-1] + H.shape
244 244 for tm, obj in self.data[key].items():
245 245 h = self.__heights[self.__times.index(tm)]
246 246 if H.size == h.size:
247 247 continue
248 248 index = numpy.where(numpy.in1d(H, h))[0]
249 249 dummy = numpy.zeros(shape) + numpy.nan
250 250 if len(shape) == 2:
251 251 dummy[:, index] = obj
252 252 else:
253 253 dummy[index] = obj
254 254 self.data[key][tm] = dummy
255 255
256 256 self.__heights = [H for tm in self.__times]
257 257
258 258 def jsonify(self, decimate=False):
259 259 '''
260 260 Convert data to json
261 261 '''
262 262
263 263 data = {}
264 264 tm = self.times[-1]
265 265 dy = int(self.heights.size/MAXNUMY) + 1
266 266 for key in self.data:
267 267 if key in ('spc', 'cspc') or not self.buffering:
268 268 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
269 269 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
270 270 else:
271 271 data[key] = roundFloats(self.data[key][tm].tolist())
272 272
273 273 ret = {'data': data}
274 274 ret['exp_code'] = self.exp_code
275 275 ret['time'] = tm
276 276 ret['interval'] = self.interval
277 277 ret['localtime'] = self.localtime
278 278 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
279 279 if 'spc' in self.data or 'cspc' in self.data:
280 280 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
281 281 else:
282 282 ret['xrange'] = []
283 283 if hasattr(self, 'pairs'):
284 284 ret['pairs'] = self.pairs
285 285 else:
286 286 ret['pairs'] = []
287 287
288 288 for key, value in self.meta.items():
289 289 ret[key] = value
290 290
291 291 return json.dumps(ret)
292 292
293 293 @property
294 294 def times(self):
295 295 '''
296 296 Return the list of times of the current data
297 297 '''
298 298
299 299 ret = numpy.array(self.__times)
300 300 ret.sort()
301 301 return ret
302 302
303 303 @property
304 304 def heights(self):
305 305 '''
306 306 Return the list of heights of the current data
307 307 '''
308 308
309 309 return numpy.array(self.__heights[-1])
310 310
311 311 class PublishData(Operation):
312 312 '''
313 313 Operation to send data over zmq.
314 314 '''
315 315
316 316 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
317 317
318 318 def __init__(self, **kwargs):
319 319 """Inicio."""
320 320 Operation.__init__(self, **kwargs)
321 321 self.isConfig = False
322 322 self.client = None
323 323 self.zeromq = None
324 324 self.mqtt = None
325 325
326 326 def on_disconnect(self, client, userdata, rc):
327 327 if rc != 0:
328 328 log.warning('Unexpected disconnection.')
329 329 self.connect()
330 330
331 331 def connect(self):
332 332 log.warning('trying to connect')
333 333 try:
334 334 self.client.connect(
335 335 host=self.host,
336 336 port=self.port,
337 337 keepalive=60*10,
338 338 bind_address='')
339 339 self.client.loop_start()
340 340 # self.client.publish(
341 341 # self.topic + 'SETUP',
342 342 # json.dumps(setup),
343 343 # retain=True
344 344 # )
345 345 except:
346 346 log.error('MQTT Conection error.')
347 347 self.client = False
348 348
349 349 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
350 350 self.counter = 0
351 351 self.topic = kwargs.get('topic', 'schain')
352 352 self.delay = kwargs.get('delay', 0)
353 353 self.plottype = kwargs.get('plottype', 'spectra')
354 354 self.host = kwargs.get('host', "10.10.10.82")
355 355 self.port = kwargs.get('port', 3000)
356 356 self.clientId = clientId
357 357 self.cnt = 0
358 358 self.zeromq = zeromq
359 359 self.mqtt = kwargs.get('plottype', 0)
360 360 self.client = None
361 361 self.verbose = verbose
362 362 setup = []
363 363 if mqtt is 1:
364 364 self.client = mqtt.Client(
365 365 client_id=self.clientId + self.topic + 'SCHAIN',
366 366 clean_session=True)
367 367 self.client.on_disconnect = self.on_disconnect
368 368 self.connect()
369 369 for plot in self.plottype:
370 370 setup.append({
371 371 'plot': plot,
372 372 'topic': self.topic + plot,
373 373 'title': getattr(self, plot + '_' + 'title', False),
374 374 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
375 375 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
376 376 'xrange': getattr(self, plot + '_' + 'xrange', False),
377 377 'yrange': getattr(self, plot + '_' + 'yrange', False),
378 378 'zrange': getattr(self, plot + '_' + 'zrange', False),
379 379 })
380 380 if zeromq is 1:
381 381 context = zmq.Context()
382 382 self.zmq_socket = context.socket(zmq.PUSH)
383 383 server = kwargs.get('server', 'zmq.pipe')
384 384
385 385 if 'tcp://' in server:
386 386 address = server
387 387 else:
388 388 address = 'ipc:///tmp/%s' % server
389 389
390 390 self.zmq_socket.connect(address)
391 391 time.sleep(1)
392 392
393 393
394 394 def publish_data(self):
395 395 self.dataOut.finished = False
396 396 if self.mqtt is 1:
397 397 yData = self.dataOut.heightList[:2].tolist()
398 398 if self.plottype == 'spectra':
399 399 data = getattr(self.dataOut, 'data_spc')
400 400 z = data/self.dataOut.normFactor
401 401 zdB = 10*numpy.log10(z)
402 402 xlen, ylen = zdB[0].shape
403 403 dx = int(xlen/MAXNUMX) + 1
404 404 dy = int(ylen/MAXNUMY) + 1
405 405 Z = [0 for i in self.dataOut.channelList]
406 406 for i in self.dataOut.channelList:
407 407 Z[i] = zdB[i][::dx, ::dy].tolist()
408 408 payload = {
409 409 'timestamp': self.dataOut.utctime,
410 410 'data': roundFloats(Z),
411 411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
412 412 'interval': self.dataOut.getTimeInterval(),
413 413 'type': self.plottype,
414 414 'yData': yData
415 415 }
416 416
417 417 elif self.plottype in ('rti', 'power'):
418 418 data = getattr(self.dataOut, 'data_spc')
419 419 z = data/self.dataOut.normFactor
420 420 avg = numpy.average(z, axis=1)
421 421 avgdB = 10*numpy.log10(avg)
422 422 xlen, ylen = z[0].shape
423 423 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
424 424 AVG = [0 for i in self.dataOut.channelList]
425 425 for i in self.dataOut.channelList:
426 426 AVG[i] = avgdB[i][::dy].tolist()
427 427 payload = {
428 428 'timestamp': self.dataOut.utctime,
429 429 'data': roundFloats(AVG),
430 430 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
431 431 'interval': self.dataOut.getTimeInterval(),
432 432 'type': self.plottype,
433 433 'yData': yData
434 434 }
435 435 elif self.plottype == 'noise':
436 436 noise = self.dataOut.getNoise()/self.dataOut.normFactor
437 437 noisedB = 10*numpy.log10(noise)
438 438 payload = {
439 439 'timestamp': self.dataOut.utctime,
440 440 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
441 441 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
442 442 'interval': self.dataOut.getTimeInterval(),
443 443 'type': self.plottype,
444 444 'yData': yData
445 445 }
446 446 elif self.plottype == 'snr':
447 447 data = getattr(self.dataOut, 'data_SNR')
448 448 avgdB = 10*numpy.log10(data)
449 449
450 450 ylen = data[0].size
451 451 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
452 452 AVG = [0 for i in self.dataOut.channelList]
453 453 for i in self.dataOut.channelList:
454 454 AVG[i] = avgdB[i][::dy].tolist()
455 455 payload = {
456 456 'timestamp': self.dataOut.utctime,
457 457 'data': roundFloats(AVG),
458 458 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
459 459 'type': self.plottype,
460 460 'yData': yData
461 461 }
462 462 else:
463 463 print "Tipo de grafico invalido"
464 464 payload = {
465 465 'data': 'None',
466 466 'timestamp': 'None',
467 467 'type': None
468 468 }
469 469
470 470 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
471 471
472 472 if self.zeromq is 1:
473 473 if self.verbose:
474 474 log.log(
475 475 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
476 476 self.name
477 477 )
478 478 self.zmq_socket.send_pyobj(self.dataOut)
479 479
480 480 def run(self, dataOut, **kwargs):
481 481 self.dataOut = dataOut
482 482 if not self.isConfig:
483 483 self.setup(**kwargs)
484 484 self.isConfig = True
485 485
486 486 self.publish_data()
487 487 time.sleep(self.delay)
488 488
489 489 def close(self):
490 490 if self.zeromq is 1:
491 491 self.dataOut.finished = True
492 492 self.zmq_socket.send_pyobj(self.dataOut)
493 493 time.sleep(0.1)
494 494 self.zmq_socket.close()
495 495 if self.client:
496 496 self.client.loop_stop()
497 497 self.client.disconnect()
498 498
499 499
500 500 class ReceiverData(ProcessingUnit):
501 501
502 502 __attrs__ = ['server']
503 503
504 504 def __init__(self, **kwargs):
505 505
506 506 ProcessingUnit.__init__(self, **kwargs)
507 507
508 508 self.isConfig = False
509 509 server = kwargs.get('server', 'zmq.pipe')
510 510 if 'tcp://' in server:
511 511 address = server
512 512 else:
513 513 address = 'ipc:///tmp/%s' % server
514 514
515 515 self.address = address
516 516 self.dataOut = JROData()
517 517
518 518 def setup(self):
519 519
520 520 self.context = zmq.Context()
521 521 self.receiver = self.context.socket(zmq.PULL)
522 522 self.receiver.bind(self.address)
523 523 time.sleep(0.5)
524 524 log.success('ReceiverData from {}'.format(self.address))
525 525
526 526
527 527 def run(self):
528 528
529 529 if not self.isConfig:
530 530 self.setup()
531 531 self.isConfig = True
532 532
533 533 self.dataOut = self.receiver.recv_pyobj()
534 534 log.log('{} - {}'.format(self.dataOut.type,
535 535 self.dataOut.datatime.ctime(),),
536 536 'Receiving')
537 537
538 538
539 539 class PlotterReceiver(ProcessingUnit, Process):
540 540
541 541 throttle_value = 5
542 542 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
543 543 'exp_code', 'web_server', 'buffering']
544 544
545 545 def __init__(self, **kwargs):
546 546
547 547 ProcessingUnit.__init__(self, **kwargs)
548 548 Process.__init__(self)
549 549 self.mp = False
550 550 self.isConfig = False
551 551 self.isWebConfig = False
552 552 self.connections = 0
553 553 server = kwargs.get('server', 'zmq.pipe')
554 554 web_server = kwargs.get('web_server', None)
555 555 if 'tcp://' in server:
556 556 address = server
557 557 else:
558 558 address = 'ipc:///tmp/%s' % server
559 559 self.address = address
560 560 self.web_address = web_server
561 561 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
562 562 self.realtime = kwargs.get('realtime', False)
563 563 self.localtime = kwargs.get('localtime', True)
564 564 self.buffering = kwargs.get('buffering', True)
565 565 self.throttle_value = kwargs.get('throttle', 5)
566 566 self.exp_code = kwargs.get('exp_code', None)
567 567 self.sendData = self.initThrottle(self.throttle_value)
568 568 self.dates = []
569 569 self.setup()
570 570
571 571 def setup(self):
572 572
573 573 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
574 574 self.isConfig = True
575 575
576 576 def event_monitor(self, monitor):
577 577
578 578 events = {}
579 579
580 580 for name in dir(zmq):
581 581 if name.startswith('EVENT_'):
582 582 value = getattr(zmq, name)
583 583 events[value] = name
584 584
585 585 while monitor.poll():
586 586 evt = recv_monitor_message(monitor)
587 587 if evt['event'] == 32:
588 588 self.connections += 1
589 589 if evt['event'] == 512:
590 590 pass
591 591
592 592 evt.update({'description': events[evt['event']]})
593 593
594 594 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
595 595 break
596 596 monitor.close()
597 597 print('event monitor thread done!')
598 598
599 599 def initThrottle(self, throttle_value):
600 600
601 601 @throttle(seconds=throttle_value)
602 602 def sendDataThrottled(fn_sender, data):
603 603 fn_sender(data)
604 604
605 605 return sendDataThrottled
606 606
607 607 def send(self, data):
608 608 log.log('Sending {}'.format(data), self.name)
609 609 self.sender.send_pyobj(data)
610 610
611 611 def run(self):
612 612
613 613 log.log(
614 614 'Starting from {}'.format(self.address),
615 615 self.name
616 616 )
617 617
618 618 self.context = zmq.Context()
619 619 self.receiver = self.context.socket(zmq.PULL)
620 620 self.receiver.bind(self.address)
621 621 monitor = self.receiver.get_monitor_socket()
622 622 self.sender = self.context.socket(zmq.PUB)
623 623 if self.web_address:
624 624 log.success(
625 625 'Sending to web: {}'.format(self.web_address),
626 626 self.name
627 627 )
628 628 self.sender_web = self.context.socket(zmq.REQ)
629 629 self.sender_web.connect(self.web_address)
630 630 self.poll = zmq.Poller()
631 631 self.poll.register(self.sender_web, zmq.POLLIN)
632 632 time.sleep(1)
633 633
634 634 if 'server' in self.kwargs:
635 635 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
636 636 else:
637 637 self.sender.bind("ipc:///tmp/zmq.plots")
638 638
639 639 time.sleep(2)
640 640
641 641 t = Thread(target=self.event_monitor, args=(monitor,))
642 642 t.start()
643 643
644 644 while True:
645 645 dataOut = self.receiver.recv_pyobj()
646 646 if not dataOut.flagNoData:
647 647 if dataOut.type == 'Parameters':
648 648 tm = dataOut.utctimeInit
649 649 else:
650 650 tm = dataOut.utctime
651 651 if dataOut.useLocalTime:
652 652 if not self.localtime:
653 653 tm += time.timezone
654 654 dt = datetime.datetime.fromtimestamp(tm).date()
655 655 else:
656 656 if self.localtime:
657 657 tm -= time.timezone
658 658 dt = datetime.datetime.utcfromtimestamp(tm).date()
659 659 coerce = False
660 660 if dt not in self.dates:
661 661 if self.data:
662 662 self.data.ended = True
663 663 self.send(self.data)
664 664 coerce = True
665 665 self.data.setup()
666 666 self.dates.append(dt)
667 667
668 668 self.data.update(dataOut, tm)
669 669
670 670 if dataOut.finished is True:
671 671 self.connections -= 1
672 672 if self.connections == 0 and dt in self.dates:
673 673 self.data.ended = True
674 674 self.send(self.data)
675 675 # self.data.setup()
676 676 time.sleep(1)
677 677 break
678 678 else:
679 679 if self.realtime:
680 680 self.send(self.data)
681 681 if self.web_address:
682 682 retries = 5
683 683 while True:
684 684 self.sender_web.send(self.data.jsonify())
685 685 socks = dict(self.poll.poll(5000))
686 686 if socks.get(self.sender_web) == zmq.POLLIN:
687 687 reply = self.sender_web.recv_string()
688 688 if reply == 'ok':
689 log.log("Response from server ok", self.name)
689 690 break
690 691 else:
691 print("Malformed reply from server: %s" % reply)
692 log.warning("Malformed reply from server: {}".format(reply), self.name)
692 693
693 694 else:
694 print("No response from server, retrying...")
695 log.warning("No response from server, retrying...", self.name)
695 696 self.sender_web.setsockopt(zmq.LINGER, 0)
696 697 self.sender_web.close()
697 698 self.poll.unregister(self.sender_web)
698 699 retries -= 1
699 700 if retries == 0:
700 print("Server seems to be offline, abandoning")
701 log.error("Server seems to be offline, abandoning", self.name)
702 self.sender_web = self.context.socket(zmq.REQ)
703 self.sender_web.connect(self.web_address)
704 self.poll.register(self.sender_web, zmq.POLLIN)
705 time.sleep(1)
701 706 break
702 707 self.sender_web = self.context.socket(zmq.REQ)
703 708 self.sender_web.connect(self.web_address)
704 709 self.poll.register(self.sender_web, zmq.POLLIN)
705 710 time.sleep(1)
706 711 else:
707 712 self.sendData(self.send, self.data, coerce=coerce)
708 713 coerce = False
709 714
710 715 return
711 716
712 717
713 718 class SendToFTP(Operation, Process):
714 719
715 720 '''
716 721 Operation to send data over FTP.
717 722 '''
718 723
719 724 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
720 725
721 726 def __init__(self, **kwargs):
722 727 '''
723 728 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
724 729 '''
725 730 Operation.__init__(self, **kwargs)
726 731 Process.__init__(self)
727 732 self.server = kwargs.get('server')
728 733 self.username = kwargs.get('username')
729 734 self.password = kwargs.get('password')
730 735 self.patterns = kwargs.get('patterns')
731 736 self.timeout = kwargs.get('timeout', 30)
732 737 self.times = [time.time() for p in self.patterns]
733 738 self.latest = ['' for p in self.patterns]
734 739 self.mp = False
735 740 self.ftp = None
736 741
737 742 def setup(self):
738 743
739 744 log.log('Connecting to ftp://{}'.format(self.server), self.name)
740 745 try:
741 746 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
742 747 except ftplib.all_errors:
743 748 log.error('Server connection fail: {}'.format(self.server), self.name)
744 749 if self.ftp is not None:
745 750 self.ftp.close()
746 751 self.ftp = None
747 752 self.isConfig = False
748 753 return
749 754
750 755 try:
751 756 self.ftp.login(self.username, self.password)
752 757 except ftplib.all_errors:
753 758 log.error('The given username y/o password are incorrect', self.name)
754 759 if self.ftp is not None:
755 760 self.ftp.close()
756 761 self.ftp = None
757 762 self.isConfig = False
758 763 return
759 764
760 765 log.success('Connection success', self.name)
761 766 self.isConfig = True
762 767 return
763 768
764 769 def check(self):
765 770
766 771 try:
767 772 self.ftp.voidcmd("NOOP")
768 773 except:
769 774 log.warning('Connection lost... trying to reconnect', self.name)
770 775 if self.ftp is not None:
771 776 self.ftp.close()
772 777 self.ftp = None
773 778 self.setup()
774 779
775 780 def find_files(self, path, ext):
776 781
777 782 files = glob.glob1(path, '*{}'.format(ext))
778 783 files.sort()
779 784 if files:
780 785 return files[-1]
781 786 return None
782 787
783 788 def getftpname(self, filename, exp_code, sub_exp_code):
784 789
785 790 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
786 791 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
787 792 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
788 793 exp_code = '%3.3d'%exp_code
789 794 sub_exp_code = '%2.2d'%sub_exp_code
790 795 plot_code = '%2.2d'% get_plot_code(filename)
791 796 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
792 797 return name
793 798
794 799 def upload(self, src, dst):
795 800
796 801 log.log('Uploading {} '.format(src), self.name, nl=False)
797 802
798 803 fp = open(src, 'rb')
799 804 command = 'STOR {}'.format(dst)
800 805
801 806 try:
802 807 self.ftp.storbinary(command, fp, blocksize=1024)
803 808 except Exception, e:
804 809 log.error('{}'.format(e), self.name)
805 810 if self.ftp is not None:
806 811 self.ftp.close()
807 812 self.ftp = None
808 813 return 0
809 814
810 815 try:
811 816 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
812 817 except Exception, e:
813 818 log.error('{}'.format(e), self.name)
814 819 if self.ftp is not None:
815 820 self.ftp.close()
816 821 self.ftp = None
817 822 return 0
818 823
819 824 fp.close()
820 825 log.success('OK', tag='')
821 826 return 1
822 827
823 828 def send_files(self):
824 829
825 830 for x, pattern in enumerate(self.patterns):
826 831 local, remote, ext, delay, exp_code, sub_exp_code = pattern
827 832 if time.time()-self.times[x] >= delay:
828 833 srcname = self.find_files(local, ext)
829 834 src = os.path.join(local, srcname)
830 835 if os.path.getmtime(src) < time.time() - 30*60:
831 836 continue
832 837
833 838 if srcname is None or srcname == self.latest[x]:
834 839 continue
835 840
836 841 if 'png' in ext:
837 842 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
838 843 else:
839 844 dstname = srcname
840 845
841 846 dst = os.path.join(remote, dstname)
842 847
843 848 if self.upload(src, dst):
844 849 self.times[x] = time.time()
845 850 self.latest[x] = srcname
846 851 else:
847 852 self.isConfig = False
848 853 break
849 854
850 855 def run(self):
851 856
852 857 while True:
853 858 if not self.isConfig:
854 859 self.setup()
855 860 if self.ftp is not None:
856 861 self.check()
857 862 self.send_files()
858 863 time.sleep(10)
859 864
860 865 def close():
861 866
862 867 if self.ftp is not None:
863 868 self.ftp.close()
864 869 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now