@@ -0,0 +1,101 | |||
|
1 | #!/usr/bin/python | |
|
2 | # -*- coding: UTF-8 -*- | |
|
3 | ||
|
4 | import time | |
|
5 | from datetime import datetime, timedelta | |
|
6 | import zmq | |
|
7 | import json | |
|
8 | import numpy as np | |
|
9 | from threading import Thread | |
|
10 | import argparse | |
|
11 | ||
|
12 | REQUEST_TIMEOUT = 15000 | |
|
13 | REQUEST_RETRIES = 10 | |
|
14 | SERVER_ENDPOINT = 'tcp://localhost:4444' | |
|
15 | ||
|
16 | def send(context, socket, poll, dato): | |
|
17 | ||
|
18 | retries_left = REQUEST_RETRIES | |
|
19 | ||
|
20 | while True: | |
|
21 | socket.send_json(dato) | |
|
22 | socks = dict(poll.poll(REQUEST_TIMEOUT)) | |
|
23 | if socks.get(socket) == zmq.POLLIN: | |
|
24 | reply = socket.recv_string() | |
|
25 | if not reply: | |
|
26 | break | |
|
27 | if reply == 'ok': | |
|
28 | print('I: Server replied OK (%s)' % reply) | |
|
29 | break | |
|
30 | else: | |
|
31 | print('E: Malformed reply from server: %s' % reply) | |
|
32 | break | |
|
33 | else: | |
|
34 | print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left)) | |
|
35 | time.sleep(2) | |
|
36 | socket.setsockopt(zmq.LINGER, 0) | |
|
37 | socket.close() | |
|
38 | poll.unregister(socket) | |
|
39 | retries_left -= 1 | |
|
40 | if retries_left == 0: | |
|
41 | print('E: Server seems to be offline, abandoning') | |
|
42 | break | |
|
43 | # Create new connection | |
|
44 | socket = context.socket(zmq.REQ) | |
|
45 | socket.connect(SERVER_ENDPOINT) | |
|
46 | poll.register(socket, zmq.POLLIN) | |
|
47 | ||
|
48 | def main(realtime, code, date=None, interval=30): | |
|
49 | ||
|
50 | context = zmq.Context() | |
|
51 | socket = context.socket(zmq.REQ) | |
|
52 | socket.connect (SERVER_ENDPOINT) | |
|
53 | poll = zmq.Poller() | |
|
54 | poll.register(socket, zmq.POLLIN) | |
|
55 | ||
|
56 | if realtime: | |
|
57 | dt = datetime.now() | |
|
58 | else: | |
|
59 | dt = date | |
|
60 | ||
|
61 | while True: | |
|
62 | ||
|
63 | print('Sending {} - {}'.format(code, dt)) | |
|
64 | ||
|
65 | dato = { | |
|
66 | 'time': time.mktime(dt.timetuple()), | |
|
67 | 'yrange': np.arange(100).tolist(), | |
|
68 | 'xrange': np.arange(-30, 30).tolist(), | |
|
69 | 'localtime': True, | |
|
70 | 'interval': interval, | |
|
71 | 'exp_code': code, | |
|
72 | 'data': { | |
|
73 | 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(), | |
|
74 | 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(), | |
|
75 | 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(), | |
|
76 | } | |
|
77 | } | |
|
78 | ||
|
79 | dt = dt + timedelta(seconds=interval) | |
|
80 | ||
|
81 | t = Thread(target=send, args=(context, socket, poll, dato)) | |
|
82 | t.start() | |
|
83 | if realtime: | |
|
84 | time.sleep(interval) | |
|
85 | else: | |
|
86 | time.sleep(5) | |
|
87 | ||
|
88 | if __name__=='__main__': | |
|
89 | parser = argparse.ArgumentParser(description='This is a Client for realtime app') | |
|
90 | parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00') | |
|
91 | parser.add_argument('-r', action='store_true', dest='realtime', default=None) | |
|
92 | parser.add_argument('-c', action='store', dest='code', default='170') | |
|
93 | parser.add_argument('-i', action='store', dest='interval', type=int, default=30) | |
|
94 | ||
|
95 | results = parser.parse_args() | |
|
96 | if not results.realtime: | |
|
97 | try: | |
|
98 | results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S') | |
|
99 | except: | |
|
100 | raise(NameError('You must specify a date (--date) for non-realtime experiment')) | |
|
101 | main(results.realtime, results.code, results.date, results.interval) |
@@ -13,7 +13,7 import asgi_redis | |||
|
13 | 13 | import mongoengine |
|
14 | 14 | from threading import Thread |
|
15 | 15 | |
|
16 | sys.path.append('../') | |
|
16 | sys.path.append(os.environ.get('APP_DIR', '../')) | |
|
17 | 17 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings") |
|
18 | 18 | |
|
19 | 19 | from plotter.models import Experiment, ExpMeta, ExpData |
@@ -21,7 +21,6 from plotter.models import Experiment, ExpMeta, ExpData | |||
|
21 | 21 | host_mongo = os.environ.get('HOST_MONGO', 'localhost') |
|
22 | 22 | mongoengine.connect('dbplots', host=host_mongo, port=27017) |
|
23 | 23 | |
|
24 | ||
|
25 | 24 | host_redis = os.environ.get('HOST_REDIS', 'localhost') |
|
26 | 25 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) |
|
27 | 26 | #========================== Conf. ZeroMQ =================== |
@@ -30,11 +29,13 receiver = context.socket(zmq.REP) | |||
|
30 | 29 | receiver.bind("tcp://0.0.0.0:4444") |
|
31 | 30 | #============================== end ======================== |
|
32 | 31 | |
|
33 | TIMES = {} | |
|
34 | ||
|
35 | 32 | def loaddata(): |
|
36 | 33 | print('Loading Experiments...') |
|
37 | for tup in json.load(open('./experiments.json')): | |
|
34 | if os.environ.get('APP_DIR', None) is not None: | |
|
35 | file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json') | |
|
36 | else: | |
|
37 | file_exp = './experiments.json' | |
|
38 | for tup in json.load(open(file_exp)): | |
|
38 | 39 | print(tup['name']) |
|
39 | 40 | exp = Experiment.objects(code=tup['code']).modify( |
|
40 | 41 | upsert=True, # To add a new row |
@@ -75,7 +76,7 def update(buffer): | |||
|
75 | 76 | def check_times(): |
|
76 | 77 | |
|
77 | 78 | while True: |
|
78 | dt = datetime.now() | |
|
79 | dt = datetime.utcnow() | |
|
79 | 80 | exps = ExpMeta.objects(date=dt.date()) |
|
80 | 81 | |
|
81 | 82 | for exp in exps: |
@@ -86,8 +87,6 def check_times(): | |||
|
86 | 87 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
87 | 88 | t -= 5*60*60 |
|
88 | 89 | |
|
89 | ||
|
90 | ||
|
91 | 90 | if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC |
|
92 | 91 | data_time = data['time'] + 5*60*60 |
|
93 | 92 | |
@@ -108,12 +107,10 def main(): | |||
|
108 | 107 | t = Thread(target=check_times) |
|
109 | 108 | t.start() |
|
110 | 109 | |
|
111 |
while True: |
|
|
110 | while True: | |
|
112 | 111 | |
|
113 | 112 | buffer = receiver.recv_json() |
|
114 | 113 | |
|
115 | ||
|
116 | ||
|
117 | 114 | if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC |
|
118 | 115 | buffer['time'] -= 5*60*60 |
|
119 | 116 | |
@@ -139,9 +136,9 def main(): | |||
|
139 | 136 | #code = dum.pop('exp_code') |
|
140 | 137 | |
|
141 | 138 | channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)}) |
|
142 |
print('Sending |
|
|
139 | print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) | |
|
143 | 140 | |
|
144 |
receiver.send_string(' |
|
|
141 | receiver.send_string('ok') | |
|
145 | 142 | |
|
146 | 143 | receiver.close() |
|
147 | 144 | context.term() |
|
1 | NO CONTENT: file was removed |
|
1 | NO CONTENT: file was removed |
General Comments 0
You need to be logged in to leave comments.
Login now