##// END OF EJS Templates
New server with req-rep and thread for updating status
Developer -
r10:07b9b665c767
parent child
Show More
@@ -1,14 +1,19
1 #!/usr/bin/python
2 # -*- coding: UTF-8 -*-
3
1 import os
4 import os
2 import sys
5 import sys
3 import json
6 import json
4 import simplejson
7 import simplejson
5 from datetime import datetime
8 from datetime import datetime
9 import time
6 import zmq
10 import zmq
7 import redis
11 import redis
8 import asgi_redis
12 import asgi_redis
9 import mongoengine
13 import mongoengine
14 from threading import Thread
10
15
11 sys.path.append('/app')
16 sys.path.append('../')
12 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
17 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
13
18
14 from plotter.models import Experiment, ExpMeta, ExpData
19 from plotter.models import Experiment, ExpMeta, ExpData
@@ -19,29 +24,31 mongoengine.connect('dbplots', host=host_mongo, port=27017)
19
24
20 host_redis = os.environ.get('HOST_REDIS', 'localhost')
25 host_redis = os.environ.get('HOST_REDIS', 'localhost')
21 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
26 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
22
27 #========================== Conf. ZeroMQ ===================
23 context = zmq.Context()
28 context = zmq.Context()
24 receiver = context.socket(zmq.SUB)
29 receiver = context.socket(zmq.REP)
25
26 receiver.bind("tcp://0.0.0.0:4444")
30 receiver.bind("tcp://0.0.0.0:4444")
27 receiver.setsockopt(zmq.SUBSCRIBE, '')
31 #============================== end ========================
32
33 TIMES = {}
28
34
29 def loaddata():
35 def loaddata():
30 print('Loading Experiments...')
36 print('Loading Experiments...')
31 for tup in json.load(open('scripts/experiments.json')):
37 for tup in json.load(open('./experiments.json')):
32 print(tup['name'])
38 print(tup['name'])
33 exp = Experiment.objects(code=tup['code']).modify(
39 exp = Experiment.objects(code=tup['code']).modify(
34 upsert=True,
40 upsert=True, # To add a new row
35 new=True,
41 new=True,
36 set__code=tup['code'],
42 set__code=tup['code'],
37 set__name=tup['name'],
43 set__name=tup['name'],
38 )
44 )
39 exp.save()
45 exp.save()
40
46
47 #============== funcion para modificar datos en la tabla ==============
41 def update(buffer):
48 def update(buffer):
42 dt = datetime.utcfromtimestamp(buffer['time'])
49 dt = datetime.utcfromtimestamp(buffer['time'])
43 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
50 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
44 upsert=True,
51 upsert=True, # To add a new row
45 new=True,
52 new=True,
46 set__code=buffer['exp_code'],
53 set__code=buffer['exp_code'],
47 set__date=dt.date(),
54 set__date=dt.date(),
@@ -54,7 +61,7 def update(buffer):
54 exp.save()
61 exp.save()
55
62
56 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
63 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
57 upsert=True,
64 upsert=True, # To add a new row
58 new=True,
65 new=True,
59 set__expmeta = exp,
66 set__expmeta = exp,
60 set__time = buffer['time'],
67 set__time = buffer['time'],
@@ -64,12 +71,58 def update(buffer):
64 data.save()
71 data.save()
65
72
66 return exp.id
73 return exp.id
74 # Function that is checking the state of my clients every 20s
75 def check_times():
76
77 while True:
78 dt = datetime.now()
79 exps = ExpMeta.objects(date=dt.date())
80
81 for exp in exps:
82 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
83
84 t = time.mktime(dt.timetuple())
85
86 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
87 t -= 5*60*60
88
89
90
91 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
92 data_time = data['time'] + 5*60*60
93
94 if (t-data['time']) > 6*exp['interval']:
95 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})})
96 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
97 elif (t-data['time']) > 3*exp['interval']:
98 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})})
99 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
100 else:
101 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
102 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
103 time.sleep(20)
67
104
68 def main():
105 def main():
69 print('Starting ZMQ server...')
106 print('Starting ZMQ server...')
70 while True:
107
108 t = Thread(target=check_times)
109 t.start()
110
111 while True:
112
71 buffer = receiver.recv_json()
113 buffer = receiver.recv_json()
114
115
116
117 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
118 buffer['time'] -= 5*60*60
119
120 if not isinstance(buffer, dict):
121 print('*******************')
122 print(buffer)
123 continue
72 code = update(buffer)
124 code = update(buffer)
125 print("==================================")
73 for plot in buffer['data']:
126 for plot in buffer['data']:
74 dum = buffer.copy()
127 dum = buffer.copy()
75 dum['time'] = [buffer['time']]
128 dum['time'] = [buffer['time']]
@@ -82,9 +135,16 def main():
82 else:
135 else:
83 dum[plot] = buffer['data'][plot]
136 dum[plot] = buffer['data'][plot]
84 dum.pop('data')
137 dum.pop('data')
85 dum.pop('exp_code')
138 exp_code = dum.pop('exp_code')
139 #code = dum.pop('exp_code')
140
86 channel.send_group(u'{}_{}'.format(code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
141 channel.send_group(u'{}_{}'.format(code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
87 print('Sending...{} - {} bytes'.format(plot, len(str(dum))))
142 print('Sending...{}:{} - {} bytes'.format(exp_code, plot, len(str(dum))))
143
144 receiver.send_string('recibido')
145
146 receiver.close()
147 context.term()
88
148
89 if __name__=='__main__':
149 if __name__=='__main__':
90 loaddata()
150 loaddata()
General Comments 0
You need to be logged in to leave comments. Login now