##// END OF EJS Templates
Fix SendToFTP
jespinoza -
r1337:34eae8e48391
parent child
Show More
@@ -1,351 +1,358
1 1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 2 # All rights reserved.
3 3 #
4 4 # Distributed under the terms of the BSD 3-clause license.
5 5 """Utilities for publish/send data, files & plots over different protocols
6 6 """
7 7
8 8 import os
9 9 import glob
10 10 import time
11 11 import json
12 12 import numpy
13 13 import zmq
14 14 import datetime
15 15 import ftplib
16 16 from functools import wraps
17 17 from threading import Thread
18 18 from multiprocessing import Process
19 19
20 20 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
21 21 from schainpy.model.data.jrodata import JROData
22 22 from schainpy.utils import log
23 23
24 24
25 25 PLOT_CODES = {
26 26 'rti': 0, # Range time intensity (RTI).
27 27 'spc': 1, # Spectra (and Cross-spectra) information.
28 28 'cspc': 2, # Cross-Correlation information.
29 29 'coh': 3, # Coherence map.
30 30 'base': 4, # Base lines graphic.
31 31 'row': 5, # Row Spectra.
32 32 'total': 6, # Total Power.
33 33 'drift': 7, # Drifts graphics.
34 34 'height': 8, # Height profile.
35 35 'phase': 9, # Signal Phase.
36 36 'power': 16,
37 37 'noise': 17,
38 38 'beacon': 18,
39 39 'wind': 22,
40 40 'skymap': 23,
41 41 'Unknown': 24,
42 42 'V-E': 25, # PIP Velocity.
43 43 'Z-E': 26, # PIP Reflectivity.
44 44 'V-A': 27, # RHI Velocity.
45 45 'Z-A': 28, # RHI Reflectivity.
46 46 }
47 47
48 48 def get_plot_code(s):
49 49 label = s.split('_')[0]
50 50 codes = [key for key in PLOT_CODES if key in label]
51 51 if codes:
52 52 return PLOT_CODES[codes[0]]
53 53 else:
54 54 return 24
55 55
56 56
57 57 class PublishData(Operation):
58 58 '''
59 59 Operation to send data over zmq.
60 60 '''
61 61
62 62 __attrs__ = ['host', 'port', 'delay', 'verbose']
63 63
64 64 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
65 65 self.counter = 0
66 66 self.delay = kwargs.get('delay', 0)
67 67 self.cnt = 0
68 68 self.verbose = verbose
69 69 context = zmq.Context()
70 70 self.zmq_socket = context.socket(zmq.PUSH)
71 71 server = kwargs.get('server', 'zmq.pipe')
72 72
73 73 if 'tcp://' in server:
74 74 address = server
75 75 else:
76 76 address = 'ipc:///tmp/%s' % server
77 77
78 78 self.zmq_socket.connect(address)
79 79 time.sleep(1)
80 80
81 81
82 82 def publish_data(self):
83 83 self.dataOut.finished = False
84 84
85 85 if self.verbose:
86 86 log.log(
87 87 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
88 88 self.name
89 89 )
90 90 self.zmq_socket.send_pyobj(self.dataOut)
91 91
92 92 def run(self, dataOut, **kwargs):
93 93 self.dataOut = dataOut
94 94 if not self.isConfig:
95 95 self.setup(**kwargs)
96 96 self.isConfig = True
97 97
98 98 self.publish_data()
99 99 time.sleep(self.delay)
100 100
101 101 def close(self):
102 102
103 103 self.dataOut.finished = True
104 104 self.zmq_socket.send_pyobj(self.dataOut)
105 105 time.sleep(0.1)
106 106 self.zmq_socket.close()
107 107
108 108
109 109 class ReceiverData(ProcessingUnit):
110 110
111 111 __attrs__ = ['server']
112 112
113 113 def __init__(self, **kwargs):
114 114
115 115 ProcessingUnit.__init__(self, **kwargs)
116 116
117 117 self.isConfig = False
118 118 server = kwargs.get('server', 'zmq.pipe')
119 119 if 'tcp://' in server:
120 120 address = server
121 121 else:
122 122 address = 'ipc:///tmp/%s' % server
123 123
124 124 self.address = address
125 125 self.dataOut = JROData()
126 126
127 127 def setup(self):
128 128
129 129 self.context = zmq.Context()
130 130 self.receiver = self.context.socket(zmq.PULL)
131 131 self.receiver.bind(self.address)
132 132 time.sleep(0.5)
133 133 log.success('ReceiverData from {}'.format(self.address))
134 134
135 135
136 136 def run(self):
137 137
138 138 if not self.isConfig:
139 139 self.setup()
140 140 self.isConfig = True
141 141
142 142 self.dataOut = self.receiver.recv_pyobj()
143 143 log.log('{} - {}'.format(self.dataOut.type,
144 144 self.dataOut.datatime.ctime(),),
145 145 'Receiving')
146 146
147 147 @MPDecorator
148 148 class SendToFTP(Operation):
149 149 """Operation for send files over FTP
150 150
151 151 This operation is used to send files over FTP, you can send different files
152 152 from different folders by adding as many `pattern` as you wish.
153 153
154 154 Parameters:
155 155 -----------
156 156 server : str
157 157 FTP server address.
158 158 username : str
159 159 FTP username
160 160 password : str
161 161 FTP password
162 162 timeout : int
163 163 timeout to restart the connection
164 164 patternX : list
165 165 detail of files to be send must have the following order: local, remote
166 166 ext, period, exp_code, sub_exp_code
167 167
168 168 Example:
169 169 --------
170 170
171 171 ftp = proc_unit.addOperation(name='SendToFTP', optype='external')
172 172 ftp.addParameter(name='server', value='jro-app.igp.gob.pe')
173 173 ftp.addParameter(name='username', value='wmaster')
174 174 ftp.addParameter(name='password', value='mst2010vhf')
175 175 ftp.addParameter(
176 176 name='pattern1',
177 177 value='/local/path/rti,/remote/path,png,300,11,0'
178 178 )
179 179 ftp.addParameter(
180 180 name='pattern2',
181 181 value='/local/path/spc,/remote/path,png,300,11,0'
182 182 )
183 183 ftp.addParameter(
184 184 name='pattern3',
185 185 value='/local/path/param,/remote/path,hdf5,300,,'
186 186 )
187 187
188 188 """
189 189
190 190 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
191 191
192 192 def __init__(self):
193 193 '''
194 194 '''
195 195 Operation.__init__(self)
196 196 self.ftp = None
197 197 self.ready = False
198 self.current_time = time.time()
198 199
199 200 def setup(self, server, username, password, timeout, **kwargs):
200 201 '''
201 202 '''
202 203
203 204 self.server = server
204 205 self.username = username
205 206 self.password = password
206 207 self.timeout = timeout
207 208 self.patterns = []
208 209 self.times = []
209 210 self.latest = []
210 211 for arg, value in kwargs.items():
211 212 if 'pattern' in arg:
212 213 self.patterns.append(value)
213 214 self.times.append(0)
214 215 self.latest.append('')
215 216
216 217 def connect(self):
217 218 '''
218 219 '''
219 220
220 221 log.log('Connecting to ftp://{}'.format(self.server), self.name)
221 222 try:
222 self.ftp = ftplib.FTP(self.server, timeout=self.timeout)
223 self.ftp = ftplib.FTP(self.server, timeout=1)
223 224 except ftplib.all_errors:
224 225 log.error('Server connection fail: {}'.format(self.server), self.name)
225 226 if self.ftp is not None:
226 227 self.ftp.close()
227 228 self.ftp = None
228 229 self.ready = False
229 230 return
230 231
231 232 try:
232 233 self.ftp.login(self.username, self.password)
233 234 except ftplib.all_errors:
234 235 log.error('The given username y/o password are incorrect', self.name)
235 236 if self.ftp is not None:
236 237 self.ftp.close()
237 238 self.ftp = None
238 239 self.ready = False
239 240 return
240 241
241 242 log.success('Connection success', self.name)
242 243 self.ready = True
243 244 return
244 245
245 246 def check(self):
246 247
247 248 try:
249 if not self.ready:
250 if time.time()-self.current_time < self.timeout:
251 return
252 else:
253 self.current_time = time.time()
248 254 self.ftp.voidcmd("NOOP")
249 255 except:
250 256 log.warning('Connection lost... trying to reconnect', self.name)
251 257 if self.ftp is not None:
252 258 self.ftp.close()
253 259 self.ftp = None
254 260 self.connect()
255 261
256 262 def find_files(self, path, ext):
257 263
258 264 files = glob.glob1(path.strip(), '*{}'.format(ext.strip()))
259 265 files.sort()
260 266 if files:
261 267 return files[-1]
262 268 return None
263 269
264 270 def getftpname(self, filename, exp_code, sub_exp_code):
265 271
266 272 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
267 273 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
268 274 DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
269 275 exp_code = '%3.3d' % exp_code
270 276 sub_exp_code = '%2.2d' % sub_exp_code
271 277 plot_code = '%2.2d' % get_plot_code(filename)
272 278 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
273 279 return name
274 280
275 281 def upload(self, src, dst):
276 282
277 283 log.log('Uploading {} -> {} '.format(
278 284 src.split('/')[-1], dst.split('/')[-1]),
279 285 self.name,
280 286 nl=False
281 287 )
282 288
283 289 fp = open(src, 'rb')
284 290 command = 'STOR {}'.format(dst)
285 291
286 292 try:
287 293 self.ftp.storbinary(command, fp, blocksize=1024)
288 294 except Exception as e:
289 295 log.error('{}'.format(e), self.name)
290 296 return 0
291 297
292 298 try:
293 299 self.ftp.sendcmd('SITE CHMOD 755 {}'.format(dst))
294 300 except Exception as e:
295 301 log.error('{}'.format(e), self.name)
296 302 return 0
297 303
298 304 fp.close()
299 305 log.success('OK', tag='')
300 306 return 1
301 307
302 308 def send_files(self):
303 309
304 310 for x, pattern in enumerate(self.patterns):
305 311 local, remote, ext, period, exp_code, sub_exp_code = pattern
306 312
307 313 if (self.dataOut.utctime - self.times[x]) < int(period):
308 314 continue
309 315
310 316 srcname = self.find_files(local, ext)
311 317
312 318 if srcname is None:
313 319 continue
314 320
315 321 if srcname == self.latest[x]:
316 322 log.warning('File alreday uploaded {}'.format(srcname))
317 323 continue
318 324
319 325 if exp_code.strip():
320 326 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
321 327 else:
322 328 dstname = srcname
323 329
324 330 src = os.path.join(local, srcname)
325 331 dst = os.path.join(remote.strip(), dstname)
326 332
327 333 if self.upload(src, dst):
328 334 self.times[x] = self.dataOut.utctime
329 335 self.latest[x] = srcname
330 336
331 def run(self, dataOut, server, username, password, timeout=10, **kwargs):
337 def run(self, dataOut, server, username, password, timeout=60, **kwargs):
332 338
333 339 if not self.isConfig:
334 340 self.setup(
335 341 server=server,
336 342 username=username,
337 343 password=password,
338 344 timeout=timeout,
339 345 **kwargs
340 346 )
341 347 self.isConfig = True
342 348 self.connect()
343 349
344 350 self.dataOut = dataOut
345 351 self.check()
352 if self.ready:
346 353 self.send_files()
347 354
348 355 def close(self):
349 356
350 357 if self.ftp is not None:
351 358 self.ftp.close()
General Comments 0
You need to be logged in to leave comments. Login now