##// END OF EJS Templates
Update schain parameters and add experiment_date (generator)
Update schain parameters and add experiment_date (generator)

File last commit:

r436:ecd30d9c3901
r437:002d9909e6f6
Show More
realtime.py
105 lines | 2.4 KiB | text/x-python | PythonLexer
import os
import sys
import time
import json
import base64
import logging
from datetime import datetime
import zmq
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
EXP = sys.argv[1]
main_path = '/DATA_RM/DATA/{}/plots'.format(EXP)
print(main_path)
CODES = {
'V': 'Velocity',
'P': 'Power',
}
DATA = {
'time': 0,
'metadata': {
'localtime': True,
'type': 'image',
'interval': 150,
'tag' : 'Jicamarca'
},
'data': '',
'plot': '',
}
class FileHandler(FileSystemEventHandler):
last = {
'V': 0,
'P': 0,
}
def on_modified(self, event):
data = DATA.copy()
path = event.src_path
filename = path.split('/')[-1]
try:
if not filename.count('_')==2:
return
dt = datetime.strptime(filename.split('.png')[0][2:], '%Y%m%d_%H%M%S')
tm = (dt-datetime(1970,1,1)).total_seconds() + 5*60*60
except:
return
code = filename[0]
if code not in CODES:
return
data['code'] = 400
stats = os.stat(path)
time.sleep(1)
if (tm-self.last[code]) >= 10:
data['time'] = tm
data['plot'] = CODES[code]
print('sending {} {}'.format( data['plot'], path))
data['data'] = base64.b64encode(bytearray(open(path, 'rb').read())).decode()
self.socket.send_string(json.dumps(data))
poller = zmq.Poller()
poller.register(self.socket, zmq.POLLIN)
if poller.poll(10*10000): # 10s timeout in milliseconds
print(self.socket.recv())
else:
print('Server Down...')
self.last[code] = tm
else:
print('Skipping {}'.format(path))
if __name__ == "__main__":
event = FileHandler()
event.context = zmq.Context()
print ("Connecting to server...")
event.socket = event.context.socket(zmq.REQ)
#event.socket.connect ("tcp://10.10.20.128:4444")
event.socket.connect ("tcp://10.10.110.243:4444")
observer = Observer()
observer.schedule(event, main_path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()