##// END OF EJS Templates
Add input queues for processing units and external operations
Juan C. Espinoza -
r1235:fff8599e0346
parent child
Show More
@@ -231,7 +231,6 class BLTRParamReader(JRODataReader, ProcessingUnit):
231 self.counter_records = 0
231 self.counter_records = 0
232 self.flagIsNewFile = 0
232 self.flagIsNewFile = 0
233 self.fileIndex += 1
233 self.fileIndex += 1
234 time.sleep(2)
235
234
236 return 1
235 return 1
237
236
@@ -1599,10 +1599,8 class JRODataWriter(JRODataIO):
1599 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1599 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1600 self.basicHeaderObj.version = self.versionFile
1600 self.basicHeaderObj.version = self.versionFile
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1602 log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime))
1603 utc = numpy.floor(self.dataOut.utctime)
1602 utc = numpy.floor(self.dataOut.utctime)
1604 milisecond = (self.dataOut.utctime - utc) * 1000.0
1603 milisecond = (self.dataOut.utctime - utc) * 1000.0
1605 log.warning(milisecond)
1606 self.basicHeaderObj.utc = utc
1604 self.basicHeaderObj.utc = utc
1607 self.basicHeaderObj.miliSecond = milisecond
1605 self.basicHeaderObj.miliSecond = milisecond
1608 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1606 self.basicHeaderObj.timeZone = self.dataOut.timeZone
@@ -1503,7 +1503,7 class ParameterWriter(Operation):
1503 data.append((dsInfo['variable'], i))
1503 data.append((dsInfo['variable'], i))
1504 fp.flush()
1504 fp.flush()
1505
1505
1506 log.log('creating file: {}'.format(fp.filename), 'Writing')
1506 log.log('Creating file: {}'.format(fp.filename), self.name)
1507
1507
1508 self.ds = dtsets
1508 self.ds = dtsets
1509 self.data = data
1509 self.data = data
@@ -1526,6 +1526,7 class ParameterWriter(Operation):
1526
1526
1527 self.fp.flush()
1527 self.fp.flush()
1528 self.blockIndex += 1
1528 self.blockIndex += 1
1529 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1529
1530
1530 return
1531 return
1531
1532
@@ -17,6 +17,8 import inspect
17 import zmq
17 import zmq
18 import time
18 import time
19 import pickle
19 import pickle
20 from queue import Queue
21 from threading import Thread
20 from multiprocessing import Process
22 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
23 from zmq.utils.monitor import recv_monitor_message
22
24
@@ -170,6 +172,32 class Operation(object):
170
172
171 return
173 return
172
174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
173
201
174 def MPDecorator(BaseClass):
202 def MPDecorator(BaseClass):
175 """
203 """
@@ -206,32 +234,22 def MPDecorator(BaseClass):
206 self.project_id = args[1]
234 self.project_id = args[1]
207 self.typeProc = "Operation"
235 self.typeProc = "Operation"
208
236
209 def fix_publish(self,valor,multiple1):
237 self.queue = InputQueue(self.project_id, self.inputId)
210 return True if valor%multiple1 ==0 else False
211
238
212 def subscribe(self):
239 def subscribe(self):
213 '''
240 '''
214 This function create a socket to receive objects from the
241 This function start the input queue.
215 topic `inputId`.
216 '''
242 '''
217
243
218 c = zmq.Context()
244 self.queue.start()
219 self.receiver = c.socket(zmq.SUB)
245
220 self.receiver.connect(
221 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
222 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
223
246
224 def listen(self):
247 def listen(self):
225 '''
248 '''
226 This function waits for objects and deserialize using pickle
249 This function waits for objects
227 '''
250 '''
228 try:
229 data = pickle.loads(self.receiver.recv_multipart()[1])
230 except zmq.ZMQError as e:
231 if e.errno == zmq.ETERM:
232 print (e.errno)
233
251
234 return data
252 return self.queue.get()
235
253
236 def set_publisher(self):
254 def set_publisher(self):
237 '''
255 '''
@@ -247,12 +265,14 def MPDecorator(BaseClass):
247 def publish(self, data, id):
265 def publish(self, data, id):
248 '''
266 '''
249 This function publish an object, to a specific topic.
267 This function publish an object, to a specific topic.
250 The fix method only affect inputId None which is Read Unit
268 For Read Units (inputId == None) adds a little delay
251 Use value between 64 80, you should notice a little retard in processing
269 to avoid data loss
252 '''
270 '''
271
253 if self.inputId is None:
272 if self.inputId is None:
254 self.i+=1
273 self.i += 1
255 if self.fix_publish(self.i,80) == True:# value n
274 if self.i % 100 == 0:
275 self.i = 0
256 time.sleep(0.01)
276 time.sleep(0.01)
257
277
258 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
278 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
General Comments 0
You need to be logged in to leave comments. Login now