diff --git a/schainpy/model/utils/jroutils_ftp.py b/schainpy/model/utils/jroutils_ftp.py index dc89300..ec1a221 100644 --- a/schainpy/model/utils/jroutils_ftp.py +++ b/schainpy/model/utils/jroutils_ftp.py @@ -11,20 +11,19 @@ try: except: print "You should install paramiko if you will use SSH protocol to upload files to a server" -import multiprocessing - import time -import threading - -try: - from gevent import sleep -except: - from time import sleep +import threading +Thread = threading.Thread + +# try: +# from gevent import sleep +# except: +from time import sleep from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation -class Remote(threading.Thread): +class Remote(Thread): """ Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are used to upload or download files remotely. @@ -49,8 +48,7 @@ class Remote(threading.Thread): def __init__(self, server, username, password, remotefolder, period=60): - threading.Thread.__init__(self) - self._stop = threading.Event() + Thread.__init__(self) self.setDaemon(True) @@ -66,9 +64,12 @@ class Remote(threading.Thread): if self.open(server, username, password, remotefolder): print "[Remote Server] %s server was opened successfully" %server + self.mutex = threading.Lock() + def stop(self): self.stopFlag = True + self.join(10) def open(self): """ @@ -158,15 +159,16 @@ class Remote(threading.Thread): if fileList == self.fileList: return 0 - init = time.time() - - while(self.bussy): - sleep(0.1) - if time.time() - init > 2*self.period: - return 0 + self.mutex.acquire() +# init = time.time() +# +# while(self.bussy): +# sleep(0.1) +# if time.time() - init > 2*self.period: +# return 0 self.fileList = fileList - + self.mutex.release() return 1 def run(self): @@ -182,19 +184,25 @@ class Remote(threading.Thread): while True: - sleep(self.period) + for i in range(self.period): + if self.stopFlag: + break + sleep(1) - self.bussy = True + if self.stopFlag: + break + +# self.bussy = True + self.mutex.acquire() for thisFile in self.fileList: sts = self.upload(thisFile, self.remotefolder) if not sts: break - self.bussy = False - - if not sts: break + self.mutex.release() +# self.bussy = False - if self.stopFlag: + if not sts: break print "[Remote Server] Thread stopped successfully" @@ -581,10 +589,15 @@ class SendToServer(ProcessingUnit): else: folderList = self.localfolder + #Remove duplicate items + folderList = list(set(folderList)) + fullfilenameList = [] for thisFolder in folderList: - + + print "[Remote Server]: Searching files on %s" %thisFolder + filenameList = glob.glob1(thisFolder, '*%s' %self.ext) if len(filenameList) < 1: @@ -592,6 +605,14 @@ class SendToServer(ProcessingUnit): for thisFile in filenameList: fullfilename = os.path.join(thisFolder, thisFile) + + if fullfilename in fullfilenameList: + continue + + #Only files modified in the last 30 minutes are considered + if os.path.getmtime(fullfilename) < time.time() - 30*60: + continue + fullfilenameList.append(fullfilename) return fullfilenameList @@ -929,6 +950,8 @@ class SendByFTP(Operation): if not(self.status): return + import multiprocessing + p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,)) p.start()