##// END OF EJS Templates
Fix SendToFTP operation for v3
Juan C. Espinoza -
r1209:53188d049ed7
parent child
Show More
@@ -613,6 +613,7 class PolarMapPlot(Plot):
613 self.titles = ['{} {}'.format(
613 self.titles = ['{} {}'.format(
614 self.data.parameters[x], title) for x in self.channels]
614 self.data.parameters[x], title) for x in self.channels]
615
615
616
616 class ScopePlot(Plot):
617 class ScopePlot(Plot):
617
618
618 '''
619 '''
@@ -14,7 +14,7 from functools import wraps
14 from threading import Thread
14 from threading import Thread
15 from multiprocessing import Process
15 from multiprocessing import Process
16
16
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
18 from schainpy.model.data.jrodata import JROData
18 from schainpy.model.data.jrodata import JROData
19 from schainpy.utils import log
19 from schainpy.utils import log
20
20
@@ -65,17 +65,11 class PublishData(Operation):
65
65
66 __attrs__ = ['host', 'port', 'delay', 'verbose']
66 __attrs__ = ['host', 'port', 'delay', 'verbose']
67
67
68 def __init__(self, **kwargs):
69 """Inicio."""
70 Operation.__init__(self, **kwargs)
71 self.isConfig = False
72
73 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
68 def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs):
74 self.counter = 0
69 self.counter = 0
75 self.delay = kwargs.get('delay', 0)
70 self.delay = kwargs.get('delay', 0)
76 self.cnt = 0
71 self.cnt = 0
77 self.verbose = verbose
72 self.verbose = verbose
78 setup = []
79 context = zmq.Context()
73 context = zmq.Context()
80 self.zmq_socket = context.socket(zmq.PUSH)
74 self.zmq_socket = context.socket(zmq.PUSH)
81 server = kwargs.get('server', 'zmq.pipe')
75 server = kwargs.get('server', 'zmq.pipe')
@@ -154,32 +148,43 class ReceiverData(ProcessingUnit):
154 self.dataOut.datatime.ctime(),),
148 self.dataOut.datatime.ctime(),),
155 'Receiving')
149 'Receiving')
156
150
157
151 @MPDecorator
158 class SendToFTP(Operation, Process):
152 class SendToFTP(Operation):
159
153
160 '''
154 '''
161 Operation to send data over FTP.
155 Operation to send data over FTP.
156 patternX = 'local, remote, ext, period, exp_code, sub_exp_code'
162 '''
157 '''
163
158
164 __attrs__ = ['server', 'username', 'password', 'patterns', 'timeout']
159 __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX']
165
160
166 def __init__(self, **kwargs):
161 def __init__(self):
167 '''
162 '''
168 patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...]
169 '''
163 '''
170 Operation.__init__(self, **kwargs)
164 Operation.__init__(self)
171 Process.__init__(self)
172 self.server = kwargs.get('server')
173 self.username = kwargs.get('username')
174 self.password = kwargs.get('password')
175 self.patterns = kwargs.get('patterns')
176 self.timeout = kwargs.get('timeout', 30)
177 self.times = [time.time() for p in self.patterns]
178 self.latest = ['' for p in self.patterns]
179 self.mp = False
180 self.ftp = None
165 self.ftp = None
166 self.ready = False
181
167
182 def setup(self):
168 def setup(self, server, username, password, timeout, **kwargs):
169 '''
170 '''
171
172 self.server = server
173 self.username = username
174 self.password = password
175 self.timeout = timeout
176 self.patterns = []
177 self.times = []
178 self.latest = []
179 for arg, value in kwargs.items():
180 if 'pattern' in arg:
181 self.patterns.append(value)
182 self.times.append(time.time())
183 self.latest.append('')
184
185 def connect(self):
186 '''
187 '''
183
188
184 log.log('Connecting to ftp://{}'.format(self.server), self.name)
189 log.log('Connecting to ftp://{}'.format(self.server), self.name)
185 try:
190 try:
@@ -189,7 +194,7 class SendToFTP(Operation, Process):
189 if self.ftp is not None:
194 if self.ftp is not None:
190 self.ftp.close()
195 self.ftp.close()
191 self.ftp = None
196 self.ftp = None
192 self.isConfig = False
197 self.ready = False
193 return
198 return
194
199
195 try:
200 try:
@@ -199,11 +204,11 class SendToFTP(Operation, Process):
199 if self.ftp is not None:
204 if self.ftp is not None:
200 self.ftp.close()
205 self.ftp.close()
201 self.ftp = None
206 self.ftp = None
202 self.isConfig = False
207 self.ready = False
203 return
208 return
204
209
205 log.success('Connection success', self.name)
210 log.success('Connection success', self.name)
206 self.isConfig = True
211 self.ready = True
207 return
212 return
208
213
209 def check(self):
214 def check(self):
@@ -215,7 +220,7 class SendToFTP(Operation, Process):
215 if self.ftp is not None:
220 if self.ftp is not None:
216 self.ftp.close()
221 self.ftp.close()
217 self.ftp = None
222 self.ftp = None
218 self.setup()
223 self.connect()
219
224
220 def find_files(self, path, ext):
225 def find_files(self, path, ext):
221
226
@@ -228,17 +233,21 class SendToFTP(Operation, Process):
228 def getftpname(self, filename, exp_code, sub_exp_code):
233 def getftpname(self, filename, exp_code, sub_exp_code):
229
234
230 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
235 thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d')
231 YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year
236 YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year
232 DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday
237 DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday
233 exp_code = '%3.3d'%exp_code
238 exp_code = '%3.3d' % exp_code
234 sub_exp_code = '%2.2d'%sub_exp_code
239 sub_exp_code = '%2.2d' % sub_exp_code
235 plot_code = '%2.2d'% get_plot_code(filename)
240 plot_code = '%2.2d' % get_plot_code(filename)
236 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
241 name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png'
237 return name
242 return name
238
243
239 def upload(self, src, dst):
244 def upload(self, src, dst):
240
245
241 log.log('Uploading {} '.format(src), self.name, nl=False)
246 log.log('Uploading {} -> {} '.format(
247 src.split('/')[-1], dst.split('/')[-1]),
248 self.name,
249 nl=False
250 )
242
251
243 fp = open(src, 'rb')
252 fp = open(src, 'rb')
244 command = 'STOR {}'.format(dst)
253 command = 'STOR {}'.format(dst)
@@ -268,18 +277,20 class SendToFTP(Operation, Process):
268 def send_files(self):
277 def send_files(self):
269
278
270 for x, pattern in enumerate(self.patterns):
279 for x, pattern in enumerate(self.patterns):
271 local, remote, ext, delay, exp_code, sub_exp_code = pattern
280 local, remote, ext, period, exp_code, sub_exp_code = pattern
272 if time.time()-self.times[x] >= delay:
281 if time.time()-self.times[x] >= int(period):
273 srcname = self.find_files(local, ext)
282 srcname = self.find_files(local, ext)
274 src = os.path.join(local, srcname)
283 src = os.path.join(local, srcname)
275 if os.path.getmtime(src) < time.time() - 30*60:
284 if os.path.getmtime(src) < time.time() - 30*60:
285 log.warning('Skipping old file {}'.format(srcname))
276 continue
286 continue
277
287
278 if srcname is None or srcname == self.latest[x]:
288 if srcname is None or srcname == self.latest[x]:
289 log.warning('File alreday uploaded {}'.format(srcname))
279 continue
290 continue
280
291
281 if 'png' in ext:
292 if 'png' in ext:
282 dstname = self.getftpname(srcname, exp_code, sub_exp_code)
293 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
283 else:
294 else:
284 dstname = srcname
295 dstname = srcname
285
296
@@ -289,21 +300,27 class SendToFTP(Operation, Process):
289 self.times[x] = time.time()
300 self.times[x] = time.time()
290 self.latest[x] = srcname
301 self.latest[x] = srcname
291 else:
302 else:
292 self.isConfig = False
303 self.ready = False
293 break
304 break
294
305
295 def run(self):
306 def run(self, dataOut, server, username, password, timeout=10, **kwargs):
296
307
297 while True:
308 if not self.isConfig:
298 if not self.isConfig:
309 self.setup(
299 self.setup()
310 server=server,
300 if self.ftp is not None:
311 username=username,
301 self.check()
312 password=password,
302 self.send_files()
313 timeout=timeout,
303 time.sleep(10)
314 **kwargs
315 )
316 self.isConfig = True
317 if not self.ready:
318 self.connect()
319 if self.ftp is not None:
320 self.check()
321 self.send_files()
304
322
305 def close():
323 def close(self):
306
324
307 if self.ftp is not None:
325 if self.ftp is not None:
308 self.ftp.close()
326 self.ftp.close()
309 self.terminate() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now