@@ -613,6 +613,7 class PolarMapPlot(Plot): | |||||
613 | self.titles = ['{} {}'.format( |
|
613 | self.titles = ['{} {}'.format( | |
614 | self.data.parameters[x], title) for x in self.channels] |
|
614 | self.data.parameters[x], title) for x in self.channels] | |
615 |
|
615 | |||
|
616 | ||||
616 | class ScopePlot(Plot): |
|
617 | class ScopePlot(Plot): | |
617 |
|
618 | |||
618 | ''' |
|
619 | ''' |
@@ -14,7 +14,7 from functools import wraps | |||||
14 | from threading import Thread |
|
14 | from threading import Thread | |
15 | from multiprocessing import Process |
|
15 | from multiprocessing import Process | |
16 |
|
16 | |||
17 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit |
|
17 | from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator | |
18 | from schainpy.model.data.jrodata import JROData |
|
18 | from schainpy.model.data.jrodata import JROData | |
19 | from schainpy.utils import log |
|
19 | from schainpy.utils import log | |
20 |
|
20 | |||
@@ -65,17 +65,11 class PublishData(Operation): | |||||
65 |
|
65 | |||
66 | __attrs__ = ['host', 'port', 'delay', 'verbose'] |
|
66 | __attrs__ = ['host', 'port', 'delay', 'verbose'] | |
67 |
|
67 | |||
68 | def __init__(self, **kwargs): |
|
|||
69 | """Inicio.""" |
|
|||
70 | Operation.__init__(self, **kwargs) |
|
|||
71 | self.isConfig = False |
|
|||
72 |
|
||||
73 | def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): |
|
68 | def setup(self, server='zmq.pipe', delay=0, verbose=True, **kwargs): | |
74 | self.counter = 0 |
|
69 | self.counter = 0 | |
75 | self.delay = kwargs.get('delay', 0) |
|
70 | self.delay = kwargs.get('delay', 0) | |
76 | self.cnt = 0 |
|
71 | self.cnt = 0 | |
77 | self.verbose = verbose |
|
72 | self.verbose = verbose | |
78 | setup = [] |
|
|||
79 | context = zmq.Context() |
|
73 | context = zmq.Context() | |
80 | self.zmq_socket = context.socket(zmq.PUSH) |
|
74 | self.zmq_socket = context.socket(zmq.PUSH) | |
81 | server = kwargs.get('server', 'zmq.pipe') |
|
75 | server = kwargs.get('server', 'zmq.pipe') | |
@@ -154,32 +148,43 class ReceiverData(ProcessingUnit): | |||||
154 | self.dataOut.datatime.ctime(),), |
|
148 | self.dataOut.datatime.ctime(),), | |
155 | 'Receiving') |
|
149 | 'Receiving') | |
156 |
|
150 | |||
157 |
|
151 | @MPDecorator | ||
158 |
class SendToFTP(Operation |
|
152 | class SendToFTP(Operation): | |
159 |
|
153 | |||
160 | ''' |
|
154 | ''' | |
161 | Operation to send data over FTP. |
|
155 | Operation to send data over FTP. | |
|
156 | patternX = 'local, remote, ext, period, exp_code, sub_exp_code' | |||
162 | ''' |
|
157 | ''' | |
163 |
|
158 | |||
164 |
__attrs__ = ['server', 'username', 'password', ' |
|
159 | __attrs__ = ['server', 'username', 'password', 'timeout', 'patternX'] | |
165 |
|
160 | |||
166 |
def __init__(self |
|
161 | def __init__(self): | |
167 | ''' |
|
162 | ''' | |
168 | patterns = [(local1, remote1, ext, delay, exp_code, sub_exp_code), ...] |
|
|||
169 | ''' |
|
163 | ''' | |
170 |
Operation.__init__(self |
|
164 | Operation.__init__(self) | |
171 | Process.__init__(self) |
|
|||
172 | self.server = kwargs.get('server') |
|
|||
173 | self.username = kwargs.get('username') |
|
|||
174 | self.password = kwargs.get('password') |
|
|||
175 | self.patterns = kwargs.get('patterns') |
|
|||
176 | self.timeout = kwargs.get('timeout', 30) |
|
|||
177 | self.times = [time.time() for p in self.patterns] |
|
|||
178 | self.latest = ['' for p in self.patterns] |
|
|||
179 | self.mp = False |
|
|||
180 | self.ftp = None |
|
165 | self.ftp = None | |
|
166 | self.ready = False | |||
181 |
|
167 | |||
182 | def setup(self): |
|
168 | def setup(self, server, username, password, timeout, **kwargs): | |
|
169 | ''' | |||
|
170 | ''' | |||
|
171 | ||||
|
172 | self.server = server | |||
|
173 | self.username = username | |||
|
174 | self.password = password | |||
|
175 | self.timeout = timeout | |||
|
176 | self.patterns = [] | |||
|
177 | self.times = [] | |||
|
178 | self.latest = [] | |||
|
179 | for arg, value in kwargs.items(): | |||
|
180 | if 'pattern' in arg: | |||
|
181 | self.patterns.append(value) | |||
|
182 | self.times.append(time.time()) | |||
|
183 | self.latest.append('') | |||
|
184 | ||||
|
185 | def connect(self): | |||
|
186 | ''' | |||
|
187 | ''' | |||
183 |
|
188 | |||
184 | log.log('Connecting to ftp://{}'.format(self.server), self.name) |
|
189 | log.log('Connecting to ftp://{}'.format(self.server), self.name) | |
185 | try: |
|
190 | try: | |
@@ -189,7 +194,7 class SendToFTP(Operation, Process): | |||||
189 | if self.ftp is not None: |
|
194 | if self.ftp is not None: | |
190 | self.ftp.close() |
|
195 | self.ftp.close() | |
191 | self.ftp = None |
|
196 | self.ftp = None | |
192 |
self. |
|
197 | self.ready = False | |
193 | return |
|
198 | return | |
194 |
|
199 | |||
195 | try: |
|
200 | try: | |
@@ -199,11 +204,11 class SendToFTP(Operation, Process): | |||||
199 | if self.ftp is not None: |
|
204 | if self.ftp is not None: | |
200 | self.ftp.close() |
|
205 | self.ftp.close() | |
201 | self.ftp = None |
|
206 | self.ftp = None | |
202 |
self. |
|
207 | self.ready = False | |
203 | return |
|
208 | return | |
204 |
|
209 | |||
205 | log.success('Connection success', self.name) |
|
210 | log.success('Connection success', self.name) | |
206 |
self. |
|
211 | self.ready = True | |
207 | return |
|
212 | return | |
208 |
|
213 | |||
209 | def check(self): |
|
214 | def check(self): | |
@@ -215,7 +220,7 class SendToFTP(Operation, Process): | |||||
215 | if self.ftp is not None: |
|
220 | if self.ftp is not None: | |
216 | self.ftp.close() |
|
221 | self.ftp.close() | |
217 | self.ftp = None |
|
222 | self.ftp = None | |
218 |
self. |
|
223 | self.connect() | |
219 |
|
224 | |||
220 | def find_files(self, path, ext): |
|
225 | def find_files(self, path, ext): | |
221 |
|
226 | |||
@@ -228,17 +233,21 class SendToFTP(Operation, Process): | |||||
228 | def getftpname(self, filename, exp_code, sub_exp_code): |
|
233 | def getftpname(self, filename, exp_code, sub_exp_code): | |
229 |
|
234 | |||
230 | thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') |
|
235 | thisDatetime = datetime.datetime.strptime(filename.split('_')[1], '%Y%m%d') | |
231 | YEAR_STR = '%4.4d'%thisDatetime.timetuple().tm_year |
|
236 | YEAR_STR = '%4.4d' % thisDatetime.timetuple().tm_year | |
232 | DOY_STR = '%3.3d'%thisDatetime.timetuple().tm_yday |
|
237 | DOY_STR = '%3.3d' % thisDatetime.timetuple().tm_yday | |
233 | exp_code = '%3.3d'%exp_code |
|
238 | exp_code = '%3.3d' % exp_code | |
234 | sub_exp_code = '%2.2d'%sub_exp_code |
|
239 | sub_exp_code = '%2.2d' % sub_exp_code | |
235 | plot_code = '%2.2d'% get_plot_code(filename) |
|
240 | plot_code = '%2.2d' % get_plot_code(filename) | |
236 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' |
|
241 | name = YEAR_STR + DOY_STR + '00' + exp_code + sub_exp_code + plot_code + '00.png' | |
237 | return name |
|
242 | return name | |
238 |
|
243 | |||
239 | def upload(self, src, dst): |
|
244 | def upload(self, src, dst): | |
240 |
|
245 | |||
241 |
log.log('Uploading {} '.format( |
|
246 | log.log('Uploading {} -> {} '.format( | |
|
247 | src.split('/')[-1], dst.split('/')[-1]), | |||
|
248 | self.name, | |||
|
249 | nl=False | |||
|
250 | ) | |||
242 |
|
251 | |||
243 | fp = open(src, 'rb') |
|
252 | fp = open(src, 'rb') | |
244 | command = 'STOR {}'.format(dst) |
|
253 | command = 'STOR {}'.format(dst) | |
@@ -268,18 +277,20 class SendToFTP(Operation, Process): | |||||
268 | def send_files(self): |
|
277 | def send_files(self): | |
269 |
|
278 | |||
270 | for x, pattern in enumerate(self.patterns): |
|
279 | for x, pattern in enumerate(self.patterns): | |
271 |
local, remote, ext, d |
|
280 | local, remote, ext, period, exp_code, sub_exp_code = pattern | |
272 |
if time.time()-self.times[x] >= |
|
281 | if time.time()-self.times[x] >= int(period): | |
273 |
srcname = self.find_files(local, ext) |
|
282 | srcname = self.find_files(local, ext) | |
274 |
src = os.path.join(local, srcname) |
|
283 | src = os.path.join(local, srcname) | |
275 |
if os.path.getmtime(src) < time.time() - 30*60: |
|
284 | if os.path.getmtime(src) < time.time() - 30*60: | |
|
285 | log.warning('Skipping old file {}'.format(srcname)) | |||
276 | continue |
|
286 | continue | |
277 |
|
287 | |||
278 | if srcname is None or srcname == self.latest[x]: |
|
288 | if srcname is None or srcname == self.latest[x]: | |
|
289 | log.warning('File alreday uploaded {}'.format(srcname)) | |||
279 | continue |
|
290 | continue | |
280 |
|
291 | |||
281 | if 'png' in ext: |
|
292 | if 'png' in ext: | |
282 | dstname = self.getftpname(srcname, exp_code, sub_exp_code) |
|
293 | dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code)) | |
283 | else: |
|
294 | else: | |
284 | dstname = srcname |
|
295 | dstname = srcname | |
285 |
|
296 | |||
@@ -289,21 +300,27 class SendToFTP(Operation, Process): | |||||
289 | self.times[x] = time.time() |
|
300 | self.times[x] = time.time() | |
290 | self.latest[x] = srcname |
|
301 | self.latest[x] = srcname | |
291 | else: |
|
302 | else: | |
292 |
self. |
|
303 | self.ready = False | |
293 | break |
|
304 | break | |
294 |
|
305 | |||
295 | def run(self): |
|
306 | def run(self, dataOut, server, username, password, timeout=10, **kwargs): | |
296 |
|
307 | |||
297 | while True: |
|
308 | if not self.isConfig: | |
298 |
|
|
309 | self.setup( | |
299 |
se |
|
310 | server=server, | |
300 | if self.ftp is not None: |
|
311 | username=username, | |
301 | self.check() |
|
312 | password=password, | |
302 | self.send_files() |
|
313 | timeout=timeout, | |
303 | time.sleep(10) |
|
314 | **kwargs | |
|
315 | ) | |||
|
316 | self.isConfig = True | |||
|
317 | if not self.ready: | |||
|
318 | self.connect() | |||
|
319 | if self.ftp is not None: | |||
|
320 | self.check() | |||
|
321 | self.send_files() | |||
304 |
|
322 | |||
305 | def close(): |
|
323 | def close(self): | |
306 |
|
324 | |||
307 | if self.ftp is not None: |
|
325 | if self.ftp is not None: | |
308 | self.ftp.close() |
|
326 | self.ftp.close() | |
309 | self.terminate() No newline at end of file |
|
General Comments 0
You need to be logged in to leave comments.
Login now