@@ -1,14 +1,19 | |||||
|
1 | #!/usr/bin/python | |||
|
2 | # -*- coding: UTF-8 -*- | |||
|
3 | ||||
1 | import os |
|
4 | import os | |
2 | import sys |
|
5 | import sys | |
3 | import json |
|
6 | import json | |
4 | import simplejson |
|
7 | import simplejson | |
5 | from datetime import datetime |
|
8 | from datetime import datetime | |
|
9 | import time | |||
6 | import zmq |
|
10 | import zmq | |
7 | import redis |
|
11 | import redis | |
8 | import asgi_redis |
|
12 | import asgi_redis | |
9 | import mongoengine |
|
13 | import mongoengine | |
|
14 | from threading import Thread | |||
10 |
|
15 | |||
11 |
sys.path.append('/ |
|
16 | sys.path.append('../') | |
12 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") |
|
17 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") | |
13 |
|
18 | |||
14 | from plotter.models import Experiment, ExpMeta, ExpData |
|
19 | from plotter.models import Experiment, ExpMeta, ExpData | |
@@ -19,29 +24,31 mongoengine.connect('dbplots', host=host_mongo, port=27017) | |||||
19 |
|
24 | |||
20 | host_redis = os.environ.get('HOST_REDIS', 'localhost') |
|
25 | host_redis = os.environ.get('HOST_REDIS', 'localhost') | |
21 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) |
|
26 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) | |
22 |
|
27 | #========================== Conf. ZeroMQ =================== | ||
23 | context = zmq.Context() |
|
28 | context = zmq.Context() | |
24 |
receiver = context.socket(zmq. |
|
29 | receiver = context.socket(zmq.REP) | |
25 |
|
||||
26 | receiver.bind("tcp://0.0.0.0:4444") |
|
30 | receiver.bind("tcp://0.0.0.0:4444") | |
27 | receiver.setsockopt(zmq.SUBSCRIBE, '') |
|
31 | #============================== end ======================== | |
|
32 | ||||
|
33 | TIMES = {} | |||
28 |
|
34 | |||
29 | def loaddata(): |
|
35 | def loaddata(): | |
30 | print('Loading Experiments...') |
|
36 | print('Loading Experiments...') | |
31 |
for tup in json.load(open(' |
|
37 | for tup in json.load(open('./experiments.json')): | |
32 | print(tup['name']) |
|
38 | print(tup['name']) | |
33 | exp = Experiment.objects(code=tup['code']).modify( |
|
39 | exp = Experiment.objects(code=tup['code']).modify( | |
34 | upsert=True, |
|
40 | upsert=True, # To add a new row | |
35 | new=True, |
|
41 | new=True, | |
36 | set__code=tup['code'], |
|
42 | set__code=tup['code'], | |
37 | set__name=tup['name'], |
|
43 | set__name=tup['name'], | |
38 | ) |
|
44 | ) | |
39 | exp.save() |
|
45 | exp.save() | |
40 |
|
46 | |||
|
47 | #============== funcion para modificar datos en la tabla ============== | |||
41 | def update(buffer): |
|
48 | def update(buffer): | |
42 | dt = datetime.utcfromtimestamp(buffer['time']) |
|
49 | dt = datetime.utcfromtimestamp(buffer['time']) | |
43 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( |
|
50 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( | |
44 | upsert=True, |
|
51 | upsert=True, # To add a new row | |
45 | new=True, |
|
52 | new=True, | |
46 | set__code=buffer['exp_code'], |
|
53 | set__code=buffer['exp_code'], | |
47 | set__date=dt.date(), |
|
54 | set__date=dt.date(), | |
@@ -54,7 +61,7 def update(buffer): | |||||
54 | exp.save() |
|
61 | exp.save() | |
55 |
|
62 | |||
56 | data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( |
|
63 | data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( | |
57 | upsert=True, |
|
64 | upsert=True, # To add a new row | |
58 | new=True, |
|
65 | new=True, | |
59 | set__expmeta = exp, |
|
66 | set__expmeta = exp, | |
60 | set__time = buffer['time'], |
|
67 | set__time = buffer['time'], | |
@@ -64,12 +71,58 def update(buffer): | |||||
64 | data.save() |
|
71 | data.save() | |
65 |
|
72 | |||
66 | return exp.id |
|
73 | return exp.id | |
|
74 | # Function that is checking the state of my clients every 20s | |||
|
75 | def check_times(): | |||
|
76 | ||||
|
77 | while True: | |||
|
78 | dt = datetime.now() | |||
|
79 | exps = ExpMeta.objects(date=dt.date()) | |||
|
80 | ||||
|
81 | for exp in exps: | |||
|
82 | data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor | |||
|
83 | ||||
|
84 | t = time.mktime(dt.timetuple()) | |||
|
85 | ||||
|
86 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC | |||
|
87 | t -= 5*60*60 | |||
|
88 | ||||
|
89 | ||||
|
90 | ||||
|
91 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC | |||
|
92 | data_time = data['time'] + 5*60*60 | |||
|
93 | ||||
|
94 | if (t-data['time']) > 6*exp['interval']: | |||
|
95 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})}) | |||
|
96 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) | |||
|
97 | elif (t-data['time']) > 3*exp['interval']: | |||
|
98 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})}) | |||
|
99 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) | |||
|
100 | else: | |||
|
101 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) | |||
|
102 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) | |||
|
103 | time.sleep(20) | |||
67 |
|
104 | |||
68 | def main(): |
|
105 | def main(): | |
69 | print('Starting ZMQ server...') |
|
106 | print('Starting ZMQ server...') | |
70 | while True: |
|
107 | ||
|
108 | t = Thread(target=check_times) | |||
|
109 | t.start() | |||
|
110 | ||||
|
111 | while True: | |||
|
112 | ||||
71 | buffer = receiver.recv_json() |
|
113 | buffer = receiver.recv_json() | |
|
114 | ||||
|
115 | ||||
|
116 | ||||
|
117 | if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC | |||
|
118 | buffer['time'] -= 5*60*60 | |||
|
119 | ||||
|
120 | if not isinstance(buffer, dict): | |||
|
121 | print('*******************') | |||
|
122 | print(buffer) | |||
|
123 | continue | |||
72 | code = update(buffer) |
|
124 | code = update(buffer) | |
|
125 | print("==================================") | |||
73 | for plot in buffer['data']: |
|
126 | for plot in buffer['data']: | |
74 | dum = buffer.copy() |
|
127 | dum = buffer.copy() | |
75 | dum['time'] = [buffer['time']] |
|
128 | dum['time'] = [buffer['time']] | |
@@ -82,9 +135,16 def main(): | |||||
82 | else: |
|
135 | else: | |
83 | dum[plot] = buffer['data'][plot] |
|
136 | dum[plot] = buffer['data'][plot] | |
84 | dum.pop('data') |
|
137 | dum.pop('data') | |
85 | dum.pop('exp_code') |
|
138 | exp_code = dum.pop('exp_code') | |
|
139 | #code = dum.pop('exp_code') | |||
|
140 | ||||
86 | channel.send_group(u'{}_{}'.format(code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) |
|
141 | channel.send_group(u'{}_{}'.format(code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) | |
87 | print('Sending...{} - {} bytes'.format(plot, len(str(dum)))) |
|
142 | print('Sending...{}:{} - {} bytes'.format(exp_code, plot, len(str(dum)))) | |
|
143 | ||||
|
144 | receiver.send_string('recibido') | |||
|
145 | ||||
|
146 | receiver.close() | |||
|
147 | context.term() | |||
88 |
|
148 | |||
89 | if __name__=='__main__': |
|
149 | if __name__=='__main__': | |
90 | loaddata() |
|
150 | loaddata() |
General Comments 0
You need to be logged in to leave comments.
Login now