##// END OF EJS Templates
Better test client & server
Juan C. Espinoza -
r17:9875b8044b93
parent child
Show More
@@ -0,0 +1,101
1 #!/usr/bin/python
2 # -*- coding: UTF-8 -*-
3
4 import time
5 from datetime import datetime, timedelta
6 import zmq
7 import json
8 import numpy as np
9 from threading import Thread
10 import argparse
11
12 REQUEST_TIMEOUT = 15000
13 REQUEST_RETRIES = 10
14 SERVER_ENDPOINT = 'tcp://localhost:4444'
15
16 def send(context, socket, poll, dato):
17
18 retries_left = REQUEST_RETRIES
19
20 while True:
21 socket.send_json(dato)
22 socks = dict(poll.poll(REQUEST_TIMEOUT))
23 if socks.get(socket) == zmq.POLLIN:
24 reply = socket.recv_string()
25 if not reply:
26 break
27 if reply == 'ok':
28 print('I: Server replied OK (%s)' % reply)
29 break
30 else:
31 print('E: Malformed reply from server: %s' % reply)
32 break
33 else:
34 print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left))
35 time.sleep(2)
36 socket.setsockopt(zmq.LINGER, 0)
37 socket.close()
38 poll.unregister(socket)
39 retries_left -= 1
40 if retries_left == 0:
41 print('E: Server seems to be offline, abandoning')
42 break
43 # Create new connection
44 socket = context.socket(zmq.REQ)
45 socket.connect(SERVER_ENDPOINT)
46 poll.register(socket, zmq.POLLIN)
47
48 def main(realtime, code, date=None, interval=30):
49
50 context = zmq.Context()
51 socket = context.socket(zmq.REQ)
52 socket.connect (SERVER_ENDPOINT)
53 poll = zmq.Poller()
54 poll.register(socket, zmq.POLLIN)
55
56 if realtime:
57 dt = datetime.now()
58 else:
59 dt = date
60
61 while True:
62
63 print('Sending {} - {}'.format(code, dt))
64
65 dato = {
66 'time': time.mktime(dt.timetuple()),
67 'yrange': np.arange(100).tolist(),
68 'xrange': np.arange(-30, 30).tolist(),
69 'localtime': True,
70 'interval': interval,
71 'exp_code': code,
72 'data': {
73 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(),
74 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(),
75 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(),
76 }
77 }
78
79 dt = dt + timedelta(seconds=interval)
80
81 t = Thread(target=send, args=(context, socket, poll, dato))
82 t.start()
83 if realtime:
84 time.sleep(interval)
85 else:
86 time.sleep(5)
87
88 if __name__=='__main__':
89 parser = argparse.ArgumentParser(description='This is a Client for realtime app')
90 parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00')
91 parser.add_argument('-r', action='store_true', dest='realtime', default=None)
92 parser.add_argument('-c', action='store', dest='code', default='170')
93 parser.add_argument('-i', action='store', dest='interval', type=int, default=30)
94
95 results = parser.parse_args()
96 if not results.realtime:
97 try:
98 results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S')
99 except:
100 raise(NameError('You must specify a date (--date) for non-realtime experiment'))
101 main(results.realtime, results.code, results.date, results.interval)
@@ -1,151 +1,148
1 1 #!/usr/bin/python
2 2 # -*- coding: UTF-8 -*-
3 3
4 4 import os
5 5 import sys
6 6 import json
7 7 import simplejson
8 8 from datetime import datetime
9 9 import time
10 10 import zmq
11 11 import redis
12 12 import asgi_redis
13 13 import mongoengine
14 14 from threading import Thread
15 15
16 sys.path.append('../')
16 sys.path.append(os.environ.get('APP_DIR', '../'))
17 17 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
18 18
19 19 from plotter.models import Experiment, ExpMeta, ExpData
20 20
21 21 host_mongo = os.environ.get('HOST_MONGO', 'localhost')
22 22 mongoengine.connect('dbplots', host=host_mongo, port=27017)
23 23
24
25 24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
26 25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
27 26 #========================== Conf. ZeroMQ ===================
28 27 context = zmq.Context()
29 28 receiver = context.socket(zmq.REP)
30 29 receiver.bind("tcp://0.0.0.0:4444")
31 30 #============================== end ========================
32 31
33 TIMES = {}
34
35 32 def loaddata():
36 33 print('Loading Experiments...')
37 for tup in json.load(open('./experiments.json')):
34 if os.environ.get('APP_DIR', None) is not None:
35 file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
36 else:
37 file_exp = './experiments.json'
38 for tup in json.load(open(file_exp)):
38 39 print(tup['name'])
39 40 exp = Experiment.objects(code=tup['code']).modify(
40 41 upsert=True, # To add a new row
41 42 new=True,
42 43 set__code=tup['code'],
43 44 set__name=tup['name'],
44 45 )
45 46 exp.save()
46 47
47 48 #============== funcion para modificar datos en la tabla ==============
48 49 def update(buffer):
49 50 dt = datetime.utcfromtimestamp(buffer['time'])
50 51 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
51 52 upsert=True, # To add a new row
52 53 new=True,
53 54 set__code=buffer['exp_code'],
54 55 set__date=dt.date(),
55 56 set__yrange = buffer['yrange'],
56 57 set__xrange = buffer['xrange'],
57 58 set__interval = buffer['interval'],
58 59 set__localtime = buffer['localtime'],
59 60 set__plots = buffer['data'].keys()
60 61 )
61 62 exp.save()
62 63
63 64 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
64 65 upsert=True, # To add a new row
65 66 new=True,
66 67 set__expmeta = exp,
67 68 set__time = buffer['time'],
68 69 set__data = buffer['data']
69 70 )
70 71
71 72 data.save()
72 73
73 74 return exp.id
74 75 # Function that is checking the state of my clients every 20s
75 76 def check_times():
76 77
77 78 while True:
78 dt = datetime.now()
79 dt = datetime.utcnow()
79 80 exps = ExpMeta.objects(date=dt.date())
80 81
81 82 for exp in exps:
82 83 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
83 84
84 85 t = time.mktime(dt.timetuple())
85 86
86 87 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
87 88 t -= 5*60*60
88 89
89
90
91 90 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
92 91 data_time = data['time'] + 5*60*60
93 92
94 93 if (t-data['time']) > 6*exp['interval']:
95 94 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})})
96 95 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
97 96 elif (t-data['time']) > 3*exp['interval']:
98 97 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})})
99 98 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
100 99 else:
101 100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
102 101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
103 102 time.sleep(20)
104 103
105 104 def main():
106 105 print('Starting ZMQ server...')
107 106
108 107 t = Thread(target=check_times)
109 108 t.start()
110 109
111 while True:
110 while True:
112 111
113 112 buffer = receiver.recv_json()
114 113
115
116
117 114 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
118 115 buffer['time'] -= 5*60*60
119 116
120 117 if not isinstance(buffer, dict):
121 118 print('*******************')
122 119 print(buffer)
123 120 continue
124 121 code = update(buffer)
125 122 print("==================================")
126 123 for plot in buffer['data']:
127 124 dum = buffer.copy()
128 125 dum['time'] = [buffer['time']]
129 126 if plot=='noise':
130 127 dum[plot] = [[x] for x in buffer['data'][plot]]
131 128 elif plot=='spc':
132 129 dum['noise'] = [[x] for x in buffer['data']['noise']]
133 130 dum['spc'] = buffer['data']['spc']
134 131 dum['rti'] = buffer['data']['rti']
135 132 else:
136 133 dum[plot] = buffer['data'][plot]
137 134 dum.pop('data')
138 135 exp_code = dum.pop('exp_code')
139 136 #code = dum.pop('exp_code')
140 137
141 138 channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
142 print('Sending...{}:{} - {} bytes'.format(exp_code, plot, len(str(dum))))
139 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
143 140
144 receiver.send_string('recibido')
141 receiver.send_string('ok')
145 142
146 143 receiver.close()
147 144 context.term()
148 145
149 146 if __name__=='__main__':
150 147 loaddata()
151 148 main()
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now