##// END OF EJS Templates
Hot fix
Juan C. Espinoza -
r1166:ce0987c48654
parent child
Show More
@@ -1,350 +1,351
1 '''
1 '''
2 Created on Jan 15, 2018
2 Created on Jan 15, 2018
3
3
4 @author: Juan C. Espinoza
4 @author: Juan C. Espinoza
5 '''
5 '''
6
6
7 import os
7 import os
8 import sys
8 import sys
9 import time
9 import time
10 import glob
10 import glob
11 import datetime
11 import datetime
12 import tarfile
12 import tarfile
13
13
14 import numpy
14 import numpy
15 try:
16 from netCDF4 import Dataset
17 except:
18 log.warning(
19 'You should install "netCDF4" module if you want to read/write NCDF files'
20 )
21
15
22 from utils import folder_in_range
16 from utils import folder_in_range
23
17
24 from schainpy.model.io.jroIO_base import JRODataReader
18 from schainpy.model.io.jroIO_base import JRODataReader
25 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
26 from schainpy.model.data.jrodata import Parameters
20 from schainpy.model.data.jrodata import Parameters
27 from schainpy.utils import log
21 from schainpy.utils import log
28
22
23 try:
24 from netCDF4 import Dataset
25 except:
26 log.warning(
27 'You should install "netCDF4" module if you want to read/write NCDF files'
28 )
29
29 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
30 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
30
31
31
32
32 class PXReader(JRODataReader, ProcessingUnit):
33 class PXReader(JRODataReader, ProcessingUnit):
33
34
34 def __init__(self, **kwargs):
35 def __init__(self, **kwargs):
35
36
36 ProcessingUnit.__init__(self, **kwargs)
37 ProcessingUnit.__init__(self, **kwargs)
37
38
38 self.dataOut = Parameters()
39 self.dataOut = Parameters()
39 self.counter_records = 0
40 self.counter_records = 0
40 self.nrecords = None
41 self.nrecords = None
41 self.flagNoMoreFiles = 0
42 self.flagNoMoreFiles = 0
42 self.isConfig = False
43 self.isConfig = False
43 self.filename = None
44 self.filename = None
44 self.intervals = set()
45 self.intervals = set()
45 self.ext = ('.nc', '.tgz')
46 self.ext = ('.nc', '.tgz')
46 self.online_mode = False
47 self.online_mode = False
47
48
48 def setup(self,
49 def setup(self,
49 path=None,
50 path=None,
50 startDate=None,
51 startDate=None,
51 endDate=None,
52 endDate=None,
52 format=None,
53 format=None,
53 startTime=datetime.time(0, 0, 0),
54 startTime=datetime.time(0, 0, 0),
54 endTime=datetime.time(23, 59, 59),
55 endTime=datetime.time(23, 59, 59),
55 walk=False,
56 walk=False,
56 **kwargs):
57 **kwargs):
57
58
58 self.path = path
59 self.path = path
59 self.startDate = startDate
60 self.startDate = startDate
60 self.endDate = endDate
61 self.endDate = endDate
61 self.startTime = startTime
62 self.startTime = startTime
62 self.endTime = endTime
63 self.endTime = endTime
63 self.datatime = datetime.datetime(1900,1,1)
64 self.datatime = datetime.datetime(1900,1,1)
64 self.walk = walk
65 self.walk = walk
65 self.nTries = kwargs.get('nTries', 10)
66 self.nTries = kwargs.get('nTries', 10)
66 self.online = kwargs.get('online', False)
67 self.online = kwargs.get('online', False)
67 self.delay = kwargs.get('delay', 60)
68 self.delay = kwargs.get('delay', 60)
68 self.ele = kwargs.get('ext', '')
69 self.ele = kwargs.get('ext', '')
69
70
70 if self.path is None:
71 if self.path is None:
71 raise ValueError, 'The path is not valid'
72 raise ValueError, 'The path is not valid'
72
73
73 self.search_files(path, startDate, endDate, startTime, endTime, walk)
74 self.search_files(path, startDate, endDate, startTime, endTime, walk)
74 self.cursor = 0
75 self.cursor = 0
75 self.counter_records = 0
76 self.counter_records = 0
76
77
77 if not self.files:
78 if not self.files:
78 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
79 raise Warning, 'There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path)
79
80
80 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
81 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
81 '''
82 '''
82 Searching for NCDF files in path
83 Searching for NCDF files in path
83 Creating a list of files to procces included in [startDate,endDate]
84 Creating a list of files to procces included in [startDate,endDate]
84
85
85 Input:
86 Input:
86 path - Path to find files
87 path - Path to find files
87 '''
88 '''
88
89
89 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
90 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
90 if walk:
91 if walk:
91 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
92 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
92 paths.sort()
93 paths.sort()
93 else:
94 else:
94 paths = [path]
95 paths = [path]
95
96
96 fileList0 = []
97 fileList0 = []
97
98
98 for subpath in paths:
99 for subpath in paths:
99 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
100 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
100 continue
101 continue
101 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
102 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
102
103
103 fileList0.sort()
104 fileList0.sort()
104 if self.online:
105 if self.online:
105 fileList0 = fileList0[-1:]
106 fileList0 = fileList0[-1:]
106
107
107 self.files = {}
108 self.files = {}
108
109
109 startDate = startDate - datetime.timedelta(1)
110 startDate = startDate - datetime.timedelta(1)
110 endDate = endDate + datetime.timedelta(1)
111 endDate = endDate + datetime.timedelta(1)
111
112
112 for fullname in fileList0:
113 for fullname in fileList0:
113 thisFile = fullname.split('/')[-1]
114 thisFile = fullname.split('/')[-1]
114 year = thisFile[3:7]
115 year = thisFile[3:7]
115 if not year.isdigit():
116 if not year.isdigit():
116 continue
117 continue
117
118
118 month = thisFile[7:9]
119 month = thisFile[7:9]
119 if not month.isdigit():
120 if not month.isdigit():
120 continue
121 continue
121
122
122 day = thisFile[9:11]
123 day = thisFile[9:11]
123 if not day.isdigit():
124 if not day.isdigit():
124 continue
125 continue
125
126
126 year, month, day = int(year), int(month), int(day)
127 year, month, day = int(year), int(month), int(day)
127 dateFile = datetime.date(year, month, day)
128 dateFile = datetime.date(year, month, day)
128 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
129 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
129
130
130 if (startDate > dateFile) or (endDate < dateFile):
131 if (startDate > dateFile) or (endDate < dateFile):
131 continue
132 continue
132
133
133 dt = datetime.datetime.combine(dateFile, timeFile)
134 dt = datetime.datetime.combine(dateFile, timeFile)
134 if dt not in self.files:
135 if dt not in self.files:
135 self.files[dt] = []
136 self.files[dt] = []
136 self.files[dt].append(fullname)
137 self.files[dt].append(fullname)
137
138
138 self.dates = self.files.keys()
139 self.dates = self.files.keys()
139 self.dates.sort()
140 self.dates.sort()
140
141
141 return
142 return
142
143
143 def search_files_online(self):
144 def search_files_online(self):
144 '''
145 '''
145 Searching for NCDF files in online mode path
146 Searching for NCDF files in online mode path
146 Creating a list of files to procces included in [startDate,endDate]
147 Creating a list of files to procces included in [startDate,endDate]
147
148
148 Input:
149 Input:
149 path - Path to find files
150 path - Path to find files
150 '''
151 '''
151
152
152 self.files = {}
153 self.files = {}
153
154
154 for n in range(self.nTries):
155 for n in range(self.nTries):
155
156
156 if self.walk:
157 if self.walk:
157 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
158 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
158 paths.sort()
159 paths.sort()
159 path = paths[-1]
160 path = paths[-1]
160 else:
161 else:
161 path = self.path
162 path = self.path
162
163
163 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
164 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
164 new_files.sort()
165 new_files.sort()
165
166
166 for fullname in new_files:
167 for fullname in new_files:
167 thisFile = fullname.split('/')[-1]
168 thisFile = fullname.split('/')[-1]
168 year = thisFile[3:7]
169 year = thisFile[3:7]
169 if not year.isdigit():
170 if not year.isdigit():
170 continue
171 continue
171
172
172 month = thisFile[7:9]
173 month = thisFile[7:9]
173 if not month.isdigit():
174 if not month.isdigit():
174 continue
175 continue
175
176
176 day = thisFile[9:11]
177 day = thisFile[9:11]
177 if not day.isdigit():
178 if not day.isdigit():
178 continue
179 continue
179
180
180 year, month, day = int(year), int(month), int(day)
181 year, month, day = int(year), int(month), int(day)
181 dateFile = datetime.date(year, month, day)
182 dateFile = datetime.date(year, month, day)
182 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
183 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
183
184
184 dt = datetime.datetime.combine(dateFile, timeFile)
185 dt = datetime.datetime.combine(dateFile, timeFile)
185
186
186 if self.dt >= dt:
187 if self.dt >= dt:
187 continue
188 continue
188
189
189 if dt not in self.files:
190 if dt not in self.files:
190 self.dt = dt
191 self.dt = dt
191 self.files[dt] = []
192 self.files[dt] = []
192
193
193 self.files[dt].append(fullname)
194 self.files[dt].append(fullname)
194 break
195 break
195
196
196 if self.files:
197 if self.files:
197 break
198 break
198 else:
199 else:
199 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
200 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
200 time.sleep(self.delay)
201 time.sleep(self.delay)
201
202
202 if not self.files:
203 if not self.files:
203 return 0
204 return 0
204
205
205 self.dates = self.files.keys()
206 self.dates = self.files.keys()
206 self.dates.sort()
207 self.dates.sort()
207 self.cursor = 0
208 self.cursor = 0
208
209
209 return 1
210 return 1
210
211
211 def parseFile(self):
212 def parseFile(self):
212 '''
213 '''
213 '''
214 '''
214
215
215 header = {}
216 header = {}
216
217
217 for attr in self.fp.ncattrs():
218 for attr in self.fp.ncattrs():
218 header[str(attr)] = getattr(self.fp, attr)
219 header[str(attr)] = getattr(self.fp, attr)
219
220
220 self.header.append(header)
221 self.header.append(header)
221
222
222 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
223 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
223
224
224 def setNextFile(self):
225 def setNextFile(self):
225 '''
226 '''
226 Open next files for the current datetime
227 Open next files for the current datetime
227 '''
228 '''
228
229
229 cursor = self.cursor
230 cursor = self.cursor
230 if not self.online_mode:
231 if not self.online_mode:
231 if cursor == len(self.dates):
232 if cursor == len(self.dates):
232 if self.online:
233 if self.online:
233 cursor = 0
234 cursor = 0
234 self.dt = self.dates[cursor]
235 self.dt = self.dates[cursor]
235 self.online_mode = True
236 self.online_mode = True
236 if not self.search_files_online():
237 if not self.search_files_online():
237 log.success('No more files', 'PXReader')
238 log.success('No more files', 'PXReader')
238 return 0
239 return 0
239 else:
240 else:
240 log.success('No more files', 'PXReader')
241 log.success('No more files', 'PXReader')
241 self.flagNoMoreFiles = 1
242 self.flagNoMoreFiles = 1
242 return 0
243 return 0
243 else:
244 else:
244 if not self.search_files_online():
245 if not self.search_files_online():
245 return 0
246 return 0
246 cursor = self.cursor
247 cursor = self.cursor
247
248
248 self.data = {}
249 self.data = {}
249 self.header = []
250 self.header = []
250
251
251 for fullname in self.files[self.dates[cursor]]:
252 for fullname in self.files[self.dates[cursor]]:
252
253
253 log.log('Opening: {}'.format(fullname), 'PXReader')
254 log.log('Opening: {}'.format(fullname), 'PXReader')
254
255
255 if os.path.splitext(fullname)[-1] == '.tgz':
256 if os.path.splitext(fullname)[-1] == '.tgz':
256 tar = tarfile.open(fullname, 'r:gz')
257 tar = tarfile.open(fullname, 'r:gz')
257 tar.extractall('/tmp')
258 tar.extractall('/tmp')
258 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
259 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
259 else:
260 else:
260 files = [fullname]
261 files = [fullname]
261
262
262 for filename in files:
263 for filename in files:
263 if self.filename is not None:
264 if self.filename is not None:
264 self.fp.close()
265 self.fp.close()
265
266
266 self.filename = filename
267 self.filename = filename
267 self.filedate = self.dates[cursor]
268 self.filedate = self.dates[cursor]
268 self.fp = Dataset(self.filename, 'r')
269 self.fp = Dataset(self.filename, 'r')
269 self.parseFile()
270 self.parseFile()
270
271
271 self.counter_records += 1
272 self.counter_records += 1
272 self.cursor += 1
273 self.cursor += 1
273 return 1
274 return 1
274
275
275 def readNextFile(self):
276 def readNextFile(self):
276
277
277 while True:
278 while True:
278 self.flagDiscontinuousBlock = 0
279 self.flagDiscontinuousBlock = 0
279 if not self.setNextFile():
280 if not self.setNextFile():
280 return 0
281 return 0
281
282
282 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
283 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
283
284
284 if self.online:
285 if self.online:
285 break
286 break
286
287
287 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
288 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
288 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
289 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
289 log.warning(
290 log.warning(
290 'Reading Record No. {}/{} -> {} [Skipping]'.format(
291 'Reading Record No. {}/{} -> {} [Skipping]'.format(
291 self.counter_records,
292 self.counter_records,
292 self.nrecords,
293 self.nrecords,
293 self.datatime.ctime()),
294 self.datatime.ctime()),
294 'PXReader')
295 'PXReader')
295 continue
296 continue
296 break
297 break
297
298
298 log.log(
299 log.log(
299 'Reading Record No. {}/{} -> {}'.format(
300 'Reading Record No. {}/{} -> {}'.format(
300 self.counter_records,
301 self.counter_records,
301 self.nrecords,
302 self.nrecords,
302 self.datatime.ctime()),
303 self.datatime.ctime()),
303 'PXReader')
304 'PXReader')
304
305
305 return 1
306 return 1
306
307
307
308
308 def set_output(self):
309 def set_output(self):
309 '''
310 '''
310 Storing data from buffer to dataOut object
311 Storing data from buffer to dataOut object
311 '''
312 '''
312
313
313 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
314 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
314 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
315 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
315 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
316 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
316 self.dataOut.data = self.data
317 self.dataOut.data = self.data
317 self.dataOut.units = [h['Unit-value'] for h in self.header]
318 self.dataOut.units = [h['Unit-value'] for h in self.header]
318 self.dataOut.parameters = [h['TypeName'] for h in self.header]
319 self.dataOut.parameters = [h['TypeName'] for h in self.header]
319 self.dataOut.missing = self.header[0]['MissingData']
320 self.dataOut.missing = self.header[0]['MissingData']
320 self.dataOut.max_range = self.header[0]['MaximumRange-value']
321 self.dataOut.max_range = self.header[0]['MaximumRange-value']
321 self.dataOut.elevation = self.header[0]['Elevation']
322 self.dataOut.elevation = self.header[0]['Elevation']
322 self.dataOut.azimuth = self.header[0]['Azimuth']
323 self.dataOut.azimuth = self.header[0]['Azimuth']
323 self.dataOut.latitude = self.header[0]['Latitude']
324 self.dataOut.latitude = self.header[0]['Latitude']
324 self.dataOut.longitude = self.header[0]['Longitude']
325 self.dataOut.longitude = self.header[0]['Longitude']
325 self.dataOut.utctime = self.header[0]['Time']
326 self.dataOut.utctime = self.header[0]['Time']
326 self.dataOut.utctimeInit = self.dataOut.utctime
327 self.dataOut.utctimeInit = self.dataOut.utctime
327 self.dataOut.useLocalTime = True
328 self.dataOut.useLocalTime = True
328 self.dataOut.flagNoData = False
329 self.dataOut.flagNoData = False
329 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
330 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
330
331
331 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
332 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
332 'PXReader')
333 'PXReader')
333
334
334 def getData(self):
335 def getData(self):
335 '''
336 '''
336 Storing data from databuffer to dataOut object
337 Storing data from databuffer to dataOut object
337 '''
338 '''
338 if self.flagNoMoreFiles:
339 if self.flagNoMoreFiles:
339 self.dataOut.flagNoData = True
340 self.dataOut.flagNoData = True
340 log.error('No file left to process', 'PXReader')
341 log.error('No file left to process', 'PXReader')
341 return 0
342 return 0
342
343
343 if not self.readNextFile():
344 if not self.readNextFile():
344 self.dataOut.flagNoData = True
345 self.dataOut.flagNoData = True
345 return 0
346 return 0
346
347
347 self.set_output()
348 self.set_output()
348
349
349 return 1
350 return 1
350
351
@@ -1,864 +1,869
1 '''
1 '''
2 @author: Juan C. Espinoza
2 @author: Juan C. Espinoza
3 '''
3 '''
4
4
5 import os
5 import os
6 import glob
6 import glob
7 import time
7 import time
8 import json
8 import json
9 import numpy
9 import numpy
10 import paho.mqtt.client as mqtt
10 import paho.mqtt.client as mqtt
11 import zmq
11 import zmq
12 import datetime
12 import datetime
13 import ftplib
13 import ftplib
14 from zmq.utils.monitor import recv_monitor_message
14 from zmq.utils.monitor import recv_monitor_message
15 from functools import wraps
15 from functools import wraps
16 from threading import Thread
16 from threading import Thread
17 from multiprocessing import Process
17 from multiprocessing import Process
18
18
19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
19 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
20 from schainpy.model.data.jrodata import JROData
20 from schainpy.model.data.jrodata import JROData
21 from schainpy.utils import log
21 from schainpy.utils import log
22
22
23 MAXNUMX = 500
23 MAXNUMX = 500
24 MAXNUMY = 500
24 MAXNUMY = 500
25
25
26 PLOT_CODES = {
26 PLOT_CODES = {
27 'rti': 0, # Range time intensity (RTI).
27 'rti': 0, # Range time intensity (RTI).
28 'spc': 1, # Spectra (and Cross-spectra) information.
28 'spc': 1, # Spectra (and Cross-spectra) information.
29 'cspc': 2, # Cross-Correlation information.
29 'cspc': 2, # Cross-Correlation information.
30 'coh': 3, # Coherence map.
30 'coh': 3, # Coherence map.
31 'base': 4, # Base lines graphic.
31 'base': 4, # Base lines graphic.
32 'row': 5, # Row Spectra.
32 'row': 5, # Row Spectra.
33 'total': 6, # Total Power.
33 'total': 6, # Total Power.
34 'drift': 7, # Drifts graphics.
34 'drift': 7, # Drifts graphics.
35 'height': 8, # Height profile.
35 'height': 8, # Height profile.
36 'phase': 9, # Signal Phase.
36 'phase': 9, # Signal Phase.
37 'power': 16,
37 'power': 16,
38 'noise': 17,
38 'noise': 17,
39 'beacon': 18,
39 'beacon': 18,
40 'wind': 22,
40 'wind': 22,
41 'skymap': 23,
41 'skymap': 23,
42 'Unknown': 24,
42 'Unknown': 24,
43 'V-E': 25, # PIP Velocity.
43 'V-E': 25, # PIP Velocity.
44 'Z-E': 26, # PIP Reflectivity.
44 'Z-E': 26, # PIP Reflectivity.
45 'V-A': 27, # RHI Velocity.
45 'V-A': 27, # RHI Velocity.
46 'Z-A': 28, # RHI Reflectivity.
46 'Z-A': 28, # RHI Reflectivity.
47 }
47 }
48
48
49 def get_plot_code(s):
49 def get_plot_code(s):
50 label = s.split('_')[0]
50 label = s.split('_')[0]
51 codes = [key for key in PLOT_CODES if key in label]
51 codes = [key for key in PLOT_CODES if key in label]
52 if codes:
52 if codes:
53 return PLOT_CODES[codes[0]]
53 return PLOT_CODES[codes[0]]
54 else:
54 else:
55 return 24
55 return 24
56
56
57 def roundFloats(obj):
57 def roundFloats(obj):
58 if isinstance(obj, list):
58 if isinstance(obj, list):
59 return map(roundFloats, obj)
59 return map(roundFloats, obj)
60 elif isinstance(obj, float):
60 elif isinstance(obj, float):
61 return round(obj, 2)
61 return round(obj, 2)
62
62
63 def decimate(z, MAXNUMY):
63 def decimate(z, MAXNUMY):
64 dy = int(len(z[0])/MAXNUMY) + 1
64 dy = int(len(z[0])/MAXNUMY) + 1
65
65
66 return z[::, ::dy]
66 return z[::, ::dy]
67
67
68 class throttle(object):
68 class throttle(object):
69 '''
69 '''
70 Decorator that prevents a function from being called more than once every
70 Decorator that prevents a function from being called more than once every
71 time period.
71 time period.
72 To create a function that cannot be called more than once a minute, but
72 To create a function that cannot be called more than once a minute, but
73 will sleep until it can be called:
73 will sleep until it can be called:
74 @throttle(minutes=1)
74 @throttle(minutes=1)
75 def foo():
75 def foo():
76 pass
76 pass
77
77
78 for i in range(10):
78 for i in range(10):
79 foo()
79 foo()
80 print "This function has run %s times." % i
80 print "This function has run %s times." % i
81 '''
81 '''
82
82
83 def __init__(self, seconds=0, minutes=0, hours=0):
83 def __init__(self, seconds=0, minutes=0, hours=0):
84 self.throttle_period = datetime.timedelta(
84 self.throttle_period = datetime.timedelta(
85 seconds=seconds, minutes=minutes, hours=hours
85 seconds=seconds, minutes=minutes, hours=hours
86 )
86 )
87
87
88 self.time_of_last_call = datetime.datetime.min
88 self.time_of_last_call = datetime.datetime.min
89
89
90 def __call__(self, fn):
90 def __call__(self, fn):
91 @wraps(fn)
91 @wraps(fn)
92 def wrapper(*args, **kwargs):
92 def wrapper(*args, **kwargs):
93 coerce = kwargs.pop('coerce', None)
93 coerce = kwargs.pop('coerce', None)
94 if coerce:
94 if coerce:
95 self.time_of_last_call = datetime.datetime.now()
95 self.time_of_last_call = datetime.datetime.now()
96 return fn(*args, **kwargs)
96 return fn(*args, **kwargs)
97 else:
97 else:
98 now = datetime.datetime.now()
98 now = datetime.datetime.now()
99 time_since_last_call = now - self.time_of_last_call
99 time_since_last_call = now - self.time_of_last_call
100 time_left = self.throttle_period - time_since_last_call
100 time_left = self.throttle_period - time_since_last_call
101
101
102 if time_left > datetime.timedelta(seconds=0):
102 if time_left > datetime.timedelta(seconds=0):
103 return
103 return
104
104
105 self.time_of_last_call = datetime.datetime.now()
105 self.time_of_last_call = datetime.datetime.now()
106 return fn(*args, **kwargs)
106 return fn(*args, **kwargs)
107
107
108 return wrapper
108 return wrapper
109
109
110 class Data(object):
110 class Data(object):
111 '''
111 '''
112 Object to hold data to be plotted
112 Object to hold data to be plotted
113 '''
113 '''
114
114
115 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
115 def __init__(self, plottypes, throttle_value, exp_code, buffering=True):
116 self.plottypes = plottypes
116 self.plottypes = plottypes
117 self.throttle = throttle_value
117 self.throttle = throttle_value
118 self.exp_code = exp_code
118 self.exp_code = exp_code
119 self.buffering = buffering
119 self.buffering = buffering
120 self.ended = False
120 self.ended = False
121 self.localtime = False
121 self.localtime = False
122 self.meta = {}
122 self.meta = {}
123 self.__times = []
123 self.__times = []
124 self.__heights = []
124 self.__heights = []
125
125
126 def __str__(self):
126 def __str__(self):
127 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
127 dum = ['{}{}'.format(key, self.shape(key)) for key in self.data]
128 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
128 return 'Data[{}][{}]'.format(';'.join(dum), len(self.__times))
129
129
130 def __len__(self):
130 def __len__(self):
131 return len(self.__times)
131 return len(self.__times)
132
132
133 def __getitem__(self, key):
133 def __getitem__(self, key):
134 if key not in self.data:
134 if key not in self.data:
135 raise KeyError(log.error('Missing key: {}'.format(key)))
135 raise KeyError(log.error('Missing key: {}'.format(key)))
136
136
137 if 'spc' in key or not self.buffering:
137 if 'spc' in key or not self.buffering:
138 ret = self.data[key]
138 ret = self.data[key]
139 else:
139 else:
140 ret = numpy.array([self.data[key][x] for x in self.times])
140 ret = numpy.array([self.data[key][x] for x in self.times])
141 if ret.ndim > 1:
141 if ret.ndim > 1:
142 ret = numpy.swapaxes(ret, 0, 1)
142 ret = numpy.swapaxes(ret, 0, 1)
143 return ret
143 return ret
144
144
145 def __contains__(self, key):
145 def __contains__(self, key):
146 return key in self.data
146 return key in self.data
147
147
148 def setup(self):
148 def setup(self):
149 '''
149 '''
150 Configure object
150 Configure object
151 '''
151 '''
152
152
153 self.type = ''
153 self.type = ''
154 self.ended = False
154 self.ended = False
155 self.data = {}
155 self.data = {}
156 self.__times = []
156 self.__times = []
157 self.__heights = []
157 self.__heights = []
158 self.__all_heights = set()
158 self.__all_heights = set()
159 for plot in self.plottypes:
159 for plot in self.plottypes:
160 if 'snr' in plot:
160 if 'snr' in plot:
161 plot = 'snr'
161 plot = 'snr'
162 self.data[plot] = {}
162 self.data[plot] = {}
163
163
164 def shape(self, key):
164 def shape(self, key):
165 '''
165 '''
166 Get the shape of the one-element data for the given key
166 Get the shape of the one-element data for the given key
167 '''
167 '''
168
168
169 if len(self.data[key]):
169 if len(self.data[key]):
170 if 'spc' in key or not self.buffering:
170 if 'spc' in key or not self.buffering:
171 return self.data[key].shape
171 return self.data[key].shape
172 return self.data[key][self.__times[0]].shape
172 return self.data[key][self.__times[0]].shape
173 return (0,)
173 return (0,)
174
174
175 def update(self, dataOut, tm):
175 def update(self, dataOut, tm):
176 '''
176 '''
177 Update data object with new dataOut
177 Update data object with new dataOut
178 '''
178 '''
179
179
180 if tm in self.__times:
180 if tm in self.__times:
181 return
181 return
182
182
183 self.type = dataOut.type
183 self.type = dataOut.type
184 self.parameters = getattr(dataOut, 'parameters', [])
184 self.parameters = getattr(dataOut, 'parameters', [])
185 if hasattr(dataOut, 'pairsList'):
185 if hasattr(dataOut, 'pairsList'):
186 self.pairs = dataOut.pairsList
186 self.pairs = dataOut.pairsList
187 if hasattr(dataOut, 'meta'):
187 if hasattr(dataOut, 'meta'):
188 self.meta = dataOut.meta
188 self.meta = dataOut.meta
189 self.channels = dataOut.channelList
189 self.channels = dataOut.channelList
190 self.interval = dataOut.getTimeInterval()
190 self.interval = dataOut.getTimeInterval()
191 self.localtime = dataOut.useLocalTime
191 self.localtime = dataOut.useLocalTime
192 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
192 if 'spc' in self.plottypes or 'cspc' in self.plottypes:
193 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
193 self.xrange = (dataOut.getFreqRange(1)/1000., dataOut.getAcfRange(1), dataOut.getVelRange(1))
194 self.__heights.append(dataOut.heightList)
194 self.__heights.append(dataOut.heightList)
195 self.__all_heights.update(dataOut.heightList)
195 self.__all_heights.update(dataOut.heightList)
196 self.__times.append(tm)
196 self.__times.append(tm)
197
197
198 for plot in self.plottypes:
198 for plot in self.plottypes:
199 if plot == 'spc':
199 if plot == 'spc':
200 z = dataOut.data_spc/dataOut.normFactor
200 z = dataOut.data_spc/dataOut.normFactor
201 buffer = 10*numpy.log10(z)
201 buffer = 10*numpy.log10(z)
202 if plot == 'cspc':
202 if plot == 'cspc':
203 buffer = dataOut.data_cspc
203 buffer = dataOut.data_cspc
204 if plot == 'noise':
204 if plot == 'noise':
205 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
205 buffer = 10*numpy.log10(dataOut.getNoise()/dataOut.normFactor)
206 if plot == 'rti':
206 if plot == 'rti':
207 buffer = dataOut.getPower()
207 buffer = dataOut.getPower()
208 if plot == 'snr_db':
208 if plot == 'snr_db':
209 buffer = dataOut.data_SNR
209 buffer = dataOut.data_SNR
210 if plot == 'snr':
210 if plot == 'snr':
211 buffer = 10*numpy.log10(dataOut.data_SNR)
211 buffer = 10*numpy.log10(dataOut.data_SNR)
212 if plot == 'dop':
212 if plot == 'dop':
213 buffer = 10*numpy.log10(dataOut.data_DOP)
213 buffer = 10*numpy.log10(dataOut.data_DOP)
214 if plot == 'mean':
214 if plot == 'mean':
215 buffer = dataOut.data_MEAN
215 buffer = dataOut.data_MEAN
216 if plot == 'std':
216 if plot == 'std':
217 buffer = dataOut.data_STD
217 buffer = dataOut.data_STD
218 if plot == 'coh':
218 if plot == 'coh':
219 buffer = dataOut.getCoherence()
219 buffer = dataOut.getCoherence()
220 if plot == 'phase':
220 if plot == 'phase':
221 buffer = dataOut.getCoherence(phase=True)
221 buffer = dataOut.getCoherence(phase=True)
222 if plot == 'output':
222 if plot == 'output':
223 buffer = dataOut.data_output
223 buffer = dataOut.data_output
224 if plot == 'param':
224 if plot == 'param':
225 buffer = dataOut.data_param
225 buffer = dataOut.data_param
226
226
227 if 'spc' in plot:
227 if 'spc' in plot:
228 self.data[plot] = buffer
228 self.data[plot] = buffer
229 else:
229 else:
230 if self.buffering:
230 if self.buffering:
231 self.data[plot][tm] = buffer
231 self.data[plot][tm] = buffer
232 else:
232 else:
233 self.data[plot] = buffer
233 self.data[plot] = buffer
234
234
235 def normalize_heights(self):
235 def normalize_heights(self):
236 '''
236 '''
237 Ensure same-dimension of the data for different heighList
237 Ensure same-dimension of the data for different heighList
238 '''
238 '''
239
239
240 H = numpy.array(list(self.__all_heights))
240 H = numpy.array(list(self.__all_heights))
241 H.sort()
241 H.sort()
242 for key in self.data:
242 for key in self.data:
243 shape = self.shape(key)[:-1] + H.shape
243 shape = self.shape(key)[:-1] + H.shape
244 for tm, obj in self.data[key].items():
244 for tm, obj in self.data[key].items():
245 h = self.__heights[self.__times.index(tm)]
245 h = self.__heights[self.__times.index(tm)]
246 if H.size == h.size:
246 if H.size == h.size:
247 continue
247 continue
248 index = numpy.where(numpy.in1d(H, h))[0]
248 index = numpy.where(numpy.in1d(H, h))[0]
249 dummy = numpy.zeros(shape) + numpy.nan
249 dummy = numpy.zeros(shape) + numpy.nan
250 if len(shape) == 2:
250 if len(shape) == 2:
251 dummy[:, index] = obj
251 dummy[:, index] = obj
252 else:
252 else:
253 dummy[index] = obj
253 dummy[index] = obj
254 self.data[key][tm] = dummy
254 self.data[key][tm] = dummy
255
255
256 self.__heights = [H for tm in self.__times]
256 self.__heights = [H for tm in self.__times]
257
257
258 def jsonify(self, decimate=False):
258 def jsonify(self, decimate=False):
259 '''
259 '''
260 Convert data to json
260 Convert data to json
261 '''
261 '''
262
262
263 data = {}
263 data = {}
264 tm = self.times[-1]
264 tm = self.times[-1]
265 dy = int(self.heights.size/MAXNUMY) + 1
265 dy = int(self.heights.size/MAXNUMY) + 1
266 for key in self.data:
266 for key in self.data:
267 if key in ('spc', 'cspc') or not self.buffering:
267 if key in ('spc', 'cspc') or not self.buffering:
268 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
268 dx = int(self.data[key].shape[1]/MAXNUMX) + 1
269 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
269 data[key] = roundFloats(self.data[key][::, ::dx, ::dy].tolist())
270 else:
270 else:
271 data[key] = roundFloats(self.data[key][tm].tolist())
271 data[key] = roundFloats(self.data[key][tm].tolist())
272
272
273 ret = {'data': data}
273 ret = {'data': data}
274 ret['exp_code'] = self.exp_code
274 ret['exp_code'] = self.exp_code
275 ret['time'] = tm
275 ret['time'] = tm
276 ret['interval'] = self.interval
276 ret['interval'] = self.interval
277 ret['localtime'] = self.localtime
277 ret['localtime'] = self.localtime
278 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
278 ret['yrange'] = roundFloats(self.heights[::dy].tolist())
279 if 'spc' in self.data or 'cspc' in self.data:
279 if 'spc' in self.data or 'cspc' in self.data:
280 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
280 ret['xrange'] = roundFloats(self.xrange[2][::dx].tolist())
281 else:
281 else:
282 ret['xrange'] = []
282 ret['xrange'] = []
283 if hasattr(self, 'pairs'):
283 if hasattr(self, 'pairs'):
284 ret['pairs'] = self.pairs
284 ret['pairs'] = self.pairs
285 else:
285 else:
286 ret['pairs'] = []
286 ret['pairs'] = []
287
287
288 for key, value in self.meta.items():
288 for key, value in self.meta.items():
289 ret[key] = value
289 ret[key] = value
290
290
291 return json.dumps(ret)
291 return json.dumps(ret)
292
292
293 @property
293 @property
294 def times(self):
294 def times(self):
295 '''
295 '''
296 Return the list of times of the current data
296 Return the list of times of the current data
297 '''
297 '''
298
298
299 ret = numpy.array(self.__times)
299 ret = numpy.array(self.__times)
300 ret.sort()
300 ret.sort()
301 return ret
301 return ret
302
302
303 @property
303 @property
304 def heights(self):
304 def heights(self):
305 '''
305 '''
306 Return the list of heights of the current data
306 Return the list of heights of the current data
307 '''
307 '''
308
308
309 return numpy.array(self.__heights[-1])
309 return numpy.array(self.__heights[-1])
310
310
311 class PublishData(Operation):
311 class PublishData(Operation):
312 '''
312 '''
313 Operation to send data over zmq.
313 Operation to send data over zmq.
314 '''
314 '''
315
315
316 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
316 __attrs__ = ['host', 'port', 'delay', 'zeromq', 'mqtt', 'verbose']
317
317
318 def __init__(self, **kwargs):
318 def __init__(self, **kwargs):
319 """Inicio."""
319 """Inicio."""
320 Operation.__init__(self, **kwargs)
320 Operation.__init__(self, **kwargs)
321 self.isConfig = False
321 self.isConfig = False
322 self.client = None
322 self.client = None
323 self.zeromq = None
323 self.zeromq = None
324 self.mqtt = None
324 self.mqtt = None
325
325
326 def on_disconnect(self, client, userdata, rc):
326 def on_disconnect(self, client, userdata, rc):
327 if rc != 0:
327 if rc != 0:
328 log.warning('Unexpected disconnection.')
328 log.warning('Unexpected disconnection.')
329 self.connect()
329 self.connect()
330
330
331 def connect(self):
331 def connect(self):
332 log.warning('trying to connect')
332 log.warning('trying to connect')
333 try:
333 try:
334 self.client.connect(
334 self.client.connect(
335 host=self.host,
335 host=self.host,
336 port=self.port,
336 port=self.port,
337 keepalive=60*10,
337 keepalive=60*10,
338 bind_address='')
338 bind_address='')
339 self.client.loop_start()
339 self.client.loop_start()
340 # self.client.publish(
340 # self.client.publish(
341 # self.topic + 'SETUP',
341 # self.topic + 'SETUP',
342 # json.dumps(setup),
342 # json.dumps(setup),
343 # retain=True
343 # retain=True
344 # )
344 # )
345 except:
345 except:
346 log.error('MQTT Conection error.')
346 log.error('MQTT Conection error.')
347 self.client = False
347 self.client = False
348
348
349 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
349 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, verbose=True, **kwargs):
350 self.counter = 0
350 self.counter = 0
351 self.topic = kwargs.get('topic', 'schain')
351 self.topic = kwargs.get('topic', 'schain')
352 self.delay = kwargs.get('delay', 0)
352 self.delay = kwargs.get('delay', 0)
353 self.plottype = kwargs.get('plottype', 'spectra')
353 self.plottype = kwargs.get('plottype', 'spectra')
354 self.host = kwargs.get('host', "10.10.10.82")
354 self.host = kwargs.get('host', "10.10.10.82")
355 self.port = kwargs.get('port', 3000)
355 self.port = kwargs.get('port', 3000)
356 self.clientId = clientId
356 self.clientId = clientId
357 self.cnt = 0
357 self.cnt = 0
358 self.zeromq = zeromq
358 self.zeromq = zeromq
359 self.mqtt = kwargs.get('plottype', 0)
359 self.mqtt = kwargs.get('plottype', 0)
360 self.client = None
360 self.client = None
361 self.verbose = verbose
361 self.verbose = verbose
362 setup = []
362 setup = []
363 if mqtt is 1:
363 if mqtt is 1:
364 self.client = mqtt.Client(
364 self.client = mqtt.Client(
365 client_id=self.clientId + self.topic + 'SCHAIN',
365 client_id=self.clientId + self.topic + 'SCHAIN',
366 clean_session=True)
366 clean_session=True)
367 self.client.on_disconnect = self.on_disconnect
367 self.client.on_disconnect = self.on_disconnect
368 self.connect()
368 self.connect()
369 for plot in self.plottype:
369 for plot in self.plottype:
370 setup.append({
370 setup.append({
371 'plot': plot,
371 'plot': plot,
372 'topic': self.topic + plot,
372 'topic': self.topic + plot,
373 'title': getattr(self, plot + '_' + 'title', False),
373 'title': getattr(self, plot + '_' + 'title', False),
374 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
374 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
375 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
375 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
376 'xrange': getattr(self, plot + '_' + 'xrange', False),
376 'xrange': getattr(self, plot + '_' + 'xrange', False),
377 'yrange': getattr(self, plot + '_' + 'yrange', False),
377 'yrange': getattr(self, plot + '_' + 'yrange', False),
378 'zrange': getattr(self, plot + '_' + 'zrange', False),
378 'zrange': getattr(self, plot + '_' + 'zrange', False),
379 })
379 })
380 if zeromq is 1:
380 if zeromq is 1:
381 context = zmq.Context()
381 context = zmq.Context()
382 self.zmq_socket = context.socket(zmq.PUSH)
382 self.zmq_socket = context.socket(zmq.PUSH)
383 server = kwargs.get('server', 'zmq.pipe')
383 server = kwargs.get('server', 'zmq.pipe')
384
384
385 if 'tcp://' in server:
385 if 'tcp://' in server:
386 address = server
386 address = server
387 else:
387 else:
388 address = 'ipc:///tmp/%s' % server
388 address = 'ipc:///tmp/%s' % server
389
389
390 self.zmq_socket.connect(address)
390 self.zmq_socket.connect(address)
391 time.sleep(1)
391 time.sleep(1)
392
392
393
393
394 def publish_data(self):
394 def publish_data(self):
395 self.dataOut.finished = False
395 self.dataOut.finished = False
396 if self.mqtt is 1:
396 if self.mqtt is 1:
397 yData = self.dataOut.heightList[:2].tolist()
397 yData = self.dataOut.heightList[:2].tolist()
398 if self.plottype == 'spectra':
398 if self.plottype == 'spectra':
399 data = getattr(self.dataOut, 'data_spc')
399 data = getattr(self.dataOut, 'data_spc')
400 z = data/self.dataOut.normFactor
400 z = data/self.dataOut.normFactor
401 zdB = 10*numpy.log10(z)
401 zdB = 10*numpy.log10(z)
402 xlen, ylen = zdB[0].shape
402 xlen, ylen = zdB[0].shape
403 dx = int(xlen/MAXNUMX) + 1
403 dx = int(xlen/MAXNUMX) + 1
404 dy = int(ylen/MAXNUMY) + 1
404 dy = int(ylen/MAXNUMY) + 1
405 Z = [0 for i in self.dataOut.channelList]
405 Z = [0 for i in self.dataOut.channelList]
406 for i in self.dataOut.channelList:
406 for i in self.dataOut.channelList:
407 Z[i] = zdB[i][::dx, ::dy].tolist()
407 Z[i] = zdB[i][::dx, ::dy].tolist()
408 payload = {
408 payload = {
409 'timestamp': self.dataOut.utctime,
409 'timestamp': self.dataOut.utctime,
410 'data': roundFloats(Z),
410 'data': roundFloats(Z),
411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
411 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
412 'interval': self.dataOut.getTimeInterval(),
412 'interval': self.dataOut.getTimeInterval(),
413 'type': self.plottype,
413 'type': self.plottype,
414 'yData': yData
414 'yData': yData
415 }
415 }
416
416
417 elif self.plottype in ('rti', 'power'):
417 elif self.plottype in ('rti', 'power'):
418 data = getattr(self.dataOut, 'data_spc')
418 data = getattr(self.dataOut, 'data_spc')
419 z = data/self.dataOut.normFactor
419 z = data/self.dataOut.normFactor
420 avg = numpy.average(z, axis=1)
420 avg = numpy.average(z, axis=1)
421 avgdB = 10*numpy.log10(avg)
421 avgdB = 10*numpy.log10(avg)
422 xlen, ylen = z[0].shape
422 xlen, ylen = z[0].shape
423 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
423 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
424 AVG = [0 for i in self.dataOut.channelList]
424 AVG = [0 for i in self.dataOut.channelList]
425 for i in self.dataOut.channelList:
425 for i in self.dataOut.channelList:
426 AVG[i] = avgdB[i][::dy].tolist()
426 AVG[i] = avgdB[i][::dy].tolist()
427 payload = {
427 payload = {
428 'timestamp': self.dataOut.utctime,
428 'timestamp': self.dataOut.utctime,
429 'data': roundFloats(AVG),
429 'data': roundFloats(AVG),
430 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
430 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
431 'interval': self.dataOut.getTimeInterval(),
431 'interval': self.dataOut.getTimeInterval(),
432 'type': self.plottype,
432 'type': self.plottype,
433 'yData': yData
433 'yData': yData
434 }
434 }
435 elif self.plottype == 'noise':
435 elif self.plottype == 'noise':
436 noise = self.dataOut.getNoise()/self.dataOut.normFactor
436 noise = self.dataOut.getNoise()/self.dataOut.normFactor
437 noisedB = 10*numpy.log10(noise)
437 noisedB = 10*numpy.log10(noise)
438 payload = {
438 payload = {
439 'timestamp': self.dataOut.utctime,
439 'timestamp': self.dataOut.utctime,
440 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
440 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
441 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
441 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
442 'interval': self.dataOut.getTimeInterval(),
442 'interval': self.dataOut.getTimeInterval(),
443 'type': self.plottype,
443 'type': self.plottype,
444 'yData': yData
444 'yData': yData
445 }
445 }
446 elif self.plottype == 'snr':
446 elif self.plottype == 'snr':
447 data = getattr(self.dataOut, 'data_SNR')
447 data = getattr(self.dataOut, 'data_SNR')
448 avgdB = 10*numpy.log10(data)
448 avgdB = 10*numpy.log10(data)
449
449
450 ylen = data[0].size
450 ylen = data[0].size
451 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
451 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
452 AVG = [0 for i in self.dataOut.channelList]
452 AVG = [0 for i in self.dataOut.channelList]
453 for i in self.dataOut.channelList:
453 for i in self.dataOut.channelList:
454 AVG[i] = avgdB[i][::dy].tolist()
454 AVG[i] = avgdB[i][::dy].tolist()
455 payload = {
455 payload = {
456 'timestamp': self.dataOut.utctime,
456 'timestamp': self.dataOut.utctime,
457 'data': roundFloats(AVG),
457 'data': roundFloats(AVG),
458 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
458 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
459 'type': self.plottype,
459 'type': self.plottype,
460 'yData': yData
460 'yData': yData
461 }
461 }
462 else:
462 else:
463 print "Tipo de grafico invalido"
463 print "Tipo de grafico invalido"
464 payload = {
464 payload = {
465 'data': 'None',
465 'data': 'None',
466 'timestamp': 'None',
466 'timestamp': 'None',
467 'type': None
467 'type': None
468 }
468 }
469
469
470 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
470 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
471
471
472 if self.zeromq is 1:
472 if self.zeromq is 1:
473 if self.verbose:
473 if self.verbose:
474 log.log(
474 log.log(
475 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
475 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
476 self.name
476 self.name
477 )
477 )
478 self.zmq_socket.send_pyobj(self.dataOut)
478 self.zmq_socket.send_pyobj(self.dataOut)
479
479
480 def run(self, dataOut, **kwargs):
480 def run(self, dataOut, **kwargs):
481 self.dataOut = dataOut
481 self.dataOut = dataOut
482 if not self.isConfig:
482 if not self.isConfig:
483 self.setup(**kwargs)
483 self.setup(**kwargs)
484 self.isConfig = True
484 self.isConfig = True
485
485
486 self.publish_data()
486 self.publish_data()
487 time.sleep(self.delay)
487 time.sleep(self.delay)
488
488
489 def close(self):
489 def close(self):
490 if self.zeromq is 1:
490 if self.zeromq is 1:
491 self.dataOut.finished = True
491 self.dataOut.finished = True
492 self.zmq_socket.send_pyobj(self.dataOut)
492 self.zmq_socket.send_pyobj(self.dataOut)
493 time.sleep(0.1)
493 time.sleep(0.1)
494 self.zmq_socket.close()
494 self.zmq_socket.close()
495 if self.client:
495 if self.client:
496 self.client.loop_stop()
496 self.client.loop_stop()
497 self.client.disconnect()
497 self.client.disconnect()
498
498
499
499
500 class ReceiverData(ProcessingUnit):
500 class ReceiverData(ProcessingUnit):
501
501
502 __attrs__ = ['server']
502 __attrs__ = ['server']
503
503
504 def __init__(self, **kwargs):
504 def __init__(self, **kwargs):
505
505
506 ProcessingUnit.__init__(self, **kwargs)
506 ProcessingUnit.__init__(self, **kwargs)
507
507
508 self.isConfig = False
508 self.isConfig = False
509 server = kwargs.get('server', 'zmq.pipe')
509 server = kwargs.get('server', 'zmq.pipe')
510 if 'tcp://' in server:
510 if 'tcp://' in server:
511 address = server
511 address = server
512 else:
512 else:
513 address = 'ipc:///tmp/%s' % server
513 address = 'ipc:///tmp/%s' % server
514
514
515 self.address = address
515 self.address = address
516 self.dataOut = JROData()
516 self.dataOut = JROData()
517
517
518 def setup(self):
518 def setup(self):
519
519
520 self.context = zmq.Context()
520 self.context = zmq.Context()
521 self.receiver = self.context.socket(zmq.PULL)
521 self.receiver = self.context.socket(zmq.PULL)
522 self.receiver.bind(self.address)
522 self.receiver.bind(self.address)
523 time.sleep(0.5)
523 time.sleep(0.5)
524 log.success('ReceiverData from {}'.format(self.address))
524 log.success('ReceiverData from {}'.format(self.address))
525
525
526
526
527 def run(self):
527 def run(self):
528
528
529 if not self.isConfig:
529 if not self.isConfig:
530 self.setup()
530 self.setup()
531 self.isConfig = True
531 self.isConfig = True
532
532
533 self.dataOut = self.receiver.recv_pyobj()
533 self.dataOut = self.receiver.recv_pyobj()
534 log.log('{} - {}'.format(self.dataOut.type,
534 log.log('{} - {}'.format(self.dataOut.type,
535 self.dataOut.datatime.ctime(),),
535 self.dataOut.datatime.ctime(),),
536 'Receiving')
536 'Receiving')
537
537
538
538
539 class PlotterReceiver(ProcessingUnit, Process):
539 class PlotterReceiver(ProcessingUnit, Process):
540
540
541 throttle_value = 5
541 throttle_value = 5
542 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
542 __attrs__ = ['server', 'plottypes', 'realtime', 'localtime', 'throttle',
543 'exp_code', 'web_server', 'buffering']
543 'exp_code', 'web_server', 'buffering']
544
544
545 def __init__(self, **kwargs):
545 def __init__(self, **kwargs):
546
546
547 ProcessingUnit.__init__(self, **kwargs)
547 ProcessingUnit.__init__(self, **kwargs)
548 Process.__init__(self)
548 Process.__init__(self)
549 self.mp = False
549 self.mp = False
550 self.isConfig = False
550 self.isConfig = False
551 self.isWebConfig = False
551 self.isWebConfig = False
552 self.connections = 0
552 self.connections = 0
553 server = kwargs.get('server', 'zmq.pipe')
553 server = kwargs.get('server', 'zmq.pipe')
554 web_server = kwargs.get('web_server', None)
554 web_server = kwargs.get('web_server', None)
555 if 'tcp://' in server:
555 if 'tcp://' in server:
556 address = server
556 address = server
557 else:
557 else:
558 address = 'ipc:///tmp/%s' % server
558 address = 'ipc:///tmp/%s' % server
559 self.address = address
559 self.address = address
560 self.web_address = web_server
560 self.web_address = web_server
561 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
561 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
562 self.realtime = kwargs.get('realtime', False)
562 self.realtime = kwargs.get('realtime', False)
563 self.localtime = kwargs.get('localtime', True)
563 self.localtime = kwargs.get('localtime', True)
564 self.buffering = kwargs.get('buffering', True)
564 self.buffering = kwargs.get('buffering', True)
565 self.throttle_value = kwargs.get('throttle', 5)
565 self.throttle_value = kwargs.get('throttle', 5)
566 self.exp_code = kwargs.get('exp_code', None)
566 self.exp_code = kwargs.get('exp_code', None)
567 self.sendData = self.initThrottle(self.throttle_value)
567 self.sendData = self.initThrottle(self.throttle_value)
568 self.dates = []
568 self.dates = []
569 self.setup()
569 self.setup()
570
570
571 def setup(self):
571 def setup(self):
572
572
573 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
573 self.data = Data(self.plottypes, self.throttle_value, self.exp_code, self.buffering)
574 self.isConfig = True
574 self.isConfig = True
575
575
576 def event_monitor(self, monitor):
576 def event_monitor(self, monitor):
577
577
578 events = {}
578 events = {}
579
579
580 for name in dir(zmq):
580 for name in dir(zmq):
581 if name.startswith('EVENT_'):
581 if name.startswith('EVENT_'):
582 value = getattr(zmq, name)
582 value = getattr(zmq, name)
583 events[value] = name
583 events[value] = name
584
584
585 while monitor.poll():
585 while monitor.poll():
586 evt = recv_monitor_message(monitor)
586 evt = recv_monitor_message(monitor)
587 if evt['event'] == 32:
587 if evt['event'] == 32:
588 self.connections += 1
588 self.connections += 1
589 if evt['event'] == 512:
589 if evt['event'] == 512:
590 pass
590 pass
591
591
592 evt.update({'description': events[evt['event']]})
592 evt.update({'description': events[evt['event']]})
593
593
594 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
594 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
595 break
595 break
596 monitor.close()
596 monitor.close()
597 print('event monitor thread done!')
597 print('event monitor thread done!')
598
598
599 def initThrottle(self, throttle_value):
599 def initThrottle(self, throttle_value):
600
600
601 @throttle(seconds=throttle_value)
601 @throttle(seconds=throttle_value)
602 def sendDataThrottled(fn_sender, data):
602 def sendDataThrottled(fn_sender, data):
603 fn_sender(data)
603 fn_sender(data)
604
604
605 return sendDataThrottled
605 return sendDataThrottled
606
606
607 def send(self, data):
607 def send(self, data):
608 log.log('Sending {}'.format(data), self.name)
608 log.log('Sending {}'.format(data), self.name)
609 self.sender.send_pyobj(data)
609 self.sender.send_pyobj(data)
610
610
611 def run(self):
611 def run(self):
612
612
613 log.log(
613 log.log(
614 'Starting from {}'.format(self.address),
614 'Starting from {}'.format(self.address),
615 self.name
615 self.name
616 )
616 )
617
617
618 self.context = zmq.Context()
618 self.context = zmq.Context()
619 self.receiver = self.context.socket(zmq.PULL)
619 self.receiver = self.context.socket(zmq.PULL)
620 self.receiver.bind(self.address)
620 self.receiver.bind(self.address)
621 monitor = self.receiver.get_monitor_socket()
621 monitor = self.receiver.get_monitor_socket()
622 self.sender = self.context.socket(zmq.PUB)
622 self.sender = self.context.socket(zmq.PUB)
623 if self.web_address:
623 if self.web_address:
624 log.success(
624 log.success(
625 'Sending to web: {}'.format(self.web_address),
625 'Sending to web: {}'.format(self.web_address),
626 self.name
626 self.name
627 )
627 )
628 self.sender_web = self.context.socket(zmq.REQ)
628 self.sender_web = self.context.socket(zmq.REQ)
629 self.sender_web.connect(self.web_address)
629 self.sender_web.connect(self.web_address)
630 self.poll = zmq.Poller()
630 self.poll = zmq.Poller()
631 self.poll.register(self.sender_web, zmq.POLLIN)
631 self.poll.register(self.sender_web, zmq.POLLIN)
632 time.sleep(1)
632 time.sleep(1)
633
633
634 if 'server' in self.kwargs:
634 if 'server' in self.kwargs:
635 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
635 self.sender.bind("ipc:///tmp/{}.plots".format(self.kwargs['server']))
636 else:
636 else:
637 self.sender.bind("ipc:///tmp/zmq.plots")
637 self.sender.bind("ipc:///tmp/zmq.plots")
638
638
639 time.sleep(2)
639 time.sleep(2)
640
640
641 t = Thread(target=self.event_monitor, args=(monitor,))
641 t = Thread(target=self.event_monitor, args=(monitor,))
642 t.start()
642 t.start()
643
643
644 while True:
644 while True:
645 dataOut = self.receiver.recv_pyobj()
645 dataOut = self.receiver.recv_pyobj()
646 if not dataOut.flagNoData:
646 if not dataOut.flagNoData:
647 if dataOut.type == 'Parameters':
647 if dataOut.type == 'Parameters':
648 tm = dataOut.utctimeInit
648 tm = dataOut.utctimeInit
649 else:
649 else:
650 tm = dataOut.utctime
650 tm = dataOut.utctime
651 if dataOut.useLocalTime:
651 if dataOut.useLocalTime:
652 if not self.localtime:
652 if not self.localtime:
653 tm += time.timezone
653 tm += time.timezone
654 dt = datetime.datetime.fromtimestamp(tm).date()
654 dt = datetime.datetime.fromtimestamp(tm).date()
655 else:
655 else:
656 if self.localtime:
656 if self.localtime:
657 tm -= time.timezone
657 tm -= time.timezone
658 dt = datetime.datetime.utcfromtimestamp(tm).date()
658 dt = datetime.datetime.utcfromtimestamp(tm).date()
659 coerce = False
659 coerce = False
660 if dt not in self.dates:
660 if dt not in self.dates:
661 if self.data:
661 if self.data:
662 self.data.ended = True
662 self.data.ended = True
663 self.send(self.data)
663 self.send(self.data)
664 coerce = True
664 coerce = True
665 self.data.setup()
665 self.data.setup()
666 self.dates.append(dt)
666 self.dates.append(dt)
667
667
668 self.data.update(dataOut, tm)
668 self.data.update(dataOut, tm)
669
669
670 if dataOut.finished is True:
670 if dataOut.finished is True:
671 self.connections -= 1
671 self.connections -= 1
672 if self.connections == 0 and dt in self.dates:
672 if self.connections == 0 and dt in self.dates:
673 self.data.ended = True
673 self.data.ended = True
674 self.send(self.data)
674 self.send(self.data)
675 # self.data.setup()
675 # self.data.setup()
676 time.sleep(1)
676 time.sleep(1)
677 break
677 break
678 else:
678 else:
679 if self.realtime:
679 if self.realtime:
680 self.send(self.data)
680 self.send(self.data)
681 if self.web_address:
681 if self.web_address:
682 retries = 5
682 retries = 5
683 while True:
683 while True:
684 self.sender_web.send(self.data.jsonify())
684 self.sender_web.send(self.data.jsonify())
685 socks = dict(self.poll.poll(5000))
685 socks = dict(self.poll.poll(5000))
686 if socks.get(self.sender_web) == zmq.POLLIN:
686 if socks.get(self.sender_web) == zmq.POLLIN:
687 reply = self.sender_web.recv_string()
687 reply = self.sender_web.recv_string()
688 if reply == 'ok':
688 if reply == 'ok':
689 log.log("Response from server ok", self.name)
689 break
690 break
690 else:
691 else:
691 print("Malformed reply from server: %s" % reply)
692 log.warning("Malformed reply from server: {}".format(reply), self.name)
692
693
693 else:
694 else:
694 print("No response from server, retrying...")
695 log.warning("No response from server, retrying...", self.name)
695 self.sender_web.setsockopt(zmq.LINGER, 0)
696 self.sender_web.setsockopt(zmq.LINGER, 0)
696 self.sender_web.close()
697 self.sender_web.close()
697 self.poll.unregister(self.sender_web)
698 self.poll.unregister(self.sender_web)
698 retries -= 1
699 retries -= 1
699 if retries == 0:
700 if retries == 0:
700 print("Server seems to be offline, abandoning")
701 log.error("Server seems to be offline, abandoning", self.name)
702 self.sender_web = self.context.socket(zmq.REQ)
703 self.sender_web.connect(self.web_address)
704 self.poll.register(self.sender_web, zmq.POLLIN)
705 time.sleep(1)
701 break
706 break
702 self.sender_web = self.context.socket(zmq.REQ)
707 self.sender_web = self.context.socket(zmq.REQ)
703 self.sender_web.connect(self.web_address)
708 self.sender_web.connect(self.web_address)
704 self.poll.register(self.sender_web, zmq.POLLIN)
709 self.poll.register(self.sender_web, zmq.POLLIN)
705 time.sleep(1)
710 time.sleep(1)
706 else:
711 else:
707 self.sendData(self.send, self.data, coerce=coerce)
712 self.sendData(self.send, self.data, coerce=coerce)
708 coerce = False
713 coerce = False
709
714
710 return
715 return
711
716
712
717
713 class SendToFTP(Operation, Process):
718 class SendToFTP(Operation, Process):
714
719
715 '''
720 '''
716 Operation to send data over FTP.
721 Operation to send data over FTP.
717 '''
722 '''
718
723
719 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
724 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
720
725
721 def __init__(self, **kwargs):
726 def __init__(self, **kwargs):
722 '''
727 '''
723 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
728 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
724 '''
729 '''
725 Operation.__init__(self, **kwargs)
730 Operation.__init__(self, **kwargs)
726 Process.__init__(self)
731 Process.__init__(self)
727 self.server = kwargs.get('server')
732 self.server = kwargs.get('server')
728 self.username = kwargs.get('username')
733 self.username = kwargs.get('username')
729 self.password = kwargs.get('password')
734 self.password = kwargs.get('password')
730 self.patterns = kwargs.get('patterns')
735 self.patterns = kwargs.get('patterns')
731 self.timeout = kwargs.get('timeout', 30)
736 self.timeout = kwargs.get('timeout', 30)
732 self.times = [time.time() for p in self.patterns]
737 self.times = [time.time() for p in self.patterns]
733 self.latest = ['' for p in self.patterns]
738 self.latest = ['' for p in self.patterns]
734 self.mp = False
739 self.mp = False
735 self.ftp = None
740 self.ftp = None
736
741
737 def setup(self):
742 def setup(self):
738
743
739 log.log('Connecting to ftp://{}'.format(self.server), self.name)
744 log.log('Connecting to ftp://{}'.format(self.server), self.name)
740 try:
745 try:
741 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
746 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
742 except ftplib.all_errors:
747 except ftplib.all_errors:
743 log.error('Server connection fail: {}'.format(self.server), self.name)
748 log.error('Server connection fail: {}'.format(self.server), self.name)
744 if self.ftp is not None:
749 if self.ftp is not None:
745 self.ftp.close()
750 self.ftp.close()
746 self.ftp = None
751 self.ftp = None
747 self.isConfig = False
752 self.isConfig = False
748 return
753 return
749
754
750 try:
755 try:
751 self.ftp.login(self.username, self.password)
756 self.ftp.login(self.username, self.password)
752 except ftplib.all_errors:
757 except ftplib.all_errors:
753 log.error('The given username y/o password are incorrect', self.name)
758 log.error('The given username y/o password are incorrect', self.name)
754 if self.ftp is not None:
759 if self.ftp is not None:
755 self.ftp.close()
760 self.ftp.close()
756 self.ftp = None
761 self.ftp = None
757 self.isConfig = False
762 self.isConfig = False
758 return
763 return
759
764
760 log.success('Connection success', self.name)
765 log.success('Connection success', self.name)
761 self.isConfig = True
766 self.isConfig = True
762 return
767 return
763
768
764 def check(self):
769 def check(self):
765
770
766 try:
771 try:
767 self.ftp.voidcmd("NOOP")
772 self.ftp.voidcmd("NOOP")
768 except:
773 except:
769 log.warning('Connection lost... trying to reconnect', self.name)
774 log.warning('Connection lost... trying to reconnect', self.name)
770 if self.ftp is not None:
775 if self.ftp is not None:
771 self.ftp.close()
776 self.ftp.close()
772 self.ftp = None
777 self.ftp = None
773 self.setup()
778 self.setup()
774
779
775 def find_files(self, path, ext):
780 def find_files(self, path, ext):
776
781
777 files = glob.glob1(path, '*{}'.format(ext))
782 files = glob.glob1(path, '*{}'.format(ext))
778 files.sort()
783 files.sort()
779 if files:
784 if files:
780 return files[-1]
785 return files[-1]
781 return None
786 return None
782
787
783 def getftpname(self, filename, exp_code, sub_exp_code):
788 def getftpname(self, filename, exp_code, sub_exp_code):
784
789
785 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
790 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
786 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
791 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
787 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
792 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
788 exp_code = '%3.3d'%exp_code
793 exp_code = '%3.3d'%exp_code
789 sub_exp_code = '%2.2d'%sub_exp_code
794 sub_exp_code = '%2.2d'%sub_exp_code
790 plot_code = '%2.2d'% get_plot_code(filename)
795 plot_code = '%2.2d'% get_plot_code(filename)
791 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
796 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
792 return name
797 return name
793
798
794 def upload(self, src, dst):
799 def upload(self, src, dst):
795
800
796 log.log('Uploading {} '.format(src), self.name, nl=False)
801 log.log('Uploading {} '.format(src), self.name, nl=False)
797
802
798 fp = open(src, 'rb')
803 fp = open(src, 'rb')
799 command = 'STOR {}'.format(dst)
804 command = 'STOR {}'.format(dst)
800
805
801 try:
806 try:
802 self.ftp.storbinary(command, fp, blocksize=1024)
807 self.ftp.storbinary(command, fp, blocksize=1024)
803 except Exception, e:
808 except Exception, e:
804 log.error('{}'.format(e), self.name)
809 log.error('{}'.format(e), self.name)
805 if self.ftp is not None:
810 if self.ftp is not None:
806 self.ftp.close()
811 self.ftp.close()
807 self.ftp = None
812 self.ftp = None
808 return 0
813 return 0
809
814
810 try:
815 try:
811 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
816 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
812 except Exception, e:
817 except Exception, e:
813 log.error('{}'.format(e), self.name)
818 log.error('{}'.format(e), self.name)
814 if self.ftp is not None:
819 if self.ftp is not None:
815 self.ftp.close()
820 self.ftp.close()
816 self.ftp = None
821 self.ftp = None
817 return 0
822 return 0
818
823
819 fp.close()
824 fp.close()
820 log.success('OK', tag='')
825 log.success('OK', tag='')
821 return 1
826 return 1
822
827
823 def send_files(self):
828 def send_files(self):
824
829
825 for x, pattern in enumerate(self.patterns):
830 for x, pattern in enumerate(self.patterns):
826 local, remote, ext, delay, exp_code, sub_exp_code = pattern
831 local, remote, ext, delay, exp_code, sub_exp_code = pattern
827 if time.time()-self.times[x] >= delay:
832 if time.time()-self.times[x] >= delay:
828 srcname = self.find_files(local, ext)
833 srcname = self.find_files(local, ext)
829 src = os.path.join(local, srcname)
834 src = os.path.join(local, srcname)
830 if os.path.getmtime(src) < time.time() - 30*60:
835 if os.path.getmtime(src) < time.time() - 30*60:
831 continue
836 continue
832
837
833 if srcname is None or srcname == self.latest[x]:
838 if srcname is None or srcname == self.latest[x]:
834 continue
839 continue
835
840
836 if 'png' in ext:
841 if 'png' in ext:
837 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
842 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
838 else:
843 else:
839 dstname = srcname
844 dstname = srcname
840
845
841 dst = os.path.join(remote, dstname)
846 dst = os.path.join(remote, dstname)
842
847
843 if self.upload(src, dst):
848 if self.upload(src, dst):
844 self.times[x] = time.time()
849 self.times[x] = time.time()
845 self.latest[x] = srcname
850 self.latest[x] = srcname
846 else:
851 else:
847 self.isConfig = False
852 self.isConfig = False
848 break
853 break
849
854
850 def run(self):
855 def run(self):
851
856
852 while True:
857 while True:
853 if not self.isConfig:
858 if not self.isConfig:
854 self.setup()
859 self.setup()
855 if self.ftp is not None:
860 if self.ftp is not None:
856 self.check()
861 self.check()
857 self.send_files()
862 self.send_files()
858 time.sleep(10)
863 time.sleep(10)
859
864
860 def close():
865 def close():
861
866
862 if self.ftp is not None:
867 if self.ftp is not None:
863 self.ftp.close()
868 self.ftp.close()
864 self.terminate()
869 self.terminate()
General Comments 0
You need to be logged in to leave comments. Login now