@@ -63,7 +63,7 def getArgs(op): | |||||
63 | def getDoc(obj): |
|
63 | def getDoc(obj): | |
64 | module = locate('schainpy.model.{}'.format(obj)) |
|
64 | module = locate('schainpy.model.{}'.format(obj)) | |
65 | try: |
|
65 | try: | |
66 | obj = module(1,2,3,Queue(),5) |
|
66 | obj = module(1,2,3,Queue(),5,6) | |
67 | except: |
|
67 | except: | |
68 | obj = module() |
|
68 | obj = module() | |
69 | return obj.__doc__ |
|
69 | return obj.__doc__ | |
@@ -148,8 +148,7 def search(nextcommand): | |||||
148 | if nextcommand is None: |
|
148 | if nextcommand is None: | |
149 | log.error('There is no Operation/ProcessingUnit to search', '') |
|
149 | log.error('There is no Operation/ProcessingUnit to search', '') | |
150 | else: |
|
150 | else: | |
151 |
|
|
151 | try: | |
152 | if True: |
|
|||
153 | args = getArgs(nextcommand) |
|
152 | args = getArgs(nextcommand) | |
154 | doc = getDoc(nextcommand) |
|
153 | doc = getDoc(nextcommand) | |
155 | if len(args) == 0: |
|
154 | if len(args) == 0: | |
@@ -157,11 +156,11 def search(nextcommand): | |||||
157 | else: |
|
156 | else: | |
158 | log.success('{}\n{}\n\narguments:\n {}'.format( |
|
157 | log.success('{}\n{}\n\narguments:\n {}'.format( | |
159 | nextcommand, doc, ', '.join(args)), '') |
|
158 | nextcommand, doc, ', '.join(args)), '') | |
160 |
|
|
159 | except Exception as e: | |
161 |
|
|
160 | log.error('Module `{}` does not exists'.format(nextcommand), '') | |
162 |
|
|
161 | allModules = getAll() | |
163 |
|
|
162 | similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80] | |
164 |
|
|
163 | log.success('Possible modules are: {}'.format(', '.join(similar)), '') | |
165 |
|
164 | |||
166 | def runschain(nextcommand): |
|
165 | def runschain(nextcommand): | |
167 | if nextcommand is None: |
|
166 | if nextcommand is None: |
@@ -11,7 +11,7 import traceback | |||||
11 | import math |
|
11 | import math | |
12 | import time |
|
12 | import time | |
13 | import zmq |
|
13 | import zmq | |
14 | from multiprocessing import Process, Queue, cpu_count |
|
14 | from multiprocessing import Process, Queue, Event, cpu_count | |
15 | from threading import Thread |
|
15 | from threading import Thread | |
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring |
|
16 | from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring | |
17 | from xml.dom import minidom |
|
17 | from xml.dom import minidom | |
@@ -361,7 +361,7 class OperationConf(): | |||||
361 |
|
361 | |||
362 | return kwargs |
|
362 | return kwargs | |
363 |
|
363 | |||
364 | def setup(self, id, name, priority, type, project_id, err_queue): |
|
364 | def setup(self, id, name, priority, type, project_id, err_queue, lock): | |
365 |
|
365 | |||
366 | self.id = str(id) |
|
366 | self.id = str(id) | |
367 | self.project_id = project_id |
|
367 | self.project_id = project_id | |
@@ -369,6 +369,7 class OperationConf(): | |||||
369 | self.type = type |
|
369 | self.type = type | |
370 | self.priority = priority |
|
370 | self.priority = priority | |
371 | self.err_queue = err_queue |
|
371 | self.err_queue = err_queue | |
|
372 | self.lock = lock | |||
372 | self.parmConfObjList = [] |
|
373 | self.parmConfObjList = [] | |
373 |
|
374 | |||
374 | def removeParameters(self): |
|
375 | def removeParameters(self): | |
@@ -460,7 +461,7 class OperationConf(): | |||||
460 | opObj = className() |
|
461 | opObj = className() | |
461 | elif self.type == 'external': |
|
462 | elif self.type == 'external': | |
462 | kwargs = self.getKwargs() |
|
463 | kwargs = self.getKwargs() | |
463 | opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs) |
|
464 | opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs) | |
464 | opObj.start() |
|
465 | opObj.start() | |
465 | self.opObj = opObj |
|
466 | self.opObj = opObj | |
466 |
|
467 | |||
@@ -479,6 +480,7 class ProcUnitConf(): | |||||
479 | self.opConfObjList = [] |
|
480 | self.opConfObjList = [] | |
480 | self.procUnitObj = None |
|
481 | self.procUnitObj = None | |
481 | self.opObjDict = {} |
|
482 | self.opObjDict = {} | |
|
483 | self.mylock = Event() | |||
482 |
|
484 | |||
483 | def __getPriority(self): |
|
485 | def __getPriority(self): | |
484 |
|
486 | |||
@@ -550,7 +552,7 class ProcUnitConf(): | |||||
550 |
|
552 | |||
551 | return self.procUnitObj |
|
553 | return self.procUnitObj | |
552 |
|
554 | |||
553 | def setup(self, project_id, id, name, datatype, inputId, err_queue): |
|
555 | def setup(self, project_id, id, name, datatype, inputId, err_queue, lock): | |
554 | ''' |
|
556 | ''' | |
555 | id sera el topico a publicar |
|
557 | id sera el topico a publicar | |
556 | inputId sera el topico a subscribirse |
|
558 | inputId sera el topico a subscribirse | |
@@ -577,6 +579,7 class ProcUnitConf(): | |||||
577 | self.datatype = datatype |
|
579 | self.datatype = datatype | |
578 | self.inputId = inputId |
|
580 | self.inputId = inputId | |
579 | self.err_queue = err_queue |
|
581 | self.err_queue = err_queue | |
|
582 | self.lock = lock | |||
580 | self.opConfObjList = [] |
|
583 | self.opConfObjList = [] | |
581 |
|
584 | |||
582 | self.addOperation(name='run', optype='self') |
|
585 | self.addOperation(name='run', optype='self') | |
@@ -610,7 +613,7 class ProcUnitConf(): | |||||
610 | id = self.__getNewId() |
|
613 | id = self.__getNewId() | |
611 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse |
|
614 | priority = self.__getPriority() # Sin mucho sentido, pero puede usarse | |
612 | opConfObj = OperationConf() |
|
615 | opConfObj = OperationConf() | |
613 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue) |
|
616 | opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock) | |
614 | self.opConfObjList.append(opConfObj) |
|
617 | self.opConfObjList.append(opConfObj) | |
615 |
|
618 | |||
616 | return opConfObj |
|
619 | return opConfObj | |
@@ -678,7 +681,7 class ProcUnitConf(): | |||||
678 |
|
681 | |||
679 | className = eval(self.name) |
|
682 | className = eval(self.name) | |
680 | kwargs = self.getKwargs() |
|
683 | kwargs = self.getKwargs() | |
681 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs) |
|
684 | procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs) | |
682 | log.success('creating process...', self.name) |
|
685 | log.success('creating process...', self.name) | |
683 |
|
686 | |||
684 | for opConfObj in self.opConfObjList: |
|
687 | for opConfObj in self.opConfObjList: | |
@@ -724,6 +727,7 class ReadUnitConf(ProcUnitConf): | |||||
724 | self.name = None |
|
727 | self.name = None | |
725 | self.inputId = None |
|
728 | self.inputId = None | |
726 | self.opConfObjList = [] |
|
729 | self.opConfObjList = [] | |
|
730 | self.mylock = Event() | |||
727 |
|
731 | |||
728 | def getElementName(self): |
|
732 | def getElementName(self): | |
729 |
|
733 | |||
@@ -769,6 +773,7 class ReadUnitConf(ProcUnitConf): | |||||
769 | self.endTime = endTime |
|
773 | self.endTime = endTime | |
770 | self.server = server |
|
774 | self.server = server | |
771 | self.err_queue = err_queue |
|
775 | self.err_queue = err_queue | |
|
776 | self.lock = self.mylock | |||
772 | self.addRunOperation(**kwargs) |
|
777 | self.addRunOperation(**kwargs) | |
773 |
|
778 | |||
774 | def update(self, **kwargs): |
|
779 | def update(self, **kwargs): | |
@@ -989,7 +994,8 class Project(Process): | |||||
989 |
|
994 | |||
990 | idProcUnit = self.__getNewId() |
|
995 | idProcUnit = self.__getNewId() | |
991 | procUnitConfObj = ProcUnitConf() |
|
996 | procUnitConfObj = ProcUnitConf() | |
992 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue) |
|
997 | input_proc = self.procUnitConfObjDict[inputId] | |
|
998 | procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock) | |||
993 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj |
|
999 | self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj | |
994 |
|
1000 | |||
995 | return procUnitConfObj |
|
1001 | return procUnitConfObj |
@@ -13,6 +13,7 Based on: | |||||
13 | ''' |
|
13 | ''' | |
14 |
|
14 | |||
15 | import os |
|
15 | import os | |
|
16 | import sys | |||
16 | import inspect |
|
17 | import inspect | |
17 | import zmq |
|
18 | import zmq | |
18 | import time |
|
19 | import time | |
@@ -181,31 +182,40 class Operation(object): | |||||
181 |
|
182 | |||
182 | class InputQueue(Thread): |
|
183 | class InputQueue(Thread): | |
183 |
|
184 | |||
184 |
|
|
185 | ''' | |
185 |
|
|
186 | Class to hold input data for Proccessing Units and external Operations, | |
186 |
|
|
187 | ''' | |
187 |
|
188 | |||
188 |
|
|
189 | def __init__(self, project_id, inputId, lock=None): | |
189 |
|
||||
190 | Thread.__init__(self) |
|
|||
191 | self.queue = Queue() |
|
|||
192 | self.project_id = project_id |
|
|||
193 | self.inputId = inputId |
|
|||
194 |
|
||||
195 | def run(self): |
|
|||
196 |
|
||||
197 | c = zmq.Context() |
|
|||
198 | self.receiver = c.socket(zmq.SUB) |
|
|||
199 | self.receiver.connect( |
|
|||
200 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) |
|
|||
201 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) |
|
|||
202 |
|
||||
203 | while True: |
|
|||
204 | self.queue.put(self.receiver.recv_multipart()[1]) |
|
|||
205 |
|
||||
206 | def get(self): |
|
|||
207 |
|
190 | |||
208 | return pickle.loads(self.queue.get()) |
|
191 | Thread.__init__(self) | |
|
192 | self.queue = Queue() | |||
|
193 | self.project_id = project_id | |||
|
194 | self.inputId = inputId | |||
|
195 | self.lock = lock | |||
|
196 | self.size = 0 | |||
|
197 | ||||
|
198 | def run(self): | |||
|
199 | ||||
|
200 | c = zmq.Context() | |||
|
201 | self.receiver = c.socket(zmq.SUB) | |||
|
202 | self.receiver.connect( | |||
|
203 | 'ipc:///tmp/schain/{}_pub'.format(self.project_id)) | |||
|
204 | self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode()) | |||
|
205 | ||||
|
206 | while True: | |||
|
207 | obj = self.receiver.recv_multipart()[1] | |||
|
208 | self.size += sys.getsizeof(obj) | |||
|
209 | self.queue.put(obj) | |||
|
210 | ||||
|
211 | def get(self): | |||
|
212 | if self.size/1000000 > 2048: | |||
|
213 | self.lock.clear() | |||
|
214 | else: | |||
|
215 | self.lock.set() | |||
|
216 | obj = self.queue.get() | |||
|
217 | self.size -= sys.getsizeof(obj) | |||
|
218 | return pickle.loads(obj) | |||
209 |
|
219 | |||
210 |
|
220 | |||
211 | def MPDecorator(BaseClass): |
|
221 | def MPDecorator(BaseClass): | |
@@ -239,9 +249,10 def MPDecorator(BaseClass): | |||||
239 | self.inputId = args[1] |
|
249 | self.inputId = args[1] | |
240 | self.project_id = args[2] |
|
250 | self.project_id = args[2] | |
241 | self.err_queue = args[3] |
|
251 | self.err_queue = args[3] | |
242 |
self. |
|
252 | self.lock = args[4] | |
|
253 | self.typeProc = args[5] | |||
243 | self.err_queue.put('#_start_#') |
|
254 | self.err_queue.put('#_start_#') | |
244 | self.queue = InputQueue(self.project_id, self.inputId) |
|
255 | self.queue = InputQueue(self.project_id, self.inputId, self.lock) | |
245 |
|
256 | |||
246 | def subscribe(self): |
|
257 | def subscribe(self): | |
247 | ''' |
|
258 | ''' | |
@@ -272,21 +283,11 def MPDecorator(BaseClass): | |||||
272 | def publish(self, data, id): |
|
283 | def publish(self, data, id): | |
273 | ''' |
|
284 | ''' | |
274 | This function publish an object, to an specific topic. |
|
285 | This function publish an object, to an specific topic. | |
275 | For Read Units (inputId == None) adds a little delay |
|
286 | It blocks publishing when receiver queue is full to avoid data loss | |
276 | to avoid data loss |
|
|||
277 | ''' |
|
287 | ''' | |
278 |
|
288 | |||
279 | if self.inputId is None: |
|
289 | if self.inputId is None: | |
280 |
self. |
|
290 | self.lock.wait() | |
281 | if self.i % 40 == 0 and time.time()-self.t > 0.1: |
|
|||
282 | self.i = 0 |
|
|||
283 | self.t = time.time() |
|
|||
284 | time.sleep(0.05) |
|
|||
285 | elif self.i % 40 == 0: |
|
|||
286 | self.i = 0 |
|
|||
287 | self.t = time.time() |
|
|||
288 | time.sleep(0.01) |
|
|||
289 |
|
||||
290 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) |
|
291 | self.sender.send_multipart([str(id).encode(), pickle.dumps(data)]) | |
291 |
|
292 | |||
292 | def runReader(self): |
|
293 | def runReader(self): |
General Comments 0
You need to be logged in to leave comments.
Login now