##// END OF EJS Templates
server & client OK
Juan C. Espinoza -
r19:dce6ff120bf7
parent child
Show More
@@ -14,8 +14,17 REQUEST_RETRIES = 10
14 SERVER_ENDPOINT = 'tcp://localhost:4444'
14 SERVER_ENDPOINT = 'tcp://localhost:4444'
15
15
16 def send(context, socket, poll, dato):
16 def send(context, socket, poll, dato):
17
17 '''
18 Function to send data to server
19 '''
18 retries_left = REQUEST_RETRIES
20 retries_left = REQUEST_RETRIES
21 if context.closed:
22 context = zmq.Context()
23 if socket.closed:
24 socket = context.socket(zmq.REQ)
25 socket.connect (SERVER_ENDPOINT)
26 poll = zmq.Poller()
27 poll.register(socket, zmq.POLLIN)
19
28
20 while True:
29 while True:
21 socket.send_json(dato)
30 socket.send_json(dato)
@@ -25,20 +34,20 def send(context, socket, poll, dato):
25 if not reply:
34 if not reply:
26 break
35 break
27 if reply == 'ok':
36 if reply == 'ok':
28 print('I: Server replied OK (%s)' % reply)
37 print('Server replied (%s)' % reply)
29 break
38 break
30 else:
39 else:
31 print('E: Malformed reply from server: %s' % reply)
40 print('Malformed reply from server: %s' % reply)
32 break
41 break
33 else:
42 else:
34 print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left))
43 print('No response from server, retry {}'.format(REQUEST_RETRIES-retries_left+1))
35 time.sleep(2)
44 time.sleep(1)
36 socket.setsockopt(zmq.LINGER, 0)
45 socket.setsockopt(zmq.LINGER, 0)
37 socket.close()
46 socket.close()
38 poll.unregister(socket)
47 poll.unregister(socket)
39 retries_left -= 1
48 retries_left -= 1
40 if retries_left == 0:
49 if retries_left == 0:
41 print('E: Server seems to be offline, abandoning')
50 print('Server seems to be offline...')
42 break
51 break
43 # Create new connection
52 # Create new connection
44 socket = context.socket(zmq.REQ)
53 socket = context.socket(zmq.REQ)
@@ -46,7 +55,9 def send(context, socket, poll, dato):
46 poll.register(socket, zmq.POLLIN)
55 poll.register(socket, zmq.POLLIN)
47
56
48 def main(realtime, code, date=None, interval=30):
57 def main(realtime, code, date=None, interval=30):
49
58 '''
59 Simulate data to be sended to server
60 '''
50 context = zmq.Context()
61 context = zmq.Context()
51 socket = context.socket(zmq.REQ)
62 socket = context.socket(zmq.REQ)
52 socket.connect (SERVER_ENDPOINT)
63 socket.connect (SERVER_ENDPOINT)
@@ -23,11 +23,6 mongoengine.connect('dbplots', host=host_mongo, port=27017)
23
23
24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
26 #========================== Conf. ZeroMQ ===================
27 context = zmq.Context()
28 receiver = context.socket(zmq.REP)
29 receiver.bind("tcp://0.0.0.0:4444")
30 #============================== end ========================
31
26
32 def loaddata():
27 def loaddata():
33 print('Loading Experiments...')
28 print('Loading Experiments...')
@@ -48,6 +43,7 def loaddata():
48 #============== funcion para modificar datos en la tabla ==============
43 #============== funcion para modificar datos en la tabla ==============
49 def update(buffer):
44 def update(buffer):
50 dt = datetime.utcfromtimestamp(buffer['time'])
45 dt = datetime.utcfromtimestamp(buffer['time'])
46 print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now()))
51 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
47 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
52 upsert=True, # To add a new row
48 upsert=True, # To add a new row
53 new=True,
49 new=True,
@@ -71,12 +67,16 def update(buffer):
71
67
72 data.save()
68 data.save()
73
69
74 return exp.id
70 if datetime.now().date() == dt.date():
75 # Function that is checking the state of my clients every 20s
71 return True
72
73 return False
74
75 # Function that is checking the state of my clients every 30s
76 def check_times():
76 def check_times():
77
77
78 while True:
78 while True:
79 dt = datetime.utcnow()
79 dt = datetime.now()
80 exps = ExpMeta.objects(date=dt.date())
80 exps = ExpMeta.objects(date=dt.date())
81
81
82 for exp in exps:
82 for exp in exps:
@@ -99,11 +99,13 def check_times():
99 else:
99 else:
100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
102 time.sleep(20)
102 time.sleep(30)
103
103
104 def main():
104 def main():
105 print('Starting ZMQ server...')
105 print('Starting ZMQ server...')
106
106 context = zmq.Context()
107 receiver = context.socket(zmq.REP)
108 receiver.bind("tcp://0.0.0.0:4444")
107 t = Thread(target=check_times)
109 t = Thread(target=check_times)
108 t.start()
110 t.start()
109
111
@@ -118,7 +120,9 def main():
118 print('*******************')
120 print('*******************')
119 print(buffer)
121 print(buffer)
120 continue
122 continue
121 code = update(buffer)
123 if not update(buffer):
124 receiver.send_string('ok')
125 continue
122 print("==================================")
126 print("==================================")
123 for plot in buffer['data']:
127 for plot in buffer['data']:
124 dum = buffer.copy()
128 dum = buffer.copy()
@@ -133,9 +137,10 def main():
133 dum[plot] = buffer['data'][plot]
137 dum[plot] = buffer['data'][plot]
134 dum.pop('data')
138 dum.pop('data')
135 exp_code = dum.pop('exp_code')
139 exp_code = dum.pop('exp_code')
136 #code = dum.pop('exp_code')
140 channel.send_group(
137
141 u'{}_{}'.format(exp_code, plot),
138 channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
142 {'text': simplejson.dumps(dum, ignore_nan=True)}
143 )
139 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
144 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
140
145
141 receiver.send_string('ok')
146 receiver.send_string('ok')
General Comments 0
You need to be logged in to leave comments. Login now