##// END OF EJS Templates
FTP modificado, para usarse libremente (no schain) y subir rti de AMISR a jro-app, ventaja de usar esta clase antigua solo se sube un archivo una vez finalizado el procesamiento offline.
joabAM -
r1422:161fd8f163b6
parent child
Show More
@@ -1,659 +1,660
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
1 # Copyright (c) 2012-2020 Jicamarca Radio Observatory
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Distributed under the terms of the BSD 3-clause license.
4 # Distributed under the terms of the BSD 3-clause license.
5 """API to create signal chain projects
5 """API to create signal chain projects
6
6
7 The API is provide through class: Project
7 The API is provide through class: Project
8 """
8 """
9
9
10 import re
10 import re
11 import sys
11 import sys
12 import ast
12 import ast
13 import datetime
13 import datetime
14 import traceback
14 import traceback
15 import time
15 import time
16 import multiprocessing
16 import multiprocessing
17 from multiprocessing import Process, Queue
17 from multiprocessing import Process, Queue
18 from threading import Thread
18 from threading import Thread
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
19 from xml.etree.ElementTree import ElementTree, Element, SubElement
20
20
21 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.admin import Alarm, SchainWarning
22 from schainpy.model import *
22 from schainpy.model import *
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
25 if 'darwin' in sys.platform and sys.version_info[0] == 3 and sys.version_info[1] > 7:
26 multiprocessing.set_start_method('fork')
26 multiprocessing.set_start_method('fork')
27
27
28 class ConfBase():
28 class ConfBase():
29
29
30 def __init__(self):
30 def __init__(self):
31
31
32 self.id = '0'
32 self.id = '0'
33 self.name = None
33 self.name = None
34 self.priority = None
34 self.priority = None
35 self.parameters = {}
35 self.parameters = {}
36 self.object = None
36 self.object = None
37 self.operations = []
37 self.operations = []
38
38
39 def getId(self):
39 def getId(self):
40
40
41 return self.id
41 return self.id
42
42
43 def getNewId(self):
43 def getNewId(self):
44
44
45 return int(self.id) * 10 + len(self.operations) + 1
45 return int(self.id) * 10 + len(self.operations) + 1
46
46
47 def updateId(self, new_id):
47 def updateId(self, new_id):
48
48
49 self.id = str(new_id)
49 self.id = str(new_id)
50
50
51 n = 1
51 n = 1
52 for conf in self.operations:
52 for conf in self.operations:
53 conf_id = str(int(new_id) * 10 + n)
53 conf_id = str(int(new_id) * 10 + n)
54 conf.updateId(conf_id)
54 conf.updateId(conf_id)
55 n += 1
55 n += 1
56
56
57 def getKwargs(self):
57 def getKwargs(self):
58
58
59 params = {}
59 params = {}
60
60
61 for key, value in self.parameters.items():
61 for key, value in self.parameters.items():
62 if value not in (None, '', ' '):
62 if value not in (None, '', ' '):
63 params[key] = value
63 params[key] = value
64
64
65 return params
65 return params
66
66
67 def update(self, **kwargs):
67 def update(self, **kwargs):
68
68
69 for key, value in kwargs.items():
69 for key, value in kwargs.items():
70 self.addParameter(name=key, value=value)
70 self.addParameter(name=key, value=value)
71
71
72 def addParameter(self, name, value, format=None):
72 def addParameter(self, name, value, format=None):
73 '''
73 '''
74 '''
74 '''
75
75
76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
76 if isinstance(value, str) and re.search(r'(\d+/\d+/\d+)', value):
77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
77 self.parameters[name] = datetime.date(*[int(x) for x in value.split('/')])
78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
78 elif isinstance(value, str) and re.search(r'(\d+:\d+:\d+)', value):
79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
79 self.parameters[name] = datetime.time(*[int(x) for x in value.split(':')])
80 else:
80 else:
81 try:
81 try:
82 self.parameters[name] = ast.literal_eval(value)
82 self.parameters[name] = ast.literal_eval(value)
83 except:
83 except:
84 if isinstance(value, str) and ',' in value:
84 if isinstance(value, str) and ',' in value:
85 self.parameters[name] = value.split(',')
85 self.parameters[name] = value.split(',')
86 else:
86 else:
87 self.parameters[name] = value
87 self.parameters[name] = value
88
88
89 def getParameters(self):
89 def getParameters(self):
90
90
91 params = {}
91 params = {}
92 for key, value in self.parameters.items():
92 for key, value in self.parameters.items():
93 s = type(value).__name__
93 s = type(value).__name__
94 if s == 'date':
94 if s == 'date':
95 params[key] = value.strftime('%Y/%m/%d')
95 params[key] = value.strftime('%Y/%m/%d')
96 elif s == 'time':
96 elif s == 'time':
97 params[key] = value.strftime('%H:%M:%S')
97 params[key] = value.strftime('%H:%M:%S')
98 else:
98 else:
99 params[key] = str(value)
99 params[key] = str(value)
100
100
101 return params
101 return params
102
102
103 def makeXml(self, element):
103 def makeXml(self, element):
104
104
105 xml = SubElement(element, self.ELEMENTNAME)
105 xml = SubElement(element, self.ELEMENTNAME)
106 for label in self.xml_labels:
106 for label in self.xml_labels:
107 xml.set(label, str(getattr(self, label)))
107 xml.set(label, str(getattr(self, label)))
108
108
109 for key, value in self.getParameters().items():
109 for key, value in self.getParameters().items():
110 xml_param = SubElement(xml, 'Parameter')
110 xml_param = SubElement(xml, 'Parameter')
111 xml_param.set('name', key)
111 xml_param.set('name', key)
112 xml_param.set('value', value)
112 xml_param.set('value', value)
113
113
114 for conf in self.operations:
114 for conf in self.operations:
115 conf.makeXml(xml)
115 conf.makeXml(xml)
116
116
117 def __str__(self):
117 def __str__(self):
118
118
119 if self.ELEMENTNAME == 'Operation':
119 if self.ELEMENTNAME == 'Operation':
120 s = ' {}[id={}]\n'.format(self.name, self.id)
120 s = ' {}[id={}]\n'.format(self.name, self.id)
121 else:
121 else:
122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
122 s = '{}[id={}, inputId={}]\n'.format(self.name, self.id, self.inputId)
123
123
124 for key, value in self.parameters.items():
124 for key, value in self.parameters.items():
125 if self.ELEMENTNAME == 'Operation':
125 if self.ELEMENTNAME == 'Operation':
126 s += ' {}: {}\n'.format(key, value)
126 s += ' {}: {}\n'.format(key, value)
127 else:
127 else:
128 s += ' {}: {}\n'.format(key, value)
128 s += ' {}: {}\n'.format(key, value)
129
129
130 for conf in self.operations:
130 for conf in self.operations:
131 s += str(conf)
131 s += str(conf)
132
132
133 return s
133 return s
134
134
135 class OperationConf(ConfBase):
135 class OperationConf(ConfBase):
136
136
137 ELEMENTNAME = 'Operation'
137 ELEMENTNAME = 'Operation'
138 xml_labels = ['id', 'name']
138 xml_labels = ['id', 'name']
139
139
140 def setup(self, id, name, priority, project_id, err_queue):
140 def setup(self, id, name, priority, project_id, err_queue):
141
141
142 self.id = str(id)
142 self.id = str(id)
143 self.project_id = project_id
143 self.project_id = project_id
144 self.name = name
144 self.name = name
145 self.type = 'other'
145 self.type = 'other'
146 self.err_queue = err_queue
146 self.err_queue = err_queue
147
147
148 def readXml(self, element, project_id, err_queue):
148 def readXml(self, element, project_id, err_queue):
149
149
150 self.id = element.get('id')
150 self.id = element.get('id')
151 self.name = element.get('name')
151 self.name = element.get('name')
152 self.type = 'other'
152 self.type = 'other'
153 self.project_id = str(project_id)
153 self.project_id = str(project_id)
154 self.err_queue = err_queue
154 self.err_queue = err_queue
155
155
156 for elm in element.iter('Parameter'):
156 for elm in element.iter('Parameter'):
157 self.addParameter(elm.get('name'), elm.get('value'))
157 self.addParameter(elm.get('name'), elm.get('value'))
158
158
159 def createObject(self):
159 def createObject(self):
160
160
161 className = eval(self.name)
161 className = eval(self.name)
162
162
163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
163 if 'Plot' in self.name or 'Writer' in self.name or 'Send' in self.name or 'print' in self.name:
164 kwargs = self.getKwargs()
164 kwargs = self.getKwargs()
165 opObj = className(self.name, self.id, self.project_id, self.err_queue, **kwargs)
165 opObj = className(self.name, self.id, self.project_id, self.err_queue, **kwargs)
166 opObj.start()
166 opObj.start()
167 self.type = 'external'
167 self.type = 'external'
168 else:
168 else:
169 opObj = className()
169 opObj = className()
170
170
171 self.object = opObj
171 self.object = opObj
172 return opObj
172 return opObj
173
173
174 class ProcUnitConf(ConfBase):
174 class ProcUnitConf(ConfBase):
175
175
176 ELEMENTNAME = 'ProcUnit'
176 ELEMENTNAME = 'ProcUnit'
177 xml_labels = ['id', 'inputId', 'name']
177 xml_labels = ['id', 'inputId', 'name']
178
178
179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
179 def setup(self, project_id, id, name, datatype, inputId, err_queue):
180 '''
180 '''
181 '''
181 '''
182
182
183 if datatype == None and name == None:
183 if datatype == None and name == None:
184 raise ValueError('datatype or name should be defined')
184 raise ValueError('datatype or name should be defined')
185
185
186 if name == None:
186 if name == None:
187 if 'Proc' in datatype:
187 if 'Proc' in datatype:
188 name = datatype
188 name = datatype
189 else:
189 else:
190 name = '%sProc' % (datatype)
190 name = '%sProc' % (datatype)
191
191
192 if datatype == None:
192 if datatype == None:
193 datatype = name.replace('Proc', '')
193 datatype = name.replace('Proc', '')
194
194
195 self.id = str(id)
195 self.id = str(id)
196 self.project_id = project_id
196 self.project_id = project_id
197 self.name = name
197 self.name = name
198 self.datatype = datatype
198 self.datatype = datatype
199 self.inputId = inputId
199 self.inputId = inputId
200 self.err_queue = err_queue
200 self.err_queue = err_queue
201 self.operations = []
201 self.operations = []
202 self.parameters = {}
202 self.parameters = {}
203
203
204 def removeOperation(self, id):
204 def removeOperation(self, id):
205
205
206 i = [1 if x.id==id else 0 for x in self.operations]
206 i = [1 if x.id==id else 0 for x in self.operations]
207 self.operations.pop(i.index(1))
207 self.operations.pop(i.index(1))
208
208
209 def getOperation(self, id):
209 def getOperation(self, id):
210
210
211 for conf in self.operations:
211 for conf in self.operations:
212 if conf.id == id:
212 if conf.id == id:
213 return conf
213 return conf
214
214
215 def addOperation(self, name, optype='self'):
215 def addOperation(self, name, optype='self'):
216 '''
216 '''
217 '''
217 '''
218
218
219 id = self.getNewId()
219 id = self.getNewId()
220 conf = OperationConf()
220 conf = OperationConf()
221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
221 conf.setup(id, name=name, priority='0', project_id=self.project_id, err_queue=self.err_queue)
222 self.operations.append(conf)
222 self.operations.append(conf)
223
223
224 return conf
224 return conf
225
225
226 def readXml(self, element, project_id, err_queue):
226 def readXml(self, element, project_id, err_queue):
227
227
228 self.id = element.get('id')
228 self.id = element.get('id')
229 self.name = element.get('name')
229 self.name = element.get('name')
230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
230 self.inputId = None if element.get('inputId') == 'None' else element.get('inputId')
231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
231 self.datatype = element.get('datatype', self.name.replace(self.ELEMENTNAME.replace('Unit', ''), ''))
232 self.project_id = str(project_id)
232 self.project_id = str(project_id)
233 self.err_queue = err_queue
233 self.err_queue = err_queue
234 self.operations = []
234 self.operations = []
235 self.parameters = {}
235 self.parameters = {}
236
236
237 for elm in element:
237 for elm in element:
238 if elm.tag == 'Parameter':
238 if elm.tag == 'Parameter':
239 self.addParameter(elm.get('name'), elm.get('value'))
239 self.addParameter(elm.get('name'), elm.get('value'))
240 elif elm.tag == 'Operation':
240 elif elm.tag == 'Operation':
241 conf = OperationConf()
241 conf = OperationConf()
242 conf.readXml(elm, project_id, err_queue)
242 conf.readXml(elm, project_id, err_queue)
243 self.operations.append(conf)
243 self.operations.append(conf)
244
244
245 def createObjects(self):
245 def createObjects(self):
246 '''
246 '''
247 Instancia de unidades de procesamiento.
247 Instancia de unidades de procesamiento.
248 '''
248 '''
249
249
250 className = eval(self.name)
250 className = eval(self.name)
251 kwargs = self.getKwargs()
251 kwargs = self.getKwargs()
252 procUnitObj = className()
252 procUnitObj = className()
253 procUnitObj.name = self.name
253 procUnitObj.name = self.name
254 log.success('creating process... id: {}, inputId: {}'.format(self.id,self.inputId), self.name)
254 log.success('creating process... id: {}, inputId: {}'.format(self.id,self.inputId), self.name)
255
255
256 for conf in self.operations:
256 for conf in self.operations:
257
257
258 opObj = conf.createObject()
258 opObj = conf.createObject()
259
259
260 log.success('adding operation: {}, type:{}'.format(conf.name,conf.type), self.name)
260 log.success('adding operation: {}, type:{}'.format(conf.name,conf.type), self.name)
261
261
262 procUnitObj.addOperation(conf, opObj)
262 procUnitObj.addOperation(conf, opObj)
263
263
264 self.object = procUnitObj
264 self.object = procUnitObj
265
265
266 def run(self):
266 def run(self):
267 '''
267 '''
268 '''
268 '''
269
269
270 return self.object.call(**self.getKwargs())
270 return self.object.call(**self.getKwargs())
271
271
272
272
273 class ReadUnitConf(ProcUnitConf):
273 class ReadUnitConf(ProcUnitConf):
274
274
275 ELEMENTNAME = 'ReadUnit'
275 ELEMENTNAME = 'ReadUnit'
276
276
277 def __init__(self):
277 def __init__(self):
278
278
279 self.id = None
279 self.id = None
280 self.datatype = None
280 self.datatype = None
281 self.name = None
281 self.name = None
282 self.inputId = None
282 self.inputId = None
283 self.operations = []
283 self.operations = []
284 self.parameters = {}
284 self.parameters = {}
285
285
286 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
286 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
287 startTime='', endTime='', server=None, **kwargs):
287 startTime='', endTime='', server=None, **kwargs):
288
288
289 if datatype == None and name == None:
289 if datatype == None and name == None:
290 raise ValueError('datatype or name should be defined')
290 raise ValueError('datatype or name should be defined')
291 if name == None:
291 if name == None:
292 if 'Reader' in datatype:
292 if 'Reader' in datatype:
293 name = datatype
293 name = datatype
294 datatype = name.replace('Reader','')
294 datatype = name.replace('Reader','')
295 else:
295 else:
296 name = '{}Reader'.format(datatype)
296 name = '{}Reader'.format(datatype)
297 if datatype == None:
297 if datatype == None:
298 if 'Reader' in name:
298 if 'Reader' in name:
299 datatype = name.replace('Reader','')
299 datatype = name.replace('Reader','')
300 else:
300 else:
301 datatype = name
301 datatype = name
302 name = '{}Reader'.format(name)
302 name = '{}Reader'.format(name)
303
303
304 self.id = id
304 self.id = id
305 self.project_id = project_id
305 self.project_id = project_id
306 self.name = name
306 self.name = name
307 self.datatype = datatype
307 self.datatype = datatype
308 self.err_queue = err_queue
308 self.err_queue = err_queue
309
309
310 self.addParameter(name='path', value=path)
310 self.addParameter(name='path', value=path)
311 self.addParameter(name='startDate', value=startDate)
311 self.addParameter(name='startDate', value=startDate)
312 self.addParameter(name='endDate', value=endDate)
312 self.addParameter(name='endDate', value=endDate)
313 self.addParameter(name='startTime', value=startTime)
313 self.addParameter(name='startTime', value=startTime)
314 self.addParameter(name='endTime', value=endTime)
314 self.addParameter(name='endTime', value=endTime)
315
315
316 for key, value in kwargs.items():
316 for key, value in kwargs.items():
317 self.addParameter(name=key, value=value)
317 self.addParameter(name=key, value=value)
318
318
319
319
320 class Project(Process):
320 class Project(Process):
321 """API to create signal chain projects"""
321 """API to create signal chain projects"""
322
322
323 ELEMENTNAME = 'Project'
323 ELEMENTNAME = 'Project'
324
324
325 def __init__(self, name=''):
325 def __init__(self, name=''):
326
326
327 Process.__init__(self)
327 Process.__init__(self)
328 self.id = '1'
328 self.id = '1'
329 if name:
329 if name:
330 self.name = '{} ({})'.format(Process.__name__, name)
330 self.name = '{} ({})'.format(Process.__name__, name)
331 self.filename = None
331 self.filename = None
332 self.description = None
332 self.description = None
333 self.email = None
333 self.email = None
334 self.alarm = []
334 self.alarm = []
335 self.configurations = {}
335 self.configurations = {}
336 # self.err_queue = Queue()
336 # self.err_queue = Queue()
337 self.err_queue = None
337 self.err_queue = None
338 self.started = False
338 self.started = False
339
339
340 def getNewId(self):
340 def getNewId(self):
341
341
342 idList = list(self.configurations.keys())
342 idList = list(self.configurations.keys())
343 id = int(self.id) * 10
343 id = int(self.id) * 10
344
344
345 while True:
345 while True:
346 id += 1
346 id += 1
347
347
348 if str(id) in idList:
348 if str(id) in idList:
349 continue
349 continue
350
350
351 break
351 break
352
352
353 return str(id)
353 return str(id)
354
354
355 def updateId(self, new_id):
355 def updateId(self, new_id):
356
356
357 self.id = str(new_id)
357 self.id = str(new_id)
358
358
359 keyList = list(self.configurations.keys())
359 keyList = list(self.configurations.keys())
360 keyList.sort()
360 keyList.sort()
361
361
362 n = 1
362 n = 1
363 new_confs = {}
363 new_confs = {}
364
364
365 for procKey in keyList:
365 for procKey in keyList:
366
366
367 conf = self.configurations[procKey]
367 conf = self.configurations[procKey]
368 idProcUnit = str(int(self.id) * 10 + n)
368 idProcUnit = str(int(self.id) * 10 + n)
369 conf.updateId(idProcUnit)
369 conf.updateId(idProcUnit)
370 new_confs[idProcUnit] = conf
370 new_confs[idProcUnit] = conf
371 n += 1
371 n += 1
372
372
373 self.configurations = new_confs
373 self.configurations = new_confs
374
374
375 def setup(self, id=1, name='', description='', email=None, alarm=[]):
375 def setup(self, id=1, name='', description='', email=None, alarm=[]):
376
376
377 self.id = str(id)
377 self.id = str(id)
378 self.description = description
378 self.description = description
379 self.email = email
379 self.email = email
380 self.alarm = alarm
380 self.alarm = alarm
381 if name:
381 if name:
382 self.name = '{} ({})'.format(Process.__name__, name)
382 self.name = '{} ({})'.format(Process.__name__, name)
383
383
384 def update(self, **kwargs):
384 def update(self, **kwargs):
385
385
386 for key, value in kwargs.items():
386 for key, value in kwargs.items():
387 setattr(self, key, value)
387 setattr(self, key, value)
388
388
389 def clone(self):
389 def clone(self):
390
390
391 p = Project()
391 p = Project()
392 p.id = self.id
392 p.id = self.id
393 p.name = self.name
393 p.name = self.name
394 p.description = self.description
394 p.description = self.description
395 p.configurations = self.configurations.copy()
395 p.configurations = self.configurations.copy()
396
396
397 return p
397 return p
398
398
399 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
399 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
400
400
401 '''
401 '''
402 '''
402 '''
403
403
404 if id is None:
404 if id is None:
405 idReadUnit = self.getNewId()
405 idReadUnit = self.getNewId()
406 else:
406 else:
407 idReadUnit = str(id)
407 idReadUnit = str(id)
408
408
409 conf = ReadUnitConf()
409 conf = ReadUnitConf()
410 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
410 conf.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
411 self.configurations[conf.id] = conf
411 self.configurations[conf.id] = conf
412
412
413 return conf
413 return conf
414
414
415 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
415 def addProcUnit(self, id=None, inputId='0', datatype=None, name=None):
416
416
417 '''
417 '''
418 '''
418 '''
419
419
420 if id is None:
420 if id is None:
421 idProcUnit = self.getNewId()
421 idProcUnit = self.getNewId()
422 else:
422 else:
423 idProcUnit = id
423 idProcUnit = id
424
424
425 conf = ProcUnitConf()
425 conf = ProcUnitConf()
426 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
426 conf.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
427 self.configurations[conf.id] = conf
427 self.configurations[conf.id] = conf
428
428
429 return conf
429 return conf
430
430
431 def removeProcUnit(self, id):
431 def removeProcUnit(self, id):
432
432
433 if id in self.configurations:
433 if id in self.configurations:
434 self.configurations.pop(id)
434 self.configurations.pop(id)
435
435
436 def getReadUnit(self):
436 def getReadUnit(self):
437
437
438 for obj in list(self.configurations.values()):
438 for obj in list(self.configurations.values()):
439 if obj.ELEMENTNAME == 'ReadUnit':
439 if obj.ELEMENTNAME == 'ReadUnit':
440 return obj
440 return obj
441
441
442 return None
442 return None
443
443
444 def getProcUnit(self, id):
444 def getProcUnit(self, id):
445
445
446 return self.configurations[id]
446 return self.configurations[id]
447
447
448 def getUnits(self):
448 def getUnits(self):
449
449
450 keys = list(self.configurations)
450 keys = list(self.configurations)
451 keys.sort()
451 keys.sort()
452
452
453 for key in keys:
453 for key in keys:
454 yield self.configurations[key]
454 yield self.configurations[key]
455
455
456 def updateUnit(self, id, **kwargs):
456 def updateUnit(self, id, **kwargs):
457
457
458 conf = self.configurations[id].update(**kwargs)
458 conf = self.configurations[id].update(**kwargs)
459
459
460 def makeXml(self):
460 def makeXml(self):
461
461
462 xml = Element('Project')
462 xml = Element('Project')
463 xml.set('id', str(self.id))
463 xml.set('id', str(self.id))
464 xml.set('name', self.name)
464 xml.set('name', self.name)
465 xml.set('description', self.description)
465 xml.set('description', self.description)
466
466
467 for conf in self.configurations.values():
467 for conf in self.configurations.values():
468 conf.makeXml(xml)
468 conf.makeXml(xml)
469
469
470 self.xml = xml
470 self.xml = xml
471
471
472 def writeXml(self, filename=None):
472 def writeXml(self, filename=None):
473
473
474 if filename == None:
474 if filename == None:
475 if self.filename:
475 if self.filename:
476 filename = self.filename
476 filename = self.filename
477 else:
477 else:
478 filename = 'schain.xml'
478 filename = 'schain.xml'
479
479
480 if not filename:
480 if not filename:
481 print('filename has not been defined. Use setFilename(filename) for do it.')
481 print('filename has not been defined. Use setFilename(filename) for do it.')
482 return 0
482 return 0
483
483
484 abs_file = os.path.abspath(filename)
484 abs_file = os.path.abspath(filename)
485
485
486 if not os.access(os.path.dirname(abs_file), os.W_OK):
486 if not os.access(os.path.dirname(abs_file), os.W_OK):
487 print('No write permission on %s' % os.path.dirname(abs_file))
487 print('No write permission on %s' % os.path.dirname(abs_file))
488 return 0
488 return 0
489
489
490 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
490 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
491 print('File %s already exists and it could not be overwriten' % abs_file)
491 print('File %s already exists and it could not be overwriten' % abs_file)
492 return 0
492 return 0
493
493
494 self.makeXml()
494 self.makeXml()
495
495
496 ElementTree(self.xml).write(abs_file, method='xml')
496 ElementTree(self.xml).write(abs_file, method='xml')
497
497
498 self.filename = abs_file
498 self.filename = abs_file
499
499
500 return 1
500 return 1
501
501
502 def readXml(self, filename):
502 def readXml(self, filename):
503
503
504 abs_file = os.path.abspath(filename)
504 abs_file = os.path.abspath(filename)
505
505
506 self.configurations = {}
506 self.configurations = {}
507
507
508 try:
508 try:
509 self.xml = ElementTree().parse(abs_file)
509 self.xml = ElementTree().parse(abs_file)
510 except:
510 except:
511 log.error('Error reading %s, verify file format' % filename)
511 log.error('Error reading %s, verify file format' % filename)
512 return 0
512 return 0
513
513
514 self.id = self.xml.get('id')
514 self.id = self.xml.get('id')
515 self.name = self.xml.get('name')
515 self.name = self.xml.get('name')
516 self.description = self.xml.get('description')
516 self.description = self.xml.get('description')
517
517
518 for element in self.xml:
518 for element in self.xml:
519 if element.tag == 'ReadUnit':
519 if element.tag == 'ReadUnit':
520 conf = ReadUnitConf()
520 conf = ReadUnitConf()
521 conf.readXml(element, self.id, self.err_queue)
521 conf.readXml(element, self.id, self.err_queue)
522 self.configurations[conf.id] = conf
522 self.configurations[conf.id] = conf
523 elif element.tag == 'ProcUnit':
523 elif element.tag == 'ProcUnit':
524 conf = ProcUnitConf()
524 conf = ProcUnitConf()
525 input_proc = self.configurations[element.get('inputId')]
525 input_proc = self.configurations[element.get('inputId')]
526 conf.readXml(element, self.id, self.err_queue)
526 conf.readXml(element, self.id, self.err_queue)
527 self.configurations[conf.id] = conf
527 self.configurations[conf.id] = conf
528
528
529 self.filename = abs_file
529 self.filename = abs_file
530
530
531 return 1
531 return 1
532
532
533 def __str__(self):
533 def __str__(self):
534
534
535 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
535 text = '\nProject[id=%s, name=%s, description=%s]\n\n' % (
536 self.id,
536 self.id,
537 self.name,
537 self.name,
538 self.description,
538 self.description,
539 )
539 )
540
540
541 for conf in self.configurations.values():
541 for conf in self.configurations.values():
542 text += '{}'.format(conf)
542 text += '{}'.format(conf)
543
543
544 return text
544 return text
545
545
546 def createObjects(self):
546 def createObjects(self):
547
547
548 keys = list(self.configurations.keys())
548 keys = list(self.configurations.keys())
549 keys.sort()
549 keys.sort()
550 for key in keys:
550 for key in keys:
551 conf = self.configurations[key]
551 conf = self.configurations[key]
552 conf.createObjects()
552 conf.createObjects()
553 if conf.inputId is not None:
553 if conf.inputId is not None:
554 conf.object.setInput(self.configurations[conf.inputId].object)
554 conf.object.setInput(self.configurations[conf.inputId].object)
555
555
556 def monitor(self):
556 def monitor(self):
557
557
558 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
558 t = Thread(target=self._monitor, args=(self.err_queue, self.ctx))
559 t.start()
559 t.start()
560
560
561 def _monitor(self, queue, ctx):
561 def _monitor(self, queue, ctx):
562
562
563 import socket
563 import socket
564
564
565 procs = 0
565 procs = 0
566 err_msg = ''
566 err_msg = ''
567
567
568 while True:
568 while True:
569 msg = queue.get()
569 msg = queue.get()
570 if '#_start_#' in msg:
570 if '#_start_#' in msg:
571 procs += 1
571 procs += 1
572 elif '#_end_#' in msg:
572 elif '#_end_#' in msg:
573 procs -=1
573 procs -=1
574 else:
574 else:
575 err_msg = msg
575 err_msg = msg
576
576
577 if procs == 0 or 'Traceback' in err_msg:
577 if procs == 0 or 'Traceback' in err_msg:
578 break
578 break
579 time.sleep(0.1)
579 time.sleep(0.1)
580
580
581 if '|' in err_msg:
581 if '|' in err_msg:
582 name, err = err_msg.split('|')
582 name, err = err_msg.split('|')
583 if 'SchainWarning' in err:
583 if 'SchainWarning' in err:
584 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
584 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
585 elif 'SchainError' in err:
585 elif 'SchainError' in err:
586 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
586 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
587 else:
587 else:
588 log.error(err, name)
588 log.error(err, name)
589 else:
589 else:
590 name, err = self.name, err_msg
590 name, err = self.name, err_msg
591
591
592 time.sleep(1)
592 time.sleep(1)
593
593
594 ctx.term()
594 ctx.term()
595
595
596 message = ''.join(err)
596 message = ''.join(err)
597
597
598 if err_msg:
598 if err_msg:
599 subject = 'SChain v%s: Error running %s\n' % (
599 subject = 'SChain v%s: Error running %s\n' % (
600 schainpy.__version__, self.name)
600 schainpy.__version__, self.name)
601
601
602 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
602 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
603 socket.gethostname())
603 socket.gethostname())
604 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
604 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
605 subtitle += 'Configuration file: %s\n' % self.filename
605 subtitle += 'Configuration file: %s\n' % self.filename
606 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
606 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
607
607
608 readUnitConfObj = self.getReadUnit()
608 readUnitConfObj = self.getReadUnit()
609 if readUnitConfObj:
609 if readUnitConfObj:
610 subtitle += '\nInput parameters:\n'
610 subtitle += '\nInput parameters:\n'
611 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
611 subtitle += '[Data path = %s]\n' % readUnitConfObj.parameters['path']
612 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
612 subtitle += '[Start date = %s]\n' % readUnitConfObj.parameters['startDate']
613 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
613 subtitle += '[End date = %s]\n' % readUnitConfObj.parameters['endDate']
614 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
614 subtitle += '[Start time = %s]\n' % readUnitConfObj.parameters['startTime']
615 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
615 subtitle += '[End time = %s]\n' % readUnitConfObj.parameters['endTime']
616
616
617 a = Alarm(
617 a = Alarm(
618 modes=self.alarm,
618 modes=self.alarm,
619 email=self.email,
619 email=self.email,
620 message=message,
620 message=message,
621 subject=subject,
621 subject=subject,
622 subtitle=subtitle,
622 subtitle=subtitle,
623 filename=self.filename
623 filename=self.filename
624 )
624 )
625
625
626 a.start()
626 a.start()
627
627
628 def setFilename(self, filename):
628 def setFilename(self, filename):
629
629
630 self.filename = filename
630 self.filename = filename
631
631
632 def runProcs(self):
632 def runProcs(self):
633
633
634 err = False
634 err = False
635 n = len(self.configurations)
635 n = len(self.configurations)
636
636
637 while not err:
637 while not err:
638 #print("STAR")
638 #print("STAR")
639 for conf in self.getUnits():
639 for conf in self.getUnits():
640 #print("CONF: ",conf)
640 #print("CONF: ",conf)
641 ok = conf.run()
641 ok = conf.run()
642 if ok == 'Error':
642 if ok == 'Error':
643 #self.removeProcUnit(conf.id) #remove proc Unit
643 n -= 1
644 n -= 1
644 continue
645 continue
645 elif not ok:
646 elif not ok:
646 break
647 break
647 if n == 0:
648 if n == 0:
648 err = True
649 err = True
649
650
650 def run(self):
651 def run(self):
651
652
652 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 log.success('\nStarting Project {} [id={}]'.format(self.name, self.id), tag='')
653 self.started = True
654 self.started = True
654 self.start_time = time.time()
655 self.start_time = time.time()
655 self.createObjects()
656 self.createObjects()
656 self.runProcs()
657 self.runProcs()
657 log.success('{} Done (Time: {:4.2f}s)'.format(
658 log.success('{} Done (Time: {:4.2f}s)'.format(
658 self.name,
659 self.name,
659 time.time()-self.start_time), '')
660 time.time()-self.start_time), '')
@@ -1,683 +1,685
1 import os
1 import os
2 import time
2 import time
3 import datetime
3 import datetime
4
4
5 import numpy
5 import numpy
6 import h5py
6 import h5py
7
7
8 import schainpy.admin
8 import schainpy.admin
9 from schainpy.model.data.jrodata import *
9 from schainpy.model.data.jrodata import *
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
10 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
11 from schainpy.model.io.jroIO_base import *
11 from schainpy.model.io.jroIO_base import *
12 from schainpy.utils import log
12 from schainpy.utils import log
13
13
14
14
15 class HDFReader(Reader, ProcessingUnit):
15 class HDFReader(Reader, ProcessingUnit):
16 """Processing unit to read HDF5 format files
16 """Processing unit to read HDF5 format files
17
17
18 This unit reads HDF5 files created with `HDFWriter` operation contains
18 This unit reads HDF5 files created with `HDFWriter` operation contains
19 by default two groups Data and Metadata all variables would be saved as `dataOut`
19 by default two groups Data and Metadata all variables would be saved as `dataOut`
20 attributes.
20 attributes.
21 It is possible to read any HDF5 file by given the structure in the `description`
21 It is possible to read any HDF5 file by given the structure in the `description`
22 parameter, also you can add extra values to metadata with the parameter `extras`.
22 parameter, also you can add extra values to metadata with the parameter `extras`.
23
23
24 Parameters:
24 Parameters:
25 -----------
25 -----------
26 path : str
26 path : str
27 Path where files are located.
27 Path where files are located.
28 startDate : date
28 startDate : date
29 Start date of the files
29 Start date of the files
30 endDate : list
30 endDate : list
31 End date of the files
31 End date of the files
32 startTime : time
32 startTime : time
33 Start time of the files
33 Start time of the files
34 endTime : time
34 endTime : time
35 End time of the files
35 End time of the files
36 description : dict, optional
36 description : dict, optional
37 Dictionary with the description of the HDF5 file
37 Dictionary with the description of the HDF5 file
38 extras : dict, optional
38 extras : dict, optional
39 Dictionary with extra metadata to be be added to `dataOut`
39 Dictionary with extra metadata to be be added to `dataOut`
40
40
41 Examples
41 Examples
42 --------
42 --------
43
43
44 desc = {
44 desc = {
45 'Data': {
45 'Data': {
46 'data_output': ['u', 'v', 'w'],
46 'data_output': ['u', 'v', 'w'],
47 'utctime': 'timestamps',
47 'utctime': 'timestamps',
48 } ,
48 } ,
49 'Metadata': {
49 'Metadata': {
50 'heightList': 'heights'
50 'heightList': 'heights'
51 }
51 }
52 }
52 }
53
53
54 desc = {
54 desc = {
55 'Data': {
55 'Data': {
56 'data_output': 'winds',
56 'data_output': 'winds',
57 'utctime': 'timestamps'
57 'utctime': 'timestamps'
58 },
58 },
59 'Metadata': {
59 'Metadata': {
60 'heightList': 'heights'
60 'heightList': 'heights'
61 }
61 }
62 }
62 }
63
63
64 extras = {
64 extras = {
65 'timeZone': 300
65 'timeZone': 300
66 }
66 }
67
67
68 reader = project.addReadUnit(
68 reader = project.addReadUnit(
69 name='HDFReader',
69 name='HDFReader',
70 path='/path/to/files',
70 path='/path/to/files',
71 startDate='2019/01/01',
71 startDate='2019/01/01',
72 endDate='2019/01/31',
72 endDate='2019/01/31',
73 startTime='00:00:00',
73 startTime='00:00:00',
74 endTime='23:59:59',
74 endTime='23:59:59',
75 # description=json.dumps(desc),
75 # description=json.dumps(desc),
76 # extras=json.dumps(extras),
76 # extras=json.dumps(extras),
77 )
77 )
78
78
79 """
79 """
80
80
81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
81 __attrs__ = ['path', 'startDate', 'endDate', 'startTime', 'endTime', 'description', 'extras']
82
82
83 def __init__(self):
83 def __init__(self):
84 ProcessingUnit.__init__(self)
84 ProcessingUnit.__init__(self)
85
85
86 self.ext = ".hdf5"
86 self.ext = ".hdf5"
87 self.optchar = "D"
87 self.optchar = "D"
88 self.meta = {}
88 self.meta = {}
89 self.data = {}
89 self.data = {}
90 self.open_file = h5py.File
90 self.open_file = h5py.File
91 self.open_mode = 'r'
91 self.open_mode = 'r'
92 self.description = {}
92 self.description = {}
93 self.extras = {}
93 self.extras = {}
94 self.filefmt = "*%Y%j***"
94 self.filefmt = "*%Y%j***"
95 self.folderfmt = "*%Y%j"
95 self.folderfmt = "*%Y%j"
96 self.utcoffset = 0
96 self.utcoffset = 0
97
97
98 self.dataOut = Parameters()
98 self.dataOut = Parameters()
99 self.dataOut.error=False ## NOTE: Importante definir esto antes inicio
99 self.dataOut.error=False ## NOTE: Importante definir esto antes inicio
100 self.dataOut.flagNoData = True
100 self.dataOut.flagNoData = True
101
101
102 def setup(self, **kwargs):
102 def setup(self, **kwargs):
103
103
104 self.set_kwargs(**kwargs)
104 self.set_kwargs(**kwargs)
105 if not self.ext.startswith('.'):
105 if not self.ext.startswith('.'):
106 self.ext = '.{}'.format(self.ext)
106 self.ext = '.{}'.format(self.ext)
107
107
108 if self.online:
108 if self.online:
109 log.log("Searching files in online mode...", self.name)
109 log.log("Searching files in online mode...", self.name)
110
110
111 for nTries in range(self.nTries):
111 for nTries in range(self.nTries):
112 fullpath = self.searchFilesOnLine(self.path, self.startDate,
112 fullpath = self.searchFilesOnLine(self.path, self.startDate,
113 self.endDate, self.expLabel, self.ext, self.walk,
113 self.endDate, self.expLabel, self.ext, self.walk,
114 self.filefmt, self.folderfmt)
114 self.filefmt, self.folderfmt)
115 pathname, filename = os.path.split(fullpath)
115 pathname, filename = os.path.split(fullpath)
116
116
117 try:
117 try:
118 fullpath = next(fullpath)
118 fullpath = next(fullpath)
119
119
120 except:
120 except:
121 fullpath = None
121 fullpath = None
122
122
123 if fullpath:
123 if fullpath:
124 break
124 break
125
125
126 log.warning(
126 log.warning(
127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
127 'Waiting {} sec for a valid file in {}: try {} ...'.format(
128 self.delay, self.path, nTries + 1),
128 self.delay, self.path, nTries + 1),
129 self.name)
129 self.name)
130 time.sleep(self.delay)
130 time.sleep(self.delay)
131
131
132 if not(fullpath):
132 if not(fullpath):
133 raise schainpy.admin.SchainError(
133 raise schainpy.admin.SchainError(
134 'There isn\'t any valid file in {}'.format(self.path))
134 'There isn\'t any valid file in {}'.format(self.path))
135
135
136 pathname, filename = os.path.split(fullpath)
136 pathname, filename = os.path.split(fullpath)
137 self.year = int(filename[1:5])
137 self.year = int(filename[1:5])
138 self.doy = int(filename[5:8])
138 self.doy = int(filename[5:8])
139 self.set = int(filename[8:11]) - 1
139 self.set = int(filename[8:11]) - 1
140 else:
140 else:
141 log.log("Searching files in {}".format(self.path), self.name)
141 log.log("Searching files in {}".format(self.path), self.name)
142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
142 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
143 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
144
144
145 self.setNextFile()
145 self.setNextFile()
146
146
147
147
148
148
149
149
150 def readFirstHeader(self):
150 def readFirstHeader(self):
151 '''Read metadata and data'''
151 '''Read metadata and data'''
152
152
153 self.__readMetadata()
153 self.__readMetadata()
154 self.__readData()
154 self.__readData()
155 self.__setBlockList()
155 self.__setBlockList()
156
156
157 for attr in self.meta:
157 for attr in self.meta:
158 setattr(self.dataOut, attr, self.meta[attr])
158 setattr(self.dataOut, attr, self.meta[attr])
159 self.blockIndex = 0
159 self.blockIndex = 0
160
160
161 return
161 return
162
162
163 def __setBlockList(self):
163 def __setBlockList(self):
164 '''
164 '''
165 Selects the data within the times defined
165 Selects the data within the times defined
166
166
167 self.fp
167 self.fp
168 self.startTime
168 self.startTime
169 self.endTime
169 self.endTime
170 self.blockList
170 self.blockList
171 self.blocksPerFile
171 self.blocksPerFile
172
172
173 '''
173 '''
174
174
175 startTime = self.startTime
175 startTime = self.startTime
176 endTime = self.endTime
176 endTime = self.endTime
177 thisUtcTime = self.data['utctime'] + self.utcoffset
177 thisUtcTime = self.data['utctime'] + self.utcoffset
178 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
178 self.interval = numpy.min(thisUtcTime[1:] - thisUtcTime[:-1])
179 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
179 thisDatetime = datetime.datetime.utcfromtimestamp(thisUtcTime[0])
180 self.startFileDatetime = thisDatetime
180 self.startFileDatetime = thisDatetime
181 thisDate = thisDatetime.date()
181 thisDate = thisDatetime.date()
182 thisTime = thisDatetime.time()
182 thisTime = thisDatetime.time()
183
183
184 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
184 startUtcTime = (datetime.datetime.combine(thisDate, startTime) - datetime.datetime(1970, 1, 1)).total_seconds()
185 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
185 endUtcTime = (datetime.datetime.combine(thisDate, endTime) - datetime.datetime(1970, 1, 1)).total_seconds()
186
186
187 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
187 ind = numpy.where(numpy.logical_and(thisUtcTime >= startUtcTime, thisUtcTime < endUtcTime))[0]
188
188
189 self.blockList = ind
189 self.blockList = ind
190 self.blocksPerFile = len(ind)
190 self.blocksPerFile = len(ind)
191 self.blocksPerFile = len(thisUtcTime)
191 self.blocksPerFile = len(thisUtcTime)
192 return
192 return
193
193
194 def __readMetadata(self):
194 def __readMetadata(self):
195 '''
195 '''
196 Reads Metadata
196 Reads Metadata
197 '''
197 '''
198
198
199 meta = {}
199 meta = {}
200
200
201 if self.description:
201 if self.description:
202 for key, value in self.description['Metadata'].items():
202 for key, value in self.description['Metadata'].items():
203 meta[key] = self.fp[value][()]
203 meta[key] = self.fp[value][()]
204 else:
204 else:
205 grp = self.fp['Metadata']
205 grp = self.fp['Metadata']
206 for name in grp:
206 for name in grp:
207 meta[name] = grp[name][()]
207 meta[name] = grp[name][()]
208
208
209 if self.extras:
209 if self.extras:
210 for key, value in self.extras.items():
210 for key, value in self.extras.items():
211 meta[key] = value
211 meta[key] = value
212 self.meta = meta
212 self.meta = meta
213
213
214 return
214 return
215
215
216
216
217
217
218 def checkForRealPath(self, nextFile, nextDay):
218 def checkForRealPath(self, nextFile, nextDay):
219
219
220 # print("check FRP")
220 # print("check FRP")
221 # dt = self.startFileDatetime + datetime.timedelta(1)
221 # dt = self.startFileDatetime + datetime.timedelta(1)
222 # filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext)
222 # filename = '{}.{}{}'.format(self.path, dt.strftime('%Y%m%d'), self.ext)
223 # fullfilename = os.path.join(self.path, filename)
223 # fullfilename = os.path.join(self.path, filename)
224 # print("check Path ",fullfilename,filename)
224 # print("check Path ",fullfilename,filename)
225 # if os.path.exists(fullfilename):
225 # if os.path.exists(fullfilename):
226 # return fullfilename, filename
226 # return fullfilename, filename
227 # return None, filename
227 # return None, filename
228 return None,None
228 return None,None
229
229
230 def __readData(self):
230 def __readData(self):
231
231
232 data = {}
232 data = {}
233
233
234 if self.description:
234 if self.description:
235 for key, value in self.description['Data'].items():
235 for key, value in self.description['Data'].items():
236 if isinstance(value, str):
236 if isinstance(value, str):
237 if isinstance(self.fp[value], h5py.Dataset):
237 if isinstance(self.fp[value], h5py.Dataset):
238 data[key] = self.fp[value][()]
238 data[key] = self.fp[value][()]
239 elif isinstance(self.fp[value], h5py.Group):
239 elif isinstance(self.fp[value], h5py.Group):
240 array = []
240 array = []
241 for ch in self.fp[value]:
241 for ch in self.fp[value]:
242 array.append(self.fp[value][ch][()])
242 array.append(self.fp[value][ch][()])
243 data[key] = numpy.array(array)
243 data[key] = numpy.array(array)
244 elif isinstance(value, list):
244 elif isinstance(value, list):
245 array = []
245 array = []
246 for ch in value:
246 for ch in value:
247 array.append(self.fp[ch][()])
247 array.append(self.fp[ch][()])
248 data[key] = numpy.array(array)
248 data[key] = numpy.array(array)
249 else:
249 else:
250 grp = self.fp['Data']
250 grp = self.fp['Data']
251 for name in grp:
251 for name in grp:
252 if isinstance(grp[name], h5py.Dataset):
252 if isinstance(grp[name], h5py.Dataset):
253 array = grp[name][()]
253 array = grp[name][()]
254 elif isinstance(grp[name], h5py.Group):
254 elif isinstance(grp[name], h5py.Group):
255 array = []
255 array = []
256 for ch in grp[name]:
256 for ch in grp[name]:
257 array.append(grp[name][ch][()])
257 array.append(grp[name][ch][()])
258 array = numpy.array(array)
258 array = numpy.array(array)
259 else:
259 else:
260 log.warning('Unknown type: {}'.format(name))
260 log.warning('Unknown type: {}'.format(name))
261
261
262 if name in self.description:
262 if name in self.description:
263 key = self.description[name]
263 key = self.description[name]
264 else:
264 else:
265 key = name
265 key = name
266 data[key] = array
266 data[key] = array
267
267
268 self.data = data
268 self.data = data
269 return
269 return
270
270
271 def getData(self):
271 def getData(self):
272 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
272 if not self.isDateTimeInRange(self.startFileDatetime, self.startDate, self.endDate, self.startTime, self.endTime):
273 self.dataOut.flagNoData = True
273 self.dataOut.flagNoData = True
274 self.blockIndex = self.blocksPerFile
274 self.blockIndex = self.blocksPerFile
275 self.dataOut.error = True # TERMINA EL PROGRAMA
275 self.dataOut.error = True # TERMINA EL PROGRAMA
276 return
276 return
277 for attr in self.data:
277 for attr in self.data:
278
278
279 if self.data[attr].ndim == 1:
279 if self.data[attr].ndim == 1:
280 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
280 setattr(self.dataOut, attr, self.data[attr][self.blockIndex])
281 else:
281 else:
282 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
282 setattr(self.dataOut, attr, self.data[attr][:, self.blockIndex])
283
283
284
284
285 self.blockIndex += 1
285 self.blockIndex += 1
286
286
287 if self.blockIndex == 1:
287 if self.blockIndex == 1:
288 log.log("Block No. {}/{} -> {}".format(
288 log.log("Block No. {}/{} -> {}".format(
289 self.blockIndex,
289 self.blockIndex,
290 self.blocksPerFile,
290 self.blocksPerFile,
291 self.dataOut.datatime.ctime()), self.name)
291 self.dataOut.datatime.ctime()), self.name)
292 else:
292 else:
293 log.log("Block No. {}/{} ".format(
293 log.log("Block No. {}/{} ".format(
294 self.blockIndex,
294 self.blockIndex,
295 self.blocksPerFile),self.name)
295 self.blocksPerFile),self.name)
296
296
297 if self.blockIndex == self.blocksPerFile:
297 if self.blockIndex == self.blocksPerFile:
298 self.setNextFile()
298 self.setNextFile()
299
299
300 self.dataOut.flagNoData = False
300 self.dataOut.flagNoData = False
301
301
302
302
303 def run(self, **kwargs):
303 def run(self, **kwargs):
304
304
305 if not(self.isConfig):
305 if not(self.isConfig):
306 self.setup(**kwargs)
306 self.setup(**kwargs)
307 self.isConfig = True
307 self.isConfig = True
308
308
309 self.getData()
309 self.getData()
310
310
311 #@MPDecorator
311 #@MPDecorator
312 class HDFWrite(Operation):
312 class HDFWrite(Operation):
313 """Operation to write HDF5 files.
313 """Operation to write HDF5 files.
314
314
315 The HDF5 file contains by default two groups Data and Metadata where
315 The HDF5 file contains by default two groups Data and Metadata where
316 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
316 you can save any `dataOut` attribute specified by `dataList` and `metadataList`
317 parameters, data attributes are normaly time dependent where the metadata
317 parameters, data attributes are normaly time dependent where the metadata
318 are not.
318 are not.
319 It is possible to customize the structure of the HDF5 file with the
319 It is possible to customize the structure of the HDF5 file with the
320 optional description parameter see the examples.
320 optional description parameter see the examples.
321
321
322 Parameters:
322 Parameters:
323 -----------
323 -----------
324 path : str
324 path : str
325 Path where files will be saved.
325 Path where files will be saved.
326 blocksPerFile : int
326 blocksPerFile : int
327 Number of blocks per file
327 Number of blocks per file
328 metadataList : list
328 metadataList : list
329 List of the dataOut attributes that will be saved as metadata
329 List of the dataOut attributes that will be saved as metadata
330 dataList : int
330 dataList : int
331 List of the dataOut attributes that will be saved as data
331 List of the dataOut attributes that will be saved as data
332 setType : bool
332 setType : bool
333 If True the name of the files corresponds to the timestamp of the data
333 If True the name of the files corresponds to the timestamp of the data
334 description : dict, optional
334 description : dict, optional
335 Dictionary with the desired description of the HDF5 file
335 Dictionary with the desired description of the HDF5 file
336
336
337 Examples
337 Examples
338 --------
338 --------
339
339
340 desc = {
340 desc = {
341 'data_output': {'winds': ['z', 'w', 'v']},
341 'data_output': {'winds': ['z', 'w', 'v']},
342 'utctime': 'timestamps',
342 'utctime': 'timestamps',
343 'heightList': 'heights'
343 'heightList': 'heights'
344 }
344 }
345 desc = {
345 desc = {
346 'data_output': ['z', 'w', 'v'],
346 'data_output': ['z', 'w', 'v'],
347 'utctime': 'timestamps',
347 'utctime': 'timestamps',
348 'heightList': 'heights'
348 'heightList': 'heights'
349 }
349 }
350 desc = {
350 desc = {
351 'Data': {
351 'Data': {
352 'data_output': 'winds',
352 'data_output': 'winds',
353 'utctime': 'timestamps'
353 'utctime': 'timestamps'
354 },
354 },
355 'Metadata': {
355 'Metadata': {
356 'heightList': 'heights'
356 'heightList': 'heights'
357 }
357 }
358 }
358 }
359
359
360 writer = proc_unit.addOperation(name='HDFWriter')
360 writer = proc_unit.addOperation(name='HDFWriter')
361 writer.addParameter(name='path', value='/path/to/file')
361 writer.addParameter(name='path', value='/path/to/file')
362 writer.addParameter(name='blocksPerFile', value='32')
362 writer.addParameter(name='blocksPerFile', value='32')
363 writer.addParameter(name='metadataList', value='heightList,timeZone')
363 writer.addParameter(name='metadataList', value='heightList,timeZone')
364 writer.addParameter(name='dataList',value='data_output,utctime')
364 writer.addParameter(name='dataList',value='data_output,utctime')
365 # writer.addParameter(name='description',value=json.dumps(desc))
365 # writer.addParameter(name='description',value=json.dumps(desc))
366
366
367 """
367 """
368
368
369 ext = ".hdf5"
369 ext = ".hdf5"
370 optchar = "D"
370 optchar = "D"
371 filename = None
371 filename = None
372 path = None
372 path = None
373 setFile = None
373 setFile = None
374 fp = None
374 fp = None
375 firsttime = True
375 firsttime = True
376 #Configurations
376 #Configurations
377 blocksPerFile = None
377 blocksPerFile = None
378 blockIndex = None
378 blockIndex = None
379 dataOut = None #eval ??????
379 dataOut = None #eval ??????
380 #Data Arrays
380 #Data Arrays
381 dataList = None
381 dataList = None
382 metadataList = None
382 metadataList = None
383 currentDay = None
383 currentDay = None
384 lastTime = None
384 lastTime = None
385 timeZone = "ut"
385 timeZone = "ut"
386 hourLimit = 3
386 hourLimit = 3
387 breakDays = True
387 breakDays = True
388
388
389 def __init__(self):
389 def __init__(self):
390
390
391 Operation.__init__(self)
391 Operation.__init__(self)
392
392
393
393
394 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
394 def setup(self, path=None, blocksPerFile=10, metadataList=None, dataList=None, setType=None,
395 description={},timeZone = "ut",hourLimit = 3, breakDays=True):
395 description={},timeZone = "ut",hourLimit = 3, breakDays=True):
396 self.path = path
396 self.path = path
397 self.blocksPerFile = blocksPerFile
397 self.blocksPerFile = blocksPerFile
398 self.metadataList = metadataList
398 self.metadataList = metadataList
399 self.dataList = [s.strip() for s in dataList]
399 self.dataList = [s.strip() for s in dataList]
400 self.setType = setType
400 self.setType = setType
401 self.description = description
401 self.description = description
402 self.timeZone = timeZone
402 self.timeZone = timeZone
403 self.hourLimit = hourLimit
403 self.hourLimit = hourLimit
404 self.breakDays = breakDays
404 self.breakDays = breakDays
405
405
406 if self.metadataList is None:
406 if self.metadataList is None:
407 self.metadataList = self.dataOut.metadata_list
407 self.metadataList = self.dataOut.metadata_list
408
408
409 tableList = []
409 tableList = []
410 dsList = []
410 dsList = []
411
411
412 for i in range(len(self.dataList)):
412 for i in range(len(self.dataList)):
413 dsDict = {}
413 dsDict = {}
414 if hasattr(self.dataOut, self.dataList[i]):
414 if hasattr(self.dataOut, self.dataList[i]):
415 dataAux = getattr(self.dataOut, self.dataList[i])
415 dataAux = getattr(self.dataOut, self.dataList[i])
416 dsDict['variable'] = self.dataList[i]
416 dsDict['variable'] = self.dataList[i]
417 else:
417 else:
418 log.warning('Attribute {} not found in dataOut', self.name)
418 log.warning('Attribute {} not found in dataOut', self.name)
419 continue
419 continue
420
420
421 if dataAux is None:
421 if dataAux is None:
422 continue
422 continue
423 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
423 elif isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
424 dsDict['nDim'] = 0
424 dsDict['nDim'] = 0
425 else:
425 else:
426 dsDict['nDim'] = len(dataAux.shape)
426 dsDict['nDim'] = len(dataAux.shape)
427 dsDict['shape'] = dataAux.shape
427 dsDict['shape'] = dataAux.shape
428 dsDict['dsNumber'] = dataAux.shape[0]
428 dsDict['dsNumber'] = dataAux.shape[0]
429 dsDict['dtype'] = dataAux.dtype
429 dsDict['dtype'] = dataAux.dtype
430
430
431 dsList.append(dsDict)
431 dsList.append(dsDict)
432
432
433 self.blockIndex = 0
433 self.blockIndex = 0
434 self.dsList = dsList
434 self.dsList = dsList
435 self.currentDay = self.dataOut.datatime.date()
435 self.currentDay = self.dataOut.datatime.date()
436
436
437
437
438 def timeFlag(self):
438 def timeFlag(self):
439 currentTime = self.dataOut.utctime
439 currentTime = self.dataOut.utctime
440 timeTuple = None
440 timeTuple = None
441 if self.timeZone == "lt":
441 if self.timeZone == "lt":
442 timeTuple = time.localtime(currentTime)
442 timeTuple = time.localtime(currentTime)
443 else :
443 else :
444 timeTuple = time.gmtime(currentTime)
444 timeTuple = time.gmtime(currentTime)
445
445
446 dataDay = timeTuple.tm_yday
446 dataDay = timeTuple.tm_yday
447
447
448 if self.lastTime is None:
448 if self.lastTime is None:
449 self.lastTime = currentTime
449 self.lastTime = currentTime
450 self.currentDay = dataDay
450 self.currentDay = dataDay
451 return False
451 return False
452
452
453 timeDiff = currentTime - self.lastTime
453 timeDiff = currentTime - self.lastTime
454
454
455 #Si el dia es diferente o si la diferencia entre un dato y otro supera self.hourLimit
455 #Si el dia es diferente o si la diferencia entre un dato y otro supera self.hourLimit
456 if (dataDay != self.currentDay) and self.breakDays:
456 if (dataDay != self.currentDay) and self.breakDays:
457 self.currentDay = dataDay
457 self.currentDay = dataDay
458 return True
458 return True
459 elif timeDiff > self.hourLimit*60*60:
459 elif timeDiff > self.hourLimit*60*60:
460 self.lastTime = currentTime
460 self.lastTime = currentTime
461 return True
461 return True
462 else:
462 else:
463 self.lastTime = currentTime
463 self.lastTime = currentTime
464 return False
464 return False
465
465
466 def run(self, dataOut,**kwargs):
466 def run(self, dataOut,**kwargs):
467
467
468 self.dataOut = dataOut
468 self.dataOut = dataOut
469 if not(self.isConfig):
469 if not(self.isConfig):
470 self.setup(**kwargs)
470 self.setup(**kwargs)
471
471
472 self.isConfig = True
472 self.isConfig = True
473 self.setNextFile()
473 self.setNextFile()
474
474
475 self.putData()
475 self.putData()
476
476
477 return self.dataOut
477 return self.dataOut
478
478
479 def setNextFile(self):
479 def setNextFile(self):
480
480
481 ext = self.ext
481 ext = self.ext
482 path = self.path
482 path = self.path
483 setFile = self.setFile
483 setFile = self.setFile
484 timeTuple = None
484 timeTuple = None
485 if self.timeZone == "lt":
485 if self.timeZone == "lt":
486 timeTuple = time.localtime(self.dataOut.utctime)
486 timeTuple = time.localtime(self.dataOut.utctime)
487 elif self.timeZone == "ut":
487 elif self.timeZone == "ut":
488 timeTuple = time.gmtime(self.dataOut.utctime)
488 timeTuple = time.gmtime(self.dataOut.utctime)
489 #print("path: ",timeTuple)
489 #print("path: ",timeTuple)
490 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
490 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
491 fullpath = os.path.join(path, subfolder)
491 fullpath = os.path.join(path, subfolder)
492
492
493 if os.path.exists(fullpath):
493 if os.path.exists(fullpath):
494 filesList = os.listdir(fullpath)
494 filesList = os.listdir(fullpath)
495 filesList = [k for k in filesList if k.startswith(self.optchar)]
495 filesList = [k for k in filesList if k.startswith(self.optchar)]
496 if len( filesList ) > 0:
496 if len( filesList ) > 0:
497 filesList = sorted(filesList, key=str.lower)
497 filesList = sorted(filesList, key=str.lower)
498 filen = filesList[-1]
498 filen = filesList[-1]
499 # el filename debera tener el siguiente formato
499 # el filename debera tener el siguiente formato
500 # 0 1234 567 89A BCDE (hex)
500 # 0 1234 567 89A BCDE (hex)
501 # x YYYY DDD SSS .ext
501 # x YYYY DDD SSS .ext
502 if isNumber(filen[8:11]):
502 if isNumber(filen[8:11]):
503 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
503 setFile = int(filen[8:11]) #inicializo mi contador de seteo al seteo del ultimo file
504 else:
504 else:
505 setFile = -1
505 setFile = -1
506 else:
506 else:
507 setFile = -1 #inicializo mi contador de seteo
507 setFile = -1 #inicializo mi contador de seteo
508 else:
508 else:
509 os.makedirs(fullpath)
509 os.makedirs(fullpath)
510 setFile = -1 #inicializo mi contador de seteo
510 setFile = -1 #inicializo mi contador de seteo
511
511
512 if self.setType is None:
512 if self.setType is None:
513 setFile += 1
513 setFile += 1
514 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
514 file = '%s%4.4d%3.3d%03d%s' % (self.optchar,
515 timeTuple.tm_year,
515 timeTuple.tm_year,
516 timeTuple.tm_yday,
516 timeTuple.tm_yday,
517 setFile,
517 setFile,
518 ext )
518 ext )
519 else:
519 else:
520 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
520 setFile = timeTuple.tm_hour*60+timeTuple.tm_min
521 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
521 file = '%s%4.4d%3.3d%04d%s' % (self.optchar,
522 timeTuple.tm_year,
522 timeTuple.tm_year,
523 timeTuple.tm_yday,
523 timeTuple.tm_yday,
524 setFile,
524 setFile,
525 ext )
525 ext )
526
526
527 self.filename = os.path.join( path, subfolder, file )
527 self.filename = os.path.join( path, subfolder, file )
528
528
529
529
530
530
531 def getLabel(self, name, x=None):
531 def getLabel(self, name, x=None):
532
532
533 if x is None:
533 if x is None:
534 if 'Data' in self.description:
534 if 'Data' in self.description:
535 data = self.description['Data']
535 data = self.description['Data']
536 if 'Metadata' in self.description:
536 if 'Metadata' in self.description:
537 data.update(self.description['Metadata'])
537 data.update(self.description['Metadata'])
538 else:
538 else:
539 data = self.description
539 data = self.description
540 if name in data:
540 if name in data:
541 if isinstance(data[name], str):
541 if isinstance(data[name], str):
542 return data[name]
542 return data[name]
543 elif isinstance(data[name], list):
543 elif isinstance(data[name], list):
544 return None
544 return None
545 elif isinstance(data[name], dict):
545 elif isinstance(data[name], dict):
546 for key, value in data[name].items():
546 for key, value in data[name].items():
547 return key
547 return key
548 return name
548 return name
549 else:
549 else:
550 if 'Metadata' in self.description:
550 if 'Metadata' in self.description:
551 meta = self.description['Metadata']
551 meta = self.description['Metadata']
552 else:
552 else:
553 meta = self.description
553 meta = self.description
554 if name in meta:
554 if name in meta:
555 if isinstance(meta[name], list):
555 if isinstance(meta[name], list):
556 return meta[name][x]
556 return meta[name][x]
557 elif isinstance(meta[name], dict):
557 elif isinstance(meta[name], dict):
558 for key, value in meta[name].items():
558 for key, value in meta[name].items():
559 return value[x]
559 return value[x]
560 if 'cspc' in name:
560 if 'cspc' in name:
561 return 'pair{:02d}'.format(x)
561 return 'pair{:02d}'.format(x)
562 else:
562 else:
563 return 'channel{:02d}'.format(x)
563 return 'channel{:02d}'.format(x)
564
564
565 def writeMetadata(self, fp):
565 def writeMetadata(self, fp):
566
566
567 if self.description:
567 if self.description:
568 if 'Metadata' in self.description:
568 if 'Metadata' in self.description:
569 grp = fp.create_group('Metadata')
569 grp = fp.create_group('Metadata')
570 else:
570 else:
571 grp = fp
571 grp = fp
572 else:
572 else:
573 grp = fp.create_group('Metadata')
573 grp = fp.create_group('Metadata')
574
574
575 for i in range(len(self.metadataList)):
575 for i in range(len(self.metadataList)):
576 if not hasattr(self.dataOut, self.metadataList[i]):
576 if not hasattr(self.dataOut, self.metadataList[i]):
577 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
577 log.warning('Metadata: `{}` not found'.format(self.metadataList[i]), self.name)
578 continue
578 continue
579 value = getattr(self.dataOut, self.metadataList[i])
579 value = getattr(self.dataOut, self.metadataList[i])
580 if isinstance(value, bool):
580 if isinstance(value, bool):
581 if value is True:
581 if value is True:
582 value = 1
582 value = 1
583 else:
583 else:
584 value = 0
584 value = 0
585 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
585 grp.create_dataset(self.getLabel(self.metadataList[i]), data=value)
586 return
586 return
587
587
588 def writeData(self, fp):
588 def writeData(self, fp):
589
589
590 if self.description:
590 if self.description:
591 if 'Data' in self.description:
591 if 'Data' in self.description:
592 grp = fp.create_group('Data')
592 grp = fp.create_group('Data')
593 else:
593 else:
594 grp = fp
594 grp = fp
595 else:
595 else:
596 grp = fp.create_group('Data')
596 grp = fp.create_group('Data')
597
597
598 dtsets = []
598 dtsets = []
599 data = []
599 data = []
600
600
601 for dsInfo in self.dsList:
601 for dsInfo in self.dsList:
602 if dsInfo['nDim'] == 0:
602 if dsInfo['nDim'] == 0:
603 ds = grp.create_dataset(
603 ds = grp.create_dataset(
604 self.getLabel(dsInfo['variable']),
604 self.getLabel(dsInfo['variable']),
605 (self.blocksPerFile, ),
605 (self.blocksPerFile, ),
606 chunks=True,
606 chunks=True,
607 dtype=numpy.float64)
607 dtype=numpy.float64)
608 dtsets.append(ds)
608 dtsets.append(ds)
609 data.append((dsInfo['variable'], -1))
609 data.append((dsInfo['variable'], -1))
610 else:
610 else:
611 label = self.getLabel(dsInfo['variable'])
611 label = self.getLabel(dsInfo['variable'])
612 if label is not None:
612 if label is not None:
613 sgrp = grp.create_group(label)
613 sgrp = grp.create_group(label)
614 else:
614 else:
615 sgrp = grp
615 sgrp = grp
616 for i in range(dsInfo['dsNumber']):
616 for i in range(dsInfo['dsNumber']):
617 ds = sgrp.create_dataset(
617 ds = sgrp.create_dataset(
618 self.getLabel(dsInfo['variable'], i),
618 self.getLabel(dsInfo['variable'], i),
619 (self.blocksPerFile, ) + dsInfo['shape'][1:],
619 (self.blocksPerFile, ) + dsInfo['shape'][1:],
620 chunks=True,
620 chunks=True,
621 dtype=dsInfo['dtype'])
621 dtype=dsInfo['dtype'])
622 dtsets.append(ds)
622 dtsets.append(ds)
623 data.append((dsInfo['variable'], i))
623 data.append((dsInfo['variable'], i))
624 fp.flush()
624 fp.flush()
625
625
626 log.log('Creating file: {}'.format(fp.filename), self.name)
626 log.log('Creating file: {}'.format(fp.filename), self.name)
627
627
628 self.ds = dtsets
628 self.ds = dtsets
629 self.data = data
629 self.data = data
630 self.firsttime = True
630 self.firsttime = True
631
631
632 return
632 return
633
633
634 def putData(self):
634 def putData(self):
635
635
636 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
636 if (self.blockIndex == self.blocksPerFile) or self.timeFlag():
637 self.closeFile()
637 self.closeFile()
638 self.setNextFile()
638 self.setNextFile()
639 self.dataOut.flagNoData = False
639 self.dataOut.flagNoData = False
640 self.blockIndex = 0
640 self.blockIndex = 0
641 return
641 return
642
642
643
643
644
644
645 if self.blockIndex == 0:
645 if self.blockIndex == 0:
646 #Escribir metadata Aqui???
646 #Escribir metadata Aqui???
647 #Setting HDF5 File
647 #Setting HDF5 File
648 self.fp = h5py.File(self.filename, 'w')
648 self.fp = h5py.File(self.filename, 'w')
649 #write metadata
649 #write metadata
650 self.writeMetadata(self.fp)
650 self.writeMetadata(self.fp)
651 #Write data
651 #Write data
652 self.writeData(self.fp)
652 self.writeData(self.fp)
653 log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
653 log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
654 elif (self.blockIndex % 10 ==0):
655 log.log('Block No. {}/{} --> {}'.format(self.blockIndex+1, self.blocksPerFile,self.dataOut.datatime.ctime()), self.name)
654 else:
656 else:
655
657
656 log.log('Block No. {}/{}'.format(self.blockIndex+1, self.blocksPerFile), self.name)
658 log.log('Block No. {}/{}'.format(self.blockIndex+1, self.blocksPerFile), self.name)
657
659
658 for i, ds in enumerate(self.ds):
660 for i, ds in enumerate(self.ds):
659 attr, ch = self.data[i]
661 attr, ch = self.data[i]
660 if ch == -1:
662 if ch == -1:
661 ds[self.blockIndex] = getattr(self.dataOut, attr)
663 ds[self.blockIndex] = getattr(self.dataOut, attr)
662 else:
664 else:
663 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
665 ds[self.blockIndex] = getattr(self.dataOut, attr)[ch]
664
666
665 self.blockIndex += 1
667 self.blockIndex += 1
666
668
667 self.fp.flush()
669 self.fp.flush()
668 self.dataOut.flagNoData = True
670 self.dataOut.flagNoData = True
669
671
670
672
671 def closeFile(self):
673 def closeFile(self):
672
674
673 if self.blockIndex != self.blocksPerFile:
675 if self.blockIndex != self.blocksPerFile:
674 for ds in self.ds:
676 for ds in self.ds:
675 ds.resize(self.blockIndex, axis=0)
677 ds.resize(self.blockIndex, axis=0)
676
678
677 if self.fp:
679 if self.fp:
678 self.fp.flush()
680 self.fp.flush()
679 self.fp.close()
681 self.fp.close()
680
682
681 def close(self):
683 def close(self):
682
684
683 self.closeFile()
685 self.closeFile()
@@ -1,1112 +1,1201
1 '''
1 '''
2 @author: Daniel Suarez
2 @author: Daniel Suarez
3 '''
3 '''
4 import os
4 import os
5 import glob
5 import glob
6 import ftplib
6 import ftplib
7
7
8 try:
8 try:
9 import paramiko
9 import paramiko
10 import scp
10 import scp
11 except:
11 except:
12 pass
12 pass
13
13
14 import time
14 import time
15
15
16 import threading
16 import threading
17 Thread = threading.Thread
17 Thread = threading.Thread
18
18
19 # try:
19 # try:
20 # from gevent import sleep
20 # from gevent import sleep
21 # except:
21 # except:
22 from time import sleep
22 from time import sleep
23 from schainpy.model.data.jrodata import *
23 from schainpy.model.data.jrodata import *
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
25 #@MPDecorator
25 #@MPDecorator
26 class Remote(Thread):
26 class Remote(Thread):
27 """
27 """
28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
29 used to upload or download files remotely.
29 used to upload or download files remotely.
30
30
31 Non-standard Python modules used:
31 Non-standard Python modules used:
32 None
32 None
33
33
34 Written by:
34 Written by:
35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
36 Modified by:
36 Modified by:
37 -
37 -
38 """
38 """
39
39
40 server = None
40 server = None
41 username = None
41 username = None
42 password = None
42 password = None
43 remotefolder = None
43 remotefolder = None
44 key_filename=None
44 key_filename=None
45
45
46 period = 60
46 period = 60
47 fileList = []
47 fileList = []
48 bussy = False
48 bussy = False
49
49
50 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
50 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
51
51
52 Thread.__init__(self)
52 Thread.__init__(self)
53
53
54 self.setDaemon(True)
54 self.setDaemon(True)
55
55
56 self.status = 0
56 self.status = 0
57
57
58 self.__server = server
58 self.__server = server
59 self.__username = username
59 self.__username = username
60 self.__password = password
60 self.__password = password
61 self.__remotefolder = remotefolder
61 self.__remotefolder = remotefolder
62
62
63 self.period = period
63 self.period = period
64 self.key_filename = key_filename
64 self.key_filename = key_filename
65 self.fileList = []
65 self.fileList = []
66 self.bussy = False
66 self.bussy = False
67
67
68 self.stopFlag = False
68 self.stopFlag = False
69
69
70 print("[Remote Server] Opening server: %s" %self.__server)
70 print("[Remote Server] Opening server: %s" %self.__server)
71 if self.open(self.__server, self.__username, self.__password, self.__remotefolder,key_filename=self.key_filename):
71 if self.open(self.__server, self.__username, self.__password, self.__remotefolder,key_filename=self.key_filename):
72 print("[Remote Server] %s server was opened successfully" %self.__server)
72 print("[Remote Server] %s server was opened successfully" %self.__server)
73
73
74 #self.close()
74 #self.close()
75
75
76 self.mutex = threading.Lock()
76 self.mutex = threading.Lock()
77
77
78 def stop(self):
78 def stop(self):
79
79
80 self.stopFlag = True
80 self.stopFlag = True
81 self.join(10)
81 self.join(10)
82
82
83 def open(self):
83 def open(self):
84 """
84 """
85 Connect to server and create a connection class (FTP or SSH) to remote server.
85 Connect to server and create a connection class (FTP or SSH) to remote server.
86 """
86 """
87 raise NotImplementedError("Implement this method in child class")
87 raise NotImplementedError("Implement this method in child class")
88
88
89 def close(self):
89 def close(self):
90 """
90 """
91 Close connection to server
91 Close connection to server
92 """
92 """
93 raise NotImplementedError("Implement this method in child class")
93 raise NotImplementedError("Implement this method in child class")
94
94
95 def mkdir(self, remotefolder):
95 def mkdir(self, remotefolder):
96 """
96 """
97 Create a folder remotely
97 Create a folder remotely
98 """
98 """
99 raise NotImplementedError("Implement this method in child class")
99 raise NotImplementedError("Implement this method in child class")
100
100
101 def cd(self, remotefolder):
101 def cd(self, remotefolder):
102 """
102 """
103 Change working directory in remote server
103 Change working directory in remote server
104 """
104 """
105 raise NotImplementedError("Implement this method in child class")
105 raise NotImplementedError("Implement this method in child class")
106
106
107 def download(self, filename, localfolder=None):
107 def download(self, filename, localfolder=None):
108 """
108 """
109 Download a file from server to local host
109 Download a file from server to local host
110 """
110 """
111 raise NotImplementedError("Implement this method in child class")
111 raise NotImplementedError("Implement this method in child class")
112
112
113 def sendFile(self, fullfilename):
113 def sendFile(self, fullfilename):
114 """
114 """
115 sendFile method is used to upload a local file to the current directory in remote server
115 sendFile method is used to upload a local file to the current directory in remote server
116
116
117 Inputs:
117 Inputs:
118 fullfilename - full path name of local file to store in remote directory
118 fullfilename - full path name of local file to store in remote directory
119
119
120 Returns:
120 Returns:
121 0 in error case else 1
121 0 in error case else 1
122 """
122 """
123 raise NotImplementedError("Implement this method in child class")
123 raise NotImplementedError("Implement this method in child class")
124
124
125 def upload(self, fullfilename, remotefolder=None):
125 def upload(self, fullfilename, remotefolder=None):
126 """
126 """
127 upload method is used to upload a local file to remote directory. This method changes
127 upload method is used to upload a local file to remote directory. This method changes
128 working directory before sending a file.
128 working directory before sending a file.
129
129
130 Inputs:
130 Inputs:
131 fullfilename - full path name of local file to store in remote directory
131 fullfilename - full path name of local file to store in remote directory
132
132
133 remotefolder - remote directory
133 remotefolder - remote directory
134
134
135 Returns:
135 Returns:
136 0 in error case else 1
136 0 in error case else 1
137 """
137 """
138 print("[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder))
138 print("[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder))
139
139
140 if not self.status:
140 if not self.status:
141 return 0
141 return 0
142
142
143 if remotefolder == None:
143 if remotefolder == None:
144 remotefolder = self.remotefolder
144 remotefolder = self.remotefolder
145
145
146 if not self.cd(remotefolder):
146 if not self.cd(remotefolder):
147 return 0
147 return 0
148
148
149 if not self.sendFile(fullfilename):
149 if not self.sendFile(fullfilename):
150 print("[Remote Server] Error uploading file %s" %fullfilename)
150 print("[Remote Server] Error uploading file %s" %fullfilename)
151 return 0
151 return 0
152
152
153
153
154
154
155 return 1
155 return 1
156
156
157 def delete(self, filename):
157 def delete(self, filename):
158 """
158 """
159 Remove a file from remote server
159 Remove a file from remote server
160 """
160 """
161 pass
161 pass
162
162
163 def updateFileList(self, fileList):
163 def updateFileList(self, fileList):
164 """
164 """
165 Remove a file from remote server
165 Remove a file from remote server
166 """
166 """
167
167
168 if fileList == self.fileList:
168 if fileList == self.fileList:
169 return 0
169 return 0
170
170
171 self.mutex.acquire()
171 self.mutex.acquire()
172 # init = time.time()
172 # init = time.time()
173 #
173 #
174 # while(self.bussy):
174 # while(self.bussy):
175 # sleep(0.1)
175 # sleep(0.1)
176 # if time.time() - init > 2*self.period:
176 # if time.time() - init > 2*self.period:
177 # return 0
177 # return 0
178
178
179 self.fileList = fileList
179 self.fileList = fileList
180 self.mutex.release()
180 self.mutex.release()
181 return 1
181 return 1
182
182
183 def run(self):
183 def run(self):
184
184
185 if not self.status:
185 if not self.status:
186 print("Finishing FTP service")
186 print("Finishing FTP service")
187 return
187 return
188
188
189 if not self.cd(self.remotefolder):
189 if not self.cd(self.remotefolder):
190 raise ValueError("Could not access to the new remote directory: %s" %self.remotefolder)
190 raise ValueError("Could not access to the new remote directory: %s" %self.remotefolder)
191
191
192 while True:
192 while True:
193
193
194 for i in range(self.period):
194 for i in range(self.period):
195 if self.stopFlag:
195 if self.stopFlag:
196 break
196 break
197 sleep(1)
197 sleep(1)
198
198
199 if self.stopFlag:
199 if self.stopFlag:
200 break
200 break
201
201
202 # self.bussy = True
202 # self.bussy = True
203 self.mutex.acquire()
203 self.mutex.acquire()
204
204
205 print("[Remote Server] Opening %s" %self.__server)
205 print("[Remote Server] Opening %s" %self.__server)
206 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
206 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
207 self.mutex.release()
207 self.mutex.release()
208 continue
208 continue
209
209
210 for thisFile in self.fileList:
210 for thisFile in self.fileList:
211 self.upload(thisFile, self.remotefolder)
211 self.upload(thisFile, self.remotefolder)
212
212
213 print("[Remote Server] Closing %s" %self.__server)
213 print("[Remote Server] Closing %s" %self.__server)
214 self.close()
214 self.close()
215
215
216 self.mutex.release()
216 self.mutex.release()
217 # self.bussy = False
217 # self.bussy = False
218
218
219 print("[Remote Server] Thread stopped successfully")
219 print("[Remote Server] Thread stopped successfully")
220
220
221 class FTPClient(Remote):
221 class FTPClient(Remote):
222
222
223 __ftpClientObj = None
223 __ftpClientObj = None
224
224
225 def __init__(self, server, username, password, remotefolder, period=60):
225 def __init__(self, server, username, password, remotefolder, period=60):
226 """
226 """
227 """
227 """
228 Remote.__init__(self, server, username, password, remotefolder, period)
228 Remote.__init__(self, server, username, password, remotefolder, period)
229
229
230 def open(self, server, username, password, remotefolder):
230 def open(self, server, username, password, remotefolder):
231
231
232 """
232 """
233 This method is used to set FTP parameters and establish a connection to remote server
233 This method is used to set FTP parameters and establish a connection to remote server
234
234
235 Inputs:
235 Inputs:
236 server - remote server IP Address
236 server - remote server IP Address
237
237
238 username - remote server Username
238 username - remote server Username
239
239
240 password - remote server password
240 password - remote server password
241
241
242 remotefolder - remote server current working directory
242 remotefolder - remote server current working directory
243
243
244 Return:
244 Return:
245 Boolean - Returns 1 if a connection has been established, 0 otherwise
245 Boolean - Returns 1 if a connection has been established, 0 otherwise
246
246
247 Affects:
247 Affects:
248 self.status - in case of error or fail connection this parameter is set to 0 else 1
248 self.status - in case of error or fail connection this parameter is set to 0 else 1
249
249
250 """
250 """
251
251
252 if server == None:
252 if server == None:
253 raise ValueError("FTP server should be defined")
253 raise ValueError("FTP server should be defined")
254
254
255 if username == None:
255 if username == None:
256 raise ValueError("FTP username should be defined")
256 raise ValueError("FTP username should be defined")
257
257
258 if password == None:
258 if password == None:
259 raise ValueError("FTP password should be defined")
259 raise ValueError("FTP password should be defined")
260
260
261 if remotefolder == None:
261 if remotefolder == None:
262 raise ValueError("FTP remote folder should be defined")
262 raise ValueError("FTP remote folder should be defined")
263
263
264 try:
264 try:
265 ftpClientObj = ftplib.FTP(server)
265 ftpClientObj = ftplib.FTP(server)
266 except ftplib.all_errors as e:
266 except ftplib.all_errors as e:
267 print("[FTP Server]: FTP server connection fail: %s" %server)
267 print("[FTP Server]: FTP server connection fail: %s" %server)
268 print("[FTP Server]:", e)
268 print("[FTP Server]:", e)
269 self.status = 0
269 self.status = 0
270 return 0
270 return 0
271
271
272 try:
272 try:
273 ftpClientObj.login(username, password)
273 ftpClientObj.login(username, password)
274 except ftplib.all_errors:
274 except ftplib.all_errors:
275 print("[FTP Server]: FTP username or password are incorrect")
275 print("[FTP Server]: FTP username or password are incorrect")
276 self.status = 0
276 self.status = 0
277 return 0
277 return 0
278
278
279 if remotefolder == None:
279 if remotefolder == None:
280 remotefolder = ftpClientObj.pwd()
280 remotefolder = ftpClientObj.pwd()
281 else:
281 else:
282 try:
282 try:
283 ftpClientObj.cwd(remotefolder)
283 ftpClientObj.cwd(remotefolder)
284 except ftplib.all_errors:
284 except ftplib.all_errors:
285 print("[FTP Server]: FTP remote folder is invalid: %s" %remotefolder)
285 print("[FTP Server]: FTP remote folder is invalid: %s" %remotefolder)
286 remotefolder = ftpClientObj.pwd()
286 remotefolder = ftpClientObj.pwd()
287
287
288 self.server = server
288 self.server = server
289 self.username = username
289 self.username = username
290 self.password = password
290 self.password = password
291 self.remotefolder = remotefolder
291 self.remotefolder = remotefolder
292 self.__ftpClientObj = ftpClientObj
292 self.__ftpClientObj = ftpClientObj
293 self.status = 1
293 self.status = 1
294
294
295 return 1
295 return 1
296
296
297 def close(self):
297 def close(self):
298 """
298 """
299 Close connection to remote server
299 Close connection to remote server
300 """
300 """
301 if not self.status:
301 if not self.status:
302 return 0
302 return 0
303
303
304 self.__ftpClientObj.close()
304 self.__ftpClientObj.close()
305
305
306 def mkdir(self, remotefolder):
306 def mkdir(self, remotefolder):
307 """
307 """
308 mkdir is used to make a new directory in remote server
308 mkdir is used to make a new directory in remote server
309
309
310 Input:
310 Input:
311 remotefolder - directory name
311 remotefolder - directory name
312
312
313 Return:
313 Return:
314 0 in error case else 1
314 0 in error case else 1
315 """
315 """
316 if not self.status:
316 if not self.status:
317 return 0
317 return 0
318
318
319 try:
319 try:
320 self.__ftpClientObj.mkd(dirname)
320 self.__ftpClientObj.mkd(dirname)
321 except ftplib.all_errors:
321 except ftplib.all_errors:
322 print("[FTP Server]: Error creating remote folder: %s" %remotefolder)
322 print("[FTP Server]: Error creating remote folder: %s" %remotefolder)
323 return 0
323 return 0
324
324
325 return 1
325 return 1
326
326
327 def cd(self, remotefolder):
327 def cd(self, remotefolder):
328 """
328 """
329 cd is used to change remote working directory on server
329 cd is used to change remote working directory on server
330
330
331 Input:
331 Input:
332 remotefolder - current working directory
332 remotefolder - current working directory
333
333
334 Affects:
334 Affects:
335 self.remotefolder
335 self.remotefolder
336
336
337 Return:
337 Return:
338 0 in case of error else 1
338 0 in case of error else 1
339 """
339 """
340 if not self.status:
340 if not self.status:
341 return 0
341 return 0
342
342
343 if remotefolder == self.remotefolder:
343 if remotefolder == self.remotefolder:
344 return 1
344 return 1
345
345
346 try:
346 try:
347 self.__ftpClientObj.cwd(remotefolder)
347 self.__ftpClientObj.cwd(remotefolder)
348 except ftplib.all_errors:
348 except ftplib.all_errors:
349 print('[FTP Server]: Error changing to %s' %remotefolder)
349 print('[FTP Server]: Error changing to %s' %remotefolder)
350 print('[FTP Server]: Trying to create remote folder')
350 print('[FTP Server]: Trying to create remote folder')
351
351
352 if not self.mkdir(remotefolder):
352 if not self.mkdir(remotefolder):
353 print('[FTP Server]: Remote folder could not be created')
353 print('[FTP Server]: Remote folder could not be created')
354 return 0
354 return 0
355
355
356 try:
356 try:
357 self.__ftpClientObj.cwd(remotefolder)
357 self.__ftpClientObj.cwd(remotefolder)
358 except ftplib.all_errors:
358 except ftplib.all_errors:
359 return 0
359 return 0
360
360
361 self.remotefolder = remotefolder
361 self.remotefolder = remotefolder
362
362
363 return 1
363 return 1
364
364
365 def sendFile(self, fullfilename):
365 def sendFile(self, fullfilename):
366
366
367 if not self.status:
367 if not self.status:
368 return 0
368 return 0
369
369
370 fp = open(fullfilename, 'rb')
370 fp = open(fullfilename, 'rb')
371
371
372 filename = os.path.basename(fullfilename)
372 filename = os.path.basename(fullfilename)
373
373
374 command = "STOR %s" %filename
374 command = "STOR %s" %filename
375
375
376 try:
376 try:
377 self.__ftpClientObj.storbinary(command, fp)
377 self.__ftpClientObj.storbinary(command, fp)
378 except ftplib.all_errors as e:
378 except ftplib.all_errors as e:
379 print("[FTP Server]:", e)
379 print("[FTP Server]:", e)
380 return 0
380 return 0
381
381
382 try:
382 try:
383 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
383 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
384 except ftplib.all_errors as e:
384 except ftplib.all_errors as e:
385 print("[FTP Server]:", e)
385 print("[FTP Server]:", e)
386
386
387 fp.close()
387 fp.close()
388
388
389 return 1
389 return 1
390
390
391 class SSHClient(Remote):
391 class SSHClient(Remote):
392
392
393 __sshClientObj = None
393 __sshClientObj = None
394 __scpClientObj = None
394 __scpClientObj = None
395
395
396
396
397 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
397 def __init__(self, server, username, password, remotefolder, period=60,key_filename=None):
398 """
398 """
399 """
399 """
400 Remote.__init__(self, server, username, password, remotefolder, period, key_filename)
400 Remote.__init__(self, server, username, password, remotefolder, period, key_filename)
401
401
402 def open(self, server, username, password, remotefolder, port=22, key_filename=None):
402 def open(self, server, username, password, remotefolder, port=22, key_filename=None):
403
403
404 """
404 """
405 This method is used to set SSH parameters and establish a connection to a remote server
405 This method is used to set SSH parameters and establish a connection to a remote server
406
406
407 Inputs:
407 Inputs:
408 server - remote server IP Address
408 server - remote server IP Address
409
409
410 username - remote server Username
410 username - remote server Username
411
411
412 password - remote server password
412 password - remote server password
413
413
414 remotefolder - remote server current working directory
414 remotefolder - remote server current working directory
415
415
416 key_filename - filename of the private key/optional
416 key_filename - filename of the private key/optional
417
417
418 Return: void
418 Return: void
419
419
420 Affects:
420 Affects:
421 self.status - in case of error or fail connection this parameter is set to 0 else 1
421 self.status - in case of error or fail connection this parameter is set to 0 else 1
422
422
423 """
423 """
424 #import socket
424 #import socket
425
425
426 if server == None:
426 if server == None:
427 raise ValueError("SSH server should be defined")
427 raise ValueError("SSH server should be defined")
428
428
429 if username == None:
429 if username == None:
430 raise ValueError("SSH username should be defined")
430 raise ValueError("SSH username should be defined")
431
431
432 if password == None:
432 if password == None:
433 raise ValueError("SSH password should be defined")
433 raise ValueError("SSH password should be defined")
434
434
435 if remotefolder == None:
435 if remotefolder == None:
436 raise ValueError("SSH remote folder should be defined")
436 raise ValueError("SSH remote folder should be defined")
437
437
438 self.__sshClientObj = paramiko.SSHClient()
438 self.__sshClientObj = paramiko.SSHClient()
439
439
440 self.__sshClientObj.load_system_host_keys()
440 self.__sshClientObj.load_system_host_keys()
441 self.__sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
441 self.__sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
442
442
443 self.status = 0
443 self.status = 0
444
444
445 try:
445 try:
446 if key_filename != None:
446 if key_filename != None:
447 self.__sshClientObj.connect(server, username=username, password=password, port=port, key_filename=key_filename)
447 self.__sshClientObj.connect(server, username=username, password=password, port=port, key_filename=key_filename)
448 else:
448 else:
449 self.__sshClientObj.connect(server, username=username, password=password, port=port)
449 self.__sshClientObj.connect(server, username=username, password=password, port=port)
450 except paramiko.AuthenticationException as e:
450 except paramiko.AuthenticationException as e:
451 # print "SSH username or password are incorrect: %s"
451 # print "SSH username or password are incorrect: %s"
452 print("[SSH Server]:", e)
452 print("[SSH Server]:", e)
453 return 0
453 return 0
454 # except SSHException as e:
454 # except SSHException as e:
455 # print("[SSH Server]:", e)
455 # print("[SSH Server]:", e)
456 # return 0
456 # return 0
457 # except socket.error:
457 # except socket.error:
458 # self.status = 0
458 # self.status = 0
459 # print("[SSH Server]:", e)
459 # print("[SSH Server]:", e)
460 # return 0
460 # return 0
461
461
462 self.status = 1
462 self.status = 1
463 #self.__scpClientObj = scp.SCPClient(self.__sshClientObj.get_transport(), socket_timeout=30)
463 #self.__scpClientObj = scp.SCPClient(self.__sshClientObj.get_transport(), socket_timeout=30)
464 self.__scpClientObj = self.__sshClientObj.open_sftp()
464 self.__scpClientObj = self.__sshClientObj.open_sftp()
465 if remotefolder == None:
465 if remotefolder == None:
466 remotefolder = self.pwd()
466 remotefolder = self.pwd()
467
467
468 self.server = server
468 self.server = server
469 self.username = username
469 self.username = username
470 self.password = password
470 self.password = password
471 # self.__sshClientObj = self.__sshClientObj
471 # self.__sshClientObj = self.__sshClientObj
472 # self.__scpClientObj = self.__scpClientObj
472 # self.__scpClientObj = self.__scpClientObj
473 self.status = 1
473 self.status = 1
474
474
475 if not self.cd(remotefolder):
475 if not self.cd(remotefolder):
476 raise ValueError("[SSH Server]: Could not access to remote folder: %s" %remotefolder)
476 raise ValueError("[SSH Server]: Could not access to remote folder: %s" %remotefolder)
477 return 0
477 return 0
478
478
479 self.remotefolder = remotefolder
479 self.remotefolder = remotefolder
480
480
481 return 1
481 return 1
482
482
483 def close(self):
483 def close(self):
484 """
484 """
485 Close connection to remote server
485 Close connection to remote server
486 """
486 """
487 if not self.status:
487 if not self.status:
488 return 0
488 return 0
489
489
490 self.__scpClientObj.close()
490 self.__scpClientObj.close()
491 self.__sshClientObj.close()
491 self.__sshClientObj.close()
492
492
493 def __execute(self, command):
493 def __execute(self, command):
494 """
494 """
495 __execute a command on remote server
495 __execute a command on remote server
496
496
497 Input:
497 Input:
498 command - Exmaple 'ls -l'
498 command - Exmaple 'ls -l'
499
499
500 Return:
500 Return:
501 0 in error case else 1
501 0 in error case else 1
502 """
502 """
503 if not self.status:
503 if not self.status:
504 return 0
504 return 0
505
505
506 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
506 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
507
507
508 result = stderr.readlines()
508 result = stderr.readlines()
509 if len(result) > 1:
509 if len(result) > 1:
510 return 0
510 return 0
511
511
512 result = stdout.readlines()
512 result = stdout.readlines()
513 if len(result) > 1:
513 if len(result) > 1:
514 return result[0][:-1]
514 return result[0][:-1]
515
515
516 return 1
516 return 1
517
517
518 def mkdir(self, remotefolder):
518 def mkdir(self, remotefolder):
519 """
519 """
520 mkdir is used to make a new directory in remote server
520 mkdir is used to make a new directory in remote server
521
521
522 Input:
522 Input:
523 remotefolder - directory name
523 remotefolder - directory name
524
524
525 Return:
525 Return:
526 0 in error case else 1
526 0 in error case else 1
527 """
527 """
528
528
529 command = 'mkdir %s' %remotefolder
529 command = 'mkdir %s' %remotefolder
530
530
531 return self.__execute(command)
531 return self.__execute(command)
532
532
533 def pwd(self):
533 def pwd(self):
534
534
535 command = 'pwd'
535 command = 'pwd'
536
536
537 return self.__execute(command)
537 return self.__execute(command)
538
538
539 def cd(self, remotefolder):
539 def cd(self, remotefolder):
540 """
540 """
541 cd is used to change remote working directory on server
541 cd is used to change remote working directory on server
542
542
543 Input:
543 Input:
544 remotefolder - current working directory
544 remotefolder - current working directory
545
545
546 Affects:
546 Affects:
547 self.remotefolder
547 self.remotefolder
548
548
549 Return:
549 Return:
550 0 in case of error else 1
550 0 in case of error else 1
551 """
551 """
552 if not self.status:
552 if not self.status:
553 return 0
553 return 0
554
554
555 if remotefolder == self.remotefolder:
555 if remotefolder == self.remotefolder:
556 return 1
556 return 1
557
557
558 chk_command = "cd %s; pwd" %remotefolder
558 chk_command = "cd %s; pwd" %remotefolder
559 mkdir_command = "mkdir %s" %remotefolder
559 mkdir_command = "mkdir %s" %remotefolder
560
560
561 if not self.__execute(chk_command):
561 if not self.__execute(chk_command):
562 if not self.__execute(mkdir_command):
562 if not self.__execute(mkdir_command):
563 self.remotefolder = None
563 self.remotefolder = None
564 return 0
564 return 0
565
565
566 self.remotefolder = remotefolder
566 self.remotefolder = remotefolder
567
567
568 return 1
568 return 1
569
569
570 def sendFile(self, fullfilename):
570 def sendFile(self, fullfilename):
571
571
572 if not self.status:
572 if not self.status:
573 return 0
573 return 0
574
574
575 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
575 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
576 print("remotefile",fullfilename, remotefile)
576 print("remotefile",fullfilename, remotefile)
577
577
578 try:
578 try:
579 self.__scpClientObj.put(fullfilename,remotefile)
579 self.__scpClientObj.put(fullfilename,remotefile)
580 except paramiko.SSHException as e:
580 except paramiko.SSHException as e:
581 print("[SSH Server]", str(e))
581 print("[SSH Server]", str(e))
582 print(fullfilename," to ",remotefile)
582 print(fullfilename," to ",remotefile)
583 return 0
583 return 0
584
584
585
585
586 #command = 'chmod 775 %s' %remotefile
586 #command = 'chmod 775 %s' %remotefile
587
587
588 return 1#self.__execute(command)
588 return 1#self.__execute(command)
589 #@MPDecorator
589 #@MPDecorator
590 class SendToServerProc(ProcessingUnit):
590 class SendToServerProc(ProcessingUnit):
591
591
592 sendByTrigger = False
592 sendByTrigger = False
593
593
594 def __init__(self, **kwargs):
594 def __init__(self, **kwargs):
595
595
596 ProcessingUnit.__init__(self)
596 ProcessingUnit.__init__(self)
597
597
598 self.isConfig = False
598 self.isConfig = False
599 self.clientObj = None
599 self.clientObj = None
600 self.dataOut = Parameters()
600 self.dataOut = Parameters()
601 self.dataOut.error=False
601 self.dataOut.error=False
602 self.dataOut.flagNoData=True
602 self.dataOut.flagNoData=True
603
603
604 def setup(self, server=None, username="", password="", remotefolder="", localfolder="",
604 def setup(self, server=None, username="", password="", remotefolder="", localfolder="",
605 ext='.png', period=60, protocol='ftp', sendByTrigger=False, key_filename=None):
605 ext='.png', period=60, protocol='ftp', sendByTrigger=False, key_filename=None):
606 self.server = server
606 self.server = server
607 self.username = username
607 self.username = username
608 self.password = password
608 self.password = password
609 self.remotefolder = remotefolder
609 self.remotefolder = remotefolder
610 self.clientObj = None
610 self.clientObj = None
611 self.localfolder = localfolder
611 self.localfolder = localfolder
612 self.ext = ext
612 self.ext = ext
613 self.sendByTrigger = sendByTrigger
613 self.sendByTrigger = sendByTrigger
614 self.period = period
614 self.period = period
615 self.key_filename = key_filename
615 self.key_filename = key_filename
616 if self.sendByTrigger:
616 if self.sendByTrigger:
617 self.period = 1000000000000 #para que no se ejecute por tiempo
617 self.period = 1000000000000 #para que no se ejecute por tiempo
618
618
619 if str.lower(protocol) == 'ftp':
619 if str.lower(protocol) == 'ftp':
620 self.clientObj = FTPClient(server, username, password, remotefolder, period)
620 self.clientObj = FTPClient(server, username, password, remotefolder, period)
621
621
622 if str.lower(protocol) == 'ssh':
622 if str.lower(protocol) == 'ssh':
623 self.clientObj = SSHClient(self.server, self.username, self.password,
623 self.clientObj = SSHClient(self.server, self.username, self.password,
624 self.remotefolder, period=600000,key_filename=self.key_filename)
624 self.remotefolder, period=600000,key_filename=self.key_filename)
625
625
626 if not self.clientObj:
626 if not self.clientObj:
627 raise ValueError("%s has been chosen as remote access protocol but it is not valid" %protocol)
627 raise ValueError("%s has been chosen as remote access protocol but it is not valid" %protocol)
628
628
629 print("Send to Server setup complete")
629 print("Send to Server setup complete")
630
630
631
631
632 def findFiles(self):
632 def findFiles(self):
633
633
634 if not type(self.localfolder) == list:
634 if not type(self.localfolder) == list:
635 folderList = [self.localfolder]
635 folderList = [self.localfolder]
636 else:
636 else:
637 folderList = self.localfolder
637 folderList = self.localfolder
638
638
639 #Remove duplicate items
639 #Remove duplicate items
640 folderList = list(set(folderList))
640 folderList = list(set(folderList))
641
641
642 fullfilenameList = []
642 fullfilenameList = []
643
643
644 for thisFolder in folderList:
644 for thisFolder in folderList:
645
645
646 print("[Remote Server]: Searching files on %s" %thisFolder)
646 print("[Remote Server]: Searching files on %s" %thisFolder)
647
647
648 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
648 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
649
649
650 if len(filenameList) < 1:
650 if len(filenameList) < 1:
651
651
652 continue
652 continue
653
653
654 for thisFile in filenameList:
654 for thisFile in filenameList:
655 fullfilename = os.path.join(thisFolder, thisFile)
655 fullfilename = os.path.join(thisFolder, thisFile)
656
656
657 if fullfilename in fullfilenameList:
657 if fullfilename in fullfilenameList:
658 continue
658 continue
659
659
660 #Only files modified in the last 30 minutes are considered
660 #Only files modified in the last 30 minutes are considered
661 if os.path.getmtime(fullfilename) < time.time() - 30*60:
661 if os.path.getmtime(fullfilename) < time.time() - 30*60:
662 continue
662 continue
663
663
664 fullfilenameList.append(fullfilename)
664 fullfilenameList.append(fullfilename)
665 fullfilenameList.sort()
665 fullfilenameList.sort()
666
666
667 return fullfilenameList
667 return fullfilenameList
668
668
669 def run(self, **kwargs):
669 def run(self, **kwargs):
670
670
671 if not self.isConfig:
671 if not self.isConfig:
672 self.init = time.time()
672 self.init = time.time()
673 self.setup(**kwargs)
673 self.setup(**kwargs)
674 self.isConfig = True
674 self.isConfig = True
675
675
676 if not self.clientObj.is_alive():
676 if not self.clientObj.is_alive():
677 print("[Remote Server]: Restarting connection ")
677 print("[Remote Server]: Restarting connection ")
678 self.setup( **kwargs)
678 self.setup( **kwargs)
679
679
680 if ((time.time() - self.init) >= self.period and not self.sendByTrigger) or (self.sendByTrigger and not self.dataIn.flagNoData):
680 if ((time.time() - self.init) >= self.period and not self.sendByTrigger) or (self.sendByTrigger and not self.dataIn.flagNoData):
681 fullfilenameList = self.findFiles()
681 fullfilenameList = self.findFiles()
682 if self.sendByTrigger:
682 if self.sendByTrigger:
683 if self.clientObj.upload(fullfilenameList[-1]): #last file to send
683 if self.clientObj.upload(fullfilenameList[-1]): #last file to send
684 print("[Remote Server] upload finished successfully")
684 print("[Remote Server] upload finished successfully")
685 else:
685 else:
686 for file in fullfilenameList:
686 for file in fullfilenameList:
687 self.clientObj.upload(file)
687 self.clientObj.upload(file)
688
688
689 # if self.clientObj.updateFileList(fullfilenameList):
689 # if self.clientObj.updateFileList(fullfilenameList):
690 # print("[Remote Server]: Sending the next files ", str(fullfilenameList))
690 # print("[Remote Server]: Sending the next files ", str(fullfilenameList))
691
691
692 self.init = time.time()
692 self.init = time.time()
693
693
694 def close(self):
694 def close(self):
695 print("[Remote Server] Stopping thread")
695 print("[Remote Server] Stopping thread")
696 self.clientObj.stop()
696 self.clientObj.stop()
697
697
698 class SendByRSYNCProc(ProcessingUnit):
698 class SendByRSYNCProc(ProcessingUnit):
699
699
700 sendByTrigger = False
700 sendByTrigger = False
701
701
702 def __init__(self, **kwargs):
702 def __init__(self, **kwargs):
703
703
704 ProcessingUnit.__init__(self)
704 ProcessingUnit.__init__(self)
705
705
706 self.isConfig = False
706 self.isConfig = False
707 self.dataOut = Parameters()
707 self.dataOut = Parameters()
708 self.dataOut.error=False
708 self.dataOut.error=False
709 self.dataOut.flagNoData=True
709 self.dataOut.flagNoData=True
710
710
711 def setup(self, server="", username="", remotefolder="", localfolder="",sendByTrigger=True,
711 def setup(self, server="", username="", remotefolder="", localfolder="",sendByTrigger=True,
712 period=60, key_filename=None, port=22 ,param1="", param2=""):
712 period=60, key_filename=None, port=22 ,param1="", param2=""):
713 self.server = server
713 self.server = server
714 self.username = username
714 self.username = username
715 self.remotefolder = remotefolder
715 self.remotefolder = remotefolder
716 self.localfolder = localfolder
716 self.localfolder = localfolder
717 self.period = period
717 self.period = period
718 self.key_filename = key_filename
718 self.key_filename = key_filename
719 if type(param1)==str:
719 if type(param1)==str:
720 self.param1 = list(param1.split(","))
720 self.param1 = list(param1.split(","))
721 else:
721 else:
722 self.param1 = param1
722 self.param1 = param1
723 if type(param2)==str:
723 if type(param2)==str:
724 self.param2 = list(param2.split(","))
724 self.param2 = list(param2.split(","))
725 else:
725 else:
726 self.param2 = param2
726 self.param2 = param2
727 self.port = port
727 self.port = port
728 self.sendByTrigger = sendByTrigger
728 self.sendByTrigger = sendByTrigger
729 if self.sendByTrigger:
729 if self.sendByTrigger:
730 self.period = 1000000000000 #para que no se ejecute por tiempo
730 self.period = 1000000000000 #para que no se ejecute por tiempo
731 self.command ="rsync "
731 self.command ="rsync "
732
732
733 def syncFolders(self):
733 def syncFolders(self):
734 self.command ="rsync "
734 self.command ="rsync "
735 for p1 in self.param1:
735 for p1 in self.param1:
736 self.command += " -"+str(p1)
736 self.command += " -"+str(p1)
737 for p2 in self.param2:
737 for p2 in self.param2:
738 self.command += " --"+str(p2)
738 self.command += " --"+str(p2)
739 if self.key_filename != None:
739 if self.key_filename != None:
740 self.command += """ "ssh -i {} -p {}" """.format(self.key_filename, self.port)
740 self.command += """ "ssh -i {} -p {}" """.format(self.key_filename, self.port)
741 self.command += " {} ".format(self.localfolder)
741 self.command += " {} ".format(self.localfolder)
742 self.command += " {}@{}:{}".format(self.username,self.server,self.remotefolder)
742 self.command += " {}@{}:{}".format(self.username,self.server,self.remotefolder)
743 print("CMD: ",self.command)
743 print("CMD: ",self.command)
744 #os.system(self.command)
744 #os.system(self.command)
745 return
745 return
746
746
747 def run(self, **kwargs):
747 def run(self, **kwargs):
748
748
749 if not self.isConfig:
749 if not self.isConfig:
750 self.init = time.time()
750 self.init = time.time()
751 self.setup(**kwargs)
751 self.setup(**kwargs)
752 self.isConfig = True
752 self.isConfig = True
753
753
754 if self.sendByTrigger and not self.dataIn.flagNoData:
754 if self.sendByTrigger and not self.dataIn.flagNoData:
755 self.syncFolders()
755 self.syncFolders()
756 else:
756 else:
757 if (time.time() - self.init) >= self.period:
757 if (time.time() - self.init) >= self.period:
758 self.syncFolders()
758 self.syncFolders()
759 self.init = time.time()
759 self.init = time.time()
760
760
761 return
761 return
762
762
763
763
764
764
765 class FTP(object):
765 class FTP(object):
766 """
766 """
767 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
767 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
768
768
769 Non-standard Python modules used: None
769 Non-standard Python modules used: None
770
770
771 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
771 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
772
773 Modified:
774 Joab Apaza Feb. 2022
772 """
775 """
773
776
774 def __init__(self,server = None, username=None, password=None, remotefolder=None):
777 def __init__(self,server = None, username=None, password=None, remotefolder=None):
775 """
778 """
776 This method is used to setting parameters for FTP and establishing connection to remote server
779 This method is used to setting parameters for FTP and establishing connection to remote server
777
780
778 Inputs:
781 Inputs:
779 server - remote server IP Address
782 server - remote server IP Address
780
783
781 username - remote server Username
784 username - remote server Username
782
785
783 password - remote server password
786 password - remote server password
784
787
785 remotefolder - remote server current working directory
788 remotefolder - remote server current working directory
786
789
787 Return: void
790 Return: void
788
791
789 Affects:
792 Affects:
790 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
793 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
791
794
792 self.folderList - sub-folder list of remote folder
795 self.folderList - sub-folder list of remote folder
793
796
794 self.fileList - file list of remote folder
797 self.fileList - file list of remote folder
795
798
796
799
797 """
800 """
798
801
799 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
802 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
800 server, username, password, remotefolder = self.parmsByDefault()
803 server, username, password, remotefolder = self.parmsByDefault()
801
804
802 self.server = server
805 self.server = server
803 self.username = username
806 self.username = username
804 self.password = password
807 self.password = password
805 self.remotefolder = remotefolder
808 self.remotefolder = remotefolder
806 self.file = None
809 self.file = None
807 self.ftp = None
810 self.ftp = None
808 self.status = 0
811 self.status = 0
809
812
810 try:
813 try:
811 self.ftp = ftplib.FTP(self.server)
814 self.ftp = ftplib.FTP(self.server)
812 self.ftp.login(self.username,self.password)
815 self.ftp.login(self.username,self.password)
813 self.ftp.cwd(self.remotefolder)
816 self.ftp.cwd(self.remotefolder)
814 # print 'Connect to FTP Server: Successfully'
817 # print 'Connect to FTP Server: Successfully'
815
818
816 except ftplib.all_errors:
819 except ftplib.all_errors:
817 print('Error FTP Service')
820 print('Error FTP Service')
818 self.status = 1
821 self.status = 1
819 return
822 return
820
823
821
824
822
825
823 self.dirList = []
826 self.dirList = []
824
827
825 try:
828 try:
826 self.dirList = self.ftp.nlst()
829 self.dirList = self.ftp.nlst()
827
830
828 except ftplib.error_perm as resp:
831 except ftplib.error_perm as resp:
829 if str(resp) == "550 No files found":
832 if str(resp) == "550 No files found":
830 print("no files in this directory")
833 print("no files in this directory")
831 self.status = 1
834 self.status = 1
832 return
835 return
833
836
834 except ftplib.all_errors:
837 except ftplib.all_errors:
835 print('Error Displaying Dir-Files')
838 print('Error Displaying Dir-Files')
836 self.status = 1
839 self.status = 1
837 return
840 return
838
841
839 self.fileList = []
842 self.fileList = []
840 self.folderList = []
843 self.folderList = []
841 #only for test
844 #only for test
842 for f in self.dirList:
845 for f in self.dirList:
843 name, ext = os.path.splitext(f)
846 name, ext = os.path.splitext(f)
844 if ext != '':
847 if ext != '':
845 self.fileList.append(f)
848 self.fileList.append(f)
846 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
849 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
847
850
848 def parmsByDefault(self):
851 def parmsByDefault(self):
849 server = 'jro-app.igp.gob.pe'
852 server = 'jro-app.igp.gob.pe'
850 username = 'wmaster'
853 username = 'wmaster'
851 password = 'mst2010vhf'
854 password = 'mst2010vhf'
852 remotefolder = '/home/wmaster/graficos'
855 remotefolder = '/home/wmaster/graficos'
853
856
854 return server, username, password, remotefolder
857 return server, username, password, remotefolder
855
858
856
859
857 def mkd(self,dirname):
860 def mkd(self,dirname):
858 """
861 """
859 mkd is used to make directory in remote server
862 mkd is used to make directory in remote server
860
863
861 Input:
864 Input:
862 dirname - directory name
865 dirname - directory name
863
866
864 Return:
867 Return:
865 1 in error case else 0
868 1 in error case else 0
866 """
869 """
867 try:
870 try:
868 self.ftp.mkd(dirname)
871 self.ftp.mkd(dirname)
869 except:
872 except:
870 print('Error creating remote folder:%s'%dirname)
873 print('Error creating remote folder:%s'%dirname)
871 return 1
874 return False
872
875
873 return 0
876 return True
874
877
875
878
876 def delete(self,filename):
879 def delete(self,filename):
877 """
880 """
878 delete is used to delete file in current working directory of remote server
881 delete is used to delete file in current working directory of remote server
879
882
880 Input:
883 Input:
881 filename - filename to delete in remote folder
884 filename - filename to delete in remote folder
882
885
883 Return:
886 Return:
884 1 in error case else 0
887 1 in error case else 0
885 """
888 """
886
889
887 try:
890 try:
888 self.ftp.delete(filename)
891 self.ftp.delete(filename)
889 except:
892 except:
890 print('Error deleting remote file:%s'%filename)
893 print('Error deleting remote file:%s'%filename)
891 return 1
894 return False
892
895
893 return 0
896 return True
894
897
895 def download(self,filename,localfolder):
898 def download(self,filename,localfolder):
896 """
899 """
897 download is used to downloading file from remote folder into local folder
900 download is used to downloading file from remote folder into local folder
898
901
899 Inputs:
902 Inputs:
900 filename - filename to donwload
903 filename - filename to donwload
901
904
902 localfolder - directory local to store filename
905 localfolder - directory local to store filename
903
906
904 Returns:
907 Returns:
905 self.status - 1 in error case else 0
908 self.status - 1 in error case else 0
906 """
909 """
907
910
908 self.status = 0
911 self.status = 0
909
912
910
913
911 if not(filename in self.fileList):
914 if not(filename in self.fileList):
912 print('filename:%s not exists'%filename)
915 print('filename:%s not exists'%filename)
913 self.status = 1
916 self.status = 1
914 return self.status
917 return self.status
915
918
916 newfilename = os.path.join(localfolder,filename)
919 newfilename = os.path.join(localfolder,filename)
917
920
918 self.file = open(newfilename, 'wb')
921 self.file = open(newfilename, 'wb')
919
922
920 try:
923 try:
921 print('Download: ' + filename)
924 print('Download: ' + filename)
922 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
925 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
923 print('Download Complete')
926 print('Download Complete')
924 except ftplib.all_errors:
927 except ftplib.all_errors:
925 print('Error Downloading ' + filename)
928 print('Error Downloading ' + filename)
926 self.status = 1
929 self.status = 1
927 return self.status
930 return self.status
928
931
929 self.file.close()
932 self.file.close()
930
933
931 return self.status
934 return self.status
932
935
933
936
934 def __handleDownload(self,block):
937 def __handleDownload(self,block):
935 """
938 """
936 __handleDownload is used to handle writing file
939 __handleDownload is used to handle writing file
937 """
940 """
938 self.file.write(block)
941 self.file.write(block)
939
942
940
943
941 def upload(self,filename,remotefolder=None):
944 def upload(self,filename,remotefolder=None, mkdir=False):
942 """
945 """
943 upload is used to uploading local file to remote directory
946 upload is used to uploading local file to remote directory, and change the permission of the remote file
944
947
945 Inputs:
948 Inputs:
946 filename - full path name of local file to store in remote directory
949 filename - full path name of local file to store in remote directory
947
950
948 remotefolder - remote directory
951 remotefolder - remote directory
949
952
953 mkdir - if the remote folder doesn't exist, it will created
954
950 Returns:
955 Returns:
951 self.status - 1 in error case else 0
956 self.status - 1 in error case else 0
952 """
957 """
953
958
954 if remotefolder == None:
959 if remotefolder == None:
955 remotefolder = self.remotefolder
960 remotefolder = self.remotefolder
956
961
962 if mkdir:
963 if self.if_dir_exist(remotefolder):
964 pass
965 else:
966 self.mkdir_r(remotefolder)
967
957 self.status = 0
968 self.status = 0
958
969
959 try:
970 try:
960 self.ftp.cwd(remotefolder)
971 self.ftp.cwd(remotefolder)
961
972
962 self.file = open(filename, 'rb')
973 self.file = open(filename, 'rb')
963
974
964 (head, tail) = os.path.split(filename)
975 (head, tail) = os.path.split(filename)
965
976
966 command = "STOR " + tail
977 command = "STOR " + tail
967
978
968 print('Uploading: ' + tail)
979 print('Uploading: ' + tail)
969 self.ftp.storbinary(command, self.file)
980 self.ftp.storbinary(command, self.file)
981 print(self.cmd('SITE CHMOD 755 {}'.format(tail)))
970 print('Upload Completed')
982 print('Upload Completed')
971
983
972 except ftplib.all_errors:
984 except ftplib.all_errors:
973 print('Error Uploading ' + tail)
985 print('Error Uploading ' + tail)
974 self.status = 1
986 self.status = 1
975 return self.status
987 return self.status
976
988
977 self.file.close()
989 self.file.close()
978
990
979 #back to initial directory in __init__()
991 #back to initial directory in __init__()
980 self.ftp.cwd(self.remotefolder)
992 self.ftp.cwd(self.remotefolder)
981
993
982 return self.status
994 return self.status
983
995
984
996
985 def dir(self,remotefolder):
997 def ch_dir(self,remotefolder):
986 """
998 """
987 dir is used to change working directory of remote server and get folder and file list
999 ch_dir is used to change working directory of remote server and get folder and file list
988
1000
989 Input:
1001 Input:
990 remotefolder - current working directory
1002 remotefolder - current working directory
991
1003
992 Affects:
1004 Affects:
993 self.fileList - file list of working directory
1005 self.fileList - file list of working directory
994
1006
995 Return:
1007 Return:
996 infoList - list with filenames and size of file in bytes
1008 infoList - list with filenames and size of file in bytes
997
1009
998 self.folderList - folder list
1010 self.folderList - folder list
999 """
1011 """
1000
1012
1001 self.remotefolder = remotefolder
1013 self.remotefolder = remotefolder
1002 print('Change to ' + self.remotefolder)
1014 print('Change to ' + self.remotefolder)
1003 try:
1015 try:
1004 self.ftp.cwd(remotefolder)
1016 self.ftp.cwd(remotefolder)
1005 except ftplib.all_errors:
1017 except ftplib.all_errors:
1006 print('Error Change to ' + self.remotefolder)
1018 print('Error Change to ' + self.remotefolder)
1007 infoList = None
1019 infoList = None
1008 self.folderList = None
1020 self.folderList = None
1009 return infoList,self.folderList
1021 return infoList,self.folderList
1010
1022
1011 self.dirList = []
1023 self.dirList = []
1012
1024
1013 try:
1025 try:
1014 self.dirList = self.ftp.nlst()
1026 self.dirList = self.ftp.nlst()
1015
1027
1016 except ftplib.error_perm as resp:
1028 except ftplib.error_perm as resp:
1017 if str(resp) == "550 No files found":
1029 if str(resp) == "550 No files found":
1018 print("no files in this directory")
1030 print("no files in this directory")
1019 infoList = None
1031 infoList = None
1020 self.folderList = None
1032 self.folderList = None
1021 return infoList,self.folderList
1033 return infoList,self.folderList
1022 except ftplib.all_errors:
1034 except ftplib.all_errors:
1023 print('Error Displaying Dir-Files')
1035 print('Error Displaying Dir-Files')
1024 infoList = None
1036 infoList = None
1025 self.folderList = None
1037 self.folderList = None
1026 return infoList,self.folderList
1038 return infoList,self.folderList
1027
1039
1028 infoList = []
1040 infoList = []
1029 self.fileList = []
1041 self.fileList = []
1030 self.folderList = []
1042 self.folderList = []
1031 for f in self.dirList:
1043 for f in self.dirList:
1032 name,ext = os.path.splitext(f)
1044 name,ext = os.path.splitext(f)
1033 if ext != '':
1045 if ext != '':
1034 self.fileList.append(f)
1046 self.fileList.append(f)
1035 value = (f,self.ftp.size(f))
1047 value = (f,self.ftp.size(f))
1036 infoList.append(value)
1048 infoList.append(value)
1037
1049
1038 if ext == '':
1050 if ext == '':
1039 self.folderList.append(f)
1051 self.folderList.append(f)
1040
1052
1041 return infoList,self.folderList
1053 return infoList,self.folderList
1042
1043
1044 def close(self):
1054 def close(self):
1045 """
1055 """
1046 close is used to close and end FTP connection
1056 close is used to close and end FTP connection
1047
1057
1048 Inputs: None
1058 Inputs: None
1049
1059
1050 Return: void
1060 Return: void
1051
1061
1052 """
1062 """
1053 self.ftp.close()
1063 self.ftp.close()
1064
1065 def get_sub_dirs(self, path):
1066 """
1067 used internal
1068
1069 Inputs:
1070 path - path to split in sub folders
1071
1072 Returns:
1073 sub_dirs - list of sub folders
1074 """
1075 sub_dirs = path.split("/")
1076 if sub_dirs[0]=="/":
1077 sub_dirs.pop(0)
1078 if sub_dirs[-1]=="/":
1079 sub_dirs.pop(-1)
1080 return sub_dirs
1081
1082 def if_dir_exist(self,path):
1083 """
1084 check if a the path folder exists in the ftp server
1085
1086 Inputs:
1087 path - path to check
1088
1089 Returns:
1090 status - True if exists and False if it doesn't
1091 """
1092 sub_dirs = self.get_sub_dirs(path)
1093 main = self.ftp.pwd()
1094 #print(main)
1095 for subdir in sub_dirs:
1096 folders = self.ftp.nlst(main)
1097 #print(folders)
1098 if (os.path.join(main,subdir) in folders):
1099 main = os.path.join(main,subdir)
1100 #print(main)
1101 continue
1102 else:
1103 return False
1104 return True
1105
1106 def cmd(self,command):
1107 """
1108 excecute a command in the FTP server
1109 """
1110 return self.ftp.sendcmd(command)
1111
1112 def mkdir_r(self,path):
1113 """
1114 create a remote folder and create sub folders if it is necessary
1115
1116 Inputs:
1117 path - path to create
1118
1119 Returns:
1120 status - True if succesfull else False
1121 """
1122 sub_dirs = self.get_sub_dirs(path)
1123 main = self.ftp.pwd()
1124 st = False
1125 #print(main)
1126 for subdir in sub_dirs:
1127 folders = self.ftp.nlst(main)
1128 #print(folders)
1129 folder = (os.path.join(main,subdir))
1130
1131 if (folder in folders):
1132 main = folder
1133 #print("new_main",main)
1134 continue
1135 else:
1136 print("creating...",folder)
1137 st = self.mkd(folder)
1138 print(self.cmd('SITE CHMOD 755 {}'.format(folder)))
1139 main = folder
1140
1141 return st
1142
1054 @MPDecorator
1143 @MPDecorator
1055 class SendByFTP(Operation):
1144 class SendByFTP(Operation):
1056
1145
1057 def __init__(self, **kwargs):
1146 def __init__(self, **kwargs):
1058 Operation.__init__(self, **kwargs)
1147 Operation.__init__(self, **kwargs)
1059 self.status = 1
1148 self.status = 1
1060 self.counter = 0
1149 self.counter = 0
1061
1150
1062 def error_print(self, ValueError):
1151 def error_print(self, ValueError):
1063
1152
1064 print(ValueError, 'Error FTP')
1153 print(ValueError, 'Error FTP')
1065 print("don't worry the program is running...")
1154 print("don't worry the program is running...")
1066
1155
1067 def worker_ftp(self, server, username, password, remotefolder, filenameList):
1156 def worker_ftp(self, server, username, password, remotefolder, filenameList):
1068
1157
1069 self.ftpClientObj = FTP(server, username, password, remotefolder)
1158 self.ftpClientObj = FTP(server, username, password, remotefolder)
1070 for filename in filenameList:
1159 for filename in filenameList:
1071 self.ftpClientObj.upload(filename)
1160 self.ftpClientObj.upload(filename)
1072 self.ftpClientObj.close()
1161 self.ftpClientObj.close()
1073
1162
1074 def ftp_thread(self, server, username, password, remotefolder):
1163 def ftp_thread(self, server, username, password, remotefolder):
1075 if not(self.status):
1164 if not(self.status):
1076 return
1165 return
1077
1166
1078 import multiprocessing
1167 import multiprocessing
1079
1168
1080 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
1169 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
1081 p.start()
1170 p.start()
1082
1171
1083 p.join(3)
1172 p.join(3)
1084
1173
1085 if p.is_alive():
1174 if p.is_alive():
1086 p.terminate()
1175 p.terminate()
1087 p.join()
1176 p.join()
1088 print('killing ftp process...')
1177 print('killing ftp process...')
1089 self.status = 0
1178 self.status = 0
1090 return
1179 return
1091
1180
1092 self.status = 1
1181 self.status = 1
1093 return
1182 return
1094
1183
1095 def filterByExt(self, ext, localfolder):
1184 def filterByExt(self, ext, localfolder):
1096 fnameList = glob.glob1(localfolder,ext)
1185 fnameList = glob.glob1(localfolder,ext)
1097 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
1186 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
1098
1187
1099 if len(self.filenameList) == 0:
1188 if len(self.filenameList) == 0:
1100 self.status = 0
1189 self.status = 0
1101
1190
1102 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
1191 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
1103
1192
1104 self.counter += 1
1193 self.counter += 1
1105 if self.counter >= period:
1194 if self.counter >= period:
1106 self.filterByExt(ext, localfolder)
1195 self.filterByExt(ext, localfolder)
1107
1196
1108 self.ftp_thread(server, username, password, remotefolder)
1197 self.ftp_thread(server, username, password, remotefolder)
1109
1198
1110 self.counter = 0
1199 self.counter = 0
1111
1200
1112 self.status = 1
1201 self.status = 1
General Comments 0
You need to be logged in to leave comments. Login now