##// END OF EJS Templates
server & client OK
Juan C. Espinoza -
r19:dce6ff120bf7
parent child
Show More
@@ -1,101 +1,112
1 1 #!/usr/bin/python
2 2 # -*- coding: UTF-8 -*-
3 3
4 4 import time
5 5 from datetime import datetime, timedelta
6 6 import zmq
7 7 import json
8 8 import numpy as np
9 9 from threading import Thread
10 10 import argparse
11 11
12 12 REQUEST_TIMEOUT = 15000
13 13 REQUEST_RETRIES = 10
14 14 SERVER_ENDPOINT = 'tcp://localhost:4444'
15 15
16 16 def send(context, socket, poll, dato):
17
17 '''
18 Function to send data to server
19 '''
18 20 retries_left = REQUEST_RETRIES
21 if context.closed:
22 context = zmq.Context()
23 if socket.closed:
24 socket = context.socket(zmq.REQ)
25 socket.connect (SERVER_ENDPOINT)
26 poll = zmq.Poller()
27 poll.register(socket, zmq.POLLIN)
19 28
20 29 while True:
21 30 socket.send_json(dato)
22 31 socks = dict(poll.poll(REQUEST_TIMEOUT))
23 32 if socks.get(socket) == zmq.POLLIN:
24 33 reply = socket.recv_string()
25 34 if not reply:
26 35 break
27 36 if reply == 'ok':
28 print('I: Server replied OK (%s)' % reply)
37 print('Server replied (%s)' % reply)
29 38 break
30 39 else:
31 print('E: Malformed reply from server: %s' % reply)
40 print('Malformed reply from server: %s' % reply)
32 41 break
33 42 else:
34 print('W: No response from server, retry {}'.format(REQUEST_RETRIES-retries_left))
35 time.sleep(2)
43 print('No response from server, retry {}'.format(REQUEST_RETRIES-retries_left+1))
44 time.sleep(1)
36 45 socket.setsockopt(zmq.LINGER, 0)
37 46 socket.close()
38 47 poll.unregister(socket)
39 48 retries_left -= 1
40 49 if retries_left == 0:
41 print('E: Server seems to be offline, abandoning')
50 print('Server seems to be offline...')
42 51 break
43 52 # Create new connection
44 53 socket = context.socket(zmq.REQ)
45 54 socket.connect(SERVER_ENDPOINT)
46 55 poll.register(socket, zmq.POLLIN)
47 56
48 57 def main(realtime, code, date=None, interval=30):
49
58 '''
59 Simulate data to be sended to server
60 '''
50 61 context = zmq.Context()
51 62 socket = context.socket(zmq.REQ)
52 63 socket.connect (SERVER_ENDPOINT)
53 64 poll = zmq.Poller()
54 65 poll.register(socket, zmq.POLLIN)
55 66
56 67 if realtime:
57 68 dt = datetime.now()
58 69 else:
59 70 dt = date
60 71
61 72 while True:
62 73
63 74 print('Sending {} - {}'.format(code, dt))
64 75
65 76 dato = {
66 77 'time': time.mktime(dt.timetuple()),
67 78 'yrange': np.arange(100).tolist(),
68 79 'xrange': np.arange(-30, 30).tolist(),
69 80 'localtime': True,
70 81 'interval': interval,
71 82 'exp_code': code,
72 83 'data': {
73 84 'noise': np.round(np.random.rand(8) + np.array([10,11,12,13,14,15,16,17]), 2).tolist(),
74 85 'rti': np.round(np.random.rand(8, 100)*5 + 10, 2).tolist(),
75 86 'spc': np.round(np.random.rand(8, 60, 100)*5 + 10, 2).tolist(),
76 87 }
77 88 }
78 89
79 90 dt = dt + timedelta(seconds=interval)
80 91
81 92 t = Thread(target=send, args=(context, socket, poll, dato))
82 93 t.start()
83 94 if realtime:
84 95 time.sleep(interval)
85 96 else:
86 97 time.sleep(5)
87 98
88 99 if __name__=='__main__':
89 100 parser = argparse.ArgumentParser(description='This is a Client for realtime app')
90 101 parser.add_argument('--date', action='store', default=None, help='format: 2018/02/13 12:23:00')
91 102 parser.add_argument('-r', action='store_true', dest='realtime', default=None)
92 103 parser.add_argument('-c', action='store', dest='code', default='170')
93 104 parser.add_argument('-i', action='store', dest='interval', type=int, default=30)
94 105
95 106 results = parser.parse_args()
96 107 if not results.realtime:
97 108 try:
98 109 results.date = datetime.strptime(results.date, '%Y/%m/%d %H:%M:%S')
99 110 except:
100 111 raise(NameError('You must specify a date (--date) for non-realtime experiment'))
101 112 main(results.realtime, results.code, results.date, results.interval)
@@ -1,148 +1,153
1 1 #!/usr/bin/python
2 2 # -*- coding: UTF-8 -*-
3 3
4 4 import os
5 5 import sys
6 6 import json
7 7 import simplejson
8 8 from datetime import datetime
9 9 import time
10 10 import zmq
11 11 import redis
12 12 import asgi_redis
13 13 import mongoengine
14 14 from threading import Thread
15 15
16 16 sys.path.append(os.environ.get('APP_DIR', '../'))
17 17 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime.settings")
18 18
19 19 from plotter.models import Experiment, ExpMeta, ExpData
20 20
21 21 host_mongo = os.environ.get('HOST_MONGO', 'localhost')
22 22 mongoengine.connect('dbplots', host=host_mongo, port=27017)
23 23
24 24 host_redis = os.environ.get('HOST_REDIS', 'localhost')
25 25 channel = asgi_redis.RedisChannelLayer(hosts=[(host_redis, 6379)])
26 #========================== Conf. ZeroMQ ===================
27 context = zmq.Context()
28 receiver = context.socket(zmq.REP)
29 receiver.bind("tcp://0.0.0.0:4444")
30 #============================== end ========================
31 26
32 27 def loaddata():
33 28 print('Loading Experiments...')
34 29 if os.environ.get('APP_DIR', None) is not None:
35 30 file_exp = os.path.join(os.environ.get('APP_DIR'), 'scripts', 'experiments.json')
36 31 else:
37 32 file_exp = './experiments.json'
38 33 for tup in json.load(open(file_exp)):
39 34 print(tup['name'])
40 35 exp = Experiment.objects(code=tup['code']).modify(
41 36 upsert=True, # To add a new row
42 37 new=True,
43 38 set__code=tup['code'],
44 39 set__name=tup['name'],
45 40 )
46 41 exp.save()
47 42
48 43 #============== funcion para modificar datos en la tabla ==============
49 44 def update(buffer):
50 45 dt = datetime.utcfromtimestamp(buffer['time'])
46 print('Updating code={} date={} {}'.format(buffer['exp_code'], dt, datetime.now()))
51 47 exp = ExpMeta.objects(code=buffer['exp_code'], date=dt.date()).modify(
52 48 upsert=True, # To add a new row
53 49 new=True,
54 50 set__code=buffer['exp_code'],
55 51 set__date=dt.date(),
56 52 set__yrange = buffer['yrange'],
57 53 set__xrange = buffer['xrange'],
58 54 set__interval = buffer['interval'],
59 55 set__localtime = buffer['localtime'],
60 56 set__plots = buffer['data'].keys()
61 57 )
62 58 exp.save()
63 59
64 60 data = ExpData.objects(expmeta=exp, time=buffer['time']).modify(
65 61 upsert=True, # To add a new row
66 62 new=True,
67 63 set__expmeta = exp,
68 64 set__time = buffer['time'],
69 65 set__data = buffer['data']
70 66 )
71 67
72 68 data.save()
73 69
74 return exp.id
75 # Function that is checking the state of my clients every 20s
70 if datetime.now().date() == dt.date():
71 return True
72
73 return False
74
75 # Function that is checking the state of my clients every 30s
76 76 def check_times():
77 77
78 78 while True:
79 dt = datetime.utcnow()
79 dt = datetime.now()
80 80 exps = ExpMeta.objects(date=dt.date())
81 81
82 82 for exp in exps:
83 83 data = ExpData.objects(expmeta=exp).order_by('-time')[0] #me quedo con el ultimo time del ultimo valor
84 84
85 85 t = time.mktime(dt.timetuple())
86 86
87 87 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
88 88 t -= 5*60*60
89 89
90 90 if exp['localtime'] == True: #Consulto que tipode time me esta llegando: LT o UTC
91 91 data_time = data['time'] + 5*60*60
92 92
93 93 if (t-data['time']) > 6*exp['interval']:
94 94 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Offline', 'time': data_time})})
95 95 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'offline'))
96 96 elif (t-data['time']) > 3*exp['interval']:
97 97 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Delayed', 'time': data_time})})
98 98 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'delayed'))
99 99 else:
100 100 channel.send_group(u'main', {'text': json.dumps({'code': exp['code'], 'value': 'Online', 'time': data_time})})
101 101 print ('{} {} {} {} {}'.format(exp.code, t, data['time'], (t-data['time']), 'online'))
102 time.sleep(20)
102 time.sleep(30)
103 103
104 104 def main():
105 105 print('Starting ZMQ server...')
106
106 context = zmq.Context()
107 receiver = context.socket(zmq.REP)
108 receiver.bind("tcp://0.0.0.0:4444")
107 109 t = Thread(target=check_times)
108 110 t.start()
109 111
110 112 while True:
111 113
112 114 buffer = receiver.recv_json()
113 115
114 116 if buffer['localtime'] == True: # Ask which type of time is coming: LT o UTC
115 117 buffer['time'] -= 5*60*60
116 118
117 119 if not isinstance(buffer, dict):
118 120 print('*******************')
119 121 print(buffer)
120 122 continue
121 code = update(buffer)
123 if not update(buffer):
124 receiver.send_string('ok')
125 continue
122 126 print("==================================")
123 127 for plot in buffer['data']:
124 128 dum = buffer.copy()
125 129 dum['time'] = [buffer['time']]
126 130 if plot=='noise':
127 131 dum[plot] = [[x] for x in buffer['data'][plot]]
128 132 elif plot=='spc':
129 133 dum['noise'] = [[x] for x in buffer['data']['noise']]
130 134 dum['spc'] = buffer['data']['spc']
131 135 dum['rti'] = buffer['data']['rti']
132 136 else:
133 137 dum[plot] = buffer['data'][plot]
134 138 dum.pop('data')
135 139 exp_code = dum.pop('exp_code')
136 #code = dum.pop('exp_code')
137
138 channel.send_group(u'{}_{}'.format(exp_code, plot), {'text': simplejson.dumps(dum, ignore_nan=True)})
140 channel.send_group(
141 u'{}_{}'.format(exp_code, plot),
142 {'text': simplejson.dumps(dum, ignore_nan=True)}
143 )
139 144 print('Sending to group {}_{} - {} bytes'.format(exp_code, plot, len(str(dum))))
140 145
141 146 receiver.send_string('ok')
142 147
143 148 receiver.close()
144 149 context.term()
145 150
146 151 if __name__=='__main__':
147 152 loaddata()
148 153 main()
General Comments 0
You need to be logged in to leave comments. Login now