@@ -63,7 +63,7 def getArgs(op): | |||
|
63 | 63 | def getDoc(obj): |
|
64 | 64 | module = locate('schainpy.model.{}'.format(obj)) |
|
65 | 65 | try: |
|
66 | obj = module(1,2,3,Queue(),5) | |
|
66 | obj = module(1,2,3,Queue(),5,6) | |
|
67 | 67 | except: |
|
68 | 68 | obj = module() |
|
69 | 69 | return obj.__doc__ |
@@ -148,8 +148,7 def search(nextcommand): | |||
|
148 | 148 | if nextcommand is None: |
|
149 | 149 | log.error('There is no Operation/ProcessingUnit to search', '') |
|
150 | 150 | else: |
|
151 |
|
|
|
152 | if True: | |
|
151 | try: | |
|
153 | 152 | args = getArgs(nextcommand) |
|
154 | 153 | doc = getDoc(nextcommand) |
|
155 | 154 | if len(args) == 0: |
@@ -157,11 +156,11 def search(nextcommand): | |||
|
157 | 156 | else: |
|
158 | 157 | log.success('{}\n{}\n\narguments:\n {}'.format( |
|
159 | 158 | nextcommand, doc, ', '.join(args)), '') |
|
160 |
|
|
|
161 |
|
|
|
162 |
|
|
|
163 |
|
|
|
164 |
|
|
|
159 | except Exception as e: | |
|
160 | log.error('Module `{}` does not exists'.format(nextcommand), '') | |
|
161 | allModules = getAll() | |
|
162 | similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80] | |
|
163 | log.success('Possible modules are: {}'.format(', '.join(similar)), '') | |
|
165 | 164 | |
|
166 | 165 | def runschain(nextcommand): |
|
167 | 166 | if nextcommand is None: |
@@ -11,7 +11,7 import traceback | |||
|
11 | 11 | import math |
|
12 | 12 | import time |
|
13 | 13 | import zmq |
|
14 | from multiprocessing import Process, Queue, cpu_count | |
|
14 | from multiprocessing import Process, Queue, Event, cpu_count | |
|
15 | 15 | from threading import Thread |
|
16 | 16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
17 | 17 | from xml.dom import minidom |
@@ -361,7 +361,7 class OperationConf(): | |||
|
361 | 361 | |
|
362 | 362 | return kwargs |
|
363 | 363 | |
|
364 | def setup(self, id, name, priority, type, project_id, err_queue): | |
|
364 | def setup(self, id, name, priority, type, project_id, err_queue, lock): | |
|
365 | 365 | |
|
366 | 366 | self.id = str(id) |
|
367 | 367 | self.project_id = project_id |
@@ -369,6 +369,7 class OperationConf(): | |||
|
369 | 369 | self.type = type |
|
370 | 370 | self.priority = priority |
|
371 | 371 | self.err_queue = err_queue |
|
372 | self.lock = lock | |
|
372 | 373 | self.parmConfObjList = [] |
|
373 | 374 | |
|
374 | 375 | def removeParameters(self): |
@@ -460,7 +461,7 class OperationConf(): | |||
|
460 | 461 | opObj = className() |
|
461 | 462 | elif self.type == 'external': |
|
462 | 463 | kwargs = self.getKwargs() |
|
463 | opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) | |
|
464 | opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs) | |
|
464 | 465 | opObj.start() |
|
465 | 466 | self.opObj = opObj |
|
466 | 467 | |
@@ -479,6 +480,7 class ProcUnitConf(): | |||
|
479 | 480 | self.opConfObjList = [] |
|
480 | 481 | self.procUnitObj = None |
|
481 | 482 | self.opObjDict = {} |
|
483 | self.mylock = Event() | |
|
482 | 484 | |
|
483 | 485 | def __getPriority(self): |
|
484 | 486 | |
@@ -550,7 +552,7 class ProcUnitConf(): | |||
|
550 | 552 | |
|
551 | 553 | return self.procUnitObj |
|
552 | 554 | |
|
553 | def setup(self, project_id, id, name, datatype, inputId, err_queue): | |
|
555 | def setup(self, project_id, id, name, datatype, inputId, err_queue, lock): | |
|
554 | 556 | ''' |
|
555 | 557 | id sera el topico a publicar |
|
556 | 558 | inputId sera el topico a subscribirse |
@@ -577,6 +579,7 class ProcUnitConf(): | |||
|
577 | 579 | self.datatype = datatype |
|
578 | 580 | self.inputId = inputId |
|
579 | 581 | self.err_queue = err_queue |
|
582 | self.lock = lock | |
|
580 | 583 | self.opConfObjList = [] |
|
581 | 584 | |
|
582 | 585 | self.addOperation(name='run', optype='self') |
@@ -610,7 +613,7 class ProcUnitConf(): | |||
|
610 | 613 | id = self.__getNewId() |
|
611 | 614 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
612 | 615 | opConfObj = OperationConf() |
|
613 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) | |
|
616 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock) | |
|
614 | 617 | self.opConfObjList.append(opConfObj) |
|
615 | 618 | |
|
616 | 619 | return opConfObj |
@@ -678,7 +681,7 class ProcUnitConf(): | |||
|
678 | 681 | |
|
679 | 682 | className = eval(self.name) |
|
680 | 683 | kwargs = self.getKwargs() |
|
681 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) | |
|
684 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs) | |
|
682 | 685 | log.success('creating process...', self.name) |
|
683 | 686 | |
|
684 | 687 | for opConfObj in self.opConfObjList: |
@@ -724,6 +727,7 class ReadUnitConf(ProcUnitConf): | |||
|
724 | 727 | self.name = None |
|
725 | 728 | self.inputId = None |
|
726 | 729 | self.opConfObjList = [] |
|
730 | self.mylock = Event() | |
|
727 | 731 | |
|
728 | 732 | def getElementName(self): |
|
729 | 733 | |
@@ -769,6 +773,7 class ReadUnitConf(ProcUnitConf): | |||
|
769 | 773 | self.endTime = endTime |
|
770 | 774 | self.server = server |
|
771 | 775 | self.err_queue = err_queue |
|
776 | self.lock = self.mylock | |
|
772 | 777 | self.addRunOperation(**kwargs) |
|
773 | 778 | |
|
774 | 779 | def update(self, **kwargs): |
@@ -989,7 +994,8 class Project(Process): | |||
|
989 | 994 | |
|
990 | 995 | idProcUnit = self.__getNewId() |
|
991 | 996 | procUnitConfObj = ProcUnitConf() |
|
992 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) | |
|
997 | input_proc = self.procUnitConfObjDict[inputId] | |
|
998 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock) | |
|
993 | 999 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
994 | 1000 | |
|
995 | 1001 | return procUnitConfObj |
@@ -13,6 +13,7 Based on: | |||
|
13 | 13 | ''' |
|
14 | 14 | |
|
15 | 15 | import os |
|
16 | import sys | |
|
16 | 17 | import inspect |
|
17 | 18 | import zmq |
|
18 | 19 | import time |
@@ -181,31 +182,40 class Operation(object): | |||
|
181 | 182 | |
|
182 | 183 | class InputQueue(Thread): |
|
183 | 184 | |
|
184 |
|
|
|
185 |
|
|
|
186 |
|
|
|
187 | ||
|
188 |
|
|
|
189 | ||
|
190 | Thread.__init__(self) | |
|
191 | self.queue = Queue() | |
|
192 | self.project_id = project_id | |
|
193 | self.inputId = inputId | |
|
194 | ||
|
195 | def run(self): | |
|
196 | ||
|
197 | c = zmq.Context() | |
|
198 | self.receiver = c.socket(zmq.SUB) | |
|
199 | self.receiver.connect( | |
|
200 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
201 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
202 | ||
|
203 | while True: | |
|
204 | self.queue.put(self.receiver.recv_multipart()[1]) | |
|
205 | ||
|
206 | def get(self): | |
|
185 | ''' | |
|
186 | Class to hold input data for Proccessing Units and external Operations, | |
|
187 | ''' | |
|
188 | ||
|
189 | def __init__(self, project_id, inputId, lock=None): | |
|
207 | 190 | |
|
208 | return pickle.loads(self.queue.get()) | |
|
191 | Thread.__init__(self) | |
|
192 | self.queue = Queue() | |
|
193 | self.project_id = project_id | |
|
194 | self.inputId = inputId | |
|
195 | self.lock = lock | |
|
196 | self.size = 0 | |
|
197 | ||
|
198 | def run(self): | |
|
199 | ||
|
200 | c = zmq.Context() | |
|
201 | self.receiver = c.socket(zmq.SUB) | |
|
202 | self.receiver.connect( | |
|
203 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |
|
204 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |
|
205 | ||
|
206 | while True: | |
|
207 | obj = self.receiver.recv_multipart()[1] | |
|
208 | self.size += sys.getsizeof(obj) | |
|
209 | self.queue.put(obj) | |
|
210 | ||
|
211 | def get(self): | |
|
212 | if self.size/1000000 > 2048: | |
|
213 | self.lock.clear() | |
|
214 | else: | |
|
215 | self.lock.set() | |
|
216 | obj = self.queue.get() | |
|
217 | self.size -= sys.getsizeof(obj) | |
|
218 | return pickle.loads(obj) | |
|
209 | 219 | |
|
210 | 220 | |
|
211 | 221 | def MPDecorator(BaseClass): |
@@ -239,9 +249,10 def MPDecorator(BaseClass): | |||
|
239 | 249 | self.inputId = args[1] |
|
240 | 250 | self.project_id = args[2] |
|
241 | 251 | self.err_queue = args[3] |
|
242 |
self. |
|
|
252 | self.lock = args[4] | |
|
253 | self.typeProc = args[5] | |
|
243 | 254 | self.err_queue.put('#_start_#') |
|
244 | self.queue = InputQueue(self.project_id, self.inputId) | |
|
255 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) | |
|
245 | 256 | |
|
246 | 257 | def subscribe(self): |
|
247 | 258 | ''' |
@@ -272,21 +283,11 def MPDecorator(BaseClass): | |||
|
272 | 283 | def publish(self, data, id): |
|
273 | 284 | ''' |
|
274 | 285 | This function publish an object, to an specific topic. |
|
275 | For Read Units (inputId == None) adds a little delay | |
|
276 | to avoid data loss | |
|
286 | It blocks publishing when receiver queue is full to avoid data loss | |
|
277 | 287 | ''' |
|
278 | 288 | |
|
279 | 289 | if self.inputId is None: |
|
280 |
self. |
|
|
281 | if self.i % 40 == 0 and time.time()-self.t > 0.1: | |
|
282 | self.i = 0 | |
|
283 | self.t = time.time() | |
|
284 | time.sleep(0.05) | |
|
285 | elif self.i % 40 == 0: | |
|
286 | self.i = 0 | |
|
287 | self.t = time.time() | |
|
288 | time.sleep(0.01) | |
|
289 | ||
|
290 | self.lock.wait() | |
|
290 | 291 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
|
291 | 292 | |
|
292 | 293 | def runReader(self): |
General Comments 0
You need to be logged in to leave comments.
Login now