##// END OF EJS Templates
Fix SendToFTP operation for v3
Juan C. Espinoza -
r1209:53188d049ed7
parent child
Show More
@@ -613,6 +613,7 class PolarMapPlot(Plot):
613 613 self.titles = ['{} {}'.format(
614 614 self.data.parameters[x], title) for x in self.channels]
615 615
616
616 617 class ScopePlot(Plot):
617 618
618 619 '''
@@ -14,7 +14,7 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
18 18 from schainpy.model.data.jrodata import JROData
19 19 from schainpy.utils import log
20 20
@@ -65,17 +65,11 class PublishData(Operation):
65 65
66 66 __attrs__ = ['host', 'port', 'delay', 'verbose']
67 67
68 def __init__(self, **kwargs):
69 """Inicio."""
70 Operation.__init__(self, **kwargs)
71 self.isConfig = False
72
73 68 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
74 69 self.counter = 0
75 70 self.delay = kwargs.get('delay', 0)
76 71 self.cnt = 0
77 72 self.verbose = verbose
78 setup = []
79 73 context = zmq.Context()
80 74 self.zmq_socket = context.socket(zmq.PUSH)
81 75 server = kwargs.get('server', 'zmq.pipe')
@@ -154,32 +148,43 class ReceiverData(ProcessingUnit):
154 148 self.dataOut.datatime.ctime(),),
155 149 'Receiving')
156 150
157
158 class SendToFTP(Operation, Process):
151 @MPDecorator
152 class SendToFTP(Operation):
159 153
160 154 '''
161 155 Operation to send data over FTP.
156 patternX = 'local, remote, ext, period, exp_code, sub_exp_code'
162 157 '''
163 158
164 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
159 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
165 160
166 def __init__(self, **kwargs):
161 def __init__(self):
167 162 '''
168 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
169 163 '''
170 Operation.__init__(self, **kwargs)
171 Process.__init__(self)
172 self.server = kwargs.get('server')
173 self.username = kwargs.get('username')
174 self.password = kwargs.get('password')
175 self.patterns = kwargs.get('patterns')
176 self.timeout = kwargs.get('timeout', 30)
177 self.times = [time.time() for p in self.patterns]
178 self.latest = ['' for p in self.patterns]
179 self.mp = False
164 Operation.__init__(self)
180 165 self.ftp = None
166 self.ready = False
181 167
182 def setup(self):
168 def setup(self, server, username, password, timeout, **kwargs):
169 '''
170 '''
171
172 self.server = server
173 self.username = username
174 self.password = password
175 self.timeout = timeout
176 self.patterns = []
177 self.times = []
178 self.latest = []
179 for arg, value in kwargs.items():
180 if 'pattern' in arg:
181 self.patterns.append(value)
182 self.times.append(time.time())
183 self.latest.append('')
184
185 def connect(self):
186 '''
187 '''
183 188
184 189 log.log('Connecting to ftp://{}'.format(self.server), self.name)
185 190 try:
@@ -189,7 +194,7 class SendToFTP(Operation, Process):
189 194 if self.ftp is not None:
190 195 self.ftp.close()
191 196 self.ftp = None
192 self.isConfig = False
197 self.ready = False
193 198 return
194 199
195 200 try:
@@ -199,11 +204,11 class SendToFTP(Operation, Process):
199 204 if self.ftp is not None:
200 205 self.ftp.close()
201 206 self.ftp = None
202 self.isConfig = False
207 self.ready = False
203 208 return
204 209
205 210 log.success('Connection success', self.name)
206 self.isConfig = True
211 self.ready = True
207 212 return
208 213
209 214 def check(self):
@@ -215,7 +220,7 class SendToFTP(Operation, Process):
215 220 if self.ftp is not None:
216 221 self.ftp.close()
217 222 self.ftp = None
218 self.setup()
223 self.connect()
219 224
220 225 def find_files(self, path, ext):
221 226
@@ -228,17 +233,21 class SendToFTP(Operation, Process):
228 233 def getftpname(self, filename, exp_code, sub_exp_code):
229 234
230 235 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
231 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
232 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
233 exp_code = '%3.3d'%exp_code
234 sub_exp_code = '%2.2d'%sub_exp_code
235 plot_code = '%2.2d'% get_plot_code(filename)
236 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
237 DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
238 exp_code = '%3.3d' % exp_code
239 sub_exp_code = '%2.2d' % sub_exp_code
240 plot_code = '%2.2d' % get_plot_code(filename)
236 241 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
237 242 return name
238 243
239 244 def upload(self, src, dst):
240 245
241 log.log('Uploading {} '.format(src), self.name, nl=False)
246 log.log('Uploading {} -> {} '.format(
247 src.split('/')[-1], dst.split('/')[-1]),
248 self.name,
249 nl=False
250 )
242 251
243 252 fp = open(src, 'rb')
244 253 command = 'STOR {}'.format(dst)
@@ -268,18 +277,20 class SendToFTP(Operation, Process):
268 277 def send_files(self):
269 278
270 279 for x, pattern in enumerate(self.patterns):
271 local, remote, ext, delay, exp_code, sub_exp_code = pattern
272 if time.time()-self.times[x] >= delay:
273 srcname = self.find_files(local, ext)
274 src = os.path.join(local, srcname)
275 if os.path.getmtime(src) < time.time() - 30*60:
280 local, remote, ext, period, exp_code, sub_exp_code = pattern
281 if time.time()-self.times[x] >= int(period):
282 srcname = self.find_files(local, ext)
283 src = os.path.join(local, srcname)
284 if os.path.getmtime(src) < time.time() - 30*60:
285 log.warning('Skipping old file {}'.format(srcname))
276 286 continue
277 287
278 288 if srcname is None or srcname == self.latest[x]:
289 log.warning('File alreday uploaded {}'.format(srcname))
279 290 continue
280 291
281 292 if 'png' in ext:
282 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
293 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
283 294 else:
284 295 dstname = srcname
285 296
@@ -289,21 +300,27 class SendToFTP(Operation, Process):
289 300 self.times[x] = time.time()
290 301 self.latest[x] = srcname
291 302 else:
292 self.isConfig = False
303 self.ready = False
293 304 break
294 305
295 def run(self):
306 def run(self, dataOut, server, username, password, timeout=10, **kwargs):
296 307
297 while True:
298 if not self.isConfig:
299 self.setup()
300 if self.ftp is not None:
301 self.check()
302 self.send_files()
303 time.sleep(10)
308 if not self.isConfig:
309 self.setup(
310 server=server,
311 username=username,
312 password=password,
313 timeout=timeout,
314 **kwargs
315 )
316 self.isConfig = True
317 if not self.ready:
318 self.connect()
319 if self.ftp is not None:
320 self.check()
321 self.send_files()
304 322
305 def close():
323 def close(self):
306 324
307 325 if self.ftp is not None:
308 326 self.ftp.close()
309 self.terminate() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now