##// END OF EJS Templates
Add input queues for processing units and external operations
Juan C. Espinoza -
r1235:fff8599e0346
parent child
Show More
@@ -230,8 +230,7 class BLTRParamReader(JRODataReader, ProcessingUnit):
230 230 self.sizeOfFile = os.path.getsize(self.filename)
231 231 self.counter_records = 0
232 232 self.flagIsNewFile = 0
233 self.fileIndex += 1
234 time.sleep(2)
233 self.fileIndex += 1
235 234
236 235 return 1
237 236
@@ -1598,11 +1598,9 class JRODataWriter(JRODataIO):
1598 1598
1599 1599 self.basicHeaderObj.size = self.basicHeaderSize # bytes
1600 1600 self.basicHeaderObj.version = self.versionFile
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1602 log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime))
1601 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1603 1602 utc = numpy.floor(self.dataOut.utctime)
1604 milisecond = (self.dataOut.utctime - utc) * 1000.0
1605 log.warning(milisecond)
1603 milisecond = (self.dataOut.utctime - utc) * 1000.0
1606 1604 self.basicHeaderObj.utc = utc
1607 1605 self.basicHeaderObj.miliSecond = milisecond
1608 1606 self.basicHeaderObj.timeZone = self.dataOut.timeZone
@@ -1503,7 +1503,7 class ParameterWriter(Operation):
1503 1503 data.append((dsInfo['variable'], i))
1504 1504 fp.flush()
1505 1505
1506 log.log('creating file: {}'.format(fp.filename), 'Writing')
1506 log.log('Creating file: {}'.format(fp.filename), self.name)
1507 1507
1508 1508 self.ds = dtsets
1509 1509 self.data = data
@@ -1526,6 +1526,7 class ParameterWriter(Operation):
1526 1526
1527 1527 self.fp.flush()
1528 1528 self.blockIndex += 1
1529 log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name)
1529 1530
1530 1531 return
1531 1532
@@ -17,6 +17,8 import inspect
17 17 import zmq
18 18 import time
19 19 import pickle
20 from queue import Queue
21 from threading import Thread
20 22 from multiprocessing import Process
21 23 from zmq.utils.monitor import recv_monitor_message
22 24
@@ -170,6 +172,32 class Operation(object):
170 172
171 173 return
172 174
175 class InputQueue(Thread):
176 '''
177 Class to hold input data for Proccessing Units and external Operations,
178 '''
179
180 def __init__(self, project_id, inputId):
181 Thread.__init__(self)
182 self.queue = Queue()
183 self.project_id = project_id
184 self.inputId = inputId
185
186 def run(self):
187
188 c = zmq.Context()
189 self.receiver = c.socket(zmq.SUB)
190 self.receiver.connect(
191 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
192 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
193
194 while True:
195 self.queue.put(self.receiver.recv_multipart()[1])
196
197 def get(self):
198
199 return pickle.loads(self.queue.get())
200
173 201
174 202 def MPDecorator(BaseClass):
175 203 """
@@ -189,7 +217,7 def MPDecorator(BaseClass):
189 217 self.kwargs = kwargs
190 218 self.sender = None
191 219 self.receiver = None
192 self.i = 0
220 self.i = 0
193 221 self.name = BaseClass.__name__
194 222 if 'plot' in self.name.lower() and not self.name.endswith('_'):
195 223 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
@@ -205,33 +233,23 def MPDecorator(BaseClass):
205 233 self.inputId = args[0]
206 234 self.project_id = args[1]
207 235 self.typeProc = "Operation"
208
209 def fix_publish(self,valor,multiple1):
210 return True if valor%multiple1 ==0 else False
236
237 self.queue = InputQueue(self.project_id, self.inputId)
211 238
212 239 def subscribe(self):
213 240 '''
214 This function create a socket to receive objects from the
215 topic `inputId`.
241 This function start the input queue.
216 242 '''
217
218 c = zmq.Context()
219 self.receiver = c.socket(zmq.SUB)
220 self.receiver.connect(
221 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
222 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
243
244 self.queue.start()
245
223 246
224 247 def listen(self):
225 248 '''
226 This function waits for objects and deserialize using pickle
249 This function waits for objects
227 250 '''
228 try:
229 data = pickle.loads(self.receiver.recv_multipart()[1])
230 except zmq.ZMQError as e:
231 if e.errno == zmq.ETERM:
232 print (e.errno)
233
234 return data
251
252 return self.queue.get()
235 253
236 254 def set_publisher(self):
237 255 '''
@@ -247,13 +265,15 def MPDecorator(BaseClass):
247 265 def publish(self, data, id):
248 266 '''
249 267 This function publish an object, to a specific topic.
250 The fix method only affect inputId None which is Read Unit
251 Use value between 64 80, you should notice a little retard in processing
268 For Read Units (inputId == None) adds a little delay
269 to avoid data loss
252 270 '''
271
253 272 if self.inputId is None:
254 self.i+=1
255 if self.fix_publish(self.i,80) == True:# value n
256 time.sleep(0.01)
273 self.i += 1
274 if self.i % 100 == 0:
275 self.i = 0
276 time.sleep(0.01)
257 277
258 278 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
259 279
General Comments 0
You need to be logged in to leave comments. Login now