@@ -0,0 +1,101 | |||||
|
1 | #!/usr/bin/python | |||
|
2 | # -*- coding: UTF-8 -*- | |||
|
3 | ||||
|
4 | import time | |||
|
5 | from datetime import datetime, timedelta | |||
|
6 | import zmq | |||
|
7 | import json | |||
|
8 | import numpy as np | |||
|
9 | from threading import Thread | |||
|
10 | import argparse | |||
|
11 | ||||
|
12 | REQUEST_TIMEOUT = 15000 | |||
|
13 | REQUEST_RETRIES = 10 | |||
|
14 | SERVER_ENDPOINT = 'tcp://localhost:4444' | |||
|
15 | ||||
|
16 | def send(context, socket, poll, dato): | |||
|
17 | ||||
|
18 | retries_left = REQUEST_RETRIES | |||
|
19 | ||||
|
20 | while True: | |||
|
21 | socket.send_json(dato) | |||
|
22 | socks = dict(poll.poll(REQUEST_TIMEOUT)) | |||
|
23 | if socks.get(socket) == zmq.POLLIN: | |||
|
24 | reply = socket.recv_string() | |||
|
25 | if not reply: | |||
|
26 | break | |||
|
27 | if reply == 'ok': | |||
|
28 | print('I: Server replied OK (%s)' % reply) | |||
|
29 | break | |||
|
30 | else: | |||
|
31 | print('E: Malformed reply from server: %s' % reply) | |||
|
32 | break | |||
|
33 | else: | |||
|
34 | print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left)) | |||
|
35 | time.sleep(2) | |||
|
36 | socket.setsockopt(zmq.LINGER, 0) | |||
|
37 | socket.close() | |||
|
38 | poll.unregister(socket) | |||
|
39 | retries_left -= 1 | |||
|
40 | if retries_left == 0: | |||
|
41 | print('E: Server seems to be offline, abandoning') | |||
|
42 | break | |||
|
43 | # Create new connection | |||
|
44 | socket = context.socket(zmq.REQ) | |||
|
45 | socket.connect(SERVER_ENDPOINT) | |||
|
46 | poll.register(socket, zmq.POLLIN) | |||
|
47 | ||||
|
48 | def main(realtime, code, date=None, interval=30): | |||
|
49 | ||||
|
50 | context = zmq.Context() | |||
|
51 | socket = context.socket(zmq.REQ) | |||
|
52 | socket.connect (SERVER_ENDPOINT) | |||
|
53 | poll = zmq.Poller() | |||
|
54 | poll.register(socket, zmq.POLLIN) | |||
|
55 | ||||
|
56 | if realtime: | |||
|
57 | dt = datetime.now() | |||
|
58 | else: | |||
|
59 | dt = date | |||
|
60 | ||||
|
61 | while True: | |||
|
62 | ||||
|
63 | print('Sending {} - {}'.format(code, dt)) | |||
|
64 | ||||
|
65 | dato = { | |||
|
66 | 'time': time.mktime(dt.timetuple()), | |||
|
67 | 'yrange': np.arange(100).tolist(), | |||
|
68 | 'xrange': np.arange(-30, 30).tolist(), | |||
|
69 | 'localtime': True, | |||
|
70 | 'interval': interval, | |||
|
71 | 'exp_code': code, | |||
|
72 | 'data': { | |||
|
73 | 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(), | |||
|
74 | 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(), | |||
|
75 | 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(), | |||
|
76 | } | |||
|
77 | } | |||
|
78 | ||||
|
79 | dt = dt + timedelta(seconds=interval) | |||
|
80 | ||||
|
81 | t = Thread(target=send, args=(context, socket, poll, dato)) | |||
|
82 | t.start() | |||
|
83 | if realtime: | |||
|
84 | time.sleep(interval) | |||
|
85 | else: | |||
|
86 | time.sleep(5) | |||
|
87 | ||||
|
88 | if __name__=='__main__': | |||
|
89 | parser = argparse.ArgumentParser(description='This is a Client for realtime app') | |||
|
90 | parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00') | |||
|
91 | parser.add_argument('-r', action='store_true', dest='realtime', default=None) | |||
|
92 | parser.add_argument('-c', action='store', dest='code', default='170') | |||
|
93 | parser.add_argument('-i', action='store', dest='interval', type=int, default=30) | |||
|
94 | ||||
|
95 | results = parser.parse_args() | |||
|
96 | if not results.realtime: | |||
|
97 | try: | |||
|
98 | results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S') | |||
|
99 | except: | |||
|
100 | raise(NameError('You must specify a date (--date) for non-realtime experiment')) | |||
|
101 | main(results.realtime, results.code, results.date, results.interval) |
@@ -1,151 +1,148 | |||||
1 | #!/usr/bin/python |
|
1 | #!/usr/bin/python | |
2 | # -*- coding: UTF-8 -*- |
|
2 | # -*- coding: UTF-8 -*- | |
3 |
|
3 | |||
4 | import os |
|
4 | import os | |
5 | import sys |
|
5 | import sys | |
6 | import json |
|
6 | import json | |
7 | import simplejson |
|
7 | import simplejson | |
8 | from datetime import datetime |
|
8 | from datetime import datetime | |
9 | import time |
|
9 | import time | |
10 | import zmq |
|
10 | import zmq | |
11 | import redis |
|
11 | import redis | |
12 | import asgi_redis |
|
12 | import asgi_redis | |
13 | import mongoengine |
|
13 | import mongoengine | |
14 | from threading import Thread |
|
14 | from threading import Thread | |
15 |
|
15 | |||
16 | sys.path.append('../') |
|
16 | sys.path.append(os.environ.get('APP_DIR', '../')) | |
17 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") |
|
17 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") | |
18 |
|
18 | |||
19 | from plotter.models import Experiment, ExpMeta, ExpData |
|
19 | from plotter.models import Experiment, ExpMeta, ExpData | |
20 |
|
20 | |||
21 | host_mongo = os.environ.get('HOST_MONGO', 'localhost') |
|
21 | host_mongo = os.environ.get('HOST_MONGO', 'localhost') | |
22 | mongoengine.connect('dbplots', host=host_mongo, port=27017) |
|
22 | mongoengine.connect('dbplots', host=host_mongo, port=27017) | |
23 |
|
23 | |||
24 |
|
||||
25 | host_redis = os.environ.get('HOST_REDIS', 'localhost') |
|
24 | host_redis = os.environ.get('HOST_REDIS', 'localhost') | |
26 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) |
|
25 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) | |
27 | #========================== Conf. ZeroMQ =================== |
|
26 | #========================== Conf. ZeroMQ =================== | |
28 | context = zmq.Context() |
|
27 | context = zmq.Context() | |
29 | receiver = context.socket(zmq.REP) |
|
28 | receiver = context.socket(zmq.REP) | |
30 | receiver.bind("tcp://0.0.0.0:4444") |
|
29 | receiver.bind("tcp://0.0.0.0:4444") | |
31 | #============================== end ======================== |
|
30 | #============================== end ======================== | |
32 |
|
31 | |||
33 | TIMES = {} |
|
|||
34 |
|
||||
35 | def loaddata(): |
|
32 | def loaddata(): | |
36 | print('Loading Experiments...') |
|
33 | print('Loading Experiments...') | |
37 | for tup in json.load(open('./experiments.json')): |
|
34 | if os.environ.get('APP_DIR', None) is not None: | |
|
35 | file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') | |||
|
36 | else: | |||
|
37 | file_exp = './experiments.json' | |||
|
38 | for tup in json.load(open(file_exp)): | |||
38 | print(tup['name']) |
|
39 | print(tup['name']) | |
39 | exp = Experiment.objects(code=tup['code']).modify( |
|
40 | exp = Experiment.objects(code=tup['code']).modify( | |
40 | upsert=True, # To add a new row |
|
41 | upsert=True, # To add a new row | |
41 | new=True, |
|
42 | new=True, | |
42 | set__code=tup['code'], |
|
43 | set__code=tup['code'], | |
43 | set__name=tup['name'], |
|
44 | set__name=tup['name'], | |
44 | ) |
|
45 | ) | |
45 | exp.save() |
|
46 | exp.save() | |
46 |
|
47 | |||
47 | #============== funcion para modificar datos en la tabla ============== |
|
48 | #============== funcion para modificar datos en la tabla ============== | |
48 | def update(buffer): |
|
49 | def update(buffer): | |
49 | dt = datetime.utcfromtimestamp(buffer['time']) |
|
50 | dt = datetime.utcfromtimestamp(buffer['time']) | |
50 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( |
|
51 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( | |
51 | upsert=True, # To add a new row |
|
52 | upsert=True, # To add a new row | |
52 | new=True, |
|
53 | new=True, | |
53 | set__code=buffer['exp_code'], |
|
54 | set__code=buffer['exp_code'], | |
54 | set__date=dt.date(), |
|
55 | set__date=dt.date(), | |
55 | set__yrange = buffer['yrange'], |
|
56 | set__yrange = buffer['yrange'], | |
56 | set__xrange = buffer['xrange'], |
|
57 | set__xrange = buffer['xrange'], | |
57 | set__interval = buffer['interval'], |
|
58 | set__interval = buffer['interval'], | |
58 | set__localtime = buffer['localtime'], |
|
59 | set__localtime = buffer['localtime'], | |
59 | set__plots = buffer['data'].keys() |
|
60 | set__plots = buffer['data'].keys() | |
60 | ) |
|
61 | ) | |
61 | exp.save() |
|
62 | exp.save() | |
62 |
|
63 | |||
63 | data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( |
|
64 | data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( | |
64 | upsert=True, # To add a new row |
|
65 | upsert=True, # To add a new row | |
65 | new=True, |
|
66 | new=True, | |
66 | set__expmeta = exp, |
|
67 | set__expmeta = exp, | |
67 | set__time = buffer['time'], |
|
68 | set__time = buffer['time'], | |
68 | set__data = buffer['data'] |
|
69 | set__data = buffer['data'] | |
69 | ) |
|
70 | ) | |
70 |
|
71 | |||
71 | data.save() |
|
72 | data.save() | |
72 |
|
73 | |||
73 | return exp.id |
|
74 | return exp.id | |
74 | # Function that is checking the state of my clients every 20s |
|
75 | # Function that is checking the state of my clients every 20s | |
75 | def check_times(): |
|
76 | def check_times(): | |
76 |
|
77 | |||
77 | while True: |
|
78 | while True: | |
78 | dt = datetime.now() |
|
79 | dt = datetime.utcnow() | |
79 | exps = ExpMeta.objects(date=dt.date()) |
|
80 | exps = ExpMeta.objects(date=dt.date()) | |
80 |
|
81 | |||
81 | for exp in exps: |
|
82 | for exp in exps: | |
82 | data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor |
|
83 | data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor | |
83 |
|
84 | |||
84 | t = time.mktime(dt.timetuple()) |
|
85 | t = time.mktime(dt.timetuple()) | |
85 |
|
86 | |||
86 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
87 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC | |
87 | t -= 5*60*60 |
|
88 | t -= 5*60*60 | |
88 |
|
89 | |||
89 |
|
||||
90 |
|
||||
91 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
90 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC | |
92 | data_time = data['time'] + 5*60*60 |
|
91 | data_time = data['time'] + 5*60*60 | |
93 |
|
92 | |||
94 | if (t-data['time']) > 6*exp['interval']: |
|
93 | if (t-data['time']) > 6*exp['interval']: | |
95 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})}) |
|
94 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})}) | |
96 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) |
|
95 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) | |
97 | elif (t-data['time']) > 3*exp['interval']: |
|
96 | elif (t-data['time']) > 3*exp['interval']: | |
98 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})}) |
|
97 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})}) | |
99 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) |
|
98 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) | |
100 | else: |
|
99 | else: | |
101 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) |
|
100 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) | |
102 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) |
|
101 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) | |
103 | time.sleep(20) |
|
102 | time.sleep(20) | |
104 |
|
103 | |||
105 | def main(): |
|
104 | def main(): | |
106 | print('Starting ZMQ server...') |
|
105 | print('Starting ZMQ server...') | |
107 |
|
106 | |||
108 | t = Thread(target=check_times) |
|
107 | t = Thread(target=check_times) | |
109 | t.start() |
|
108 | t.start() | |
110 |
|
109 | |||
111 |
while True: |
|
110 | while True: | |
112 |
|
111 | |||
113 | buffer = receiver.recv_json() |
|
112 | buffer = receiver.recv_json() | |
114 |
|
113 | |||
115 |
|
||||
116 |
|
||||
117 | if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC |
|
114 | if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC | |
118 | buffer['time'] -= 5*60*60 |
|
115 | buffer['time'] -= 5*60*60 | |
119 |
|
116 | |||
120 | if not isinstance(buffer, dict): |
|
117 | if not isinstance(buffer, dict): | |
121 | print('*******************') |
|
118 | print('*******************') | |
122 | print(buffer) |
|
119 | print(buffer) | |
123 | continue |
|
120 | continue | |
124 | code = update(buffer) |
|
121 | code = update(buffer) | |
125 | print("==================================") |
|
122 | print("==================================") | |
126 | for plot in buffer['data']: |
|
123 | for plot in buffer['data']: | |
127 | dum = buffer.copy() |
|
124 | dum = buffer.copy() | |
128 | dum['time'] = [buffer['time']] |
|
125 | dum['time'] = [buffer['time']] | |
129 | if plot=='noise': |
|
126 | if plot=='noise': | |
130 | dum[plot] = [[x] for x in buffer['data'][plot]] |
|
127 | dum[plot] = [[x] for x in buffer['data'][plot]] | |
131 | elif plot=='spc': |
|
128 | elif plot=='spc': | |
132 | dum['noise'] = [[x] for x in buffer['data']['noise']] |
|
129 | dum['noise'] = [[x] for x in buffer['data']['noise']] | |
133 | dum['spc'] = buffer['data']['spc'] |
|
130 | dum['spc'] = buffer['data']['spc'] | |
134 | dum['rti'] = buffer['data']['rti'] |
|
131 | dum['rti'] = buffer['data']['rti'] | |
135 | else: |
|
132 | else: | |
136 | dum[plot] = buffer['data'][plot] |
|
133 | dum[plot] = buffer['data'][plot] | |
137 | dum.pop('data') |
|
134 | dum.pop('data') | |
138 | exp_code = dum.pop('exp_code') |
|
135 | exp_code = dum.pop('exp_code') | |
139 | #code = dum.pop('exp_code') |
|
136 | #code = dum.pop('exp_code') | |
140 |
|
137 | |||
141 | channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) |
|
138 | channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) | |
142 |
print('Sending |
|
139 | print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) | |
143 |
|
140 | |||
144 |
receiver.send_string(' |
|
141 | receiver.send_string('ok') | |
145 |
|
142 | |||
146 | receiver.close() |
|
143 | receiver.close() | |
147 | context.term() |
|
144 | context.term() | |
148 |
|
145 | |||
149 | if __name__=='__main__': |
|
146 | if __name__=='__main__': | |
150 | loaddata() |
|
147 | loaddata() | |
151 | main() |
|
148 | main() |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
1 | NO CONTENT: file was removed |
|
NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now