@@ -14,8 +14,17 REQUEST_RETRIES = 10 | |||||
14 | SERVER_ENDPOINT = 'tcp://localhost:4444' |
|
14 | SERVER_ENDPOINT = 'tcp://localhost:4444' | |
15 |
|
15 | |||
16 | def send(context, socket, poll, dato): |
|
16 | def send(context, socket, poll, dato): | |
17 |
|
17 | ''' | ||
|
18 | Function to send data to server | |||
|
19 | ''' | |||
18 | retries_left = REQUEST_RETRIES |
|
20 | retries_left = REQUEST_RETRIES | |
|
21 | if context.closed: | |||
|
22 | context = zmq.Context() | |||
|
23 | if socket.closed: | |||
|
24 | socket = context.socket(zmq.REQ) | |||
|
25 | socket.connect (SERVER_ENDPOINT) | |||
|
26 | poll = zmq.Poller() | |||
|
27 | poll.register(socket, zmq.POLLIN) | |||
19 |
|
28 | |||
20 | while True: |
|
29 | while True: | |
21 | socket.send_json(dato) |
|
30 | socket.send_json(dato) | |
@@ -25,20 +34,20 def send(context, socket, poll, dato): | |||||
25 | if not reply: |
|
34 | if not reply: | |
26 | break |
|
35 | break | |
27 | if reply == 'ok': |
|
36 | if reply == 'ok': | |
28 |
print(' |
|
37 | print('Server replied (%s)' % reply) | |
29 | break |
|
38 | break | |
30 | else: |
|
39 | else: | |
31 |
print(' |
|
40 | print('Malformed reply from server: %s' % reply) | |
32 | break |
|
41 | break | |
33 | else: |
|
42 | else: | |
34 |
print(' |
|
43 | print('No response from server, retry {}'.format(REQUEST_RETRIES-retries_left+1)) | |
35 |
time.sleep( |
|
44 | time.sleep(1) | |
36 | socket.setsockopt(zmq.LINGER, 0) |
|
45 | socket.setsockopt(zmq.LINGER, 0) | |
37 | socket.close() |
|
46 | socket.close() | |
38 | poll.unregister(socket) |
|
47 | poll.unregister(socket) | |
39 | retries_left -= 1 |
|
48 | retries_left -= 1 | |
40 | if retries_left == 0: |
|
49 | if retries_left == 0: | |
41 |
print(' |
|
50 | print('Server seems to be offline...') | |
42 | break |
|
51 | break | |
43 | # Create new connection |
|
52 | # Create new connection | |
44 | socket = context.socket(zmq.REQ) |
|
53 | socket = context.socket(zmq.REQ) | |
@@ -46,7 +55,9 def send(context, socket, poll, dato): | |||||
46 | poll.register(socket, zmq.POLLIN) |
|
55 | poll.register(socket, zmq.POLLIN) | |
47 |
|
56 | |||
48 | def main(realtime, code, date=None, interval=30): |
|
57 | def main(realtime, code, date=None, interval=30): | |
49 |
|
58 | ''' | ||
|
59 | Simulate data to be sended to server | |||
|
60 | ''' | |||
50 | context = zmq.Context() |
|
61 | context = zmq.Context() | |
51 | socket = context.socket(zmq.REQ) |
|
62 | socket = context.socket(zmq.REQ) | |
52 | socket.connect (SERVER_ENDPOINT) |
|
63 | socket.connect (SERVER_ENDPOINT) |
@@ -23,11 +23,6 mongoengine.connect('dbplots', host=host_mongo, port=27017) | |||||
23 |
|
23 | |||
24 | host_redis = os.environ.get('HOST_REDIS', 'localhost') |
|
24 | host_redis = os.environ.get('HOST_REDIS', 'localhost') | |
25 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) |
|
25 | channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)]) | |
26 | #========================== Conf. ZeroMQ =================== |
|
|||
27 | context = zmq.Context() |
|
|||
28 | receiver = context.socket(zmq.REP) |
|
|||
29 | receiver.bind("tcp://0.0.0.0:4444") |
|
|||
30 | #============================== end ======================== |
|
|||
31 |
|
26 | |||
32 | def loaddata(): |
|
27 | def loaddata(): | |
33 | print('Loading Experiments...') |
|
28 | print('Loading Experiments...') | |
@@ -48,6 +43,7 def loaddata(): | |||||
48 | #============== funcion para modificar datos en la tabla ============== |
|
43 | #============== funcion para modificar datos en la tabla ============== | |
49 | def update(buffer): |
|
44 | def update(buffer): | |
50 | dt = datetime.utcfromtimestamp(buffer['time']) |
|
45 | dt = datetime.utcfromtimestamp(buffer['time']) | |
|
46 | print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now())) | |||
51 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( |
|
47 | exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify( | |
52 | upsert=True, # To add a new row |
|
48 | upsert=True, # To add a new row | |
53 | new=True, |
|
49 | new=True, | |
@@ -71,12 +67,16 def update(buffer): | |||||
71 |
|
67 | |||
72 | data.save() |
|
68 | data.save() | |
73 |
|
69 | |||
74 | return exp.id |
|
70 | if datetime.now().date() == dt.date(): | |
75 | # Function that is checking the state of my clients every 20s |
|
71 | return True | |
|
72 | ||||
|
73 | return False | |||
|
74 | ||||
|
75 | # Function that is checking the state of my clients every 30s | |||
76 | def check_times(): |
|
76 | def check_times(): | |
77 |
|
77 | |||
78 | while True: |
|
78 | while True: | |
79 |
dt = datetime. |
|
79 | dt = datetime.now() | |
80 | exps = ExpMeta.objects(date=dt.date()) |
|
80 | exps = ExpMeta.objects(date=dt.date()) | |
81 |
|
81 | |||
82 | for exp in exps: |
|
82 | for exp in exps: | |
@@ -99,11 +99,13 def check_times(): | |||||
99 | else: |
|
99 | else: | |
100 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) |
|
100 | channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})}) | |
101 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) |
|
101 | print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online')) | |
102 |
time.sleep( |
|
102 | time.sleep(30) | |
103 |
|
103 | |||
104 | def main(): |
|
104 | def main(): | |
105 | print('Starting ZMQ server...') |
|
105 | print('Starting ZMQ server...') | |
106 |
|
106 | context = zmq.Context() | ||
|
107 | receiver = context.socket(zmq.REP) | |||
|
108 | receiver.bind("tcp://0.0.0.0:4444") | |||
107 | t = Thread(target=check_times) |
|
109 | t = Thread(target=check_times) | |
108 | t.start() |
|
110 | t.start() | |
109 |
|
111 | |||
@@ -118,7 +120,9 def main(): | |||||
118 | print('*******************') |
|
120 | print('*******************') | |
119 | print(buffer) |
|
121 | print(buffer) | |
120 | continue |
|
122 | continue | |
121 |
|
|
123 | if not update(buffer): | |
|
124 | receiver.send_string('ok') | |||
|
125 | continue | |||
122 | print("==================================") |
|
126 | print("==================================") | |
123 | for plot in buffer['data']: |
|
127 | for plot in buffer['data']: | |
124 | dum = buffer.copy() |
|
128 | dum = buffer.copy() | |
@@ -133,9 +137,10 def main(): | |||||
133 | dum[plot] = buffer['data'][plot] |
|
137 | dum[plot] = buffer['data'][plot] | |
134 | dum.pop('data') |
|
138 | dum.pop('data') | |
135 | exp_code = dum.pop('exp_code') |
|
139 | exp_code = dum.pop('exp_code') | |
136 | #code = dum.pop('exp_code') |
|
140 | channel.send_group( | |
137 |
|
141 | u'{}_{}'.format(exp_code, plot), | ||
138 |
|
|
142 | {'text': simplejson.dumps(dum, ignore_nan=True)} | |
|
143 | ) | |||
139 | print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) |
|
144 | print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum)))) | |
140 |
|
145 | |||
141 | receiver.send_string('ok') |
|
146 | receiver.send_string('ok') |
General Comments 0
You need to be logged in to leave comments.
Login now