##// END OF EJS Templates
Better test client & server
Juan C. Espinoza -
r17:9875b8044b93
parent child
Show More
@@ -0,0 +1,101
1 #!/usr/bin/python
2 # -*- coding: UTF-8 -*-
3
4 import time
5 from datetime import datetime, timedelta
6 import zmq
7 import json
8 import numpy as np
9 from threading import Thread
10 import argparse
11
12 REQUEST_TIMEOUT = 15000
13 REQUEST_RETRIES = 10
14 SERVER_ENDPOINT = 'tcp://localhost:4444'
15
16 def send(context, socket, poll, dato):
17
18 retries_left = REQUEST_RETRIES
19
20 while True:
21 socket.send_json(dato)
22 socks = dict(poll.poll(REQUEST_TIMEOUT))
23 if socks.get(socket) == zmq.POLLIN:
24 reply = socket.recv_string()
25 if not reply:
26 break
27 if reply == 'ok':
28 print('I: Server replied OK (%s)' % reply)
29 break
30 else:
31 print('E: Malformed reply from server: %s' % reply)
32 break
33 else:
34 print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left))
35 time.sleep(2)
36 socket.setsockopt(zmq.LINGER, 0)
37 socket.close()
38 poll.unregister(socket)
39 retries_left -= 1
40 if retries_left == 0:
41 print('E: Server seems to be offline, abandoning')
42 break
43 # Create new connection
44 socket = context.socket(zmq.REQ)
45 socket.connect(SERVER_ENDPOINT)
46 poll.register(socket, zmq.POLLIN)
47
48 def main(realtime, code, date=None, interval=30):
49
50 context = zmq.Context()
51 socket = context.socket(zmq.REQ)
52 socket.connect (SERVER_ENDPOINT)
53 poll = zmq.Poller()
54 poll.register(socket, zmq.POLLIN)
55
56 if realtime:
57 dt = datetime.now()
58 else:
59 dt = date
60
61 while True:
62
63 print('Sending {} - {}'.format(code, dt))
64
65 dato = {
66 'time': time.mktime(dt.timetuple()),
67 'yrange': np.arange(100).tolist(),
68 'xrange': np.arange(-30, 30).tolist(),
69 'localtime': True,
70 'interval': interval,
71 'exp_code': code,
72 'data': {
73 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(),
74 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(),
75 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(),
76 }
77 }
78
79 dt = dt + timedelta(seconds=interval)
80
81 t = Thread(target=send, args=(context, socket, poll, dato))
82 t.start()
83 if realtime:
84 time.sleep(interval)
85 else:
86 time.sleep(5)
87
88 if __name__=='__main__':
89 parser = argparse.ArgumentParser(description='This is a Client for realtime app')
90 parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00')
91 parser.add_argument('-r', action='store_true', dest='realtime', default=None)
92 parser.add_argument('-c', action='store', dest='code', default='170')
93 parser.add_argument('-i', action='store', dest='interval', type=int, default=30)
94
95 results = parser.parse_args()
96 if not results.realtime:
97 try:
98 results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S')
99 except:
100 raise(NameError('You must specify a date (--date) for non-realtime experiment'))
101 main(results.realtime, results.code, results.date, results.interval)
@@ -1,151 +1,148
1 #!/usr/bin/python
1 #!/usr/bin/python
2 # -*- coding: UTF-8 -*-
2 # -*- coding: UTF-8 -*-
3
3
4 import os
4 import os
5 import sys
5 import sys
6 import json
6 import json
7 import simplejson
7 import simplejson
8 from datetime import datetime
8 from datetime import datetime
9 import time
9 import time
10 import zmq
10 import zmq
11 import redis
11 import redis
12 import asgi_redis
12 import asgi_redis
13 import mongoengine
13 import mongoengine
14 from threading import Thread
14 from threading import Thread
15
15
16 sys.path.append('../')
16 sys.path.append(os.environ.get('APP_DIR', '../'))
17 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
17 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
18
18
19 from plotter.models import Experiment, ExpMeta, ExpData
19 from plotter.models import Experiment, ExpMeta, ExpData
20
20
21 host_mongo = os.environ.get('HOST_MONGO', 'localhost')
21 host_mongo = os.environ.get('HOST_MONGO', 'localhost')
22 mongoengine.connect('dbplots', host=host_mongo, port=27017)
22 mongoengine.connect('dbplots', host=host_mongo, port=27017)
23
23
24
25 host_redis = os.environ.get('HOST_REDIS', 'localhost')
24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
26 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
27 #========================== Conf. ZeroMQ ===================
26 #========================== Conf. ZeroMQ ===================
28 context = zmq.Context()
27 context = zmq.Context()
29 receiver = context.socket(zmq.REP)
28 receiver = context.socket(zmq.REP)
30 receiver.bind("tcp://0.0.0.0:4444")
29 receiver.bind("tcp://0.0.0.0:4444")
31 #============================== end ========================
30 #============================== end ========================
32
31
33 TIMES = {}
34
35 def loaddata():
32 def loaddata():
36 print('Loading Experiments...')
33 print('Loading Experiments...')
37 for tup in json.load(open('./experiments.json')):
34 if os.environ.get('APP_DIR', None) is not None:
35 file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
36 else:
37 file_exp = './experiments.json'
38 for tup in json.load(open(file_exp)):
38 print(tup['name'])
39 print(tup['name'])
39 exp = Experiment.objects(code=tup['code']).modify(
40 exp = Experiment.objects(code=tup['code']).modify(
40 upsert=True, # To add a new row
41 upsert=True, # To add a new row
41 new=True,
42 new=True,
42 set__code=tup['code'],
43 set__code=tup['code'],
43 set__name=tup['name'],
44 set__name=tup['name'],
44 )
45 )
45 exp.save()
46 exp.save()
46
47
47 #============== funcion para modificar datos en la tabla ==============
48 #============== funcion para modificar datos en la tabla ==============
48 def update(buffer):
49 def update(buffer):
49 dt = datetime.utcfromtimestamp(buffer['time'])
50 dt = datetime.utcfromtimestamp(buffer['time'])
50 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
51 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
51 upsert=True, # To add a new row
52 upsert=True, # To add a new row
52 new=True,
53 new=True,
53 set__code=buffer['exp_code'],
54 set__code=buffer['exp_code'],
54 set__date=dt.date(),
55 set__date=dt.date(),
55 set__yrange = buffer['yrange'],
56 set__yrange = buffer['yrange'],
56 set__xrange = buffer['xrange'],
57 set__xrange = buffer['xrange'],
57 set__interval = buffer['interval'],
58 set__interval = buffer['interval'],
58 set__localtime = buffer['localtime'],
59 set__localtime = buffer['localtime'],
59 set__plots = buffer['data'].keys()
60 set__plots = buffer['data'].keys()
60 )
61 )
61 exp.save()
62 exp.save()
62
63
63 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
64 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
64 upsert=True, # To add a new row
65 upsert=True, # To add a new row
65 new=True,
66 new=True,
66 set__expmeta = exp,
67 set__expmeta = exp,
67 set__time = buffer['time'],
68 set__time = buffer['time'],
68 set__data = buffer['data']
69 set__data = buffer['data']
69 )
70 )
70
71
71 data.save()
72 data.save()
72
73
73 return exp.id
74 return exp.id
74 # Function that is checking the state of my clients every 20s
75 # Function that is checking the state of my clients every 20s
75 def check_times():
76 def check_times():
76
77
77 while True:
78 while True:
78 dt = datetime.now()
79 dt = datetime.utcnow()
79 exps = ExpMeta.objects(date=dt.date())
80 exps = ExpMeta.objects(date=dt.date())
80
81
81 for exp in exps:
82 for exp in exps:
82 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
83 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
83
84
84 t = time.mktime(dt.timetuple())
85 t = time.mktime(dt.timetuple())
85
86
86 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
87 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
87 t -= 5*60*60
88 t -= 5*60*60
88
89
89
90
91 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
90 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
92 data_time = data['time'] + 5*60*60
91 data_time = data['time'] + 5*60*60
93
92
94 if (t-data['time']) > 6*exp['interval']:
93 if (t-data['time']) > 6*exp['interval']:
95 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})})
94 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})})
96 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
95 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
97 elif (t-data['time']) > 3*exp['interval']:
96 elif (t-data['time']) > 3*exp['interval']:
98 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})})
97 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})})
99 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
98 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
100 else:
99 else:
101 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
102 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
103 time.sleep(20)
102 time.sleep(20)
104
103
105 def main():
104 def main():
106 print('Starting ZMQ server...')
105 print('Starting ZMQ server...')
107
106
108 t = Thread(target=check_times)
107 t = Thread(target=check_times)
109 t.start()
108 t.start()
110
109
111 while True:
110 while True:
112
111
113 buffer = receiver.recv_json()
112 buffer = receiver.recv_json()
114
113
115
116
117 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
114 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
118 buffer['time'] -= 5*60*60
115 buffer['time'] -= 5*60*60
119
116
120 if not isinstance(buffer, dict):
117 if not isinstance(buffer, dict):
121 print('*******************')
118 print('*******************')
122 print(buffer)
119 print(buffer)
123 continue
120 continue
124 code = update(buffer)
121 code = update(buffer)
125 print("==================================")
122 print("==================================")
126 for plot in buffer['data']:
123 for plot in buffer['data']:
127 dum = buffer.copy()
124 dum = buffer.copy()
128 dum['time'] = [buffer['time']]
125 dum['time'] = [buffer['time']]
129 if plot=='noise':
126 if plot=='noise':
130 dum[plot] = [[x] for x in buffer['data'][plot]]
127 dum[plot] = [[x] for x in buffer['data'][plot]]
131 elif plot=='spc':
128 elif plot=='spc':
132 dum['noise'] = [[x] for x in buffer['data']['noise']]
129 dum['noise'] = [[x] for x in buffer['data']['noise']]
133 dum['spc'] = buffer['data']['spc']
130 dum['spc'] = buffer['data']['spc']
134 dum['rti'] = buffer['data']['rti']
131 dum['rti'] = buffer['data']['rti']
135 else:
132 else:
136 dum[plot] = buffer['data'][plot]
133 dum[plot] = buffer['data'][plot]
137 dum.pop('data')
134 dum.pop('data')
138 exp_code = dum.pop('exp_code')
135 exp_code = dum.pop('exp_code')
139 #code = dum.pop('exp_code')
136 #code = dum.pop('exp_code')
140
137
141 channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
138 channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
142 print('Sending...{}:{} - {} bytes'.format(exp_code, plot, len(str(dum))))
139 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
143
140
144 receiver.send_string('recibido')
141 receiver.send_string('ok')
145
142
146 receiver.close()
143 receiver.close()
147 context.term()
144 context.term()
148
145
149 if __name__=='__main__':
146 if __name__=='__main__':
150 loaddata()
147 loaddata()
151 main()
148 main()
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now