##// END OF EJS Templates
Fix SendToFTP
jespinoza -
r1337:34eae8e48391
parent child
Show More
@@ -1,351 +1,358
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Distributed under the terms of the BSD 3-clause license.
4 # Distributed under the terms of the BSD 3-clause license.
5 """Utilities for publish/send data, files & plots over different protocols
5 """Utilities for publish/send data, files & plots over different protocols
6 """
6 """
7
7
8 import os
8 import os
9 import glob
9 import glob
10 import time
10 import time
11 import json
11 import json
12 import numpy
12 import numpy
13 import zmq
13 import zmq
14 import datetime
14 import datetime
15 import ftplib
15 import ftplib
16 from functools import wraps
16 from functools import wraps
17 from threading import Thread
17 from threading import Thread
18 from multiprocessing import Process
18 from multiprocessing import Process
19
19
20 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
20 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
21 from schainpy.model.data.jrodata import JROData
21 from schainpy.model.data.jrodata import JROData
22 from schainpy.utils import log
22 from schainpy.utils import log
23
23
24
24
25 PLOT_CODES = {
25 PLOT_CODES = {
26 'rti': 0, # Range time intensity (RTI).
26 'rti': 0, # Range time intensity (RTI).
27 'spc': 1, # Spectra (and Cross-spectra) information.
27 'spc': 1, # Spectra (and Cross-spectra) information.
28 'cspc': 2, # Cross-Correlation information.
28 'cspc': 2, # Cross-Correlation information.
29 'coh': 3, # Coherence map.
29 'coh': 3, # Coherence map.
30 'base': 4, # Base lines graphic.
30 'base': 4, # Base lines graphic.
31 'row': 5, # Row Spectra.
31 'row': 5, # Row Spectra.
32 'total': 6, # Total Power.
32 'total': 6, # Total Power.
33 'drift': 7, # Drifts graphics.
33 'drift': 7, # Drifts graphics.
34 'height': 8, # Height profile.
34 'height': 8, # Height profile.
35 'phase': 9, # Signal Phase.
35 'phase': 9, # Signal Phase.
36 'power': 16,
36 'power': 16,
37 'noise': 17,
37 'noise': 17,
38 'beacon': 18,
38 'beacon': 18,
39 'wind': 22,
39 'wind': 22,
40 'skymap': 23,
40 'skymap': 23,
41 'Unknown': 24,
41 'Unknown': 24,
42 'V-E': 25, # PIP Velocity.
42 'V-E': 25, # PIP Velocity.
43 'Z-E': 26, # PIP Reflectivity.
43 'Z-E': 26, # PIP Reflectivity.
44 'V-A': 27, # RHI Velocity.
44 'V-A': 27, # RHI Velocity.
45 'Z-A': 28, # RHI Reflectivity.
45 'Z-A': 28, # RHI Reflectivity.
46 }
46 }
47
47
48 def get_plot_code(s):
48 def get_plot_code(s):
49 label = s.split('_')[0]
49 label = s.split('_')[0]
50 codes = [key for key in PLOT_CODES if key in label]
50 codes = [key for key in PLOT_CODES if key in label]
51 if codes:
51 if codes:
52 return PLOT_CODES[codes[0]]
52 return PLOT_CODES[codes[0]]
53 else:
53 else:
54 return 24
54 return 24
55
55
56
56
57 class PublishData(Operation):
57 class PublishData(Operation):
58 '''
58 '''
59 Operation to send data over zmq.
59 Operation to send data over zmq.
60 '''
60 '''
61
61
62 __attrs__ = ['host', 'port', 'delay', 'verbose']
62 __attrs__ = ['host', 'port', 'delay', 'verbose']
63
63
64 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
64 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
65 self.counter = 0
65 self.counter = 0
66 self.delay = kwargs.get('delay', 0)
66 self.delay = kwargs.get('delay', 0)
67 self.cnt = 0
67 self.cnt = 0
68 self.verbose = verbose
68 self.verbose = verbose
69 context = zmq.Context()
69 context = zmq.Context()
70 self.zmq_socket = context.socket(zmq.PUSH)
70 self.zmq_socket = context.socket(zmq.PUSH)
71 server = kwargs.get('server', 'zmq.pipe')
71 server = kwargs.get('server', 'zmq.pipe')
72
72
73 if 'tcp://' in server:
73 if 'tcp://' in server:
74 address = server
74 address = server
75 else:
75 else:
76 address = 'ipc:///tmp/%s' % server
76 address = 'ipc:///tmp/%s' % server
77
77
78 self.zmq_socket.connect(address)
78 self.zmq_socket.connect(address)
79 time.sleep(1)
79 time.sleep(1)
80
80
81
81
82 def publish_data(self):
82 def publish_data(self):
83 self.dataOut.finished = False
83 self.dataOut.finished = False
84
84
85 if self.verbose:
85 if self.verbose:
86 log.log(
86 log.log(
87 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
87 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
88 self.name
88 self.name
89 )
89 )
90 self.zmq_socket.send_pyobj(self.dataOut)
90 self.zmq_socket.send_pyobj(self.dataOut)
91
91
92 def run(self, dataOut, **kwargs):
92 def run(self, dataOut, **kwargs):
93 self.dataOut = dataOut
93 self.dataOut = dataOut
94 if not self.isConfig:
94 if not self.isConfig:
95 self.setup(**kwargs)
95 self.setup(**kwargs)
96 self.isConfig = True
96 self.isConfig = True
97
97
98 self.publish_data()
98 self.publish_data()
99 time.sleep(self.delay)
99 time.sleep(self.delay)
100
100
101 def close(self):
101 def close(self):
102
102
103 self.dataOut.finished = True
103 self.dataOut.finished = True
104 self.zmq_socket.send_pyobj(self.dataOut)
104 self.zmq_socket.send_pyobj(self.dataOut)
105 time.sleep(0.1)
105 time.sleep(0.1)
106 self.zmq_socket.close()
106 self.zmq_socket.close()
107
107
108
108
109 class ReceiverData(ProcessingUnit):
109 class ReceiverData(ProcessingUnit):
110
110
111 __attrs__ = ['server']
111 __attrs__ = ['server']
112
112
113 def __init__(self, **kwargs):
113 def __init__(self, **kwargs):
114
114
115 ProcessingUnit.__init__(self, **kwargs)
115 ProcessingUnit.__init__(self, **kwargs)
116
116
117 self.isConfig = False
117 self.isConfig = False
118 server = kwargs.get('server', 'zmq.pipe')
118 server = kwargs.get('server', 'zmq.pipe')
119 if 'tcp://' in server:
119 if 'tcp://' in server:
120 address = server
120 address = server
121 else:
121 else:
122 address = 'ipc:///tmp/%s' % server
122 address = 'ipc:///tmp/%s' % server
123
123
124 self.address = address
124 self.address = address
125 self.dataOut = JROData()
125 self.dataOut = JROData()
126
126
127 def setup(self):
127 def setup(self):
128
128
129 self.context = zmq.Context()
129 self.context = zmq.Context()
130 self.receiver = self.context.socket(zmq.PULL)
130 self.receiver = self.context.socket(zmq.PULL)
131 self.receiver.bind(self.address)
131 self.receiver.bind(self.address)
132 time.sleep(0.5)
132 time.sleep(0.5)
133 log.success('ReceiverData from {}'.format(self.address))
133 log.success('ReceiverData from {}'.format(self.address))
134
134
135
135
136 def run(self):
136 def run(self):
137
137
138 if not self.isConfig:
138 if not self.isConfig:
139 self.setup()
139 self.setup()
140 self.isConfig = True
140 self.isConfig = True
141
141
142 self.dataOut = self.receiver.recv_pyobj()
142 self.dataOut = self.receiver.recv_pyobj()
143 log.log('{} - {}'.format(self.dataOut.type,
143 log.log('{} - {}'.format(self.dataOut.type,
144 self.dataOut.datatime.ctime(),),
144 self.dataOut.datatime.ctime(),),
145 'Receiving')
145 'Receiving')
146
146
147 @MPDecorator
147 @MPDecorator
148 class SendToFTP(Operation):
148 class SendToFTP(Operation):
149 """Operation for send files over FTP
149 """Operation for send files over FTP
150
150
151 This operation is used to send files over FTP, you can send different files
151 This operation is used to send files over FTP, you can send different files
152 from different folders by adding as many `pattern` as you wish.
152 from different folders by adding as many `pattern` as you wish.
153
153
154 Parameters:
154 Parameters:
155 -----------
155 -----------
156 server : str
156 server : str
157 FTP server address.
157 FTP server address.
158 username : str
158 username : str
159 FTP username
159 FTP username
160 password : str
160 password : str
161 FTP password
161 FTP password
162 timeout : int
162 timeout : int
163 timeout to restart the connection
163 timeout to restart the connection
164 patternX : list
164 patternX : list
165 detail of files to be send must have the following order: local, remote
165 detail of files to be send must have the following order: local, remote
166 ext, period, exp_code, sub_exp_code
166 ext, period, exp_code, sub_exp_code
167
167
168 Example:
168 Example:
169 --------
169 --------
170
170
171 ftp = proc_unit.addOperation(name='SendToFTP', optype='external')
171 ftp = proc_unit.addOperation(name='SendToFTP', optype='external')
172 ftp.addParameter(name='server', value='jro-app.igp.gob.pe')
172 ftp.addParameter(name='server', value='jro-app.igp.gob.pe')
173 ftp.addParameter(name='username', value='wmaster')
173 ftp.addParameter(name='username', value='wmaster')
174 ftp.addParameter(name='password', value='mst2010vhf')
174 ftp.addParameter(name='password', value='mst2010vhf')
175 ftp.addParameter(
175 ftp.addParameter(
176 name='pattern1',
176 name='pattern1',
177 value='/local/path/rti,/remote/path,png,300,11,0'
177 value='/local/path/rti,/remote/path,png,300,11,0'
178 )
178 )
179 ftp.addParameter(
179 ftp.addParameter(
180 name='pattern2',
180 name='pattern2',
181 value='/local/path/spc,/remote/path,png,300,11,0'
181 value='/local/path/spc,/remote/path,png,300,11,0'
182 )
182 )
183 ftp.addParameter(
183 ftp.addParameter(
184 name='pattern3',
184 name='pattern3',
185 value='/local/path/param,/remote/path,hdf5,300,,'
185 value='/local/path/param,/remote/path,hdf5,300,,'
186 )
186 )
187
187
188 """
188 """
189
189
190 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
190 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
191
191
192 def __init__(self):
192 def __init__(self):
193 '''
193 '''
194 '''
194 '''
195 Operation.__init__(self)
195 Operation.__init__(self)
196 self.ftp = None
196 self.ftp = None
197 self.ready = False
197 self.ready = False
198 self.current_time = time.time()
198
199
199 def setup(self, server, username, password, timeout, **kwargs):
200 def setup(self, server, username, password, timeout, **kwargs):
200 '''
201 '''
201 '''
202 '''
202
203
203 self.server = server
204 self.server = server
204 self.username = username
205 self.username = username
205 self.password = password
206 self.password = password
206 self.timeout = timeout
207 self.timeout = timeout
207 self.patterns = []
208 self.patterns = []
208 self.times = []
209 self.times = []
209 self.latest = []
210 self.latest = []
210 for arg, value in kwargs.items():
211 for arg, value in kwargs.items():
211 if 'pattern' in arg:
212 if 'pattern' in arg:
212 self.patterns.append(value)
213 self.patterns.append(value)
213 self.times.append(0)
214 self.times.append(0)
214 self.latest.append('')
215 self.latest.append('')
215
216
216 def connect(self):
217 def connect(self):
217 '''
218 '''
218 '''
219 '''
219
220
220 log.log('Connecting to ftp://{}'.format(self.server), self.name)
221 log.log('Connecting to ftp://{}'.format(self.server), self.name)
221 try:
222 try:
222 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
223 self.ftp = ftplib.FTP(self.server, timeout=1)
223 except ftplib.all_errors:
224 except ftplib.all_errors:
224 log.error('Server connection fail: {}'.format(self.server), self.name)
225 log.error('Server connection fail: {}'.format(self.server), self.name)
225 if self.ftp is not None:
226 if self.ftp is not None:
226 self.ftp.close()
227 self.ftp.close()
227 self.ftp = None
228 self.ftp = None
228 self.ready = False
229 self.ready = False
229 return
230 return
230
231
231 try:
232 try:
232 self.ftp.login(self.username, self.password)
233 self.ftp.login(self.username, self.password)
233 except ftplib.all_errors:
234 except ftplib.all_errors:
234 log.error('The given username y/o password are incorrect', self.name)
235 log.error('The given username y/o password are incorrect', self.name)
235 if self.ftp is not None:
236 if self.ftp is not None:
236 self.ftp.close()
237 self.ftp.close()
237 self.ftp = None
238 self.ftp = None
238 self.ready = False
239 self.ready = False
239 return
240 return
240
241
241 log.success('Connection success', self.name)
242 log.success('Connection success', self.name)
242 self.ready = True
243 self.ready = True
243 return
244 return
244
245
245 def check(self):
246 def check(self):
246
247
247 try:
248 try:
249 if not self.ready:
250 if time.time()-self.current_time < self.timeout:
251 return
252 else:
253 self.current_time = time.time()
248 self.ftp.voidcmd("NOOP")
254 self.ftp.voidcmd("NOOP")
249 except:
255 except:
250 log.warning('Connection lost... trying to reconnect', self.name)
256 log.warning('Connection lost... trying to reconnect', self.name)
251 if self.ftp is not None:
257 if self.ftp is not None:
252 self.ftp.close()
258 self.ftp.close()
253 self.ftp = None
259 self.ftp = None
254 self.connect()
260 self.connect()
255
261
256 def find_files(self, path, ext):
262 def find_files(self, path, ext):
257
263
258 files = glob.glob1(path.strip(), '*{}'.format(ext.strip()))
264 files = glob.glob1(path.strip(), '*{}'.format(ext.strip()))
259 files.sort()
265 files.sort()
260 if files:
266 if files:
261 return files[-1]
267 return files[-1]
262 return None
268 return None
263
269
264 def getftpname(self, filename, exp_code, sub_exp_code):
270 def getftpname(self, filename, exp_code, sub_exp_code):
265
271
266 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
272 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
267 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
273 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
268 DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
274 DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
269 exp_code = '%3.3d' % exp_code
275 exp_code = '%3.3d' % exp_code
270 sub_exp_code = '%2.2d' % sub_exp_code
276 sub_exp_code = '%2.2d' % sub_exp_code
271 plot_code = '%2.2d' % get_plot_code(filename)
277 plot_code = '%2.2d' % get_plot_code(filename)
272 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
278 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
273 return name
279 return name
274
280
275 def upload(self, src, dst):
281 def upload(self, src, dst):
276
282
277 log.log('Uploading {} -> {} '.format(
283 log.log('Uploading {} -> {} '.format(
278 src.split('/')[-1], dst.split('/')[-1]),
284 src.split('/')[-1], dst.split('/')[-1]),
279 self.name,
285 self.name,
280 nl=False
286 nl=False
281 )
287 )
282
288
283 fp = open(src, 'rb')
289 fp = open(src, 'rb')
284 command = 'STOR {}'.format(dst)
290 command = 'STOR {}'.format(dst)
285
291
286 try:
292 try:
287 self.ftp.storbinary(command, fp, blocksize=1024)
293 self.ftp.storbinary(command, fp, blocksize=1024)
288 except Exception as e:
294 except Exception as e:
289 log.error('{}'.format(e), self.name)
295 log.error('{}'.format(e), self.name)
290 return 0
296 return 0
291
297
292 try:
298 try:
293 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
299 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
294 except Exception as e:
300 except Exception as e:
295 log.error('{}'.format(e), self.name)
301 log.error('{}'.format(e), self.name)
296 return 0
302 return 0
297
303
298 fp.close()
304 fp.close()
299 log.success('OK', tag='')
305 log.success('OK', tag='')
300 return 1
306 return 1
301
307
302 def send_files(self):
308 def send_files(self):
303
309
304 for x, pattern in enumerate(self.patterns):
310 for x, pattern in enumerate(self.patterns):
305 local, remote, ext, period, exp_code, sub_exp_code = pattern
311 local, remote, ext, period, exp_code, sub_exp_code = pattern
306
312
307 if (self.dataOut.utctime - self.times[x]) < int(period):
313 if (self.dataOut.utctime - self.times[x]) < int(period):
308 continue
314 continue
309
315
310 srcname = self.find_files(local, ext)
316 srcname = self.find_files(local, ext)
311
317
312 if srcname is None:
318 if srcname is None:
313 continue
319 continue
314
320
315 if srcname == self.latest[x]:
321 if srcname == self.latest[x]:
316 log.warning('File alreday uploaded {}'.format(srcname))
322 log.warning('File alreday uploaded {}'.format(srcname))
317 continue
323 continue
318
324
319 if exp_code.strip():
325 if exp_code.strip():
320 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
326 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
321 else:
327 else:
322 dstname = srcname
328 dstname = srcname
323
329
324 src = os.path.join(local, srcname)
330 src = os.path.join(local, srcname)
325 dst = os.path.join(remote.strip(), dstname)
331 dst = os.path.join(remote.strip(), dstname)
326
332
327 if self.upload(src, dst):
333 if self.upload(src, dst):
328 self.times[x] = self.dataOut.utctime
334 self.times[x] = self.dataOut.utctime
329 self.latest[x] = srcname
335 self.latest[x] = srcname
330
336
331 def run(self, dataOut, server, username, password, timeout=10, **kwargs):
337 def run(self, dataOut, server, username, password, timeout=60, **kwargs):
332
338
333 if not self.isConfig:
339 if not self.isConfig:
334 self.setup(
340 self.setup(
335 server=server,
341 server=server,
336 username=username,
342 username=username,
337 password=password,
343 password=password,
338 timeout=timeout,
344 timeout=timeout,
339 **kwargs
345 **kwargs
340 )
346 )
341 self.isConfig = True
347 self.isConfig = True
342 self.connect()
348 self.connect()
343
349
344 self.dataOut = dataOut
350 self.dataOut = dataOut
345 self.check()
351 self.check()
346 self.send_files()
352 if self.ready:
353 self.send_files()
347
354
348 def close(self):
355 def close(self):
349
356
350 if self.ftp is not None:
357 if self.ftp is not None:
351 self.ftp.close()
358 self.ftp.close()
General Comments 0
You need to be logged in to leave comments. Login now