@@ -230,8 +230,7 class BLTRParamReader(JRODataReader, ProcessingUnit): | |||
|
230 | 230 | self.sizeOfFile = os.path.getsize(self.filename) |
|
231 | 231 | self.counter_records = 0 |
|
232 | 232 | self.flagIsNewFile = 0 |
|
233 | self.fileIndex += 1 | |
|
234 | time.sleep(2) | |
|
233 | self.fileIndex += 1 | |
|
235 | 234 | |
|
236 | 235 | return 1 |
|
237 | 236 |
@@ -1598,11 +1598,9 class JRODataWriter(JRODataIO): | |||
|
1598 | 1598 | |
|
1599 | 1599 | self.basicHeaderObj.size = self.basicHeaderSize # bytes |
|
1600 | 1600 | self.basicHeaderObj.version = self.versionFile |
|
1601 | self.basicHeaderObj.dataBlock = self.nTotalBlocks | |
|
1602 | log.warning(datetime.datetime.fromtimestamp(self.dataOut.utctime)) | |
|
1601 | self.basicHeaderObj.dataBlock = self.nTotalBlocks | |
|
1603 | 1602 | utc = numpy.floor(self.dataOut.utctime) |
|
1604 | milisecond = (self.dataOut.utctime - utc) * 1000.0 | |
|
1605 | log.warning(milisecond) | |
|
1603 | milisecond = (self.dataOut.utctime - utc) * 1000.0 | |
|
1606 | 1604 | self.basicHeaderObj.utc = utc |
|
1607 | 1605 | self.basicHeaderObj.miliSecond = milisecond |
|
1608 | 1606 | self.basicHeaderObj.timeZone = self.dataOut.timeZone |
@@ -1503,7 +1503,7 class ParameterWriter(Operation): | |||
|
1503 | 1503 | data.append((dsInfo['variable'], i)) |
|
1504 | 1504 | fp.flush() |
|
1505 | 1505 | |
|
1506 |
log.log(' |
|
|
1506 | log.log('Creating file: {}'.format(fp.filename), self.name) | |
|
1507 | 1507 | |
|
1508 | 1508 | self.ds = dtsets |
|
1509 | 1509 | self.data = data |
@@ -1526,6 +1526,7 class ParameterWriter(Operation): | |||
|
1526 | 1526 | |
|
1527 | 1527 | self.fp.flush() |
|
1528 | 1528 | self.blockIndex += 1 |
|
1529 | log.log('Block No. {}/{}'.format(self.blockIndex, self.blocksPerFile), self.name) | |
|
1529 | 1530 | |
|
1530 | 1531 | return |
|
1531 | 1532 |
@@ -17,6 +17,8 import inspect | |||
|
17 | 17 | import zmq |
|
18 | 18 | import time |
|
19 | 19 | import pickle |
|
20 | from queue import Queue | |
|
21 | from threading import Thread | |
|
20 | 22 | from multiprocessing import Process |
|
21 | 23 | from zmq.utils.monitor import recv_monitor_message |
|
22 | 24 | |
@@ -170,6 +172,32 class Operation(object): | |||
|
170 | 172 | |
|
171 | 173 | return |
|
172 | 174 | |
|
175 | class InputQueue(Thread): | |
|
176 | ''' | |
|
177 | Class to hold input data for Proccessing Units and external Operations, | |
|
178 | ''' | |
|
179 | ||
|
180 | def __init__(self, project_id, inputId): | |
|
181 | Thread.__init__(self) | |
|
182 | self.queue = Queue() | |
|
183 | self.project_id = project_id | |
|
184 | self.inputId = inputId | |
|
185 | ||
|
186 | def run(self): | |
|
187 | ||
|
188 | c = zmq.Context() | |
|
189 | self.receiver = c.socket(zmq.SUB) | |
|
190 | self.receiver.connect( | |
|
191 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
192 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
193 | ||
|
194 | while True: | |
|
195 | self.queue.put(self.receiver.recv_multipart()[1]) | |
|
196 | ||
|
197 | def get(self): | |
|
198 | ||
|
199 | return pickle.loads(self.queue.get()) | |
|
200 | ||
|
173 | 201 | |
|
174 | 202 | def MPDecorator(BaseClass): |
|
175 | 203 | """ |
@@ -189,7 +217,7 def MPDecorator(BaseClass): | |||
|
189 | 217 | self.kwargs = kwargs |
|
190 | 218 | self.sender = None |
|
191 | 219 | self.receiver = None |
|
192 |
self.i |
|
|
220 | self.i = 0 | |
|
193 | 221 | self.name = BaseClass.__name__ |
|
194 | 222 | if 'plot' in self.name.lower() and not self.name.endswith('_'): |
|
195 | 223 | self.name = '{}{}'.format(self.CODE.upper(), 'Plot') |
@@ -205,33 +233,23 def MPDecorator(BaseClass): | |||
|
205 | 233 | self.inputId = args[0] |
|
206 | 234 | self.project_id = args[1] |
|
207 | 235 | self.typeProc = "Operation" |
|
208 |
|
|
|
209 | def fix_publish(self,valor,multiple1): | |
|
210 | return True if valor%multiple1 ==0 else False | |
|
236 | ||
|
237 | self.queue = InputQueue(self.project_id, self.inputId) | |
|
211 | 238 | |
|
212 | 239 | def subscribe(self): |
|
213 | 240 | ''' |
|
214 | This function create a socket to receive objects from the | |
|
215 | topic `inputId`. | |
|
241 | This function start the input queue. | |
|
216 | 242 | ''' |
|
217 | ||
|
218 |
|
|
|
219 | self.receiver = c.socket(zmq.SUB) | |
|
220 | self.receiver.connect( | |
|
221 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
222 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
243 | ||
|
244 | self.queue.start() | |
|
245 | ||
|
223 | 246 | |
|
224 | 247 | def listen(self): |
|
225 | 248 | ''' |
|
226 |
This function waits for objects |
|
|
249 | This function waits for objects | |
|
227 | 250 | ''' |
|
228 | try: | |
|
229 | data = pickle.loads(self.receiver.recv_multipart()[1]) | |
|
230 | except zmq.ZMQError as e: | |
|
231 | if e.errno == zmq.ETERM: | |
|
232 | print (e.errno) | |
|
233 | ||
|
234 | return data | |
|
251 | ||
|
252 | return self.queue.get() | |
|
235 | 253 | |
|
236 | 254 | def set_publisher(self): |
|
237 | 255 | ''' |
@@ -247,13 +265,15 def MPDecorator(BaseClass): | |||
|
247 | 265 | def publish(self, data, id): |
|
248 | 266 | ''' |
|
249 | 267 | This function publish an object, to a specific topic. |
|
250 | The fix method only affect inputId None which is Read Unit | |
|
251 | Use value between 64 80, you should notice a little retard in processing | |
|
268 | For Read Units (inputId == None) adds a little delay | |
|
269 | to avoid data loss | |
|
252 | 270 | ''' |
|
271 | ||
|
253 | 272 | if self.inputId is None: |
|
254 | self.i+=1 | |
|
255 |
if self. |
|
|
256 |
|
|
|
273 | self.i += 1 | |
|
274 | if self.i % 100 == 0: | |
|
275 | self.i = 0 | |
|
276 | time.sleep(0.01) | |
|
257 | 277 | |
|
258 | 278 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
|
259 | 279 |
General Comments 0
You need to be logged in to leave comments.
Login now