@@ -230,8 +230,7 class BLTRParamReader(JRODataReader, ProcessingUnit): | |||||
230 | self.sizeOfFile = os.path.getsize(self.filename) |
|
230 | self.sizeOfFile = os.path.getsize(self.filename) | |
231 | self.counter_records = 0 |
|
231 | self.counter_records = 0 | |
232 | self.flagIsNewFile = 0 |
|
232 | self.flagIsNewFile = 0 | |
233 | self.fileIndex += 1 |
|
233 | self.fileIndex += 1 | |
234 | time.sleep(2) |
|
|||
235 |
|
234 | |||
236 | return 1 |
|
235 | return 1 | |
237 |
|
236 |
@@ -1598,11 +1598,9 class JRODataWriter(JRODataIO): | |||||
1598 |
|
1598 | |||
1599 | self.basicHeaderObj.size = self.basicHeaderSize # bytes |
|
1599 | self.basicHeaderObj.size = self.basicHeaderSize # bytes | |
1600 | self.basicHeaderObj.version = self.versionFile |
|
1600 | self.basicHeaderObj.version = self.versionFile | |
1601 | self.basicHeaderObj.dataBlock = self.nTotalBlocks |
|
1601 | self.basicHeaderObj.dataBlock = self.nTotalBlocks | |
1602 | log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime)) |
|
|||
1603 | utc = numpy.floor(self.dataOut.utctime) |
|
1602 | utc = numpy.floor(self.dataOut.utctime) | |
1604 | milisecond = (self.dataOut.utctime - utc) * 1000.0 |
|
1603 | milisecond = (self.dataOut.utctime - utc) * 1000.0 | |
1605 | log.warning(milisecond) |
|
|||
1606 | self.basicHeaderObj.utc = utc |
|
1604 | self.basicHeaderObj.utc = utc | |
1607 | self.basicHeaderObj.miliSecond = milisecond |
|
1605 | self.basicHeaderObj.miliSecond = milisecond | |
1608 | self.basicHeaderObj.timeZone = self.dataOut.timeZone |
|
1606 | self.basicHeaderObj.timeZone = self.dataOut.timeZone |
@@ -1503,7 +1503,7 class ParameterWriter(Operation): | |||||
1503 | data.append((dsInfo['variable'], i)) |
|
1503 | data.append((dsInfo['variable'], i)) | |
1504 | fp.flush() |
|
1504 | fp.flush() | |
1505 |
|
1505 | |||
1506 |
log.log(' |
|
1506 | log.log('Creating file: {}'.format(fp.filename), self.name) | |
1507 |
|
1507 | |||
1508 | self.ds = dtsets |
|
1508 | self.ds = dtsets | |
1509 | self.data = data |
|
1509 | self.data = data | |
@@ -1526,6 +1526,7 class ParameterWriter(Operation): | |||||
1526 |
|
1526 | |||
1527 | self.fp.flush() |
|
1527 | self.fp.flush() | |
1528 | self.blockIndex += 1 |
|
1528 | self.blockIndex += 1 | |
|
1529 | log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) | |||
1529 |
|
1530 | |||
1530 | return |
|
1531 | return | |
1531 |
|
1532 |
@@ -17,6 +17,8 import inspect | |||||
17 | import zmq |
|
17 | import zmq | |
18 | import time |
|
18 | import time | |
19 | import pickle |
|
19 | import pickle | |
|
20 | from queue import Queue | |||
|
21 | from threading import Thread | |||
20 | from multiprocessing import Process |
|
22 | from multiprocessing import Process | |
21 | from zmq.utils.monitor import recv_monitor_message |
|
23 | from zmq.utils.monitor import recv_monitor_message | |
22 |
|
24 | |||
@@ -170,6 +172,32 class Operation(object): | |||||
170 |
|
172 | |||
171 | return |
|
173 | return | |
172 |
|
174 | |||
|
175 | class InputQueue(Thread): | |||
|
176 | ''' | |||
|
177 | Class to hold input data for Proccessing Units and external Operations, | |||
|
178 | ''' | |||
|
179 | ||||
|
180 | def __init__(self, project_id, inputId): | |||
|
181 | Thread.__init__(self) | |||
|
182 | self.queue = Queue() | |||
|
183 | self.project_id = project_id | |||
|
184 | self.inputId = inputId | |||
|
185 | ||||
|
186 | def run(self): | |||
|
187 | ||||
|
188 | c = zmq.Context() | |||
|
189 | self.receiver = c.socket(zmq.SUB) | |||
|
190 | self.receiver.connect( | |||
|
191 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |||
|
192 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |||
|
193 | ||||
|
194 | while True: | |||
|
195 | self.queue.put(self.receiver.recv_multipart()[1]) | |||
|
196 | ||||
|
197 | def get(self): | |||
|
198 | ||||
|
199 | return pickle.loads(self.queue.get()) | |||
|
200 | ||||
173 |
|
201 | |||
174 | def MPDecorator(BaseClass): |
|
202 | def MPDecorator(BaseClass): | |
175 | """ |
|
203 | """ | |
@@ -189,7 +217,7 def MPDecorator(BaseClass): | |||||
189 | self.kwargs = kwargs |
|
217 | self.kwargs = kwargs | |
190 | self.sender = None |
|
218 | self.sender = None | |
191 | self.receiver = None |
|
219 | self.receiver = None | |
192 |
self.i |
|
220 | self.i = 0 | |
193 | self.name = BaseClass.__name__ |
|
221 | self.name = BaseClass.__name__ | |
194 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
|
222 | if 'plot' in self.name.lower() and not self.name.endswith('_'): | |
195 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') |
|
223 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') | |
@@ -205,33 +233,23 def MPDecorator(BaseClass): | |||||
205 | self.inputId = args[0] |
|
233 | self.inputId = args[0] | |
206 | self.project_id = args[1] |
|
234 | self.project_id = args[1] | |
207 | self.typeProc = "Operation" |
|
235 | self.typeProc = "Operation" | |
208 |
|
|
236 | ||
209 | def fix_publish(self,valor,multiple1): |
|
237 | self.queue = InputQueue(self.project_id, self.inputId) | |
210 | return True if valor%multiple1 ==0 else False |
|
|||
211 |
|
238 | |||
212 | def subscribe(self): |
|
239 | def subscribe(self): | |
213 | ''' |
|
240 | ''' | |
214 | This function create a socket to receive objects from the |
|
241 | This function start the input queue. | |
215 | topic `inputId`. |
|
|||
216 | ''' |
|
242 | ''' | |
217 |
|
243 | |||
218 |
|
|
244 | self.queue.start() | |
219 | self.receiver = c.socket(zmq.SUB) |
|
245 | ||
220 | self.receiver.connect( |
|
|||
221 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) |
|
|||
222 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) |
|
|||
223 |
|
246 | |||
224 | def listen(self): |
|
247 | def listen(self): | |
225 | ''' |
|
248 | ''' | |
226 |
This function waits for objects |
|
249 | This function waits for objects | |
227 | ''' |
|
250 | ''' | |
228 | try: |
|
251 | ||
229 | data = pickle.loads(self.receiver.recv_multipart()[1]) |
|
252 | return self.queue.get() | |
230 | except zmq.ZMQError as e: |
|
|||
231 | if e.errno == zmq.ETERM: |
|
|||
232 | print (e.errno) |
|
|||
233 |
|
||||
234 | return data |
|
|||
235 |
|
253 | |||
236 | def set_publisher(self): |
|
254 | def set_publisher(self): | |
237 | ''' |
|
255 | ''' | |
@@ -247,13 +265,15 def MPDecorator(BaseClass): | |||||
247 | def publish(self, data, id): |
|
265 | def publish(self, data, id): | |
248 | ''' |
|
266 | ''' | |
249 | This function publish an object, to a specific topic. |
|
267 | This function publish an object, to a specific topic. | |
250 | The fix method only affect inputId None which is Read Unit |
|
268 | For Read Units (inputId == None) adds a little delay | |
251 | Use value between 64 80, you should notice a little retard in processing |
|
269 | to avoid data loss | |
252 | ''' |
|
270 | ''' | |
|
271 | ||||
253 | if self.inputId is None: |
|
272 | if self.inputId is None: | |
254 | self.i+=1 |
|
273 | self.i += 1 | |
255 |
if self. |
|
274 | if self.i % 100 == 0: | |
256 |
|
|
275 | self.i = 0 | |
|
276 | time.sleep(0.01) | |||
257 |
|
277 | |||
258 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
|
278 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | |
259 |
|
279 |
General Comments 0
You need to be logged in to leave comments.
Login now