@@ -0,0 +1,101 | |||
|
1 | #!/usr/bin/python | |
|
2 | # -*- coding: UTF-8 -*- | |
|
3 | ||
|
4 | import time | |
|
5 | from datetime import datetime, timedelta | |
|
6 | import zmq | |
|
7 | import json | |
|
8 | import numpy as np | |
|
9 | from threading import Thread | |
|
10 | import argparse | |
|
11 | ||
|
12 | REQUEST_TIMEOUT = 15000 | |
|
13 | REQUEST_RETRIES = 10 | |
|
14 | SERVER_ENDPOINT = 'tcp://localhost:4444' | |
|
15 | ||
|
16 | def send(context, socket, poll, dato): | |
|
17 | ||
|
18 | retries_left = REQUEST_RETRIES | |
|
19 | ||
|
20 | while True: | |
|
21 | socket.send_json(dato) | |
|
22 | socks = dict(poll.poll(REQUEST_TIMEOUT)) | |
|
23 | if socks.get(socket) == zmq.POLLIN: | |
|
24 | reply = socket.recv_string() | |
|
25 | if not reply: | |
|
26 | break | |
|
27 | if reply == 'ok': | |
|
28 | print('I: Server replied OK (%s)' % reply) | |
|
29 | break | |
|
30 | else: | |
|
31 | print('E: Malformed reply from server: %s' % reply) | |
|
32 | break | |
|
33 | else: | |
|
34 | print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left)) | |
|
35 | time.sleep(2) | |
|
36 | socket.setsockopt(zmq.LINGER, 0) | |
|
37 | socket.close() | |
|
38 | poll.unregister(socket) | |
|
39 | retries_left -= 1 | |
|
40 | if retries_left == 0: | |
|
41 | print('E: Server seems to be offline, abandoning') | |
|
42 | break | |
|
43 | # Create new connection | |
|
44 | socket = context.socket(zmq.REQ) | |
|
45 | socket.connect(SERVER_ENDPOINT) | |
|
46 | poll.register(socket, zmq.POLLIN) | |
|
47 | ||
|
48 | def main(realtime, code, date=None, interval=30): | |
|
49 | ||
|
50 | context = zmq.Context() | |
|
51 | socket = context.socket(zmq.REQ) | |
|
52 | socket.connect (SERVER_ENDPOINT) | |
|
53 | poll = zmq.Poller() | |
|
54 | poll.register(socket, zmq.POLLIN) | |
|
55 | ||
|
56 | if realtime: | |
|
57 | dt = datetime.now() | |
|
58 | else: | |
|
59 | dt = date | |
|
60 | ||
|
61 | while True: | |
|
62 | ||
|
63 | print('Sending {} - {}'.format(code, dt)) | |
|
64 | ||
|
65 | dato = { | |
|
66 | 'time': time.mktime(dt.timetuple()), | |
|
67 | 'yrange': np.arange(100).tolist(), | |
|
68 | 'xrange': np.arange(-30, 30).tolist(), | |
|
69 | 'localtime': True, | |
|
70 | 'interval': interval, | |
|
71 | 'exp_code': code, | |
|
72 | 'data': { | |
|
73 | 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(), | |
|
74 | 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(), | |
|
75 | 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(), | |
|
76 | } | |
|
77 | } | |
|
78 | ||
|
79 | dt = dt + timedelta(seconds=interval) | |
|
80 | ||
|
81 | t = Thread(target=send, args=(context, socket, poll, dato)) | |
|
82 | t.start() | |
|
83 | if realtime: | |
|
84 | time.sleep(interval) | |
|
85 | else: | |
|
86 | time.sleep(5) | |
|
87 | ||
|
88 | if __name__=='__main__': | |
|
89 | parser = argparse.ArgumentParser(description='This is a Client for realtime app') | |
|
90 | parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00') | |
|
91 | parser.add_argument('-r', action='store_true', dest='realtime', default=None) | |
|
92 | parser.add_argument('-c', action='store', dest='code', default='170') | |
|
93 | parser.add_argument('-i', action='store', dest='interval', type=int, default=30) | |
|
94 | ||
|
95 | results = parser.parse_args() | |
|
96 | if not results.realtime: | |
|
97 | try: | |
|
98 | results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S') | |
|
99 | except: | |
|
100 | raise(NameError('You must specify a date (--date) for non-realtime experiment')) | |
|
101 | main(results.realtime, results.code, results.date, results.interval) |
@@ -1,151 +1,148 | |||
|
1 | 1 | #!/usr/bin/python |
|
2 | 2 | # -*- coding: UTF-8 -*- |
|
3 | 3 | |
|
4 | 4 | import os |
|
5 | 5 | import sys |
|
6 | 6 | import json |
|
7 | 7 | import simplejson |
|
8 | 8 | from datetime import datetime |
|
9 | 9 | import time |
|
10 | 10 | import zmq |
|
11 | 11 | import redis |
|
12 | 12 | import asgi_redis |
|
13 | 13 | import mongoengine |
|
14 | 14 | from threading import Thread |
|
15 | 15 | |
|
16 | sys.path.append('../') | |
|
16 | sys.path.append(os.environ.get('APP_DIR', '../')) | |
|
17 | 17 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") |
|
18 | 18 | |
|
19 | 19 | from plotter.models import Experiment, ExpMeta, ExpData |
|
20 | 20 | |
|
21 | 21 | host_mongo = os.environ.get('HOST_MONGO', 'localhost') |
|
22 | 22 | mongoengine.connect('dbplots', host=host_mongo, port=27017) |
|
23 | 23 | |
|
24 | ||
|
25 | 24 | host_redis = os.environ.get('HOST_REDIS', 'localhost') |
|
26 | 25 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) |
|
27 | 26 | #========================== Conf. ZeroMQ =================== |
|
28 | 27 | context = zmq.Context() |
|
29 | 28 | receiver = context.socket(zmq.REP) |
|
30 | 29 | receiver.bind("tcp://0.0.0.0:4444") |
|
31 | 30 | #============================== end ======================== |
|
32 | 31 | |
|
33 | TIMES = {} | |
|
34 | ||
|
35 | 32 | def loaddata(): |
|
36 | 33 | print('Loading Experiments...') |
|
37 | for tup in json.load(open('./experiments.json')): | |
|
34 | if os.environ.get('APP_DIR', None) is not None: | |
|
35 | file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') | |
|
36 | else: | |
|
37 | file_exp = './experiments.json' | |
|
38 | for tup in json.load(open(file_exp)): | |
|
38 | 39 | print(tup['name']) |
|
39 | 40 | exp = Experiment.objects(code=tup['code']).modify( |
|
40 | 41 | upsert=True, # To add a new row |
|
41 | 42 | new=True, |
|
42 | 43 | set__code=tup['code'], |
|
43 | 44 | set__name=tup['name'], |
|
44 | 45 | ) |
|
45 | 46 | exp.save() |
|
46 | 47 | |
|
47 | 48 | #============== funcion para modificar datos en la tabla ============== |
|
48 | 49 | def update(buffer): |
|
49 | 50 | dt = datetime.utcfromtimestamp(buffer['time']) |
|
50 | 51 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( |
|
51 | 52 | upsert=True, # To add a new row |
|
52 | 53 | new=True, |
|
53 | 54 | set__code=buffer['exp_code'], |
|
54 | 55 | set__date=dt.date(), |
|
55 | 56 | set__yrange = buffer['yrange'], |
|
56 | 57 | set__xrange = buffer['xrange'], |
|
57 | 58 | set__interval = buffer['interval'], |
|
58 | 59 | set__localtime = buffer['localtime'], |
|
59 | 60 | set__plots = buffer['data'].keys() |
|
60 | 61 | ) |
|
61 | 62 | exp.save() |
|
62 | 63 | |
|
63 | 64 | data = ExpData.objects(expmeta=exp, time=buffer['time']).modify( |
|
64 | 65 | upsert=True, # To add a new row |
|
65 | 66 | new=True, |
|
66 | 67 | set__expmeta = exp, |
|
67 | 68 | set__time = buffer['time'], |
|
68 | 69 | set__data = buffer['data'] |
|
69 | 70 | ) |
|
70 | 71 | |
|
71 | 72 | data.save() |
|
72 | 73 | |
|
73 | 74 | return exp.id |
|
74 | 75 | # Function that is checking the state of my clients every 20s |
|
75 | 76 | def check_times(): |
|
76 | 77 | |
|
77 | 78 | while True: |
|
78 | dt = datetime.now() | |
|
79 | dt = datetime.utcnow() | |
|
79 | 80 | exps = ExpMeta.objects(date=dt.date()) |
|
80 | 81 | |
|
81 | 82 | for exp in exps: |
|
82 | 83 | data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor |
|
83 | 84 | |
|
84 | 85 | t = time.mktime(dt.timetuple()) |
|
85 | 86 | |
|
86 | 87 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
87 | 88 | t -= 5*60*60 |
|
88 | 89 | |
|
89 | ||
|
90 | ||
|
91 | 90 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
92 | 91 | data_time = data['time'] + 5*60*60 |
|
93 | 92 | |
|
94 | 93 | if (t-data['time']) > 6*exp['interval']: |
|
95 | 94 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})}) |
|
96 | 95 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline')) |
|
97 | 96 | elif (t-data['time']) > 3*exp['interval']: |
|
98 | 97 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})}) |
|
99 | 98 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed')) |
|
100 | 99 | else: |
|
101 | 100 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) |
|
102 | 101 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) |
|
103 | 102 | time.sleep(20) |
|
104 | 103 | |
|
105 | 104 | def main(): |
|
106 | 105 | print('Starting ZMQ server...') |
|
107 | 106 | |
|
108 | 107 | t = Thread(target=check_times) |
|
109 | 108 | t.start() |
|
110 | 109 | |
|
111 |
while True: |
|
|
110 | while True: | |
|
112 | 111 | |
|
113 | 112 | buffer = receiver.recv_json() |
|
114 | 113 | |
|
115 | ||
|
116 | ||
|
117 | 114 | if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC |
|
118 | 115 | buffer['time'] -= 5*60*60 |
|
119 | 116 | |
|
120 | 117 | if not isinstance(buffer, dict): |
|
121 | 118 | print('*******************') |
|
122 | 119 | print(buffer) |
|
123 | 120 | continue |
|
124 | 121 | code = update(buffer) |
|
125 | 122 | print("==================================") |
|
126 | 123 | for plot in buffer['data']: |
|
127 | 124 | dum = buffer.copy() |
|
128 | 125 | dum['time'] = [buffer['time']] |
|
129 | 126 | if plot=='noise': |
|
130 | 127 | dum[plot] = [[x] for x in buffer['data'][plot]] |
|
131 | 128 | elif plot=='spc': |
|
132 | 129 | dum['noise'] = [[x] for x in buffer['data']['noise']] |
|
133 | 130 | dum['spc'] = buffer['data']['spc'] |
|
134 | 131 | dum['rti'] = buffer['data']['rti'] |
|
135 | 132 | else: |
|
136 | 133 | dum[plot] = buffer['data'][plot] |
|
137 | 134 | dum.pop('data') |
|
138 | 135 | exp_code = dum.pop('exp_code') |
|
139 | 136 | #code = dum.pop('exp_code') |
|
140 | 137 | |
|
141 | 138 | channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) |
|
142 |
print('Sending |
|
|
139 | print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) | |
|
143 | 140 | |
|
144 |
receiver.send_string(' |
|
|
141 | receiver.send_string('ok') | |
|
145 | 142 | |
|
146 | 143 | receiver.close() |
|
147 | 144 | context.term() |
|
148 | 145 | |
|
149 | 146 | if __name__=='__main__': |
|
150 | 147 | loaddata() |
|
151 | 148 | main() |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now