##// END OF EJS Templates
Block publishing when input queues are full to avoid data loss
jespinoza -
r1256:7ed2e4983ab9
parent child
Show More
@@ -1,237 +1,236
1 import click
1 import click
2 import schainpy
2 import schainpy
3 import subprocess
3 import subprocess
4 import os
4 import os
5 import sys
5 import sys
6 import glob
6 import glob
7 save_stdout = sys.stdout
7 save_stdout = sys.stdout
8 sys.stdout = open('/dev/null', 'w')
8 sys.stdout = open('/dev/null', 'w')
9 from multiprocessing import cpu_count
9 from multiprocessing import cpu_count
10 from schainpy.controller import Project
10 from schainpy.controller import Project
11 from schainpy.model import Operation, ProcessingUnit
11 from schainpy.model import Operation, ProcessingUnit
12 from schainpy.utils import log
12 from schainpy.utils import log
13 from importlib import import_module
13 from importlib import import_module
14 from pydoc import locate
14 from pydoc import locate
15 from fuzzywuzzy import process
15 from fuzzywuzzy import process
16 from schainpy.cli import templates
16 from schainpy.cli import templates
17 import inspect
17 import inspect
18 try:
18 try:
19 from queue import Queue
19 from queue import Queue
20 except:
20 except:
21 from Queue import Queue
21 from Queue import Queue
22 sys.stdout = save_stdout
22 sys.stdout = save_stdout
23
23
24
24
25 def getProcs():
25 def getProcs():
26 modules = dir(schainpy.model)
26 modules = dir(schainpy.model)
27 procs = check_module(modules, 'processing')
27 procs = check_module(modules, 'processing')
28 try:
28 try:
29 procs.remove('ProcessingUnit')
29 procs.remove('ProcessingUnit')
30 except Exception as e:
30 except Exception as e:
31 pass
31 pass
32 return procs
32 return procs
33
33
34 def getOperations():
34 def getOperations():
35 module = dir(schainpy.model)
35 module = dir(schainpy.model)
36 noProcs = [x for x in module if not x.endswith('Proc')]
36 noProcs = [x for x in module if not x.endswith('Proc')]
37 operations = check_module(noProcs, 'operation')
37 operations = check_module(noProcs, 'operation')
38 try:
38 try:
39 operations.remove('Operation')
39 operations.remove('Operation')
40 operations.remove('Figure')
40 operations.remove('Figure')
41 operations.remove('Plot')
41 operations.remove('Plot')
42 except Exception as e:
42 except Exception as e:
43 pass
43 pass
44 return operations
44 return operations
45
45
46 def getArgs(op):
46 def getArgs(op):
47 module = locate('schainpy.model.{}'.format(op))
47 module = locate('schainpy.model.{}'.format(op))
48
48
49 if hasattr(module, '__attrs__'):
49 if hasattr(module, '__attrs__'):
50 args = module.__attrs__
50 args = module.__attrs__
51 else:
51 else:
52 args = inspect.getargspec(module.run).args
52 args = inspect.getargspec(module.run).args
53 try:
53 try:
54 args.remove('self')
54 args.remove('self')
55 except Exception as e:
55 except Exception as e:
56 pass
56 pass
57 try:
57 try:
58 args.remove('dataOut')
58 args.remove('dataOut')
59 except Exception as e:
59 except Exception as e:
60 pass
60 pass
61 return args
61 return args
62
62
63 def getDoc(obj):
63 def getDoc(obj):
64 module = locate('schainpy.model.{}'.format(obj))
64 module = locate('schainpy.model.{}'.format(obj))
65 try:
65 try:
66 obj = module(1,2,3,Queue(),5)
66 obj = module(1,2,3,Queue(),5,6)
67 except:
67 except:
68 obj = module()
68 obj = module()
69 return obj.__doc__
69 return obj.__doc__
70
70
71 def getAll():
71 def getAll():
72 modules = getOperations()
72 modules = getOperations()
73 modules.extend(getProcs())
73 modules.extend(getProcs())
74 return modules
74 return modules
75
75
76
76
77 def print_version(ctx, param, value):
77 def print_version(ctx, param, value):
78 if not value or ctx.resilient_parsing:
78 if not value or ctx.resilient_parsing:
79 return
79 return
80 click.echo(schainpy.__version__)
80 click.echo(schainpy.__version__)
81 ctx.exit()
81 ctx.exit()
82
82
83
83
84 PREFIX = 'experiment'
84 PREFIX = 'experiment'
85
85
86 @click.command()
86 @click.command()
87 @click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str)
87 @click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str)
88 @click.argument('command', default='run', required=True)
88 @click.argument('command', default='run', required=True)
89 @click.argument('nextcommand', default=None, required=False, type=str)
89 @click.argument('nextcommand', default=None, required=False, type=str)
90 def main(command, nextcommand, version):
90 def main(command, nextcommand, version):
91 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
91 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
92 Available commands.\n
92 Available commands.\n
93 xml: runs a schain XML generated file\n
93 xml: runs a schain XML generated file\n
94 run: runs any python script starting 'experiment_'\n
94 run: runs any python script starting 'experiment_'\n
95 generate: generates a template schain script\n
95 generate: generates a template schain script\n
96 list: return a list of available procs and operations\n
96 list: return a list of available procs and operations\n
97 search: return avilable operations, procs or arguments of the given
97 search: return avilable operations, procs or arguments of the given
98 operation/proc\n"""
98 operation/proc\n"""
99 if command == 'xml':
99 if command == 'xml':
100 runFromXML(nextcommand)
100 runFromXML(nextcommand)
101 elif command == 'generate':
101 elif command == 'generate':
102 generate()
102 generate()
103 elif command == 'test':
103 elif command == 'test':
104 test()
104 test()
105 elif command == 'run':
105 elif command == 'run':
106 runschain(nextcommand)
106 runschain(nextcommand)
107 elif command == 'search':
107 elif command == 'search':
108 search(nextcommand)
108 search(nextcommand)
109 elif command == 'list':
109 elif command == 'list':
110 cmdlist(nextcommand)
110 cmdlist(nextcommand)
111 else:
111 else:
112 log.error('Command {} is not defined'.format(command))
112 log.error('Command {} is not defined'.format(command))
113
113
114
114
115 def check_module(possible, instance):
115 def check_module(possible, instance):
116 def check(x):
116 def check(x):
117 try:
117 try:
118 instancia = locate('schainpy.model.{}'.format(x))
118 instancia = locate('schainpy.model.{}'.format(x))
119 ret = instancia.proc_type == instance
119 ret = instancia.proc_type == instance
120 return ret
120 return ret
121 except Exception as e:
121 except Exception as e:
122 return False
122 return False
123 clean = clean_modules(possible)
123 clean = clean_modules(possible)
124 return [x for x in clean if check(x)]
124 return [x for x in clean if check(x)]
125
125
126
126
127 def clean_modules(module):
127 def clean_modules(module):
128 noEndsUnder = [x for x in module if not x.endswith('__')]
128 noEndsUnder = [x for x in module if not x.endswith('__')]
129 noStartUnder = [x for x in noEndsUnder if not x.startswith('__')]
129 noStartUnder = [x for x in noEndsUnder if not x.startswith('__')]
130 noFullUpper = [x for x in noStartUnder if not x.isupper()]
130 noFullUpper = [x for x in noStartUnder if not x.isupper()]
131 return noFullUpper
131 return noFullUpper
132
132
133 def cmdlist(nextcommand):
133 def cmdlist(nextcommand):
134 if nextcommand is None:
134 if nextcommand is None:
135 log.error('Missing argument, available arguments: procs, operations', '')
135 log.error('Missing argument, available arguments: procs, operations', '')
136 elif nextcommand == 'procs':
136 elif nextcommand == 'procs':
137 procs = getProcs()
137 procs = getProcs()
138 log.success(
138 log.success(
139 'Current ProcessingUnits are:\n {}'.format('\n '.join(procs)), '')
139 'Current ProcessingUnits are:\n {}'.format('\n '.join(procs)), '')
140 elif nextcommand == 'operations':
140 elif nextcommand == 'operations':
141 operations = getOperations()
141 operations = getOperations()
142 log.success('Current Operations are:\n {}'.format(
142 log.success('Current Operations are:\n {}'.format(
143 '\n '.join(operations)), '')
143 '\n '.join(operations)), '')
144 else:
144 else:
145 log.error('Wrong argument', '')
145 log.error('Wrong argument', '')
146
146
147 def search(nextcommand):
147 def search(nextcommand):
148 if nextcommand is None:
148 if nextcommand is None:
149 log.error('There is no Operation/ProcessingUnit to search', '')
149 log.error('There is no Operation/ProcessingUnit to search', '')
150 else:
150 else:
151 #try:
151 try:
152 if True:
153 args = getArgs(nextcommand)
152 args = getArgs(nextcommand)
154 doc = getDoc(nextcommand)
153 doc = getDoc(nextcommand)
155 if len(args) == 0:
154 if len(args) == 0:
156 log.success('\n{} has no arguments'.format(nextcommand), '')
155 log.success('\n{} has no arguments'.format(nextcommand), '')
157 else:
156 else:
158 log.success('{}\n{}\n\narguments:\n {}'.format(
157 log.success('{}\n{}\n\narguments:\n {}'.format(
159 nextcommand, doc, ', '.join(args)), '')
158 nextcommand, doc, ', '.join(args)), '')
160 # except Exception as e:
159 except Exception as e:
161 # log.error('Module `{}` does not exists'.format(nextcommand), '')
160 log.error('Module `{}` does not exists'.format(nextcommand), '')
162 # allModules = getAll()
161 allModules = getAll()
163 # similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
162 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
164 # log.success('Possible modules are: {}'.format(', '.join(similar)), '')
163 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
165
164
166 def runschain(nextcommand):
165 def runschain(nextcommand):
167 if nextcommand is None:
166 if nextcommand is None:
168 currentfiles = glob.glob('./{}_*.py'.format(PREFIX))
167 currentfiles = glob.glob('./{}_*.py'.format(PREFIX))
169 numberfiles = len(currentfiles)
168 numberfiles = len(currentfiles)
170 if numberfiles > 1:
169 if numberfiles > 1:
171 log.error('There is more than one file to run')
170 log.error('There is more than one file to run')
172 elif numberfiles == 1:
171 elif numberfiles == 1:
173 subprocess.call(['python ' + currentfiles[0]], shell=True)
172 subprocess.call(['python ' + currentfiles[0]], shell=True)
174 else:
173 else:
175 log.error('There is no file to run')
174 log.error('There is no file to run')
176 else:
175 else:
177 try:
176 try:
178 subprocess.call(['python ' + nextcommand], shell=True)
177 subprocess.call(['python ' + nextcommand], shell=True)
179 except Exception as e:
178 except Exception as e:
180 log.error("I cannot run the file. Does it exists?")
179 log.error("I cannot run the file. Does it exists?")
181
180
182
181
183 def basicInputs():
182 def basicInputs():
184 inputs = {}
183 inputs = {}
185 inputs['name'] = click.prompt(
184 inputs['name'] = click.prompt(
186 'Name of the project', default="project", type=str)
185 'Name of the project', default="project", type=str)
187 inputs['desc'] = click.prompt(
186 inputs['desc'] = click.prompt(
188 'Enter a description', default="A schain project", type=str)
187 'Enter a description', default="A schain project", type=str)
189 inputs['multiprocess'] = click.prompt(
188 inputs['multiprocess'] = click.prompt(
190 '''Select data type:
189 '''Select data type:
191
190
192 - Voltage (*.r): [1]
191 - Voltage (*.r): [1]
193 - Spectra (*.pdata): [2]
192 - Spectra (*.pdata): [2]
194 - Voltage and Spectra (*.r): [3]
193 - Voltage and Spectra (*.r): [3]
195
194
196 -->''', type=int)
195 -->''', type=int)
197 inputs['path'] = click.prompt('Data path', default=os.getcwd(
196 inputs['path'] = click.prompt('Data path', default=os.getcwd(
198 ), type=click.Path(exists=True, resolve_path=True))
197 ), type=click.Path(exists=True, resolve_path=True))
199 inputs['startDate'] = click.prompt(
198 inputs['startDate'] = click.prompt(
200 'Start date', default='1970/01/01', type=str)
199 'Start date', default='1970/01/01', type=str)
201 inputs['endDate'] = click.prompt(
200 inputs['endDate'] = click.prompt(
202 'End date', default='2018/12/31', type=str)
201 'End date', default='2018/12/31', type=str)
203 inputs['startHour'] = click.prompt(
202 inputs['startHour'] = click.prompt(
204 'Start hour', default='00:00:00', type=str)
203 'Start hour', default='00:00:00', type=str)
205 inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str)
204 inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str)
206 inputs['figpath'] = inputs['path'] + '/figs'
205 inputs['figpath'] = inputs['path'] + '/figs'
207 return inputs
206 return inputs
208
207
209
208
210 def generate():
209 def generate():
211 inputs = basicInputs()
210 inputs = basicInputs()
212
211
213 if inputs['multiprocess'] == 1:
212 if inputs['multiprocess'] == 1:
214 current = templates.voltage.format(**inputs)
213 current = templates.voltage.format(**inputs)
215 elif inputs['multiprocess'] == 2:
214 elif inputs['multiprocess'] == 2:
216 current = templates.spectra.format(**inputs)
215 current = templates.spectra.format(**inputs)
217 elif inputs['multiprocess'] == 3:
216 elif inputs['multiprocess'] == 3:
218 current = templates.voltagespectra.format(**inputs)
217 current = templates.voltagespectra.format(**inputs)
219 scriptname = '{}_{}.py'.format(PREFIX, inputs['name'])
218 scriptname = '{}_{}.py'.format(PREFIX, inputs['name'])
220 script = open(scriptname, 'w')
219 script = open(scriptname, 'w')
221 try:
220 try:
222 script.write(current)
221 script.write(current)
223 log.success('Script {} generated'.format(scriptname))
222 log.success('Script {} generated'.format(scriptname))
224 except Exception as e:
223 except Exception as e:
225 log.error('I cannot create the file. Do you have writing permissions?')
224 log.error('I cannot create the file. Do you have writing permissions?')
226
225
227
226
228 def test():
227 def test():
229 log.warning('testing')
228 log.warning('testing')
230
229
231
230
232 def runFromXML(filename):
231 def runFromXML(filename):
233 controller = Project()
232 controller = Project()
234 if not controller.readXml(filename):
233 if not controller.readXml(filename):
235 return
234 return
236 controller.start()
235 controller.start()
237 return
236 return
@@ -1,1284 +1,1290
1 '''
1 '''
2 Updated on January , 2018, for multiprocessing purposes
2 Updated on January , 2018, for multiprocessing purposes
3 Author: Sergio Cortez
3 Author: Sergio Cortez
4 Created on September , 2012
4 Created on September , 2012
5 '''
5 '''
6 from platform import python_version
6 from platform import python_version
7 import sys
7 import sys
8 import ast
8 import ast
9 import datetime
9 import datetime
10 import traceback
10 import traceback
11 import math
11 import math
12 import time
12 import time
13 import zmq
13 import zmq
14 from multiprocessing import Process, Queue, cpu_count
14 from multiprocessing import Process, Queue, Event, cpu_count
15 from threading import Thread
15 from threading import Thread
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
16 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
17 from xml.dom import minidom
17 from xml.dom import minidom
18
18
19
19
20 from schainpy.admin import Alarm, SchainWarning
20 from schainpy.admin import Alarm, SchainWarning
21 from schainpy.model import *
21 from schainpy.model import *
22 from schainpy.utils import log
22 from schainpy.utils import log
23
23
24
24
25 DTYPES = {
25 DTYPES = {
26 'Voltage': '.r',
26 'Voltage': '.r',
27 'Spectra': '.pdata'
27 'Spectra': '.pdata'
28 }
28 }
29
29
30
30
31 def MPProject(project, n=cpu_count()):
31 def MPProject(project, n=cpu_count()):
32 '''
32 '''
33 Project wrapper to run schain in n processes
33 Project wrapper to run schain in n processes
34 '''
34 '''
35
35
36 rconf = project.getReadUnitObj()
36 rconf = project.getReadUnitObj()
37 op = rconf.getOperationObj('run')
37 op = rconf.getOperationObj('run')
38 dt1 = op.getParameterValue('startDate')
38 dt1 = op.getParameterValue('startDate')
39 dt2 = op.getParameterValue('endDate')
39 dt2 = op.getParameterValue('endDate')
40 tm1 = op.getParameterValue('startTime')
40 tm1 = op.getParameterValue('startTime')
41 tm2 = op.getParameterValue('endTime')
41 tm2 = op.getParameterValue('endTime')
42 days = (dt2 - dt1).days
42 days = (dt2 - dt1).days
43
43
44 for day in range(days + 1):
44 for day in range(days + 1):
45 skip = 0
45 skip = 0
46 cursor = 0
46 cursor = 0
47 processes = []
47 processes = []
48 dt = dt1 + datetime.timedelta(day)
48 dt = dt1 + datetime.timedelta(day)
49 dt_str = dt.strftime('%Y/%m/%d')
49 dt_str = dt.strftime('%Y/%m/%d')
50 reader = JRODataReader()
50 reader = JRODataReader()
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
51 paths, files = reader.searchFilesOffLine(path=rconf.path,
52 startDate=dt,
52 startDate=dt,
53 endDate=dt,
53 endDate=dt,
54 startTime=tm1,
54 startTime=tm1,
55 endTime=tm2,
55 endTime=tm2,
56 ext=DTYPES[rconf.datatype])
56 ext=DTYPES[rconf.datatype])
57 nFiles = len(files)
57 nFiles = len(files)
58 if nFiles == 0:
58 if nFiles == 0:
59 continue
59 continue
60 skip = int(math.ceil(nFiles / n))
60 skip = int(math.ceil(nFiles / n))
61 while nFiles > cursor * skip:
61 while nFiles > cursor * skip:
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 skip=skip)
63 skip=skip)
64 p = project.clone()
64 p = project.clone()
65 p.start()
65 p.start()
66 processes.append(p)
66 processes.append(p)
67 cursor += 1
67 cursor += 1
68
68
69 def beforeExit(exctype, value, trace):
69 def beforeExit(exctype, value, trace):
70 for process in processes:
70 for process in processes:
71 process.terminate()
71 process.terminate()
72 process.join()
72 process.join()
73 print(traceback.print_tb(trace))
73 print(traceback.print_tb(trace))
74
74
75 sys.excepthook = beforeExit
75 sys.excepthook = beforeExit
76
76
77 for process in processes:
77 for process in processes:
78 process.join()
78 process.join()
79 process.terminate()
79 process.terminate()
80
80
81 time.sleep(3)
81 time.sleep(3)
82
82
83 def wait(context):
83 def wait(context):
84
84
85 time.sleep(1)
85 time.sleep(1)
86 c = zmq.Context()
86 c = zmq.Context()
87 receiver = c.socket(zmq.SUB)
87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 msg = receiver.recv_multipart()[1]
90 msg = receiver.recv_multipart()[1]
91 context.terminate()
91 context.terminate()
92
92
93 class ParameterConf():
93 class ParameterConf():
94
94
95 id = None
95 id = None
96 name = None
96 name = None
97 value = None
97 value = None
98 format = None
98 format = None
99
99
100 __formated_value = None
100 __formated_value = None
101
101
102 ELEMENTNAME = 'Parameter'
102 ELEMENTNAME = 'Parameter'
103
103
104 def __init__(self):
104 def __init__(self):
105
105
106 self.format = 'str'
106 self.format = 'str'
107
107
108 def getElementName(self):
108 def getElementName(self):
109
109
110 return self.ELEMENTNAME
110 return self.ELEMENTNAME
111
111
112 def getValue(self):
112 def getValue(self):
113
113
114 value = self.value
114 value = self.value
115 format = self.format
115 format = self.format
116
116
117 if self.__formated_value != None:
117 if self.__formated_value != None:
118
118
119 return self.__formated_value
119 return self.__formated_value
120
120
121 if format == 'obj':
121 if format == 'obj':
122 return value
122 return value
123
123
124 if format == 'str':
124 if format == 'str':
125 self.__formated_value = str(value)
125 self.__formated_value = str(value)
126 return self.__formated_value
126 return self.__formated_value
127
127
128 if value == '':
128 if value == '':
129 raise ValueError('%s: This parameter value is empty' % self.name)
129 raise ValueError('%s: This parameter value is empty' % self.name)
130
130
131 if format == 'list':
131 if format == 'list':
132 strList = [s.strip() for s in value.split(',')]
132 strList = [s.strip() for s in value.split(',')]
133 self.__formated_value = strList
133 self.__formated_value = strList
134
134
135 return self.__formated_value
135 return self.__formated_value
136
136
137 if format == 'intlist':
137 if format == 'intlist':
138 '''
138 '''
139 Example:
139 Example:
140 value = (0,1,2)
140 value = (0,1,2)
141 '''
141 '''
142
142
143 new_value = ast.literal_eval(value)
143 new_value = ast.literal_eval(value)
144
144
145 if type(new_value) not in (tuple, list):
145 if type(new_value) not in (tuple, list):
146 new_value = [int(new_value)]
146 new_value = [int(new_value)]
147
147
148 self.__formated_value = new_value
148 self.__formated_value = new_value
149
149
150 return self.__formated_value
150 return self.__formated_value
151
151
152 if format == 'floatlist':
152 if format == 'floatlist':
153 '''
153 '''
154 Example:
154 Example:
155 value = (0.5, 1.4, 2.7)
155 value = (0.5, 1.4, 2.7)
156 '''
156 '''
157
157
158 new_value = ast.literal_eval(value)
158 new_value = ast.literal_eval(value)
159
159
160 if type(new_value) not in (tuple, list):
160 if type(new_value) not in (tuple, list):
161 new_value = [float(new_value)]
161 new_value = [float(new_value)]
162
162
163 self.__formated_value = new_value
163 self.__formated_value = new_value
164
164
165 return self.__formated_value
165 return self.__formated_value
166
166
167 if format == 'date':
167 if format == 'date':
168 strList = value.split('/')
168 strList = value.split('/')
169 intList = [int(x) for x in strList]
169 intList = [int(x) for x in strList]
170 date = datetime.date(intList[0], intList[1], intList[2])
170 date = datetime.date(intList[0], intList[1], intList[2])
171
171
172 self.__formated_value = date
172 self.__formated_value = date
173
173
174 return self.__formated_value
174 return self.__formated_value
175
175
176 if format == 'time':
176 if format == 'time':
177 strList = value.split(':')
177 strList = value.split(':')
178 intList = [int(x) for x in strList]
178 intList = [int(x) for x in strList]
179 time = datetime.time(intList[0], intList[1], intList[2])
179 time = datetime.time(intList[0], intList[1], intList[2])
180
180
181 self.__formated_value = time
181 self.__formated_value = time
182
182
183 return self.__formated_value
183 return self.__formated_value
184
184
185 if format == 'pairslist':
185 if format == 'pairslist':
186 '''
186 '''
187 Example:
187 Example:
188 value = (0,1),(1,2)
188 value = (0,1),(1,2)
189 '''
189 '''
190
190
191 new_value = ast.literal_eval(value)
191 new_value = ast.literal_eval(value)
192
192
193 if type(new_value) not in (tuple, list):
193 if type(new_value) not in (tuple, list):
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
194 raise ValueError('%s has to be a tuple or list of pairs' % value)
195
195
196 if type(new_value[0]) not in (tuple, list):
196 if type(new_value[0]) not in (tuple, list):
197 if len(new_value) != 2:
197 if len(new_value) != 2:
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
198 raise ValueError('%s has to be a tuple or list of pairs' % value)
199 new_value = [new_value]
199 new_value = [new_value]
200
200
201 for thisPair in new_value:
201 for thisPair in new_value:
202 if len(thisPair) != 2:
202 if len(thisPair) != 2:
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
203 raise ValueError('%s has to be a tuple or list of pairs' % value)
204
204
205 self.__formated_value = new_value
205 self.__formated_value = new_value
206
206
207 return self.__formated_value
207 return self.__formated_value
208
208
209 if format == 'multilist':
209 if format == 'multilist':
210 '''
210 '''
211 Example:
211 Example:
212 value = (0,1,2),(3,4,5)
212 value = (0,1,2),(3,4,5)
213 '''
213 '''
214 multiList = ast.literal_eval(value)
214 multiList = ast.literal_eval(value)
215
215
216 if type(multiList[0]) == int:
216 if type(multiList[0]) == int:
217 multiList = ast.literal_eval('(' + value + ')')
217 multiList = ast.literal_eval('(' + value + ')')
218
218
219 self.__formated_value = multiList
219 self.__formated_value = multiList
220
220
221 return self.__formated_value
221 return self.__formated_value
222
222
223 if format == 'bool':
223 if format == 'bool':
224 value = int(value)
224 value = int(value)
225
225
226 if format == 'int':
226 if format == 'int':
227 value = float(value)
227 value = float(value)
228
228
229 format_func = eval(format)
229 format_func = eval(format)
230
230
231 self.__formated_value = format_func(value)
231 self.__formated_value = format_func(value)
232
232
233 return self.__formated_value
233 return self.__formated_value
234
234
235 def updateId(self, new_id):
235 def updateId(self, new_id):
236
236
237 self.id = str(new_id)
237 self.id = str(new_id)
238
238
239 def setup(self, id, name, value, format='str'):
239 def setup(self, id, name, value, format='str'):
240 self.id = str(id)
240 self.id = str(id)
241 self.name = name
241 self.name = name
242 if format == 'obj':
242 if format == 'obj':
243 self.value = value
243 self.value = value
244 else:
244 else:
245 self.value = str(value)
245 self.value = str(value)
246 self.format = str.lower(format)
246 self.format = str.lower(format)
247
247
248 self.getValue()
248 self.getValue()
249
249
250 return 1
250 return 1
251
251
252 def update(self, name, value, format='str'):
252 def update(self, name, value, format='str'):
253
253
254 self.name = name
254 self.name = name
255 self.value = str(value)
255 self.value = str(value)
256 self.format = format
256 self.format = format
257
257
258 def makeXml(self, opElement):
258 def makeXml(self, opElement):
259 if self.name not in ('queue',):
259 if self.name not in ('queue',):
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
260 parmElement = SubElement(opElement, self.ELEMENTNAME)
261 parmElement.set('id', str(self.id))
261 parmElement.set('id', str(self.id))
262 parmElement.set('name', self.name)
262 parmElement.set('name', self.name)
263 parmElement.set('value', self.value)
263 parmElement.set('value', self.value)
264 parmElement.set('format', self.format)
264 parmElement.set('format', self.format)
265
265
266 def readXml(self, parmElement):
266 def readXml(self, parmElement):
267
267
268 self.id = parmElement.get('id')
268 self.id = parmElement.get('id')
269 self.name = parmElement.get('name')
269 self.name = parmElement.get('name')
270 self.value = parmElement.get('value')
270 self.value = parmElement.get('value')
271 self.format = str.lower(parmElement.get('format'))
271 self.format = str.lower(parmElement.get('format'))
272
272
273 # Compatible with old signal chain version
273 # Compatible with old signal chain version
274 if self.format == 'int' and self.name == 'idfigure':
274 if self.format == 'int' and self.name == 'idfigure':
275 self.name = 'id'
275 self.name = 'id'
276
276
277 def printattr(self):
277 def printattr(self):
278
278
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
279 print('Parameter[%s]: name = %s, value = %s, format = %s, project_id = %s' % (self.id, self.name, self.value, self.format, self.project_id))
280
280
281 class OperationConf():
281 class OperationConf():
282
282
283 ELEMENTNAME = 'Operation'
283 ELEMENTNAME = 'Operation'
284
284
285 def __init__(self):
285 def __init__(self):
286
286
287 self.id = '0'
287 self.id = '0'
288 self.name = None
288 self.name = None
289 self.priority = None
289 self.priority = None
290 self.topic = None
290 self.topic = None
291
291
292 def __getNewId(self):
292 def __getNewId(self):
293
293
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
294 return int(self.id) * 10 + len(self.parmConfObjList) + 1
295
295
296 def getId(self):
296 def getId(self):
297 return self.id
297 return self.id
298
298
299 def updateId(self, new_id):
299 def updateId(self, new_id):
300
300
301 self.id = str(new_id)
301 self.id = str(new_id)
302
302
303 n = 1
303 n = 1
304 for parmObj in self.parmConfObjList:
304 for parmObj in self.parmConfObjList:
305
305
306 idParm = str(int(new_id) * 10 + n)
306 idParm = str(int(new_id) * 10 + n)
307 parmObj.updateId(idParm)
307 parmObj.updateId(idParm)
308
308
309 n += 1
309 n += 1
310
310
311 def getElementName(self):
311 def getElementName(self):
312
312
313 return self.ELEMENTNAME
313 return self.ELEMENTNAME
314
314
315 def getParameterObjList(self):
315 def getParameterObjList(self):
316
316
317 return self.parmConfObjList
317 return self.parmConfObjList
318
318
319 def getParameterObj(self, parameterName):
319 def getParameterObj(self, parameterName):
320
320
321 for parmConfObj in self.parmConfObjList:
321 for parmConfObj in self.parmConfObjList:
322
322
323 if parmConfObj.name != parameterName:
323 if parmConfObj.name != parameterName:
324 continue
324 continue
325
325
326 return parmConfObj
326 return parmConfObj
327
327
328 return None
328 return None
329
329
330 def getParameterObjfromValue(self, parameterValue):
330 def getParameterObjfromValue(self, parameterValue):
331
331
332 for parmConfObj in self.parmConfObjList:
332 for parmConfObj in self.parmConfObjList:
333
333
334 if parmConfObj.getValue() != parameterValue:
334 if parmConfObj.getValue() != parameterValue:
335 continue
335 continue
336
336
337 return parmConfObj.getValue()
337 return parmConfObj.getValue()
338
338
339 return None
339 return None
340
340
341 def getParameterValue(self, parameterName):
341 def getParameterValue(self, parameterName):
342
342
343 parameterObj = self.getParameterObj(parameterName)
343 parameterObj = self.getParameterObj(parameterName)
344
344
345 # if not parameterObj:
345 # if not parameterObj:
346 # return None
346 # return None
347
347
348 value = parameterObj.getValue()
348 value = parameterObj.getValue()
349
349
350 return value
350 return value
351
351
352 def getKwargs(self):
352 def getKwargs(self):
353
353
354 kwargs = {}
354 kwargs = {}
355
355
356 for parmConfObj in self.parmConfObjList:
356 for parmConfObj in self.parmConfObjList:
357 if self.name == 'run' and parmConfObj.name == 'datatype':
357 if self.name == 'run' and parmConfObj.name == 'datatype':
358 continue
358 continue
359
359
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
360 kwargs[parmConfObj.name] = parmConfObj.getValue()
361
361
362 return kwargs
362 return kwargs
363
363
364 def setup(self, id, name, priority, type, project_id, err_queue):
364 def setup(self, id, name, priority, type, project_id, err_queue, lock):
365
365
366 self.id = str(id)
366 self.id = str(id)
367 self.project_id = project_id
367 self.project_id = project_id
368 self.name = name
368 self.name = name
369 self.type = type
369 self.type = type
370 self.priority = priority
370 self.priority = priority
371 self.err_queue = err_queue
371 self.err_queue = err_queue
372 self.lock = lock
372 self.parmConfObjList = []
373 self.parmConfObjList = []
373
374
374 def removeParameters(self):
375 def removeParameters(self):
375
376
376 for obj in self.parmConfObjList:
377 for obj in self.parmConfObjList:
377 del obj
378 del obj
378
379
379 self.parmConfObjList = []
380 self.parmConfObjList = []
380
381
381 def addParameter(self, name, value, format='str'):
382 def addParameter(self, name, value, format='str'):
382
383
383 if value is None:
384 if value is None:
384 return None
385 return None
385 id = self.__getNewId()
386 id = self.__getNewId()
386
387
387 parmConfObj = ParameterConf()
388 parmConfObj = ParameterConf()
388 if not parmConfObj.setup(id, name, value, format):
389 if not parmConfObj.setup(id, name, value, format):
389 return None
390 return None
390
391
391 self.parmConfObjList.append(parmConfObj)
392 self.parmConfObjList.append(parmConfObj)
392
393
393 return parmConfObj
394 return parmConfObj
394
395
395 def changeParameter(self, name, value, format='str'):
396 def changeParameter(self, name, value, format='str'):
396
397
397 parmConfObj = self.getParameterObj(name)
398 parmConfObj = self.getParameterObj(name)
398 parmConfObj.update(name, value, format)
399 parmConfObj.update(name, value, format)
399
400
400 return parmConfObj
401 return parmConfObj
401
402
402 def makeXml(self, procUnitElement):
403 def makeXml(self, procUnitElement):
403
404
404 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
405 opElement.set('id', str(self.id))
406 opElement.set('id', str(self.id))
406 opElement.set('name', self.name)
407 opElement.set('name', self.name)
407 opElement.set('type', self.type)
408 opElement.set('type', self.type)
408 opElement.set('priority', str(self.priority))
409 opElement.set('priority', str(self.priority))
409
410
410 for parmConfObj in self.parmConfObjList:
411 for parmConfObj in self.parmConfObjList:
411 parmConfObj.makeXml(opElement)
412 parmConfObj.makeXml(opElement)
412
413
413 def readXml(self, opElement, project_id):
414 def readXml(self, opElement, project_id):
414
415
415 self.id = opElement.get('id')
416 self.id = opElement.get('id')
416 self.name = opElement.get('name')
417 self.name = opElement.get('name')
417 self.type = opElement.get('type')
418 self.type = opElement.get('type')
418 self.priority = opElement.get('priority')
419 self.priority = opElement.get('priority')
419 self.project_id = str(project_id)
420 self.project_id = str(project_id)
420
421
421 # Compatible with old signal chain version
422 # Compatible with old signal chain version
422 # Use of 'run' method instead 'init'
423 # Use of 'run' method instead 'init'
423 if self.type == 'self' and self.name == 'init':
424 if self.type == 'self' and self.name == 'init':
424 self.name = 'run'
425 self.name = 'run'
425
426
426 self.parmConfObjList = []
427 self.parmConfObjList = []
427
428
428 parmElementList = opElement.iter(ParameterConf().getElementName())
429 parmElementList = opElement.iter(ParameterConf().getElementName())
429
430
430 for parmElement in parmElementList:
431 for parmElement in parmElementList:
431 parmConfObj = ParameterConf()
432 parmConfObj = ParameterConf()
432 parmConfObj.readXml(parmElement)
433 parmConfObj.readXml(parmElement)
433
434
434 # Compatible with old signal chain version
435 # Compatible with old signal chain version
435 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 # If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
436 if self.type != 'self' and self.name == 'Plot':
437 if self.type != 'self' and self.name == 'Plot':
437 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
438 self.name = parmConfObj.value
439 self.name = parmConfObj.value
439 continue
440 continue
440
441
441 self.parmConfObjList.append(parmConfObj)
442 self.parmConfObjList.append(parmConfObj)
442
443
443 def printattr(self):
444 def printattr(self):
444
445
445 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 print('%s[%s]: name = %s, type = %s, priority = %s, project_id = %s' % (self.ELEMENTNAME,
446 self.id,
447 self.id,
447 self.name,
448 self.name,
448 self.type,
449 self.type,
449 self.priority,
450 self.priority,
450 self.project_id))
451 self.project_id))
451
452
452 for parmConfObj in self.parmConfObjList:
453 for parmConfObj in self.parmConfObjList:
453 parmConfObj.printattr()
454 parmConfObj.printattr()
454
455
455 def createObject(self):
456 def createObject(self):
456
457
457 className = eval(self.name)
458 className = eval(self.name)
458
459
459 if self.type == 'other':
460 if self.type == 'other':
460 opObj = className()
461 opObj = className()
461 elif self.type == 'external':
462 elif self.type == 'external':
462 kwargs = self.getKwargs()
463 kwargs = self.getKwargs()
463 opObj = className(self.id, self.id, self.project_id, self.err_queue, 'Operation', **kwargs)
464 opObj = className(self.id, self.id, self.project_id, self.err_queue, self.lock, 'Operation', **kwargs)
464 opObj.start()
465 opObj.start()
465 self.opObj = opObj
466 self.opObj = opObj
466
467
467 return opObj
468 return opObj
468
469
469 class ProcUnitConf():
470 class ProcUnitConf():
470
471
471 ELEMENTNAME = 'ProcUnit'
472 ELEMENTNAME = 'ProcUnit'
472
473
473 def __init__(self):
474 def __init__(self):
474
475
475 self.id = None
476 self.id = None
476 self.datatype = None
477 self.datatype = None
477 self.name = None
478 self.name = None
478 self.inputId = None
479 self.inputId = None
479 self.opConfObjList = []
480 self.opConfObjList = []
480 self.procUnitObj = None
481 self.procUnitObj = None
481 self.opObjDict = {}
482 self.opObjDict = {}
483 self.mylock = Event()
482
484
483 def __getPriority(self):
485 def __getPriority(self):
484
486
485 return len(self.opConfObjList) + 1
487 return len(self.opConfObjList) + 1
486
488
487 def __getNewId(self):
489 def __getNewId(self):
488
490
489 return int(self.id) * 10 + len(self.opConfObjList) + 1
491 return int(self.id) * 10 + len(self.opConfObjList) + 1
490
492
491 def getElementName(self):
493 def getElementName(self):
492
494
493 return self.ELEMENTNAME
495 return self.ELEMENTNAME
494
496
495 def getId(self):
497 def getId(self):
496
498
497 return self.id
499 return self.id
498
500
499 def updateId(self, new_id):
501 def updateId(self, new_id):
500 '''
502 '''
501 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 new_id = int(parentId) * 10 + (int(self.id) % 10)
502 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
504 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
503
505
504 # If this proc unit has not inputs
506 # If this proc unit has not inputs
505 #if self.inputId == '0':
507 #if self.inputId == '0':
506 #new_inputId = 0
508 #new_inputId = 0
507
509
508 n = 1
510 n = 1
509 for opConfObj in self.opConfObjList:
511 for opConfObj in self.opConfObjList:
510
512
511 idOp = str(int(new_id) * 10 + n)
513 idOp = str(int(new_id) * 10 + n)
512 opConfObj.updateId(idOp)
514 opConfObj.updateId(idOp)
513
515
514 n += 1
516 n += 1
515
517
516 self.parentId = str(parentId)
518 self.parentId = str(parentId)
517 self.id = str(new_id)
519 self.id = str(new_id)
518 #self.inputId = str(new_inputId)
520 #self.inputId = str(new_inputId)
519 '''
521 '''
520 n = 1
522 n = 1
521
523
522 def getInputId(self):
524 def getInputId(self):
523
525
524 return self.inputId
526 return self.inputId
525
527
526 def getOperationObjList(self):
528 def getOperationObjList(self):
527
529
528 return self.opConfObjList
530 return self.opConfObjList
529
531
530 def getOperationObj(self, name=None):
532 def getOperationObj(self, name=None):
531
533
532 for opConfObj in self.opConfObjList:
534 for opConfObj in self.opConfObjList:
533
535
534 if opConfObj.name != name:
536 if opConfObj.name != name:
535 continue
537 continue
536
538
537 return opConfObj
539 return opConfObj
538
540
539 return None
541 return None
540
542
541 def getOpObjfromParamValue(self, value=None):
543 def getOpObjfromParamValue(self, value=None):
542
544
543 for opConfObj in self.opConfObjList:
545 for opConfObj in self.opConfObjList:
544 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
546 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
545 continue
547 continue
546 return opConfObj
548 return opConfObj
547 return None
549 return None
548
550
549 def getProcUnitObj(self):
551 def getProcUnitObj(self):
550
552
551 return self.procUnitObj
553 return self.procUnitObj
552
554
553 def setup(self, project_id, id, name, datatype, inputId, err_queue):
555 def setup(self, project_id, id, name, datatype, inputId, err_queue, lock):
554 '''
556 '''
555 id sera el topico a publicar
557 id sera el topico a publicar
556 inputId sera el topico a subscribirse
558 inputId sera el topico a subscribirse
557 '''
559 '''
558
560
559 # Compatible with old signal chain version
561 # Compatible with old signal chain version
560 if datatype == None and name == None:
562 if datatype == None and name == None:
561 raise ValueError('datatype or name should be defined')
563 raise ValueError('datatype or name should be defined')
562
564
563 #Definir una condicion para inputId cuando sea 0
565 #Definir una condicion para inputId cuando sea 0
564
566
565 if name == None:
567 if name == None:
566 if 'Proc' in datatype:
568 if 'Proc' in datatype:
567 name = datatype
569 name = datatype
568 else:
570 else:
569 name = '%sProc' % (datatype)
571 name = '%sProc' % (datatype)
570
572
571 if datatype == None:
573 if datatype == None:
572 datatype = name.replace('Proc', '')
574 datatype = name.replace('Proc', '')
573
575
574 self.id = str(id)
576 self.id = str(id)
575 self.project_id = project_id
577 self.project_id = project_id
576 self.name = name
578 self.name = name
577 self.datatype = datatype
579 self.datatype = datatype
578 self.inputId = inputId
580 self.inputId = inputId
579 self.err_queue = err_queue
581 self.err_queue = err_queue
582 self.lock = lock
580 self.opConfObjList = []
583 self.opConfObjList = []
581
584
582 self.addOperation(name='run', optype='self')
585 self.addOperation(name='run', optype='self')
583
586
584 def removeOperations(self):
587 def removeOperations(self):
585
588
586 for obj in self.opConfObjList:
589 for obj in self.opConfObjList:
587 del obj
590 del obj
588
591
589 self.opConfObjList = []
592 self.opConfObjList = []
590 self.addOperation(name='run')
593 self.addOperation(name='run')
591
594
592 def addParameter(self, **kwargs):
595 def addParameter(self, **kwargs):
593 '''
596 '''
594 Add parameters to 'run' operation
597 Add parameters to 'run' operation
595 '''
598 '''
596 opObj = self.opConfObjList[0]
599 opObj = self.opConfObjList[0]
597
600
598 opObj.addParameter(**kwargs)
601 opObj.addParameter(**kwargs)
599
602
600 return opObj
603 return opObj
601
604
602 def addOperation(self, name, optype='self'):
605 def addOperation(self, name, optype='self'):
603 '''
606 '''
604 Actualizacion - > proceso comunicacion
607 Actualizacion - > proceso comunicacion
605 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
608 En el caso de optype='self', elminar. DEfinir comuncacion IPC -> Topic
606 definir el tipoc de socket o comunicacion ipc++
609 definir el tipoc de socket o comunicacion ipc++
607
610
608 '''
611 '''
609
612
610 id = self.__getNewId()
613 id = self.__getNewId()
611 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
614 priority = self.__getPriority() # Sin mucho sentido, pero puede usarse
612 opConfObj = OperationConf()
615 opConfObj = OperationConf()
613 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue)
616 opConfObj.setup(id, name=name, priority=priority, type=optype, project_id=self.project_id, err_queue=self.err_queue, lock=self.mylock)
614 self.opConfObjList.append(opConfObj)
617 self.opConfObjList.append(opConfObj)
615
618
616 return opConfObj
619 return opConfObj
617
620
618 def makeXml(self, projectElement):
621 def makeXml(self, projectElement):
619
622
620 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
623 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
621 procUnitElement.set('id', str(self.id))
624 procUnitElement.set('id', str(self.id))
622 procUnitElement.set('name', self.name)
625 procUnitElement.set('name', self.name)
623 procUnitElement.set('datatype', self.datatype)
626 procUnitElement.set('datatype', self.datatype)
624 procUnitElement.set('inputId', str(self.inputId))
627 procUnitElement.set('inputId', str(self.inputId))
625
628
626 for opConfObj in self.opConfObjList:
629 for opConfObj in self.opConfObjList:
627 opConfObj.makeXml(procUnitElement)
630 opConfObj.makeXml(procUnitElement)
628
631
629 def readXml(self, upElement, project_id):
632 def readXml(self, upElement, project_id):
630
633
631 self.id = upElement.get('id')
634 self.id = upElement.get('id')
632 self.name = upElement.get('name')
635 self.name = upElement.get('name')
633 self.datatype = upElement.get('datatype')
636 self.datatype = upElement.get('datatype')
634 self.inputId = upElement.get('inputId')
637 self.inputId = upElement.get('inputId')
635 self.project_id = str(project_id)
638 self.project_id = str(project_id)
636
639
637 if self.ELEMENTNAME == 'ReadUnit':
640 if self.ELEMENTNAME == 'ReadUnit':
638 self.datatype = self.datatype.replace('Reader', '')
641 self.datatype = self.datatype.replace('Reader', '')
639
642
640 if self.ELEMENTNAME == 'ProcUnit':
643 if self.ELEMENTNAME == 'ProcUnit':
641 self.datatype = self.datatype.replace('Proc', '')
644 self.datatype = self.datatype.replace('Proc', '')
642
645
643 if self.inputId == 'None':
646 if self.inputId == 'None':
644 self.inputId = '0'
647 self.inputId = '0'
645
648
646 self.opConfObjList = []
649 self.opConfObjList = []
647
650
648 opElementList = upElement.iter(OperationConf().getElementName())
651 opElementList = upElement.iter(OperationConf().getElementName())
649
652
650 for opElement in opElementList:
653 for opElement in opElementList:
651 opConfObj = OperationConf()
654 opConfObj = OperationConf()
652 opConfObj.readXml(opElement, project_id)
655 opConfObj.readXml(opElement, project_id)
653 self.opConfObjList.append(opConfObj)
656 self.opConfObjList.append(opConfObj)
654
657
655 def printattr(self):
658 def printattr(self):
656
659
657 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
660 print('%s[%s]: name = %s, datatype = %s, inputId = %s, project_id = %s' % (self.ELEMENTNAME,
658 self.id,
661 self.id,
659 self.name,
662 self.name,
660 self.datatype,
663 self.datatype,
661 self.inputId,
664 self.inputId,
662 self.project_id))
665 self.project_id))
663
666
664 for opConfObj in self.opConfObjList:
667 for opConfObj in self.opConfObjList:
665 opConfObj.printattr()
668 opConfObj.printattr()
666
669
667 def getKwargs(self):
670 def getKwargs(self):
668
671
669 opObj = self.opConfObjList[0]
672 opObj = self.opConfObjList[0]
670 kwargs = opObj.getKwargs()
673 kwargs = opObj.getKwargs()
671
674
672 return kwargs
675 return kwargs
673
676
674 def createObjects(self):
677 def createObjects(self):
675 '''
678 '''
676 Instancia de unidades de procesamiento.
679 Instancia de unidades de procesamiento.
677 '''
680 '''
678
681
679 className = eval(self.name)
682 className = eval(self.name)
680 kwargs = self.getKwargs()
683 kwargs = self.getKwargs()
681 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, 'ProcUnit', **kwargs)
684 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
682 log.success('creating process...', self.name)
685 log.success('creating process...', self.name)
683
686
684 for opConfObj in self.opConfObjList:
687 for opConfObj in self.opConfObjList:
685
688
686 if opConfObj.type == 'self' and opConfObj.name == 'run':
689 if opConfObj.type == 'self' and opConfObj.name == 'run':
687 continue
690 continue
688 elif opConfObj.type == 'self':
691 elif opConfObj.type == 'self':
689 opObj = getattr(procUnitObj, opConfObj.name)
692 opObj = getattr(procUnitObj, opConfObj.name)
690 else:
693 else:
691 opObj = opConfObj.createObject()
694 opObj = opConfObj.createObject()
692
695
693 log.success('adding operation: {}, type:{}'.format(
696 log.success('adding operation: {}, type:{}'.format(
694 opConfObj.name,
697 opConfObj.name,
695 opConfObj.type), self.name)
698 opConfObj.type), self.name)
696
699
697 procUnitObj.addOperation(opConfObj, opObj)
700 procUnitObj.addOperation(opConfObj, opObj)
698
701
699 procUnitObj.start()
702 procUnitObj.start()
700 self.procUnitObj = procUnitObj
703 self.procUnitObj = procUnitObj
701
704
702 def close(self):
705 def close(self):
703
706
704 for opConfObj in self.opConfObjList:
707 for opConfObj in self.opConfObjList:
705 if opConfObj.type == 'self':
708 if opConfObj.type == 'self':
706 continue
709 continue
707
710
708 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
711 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
709 opObj.close()
712 opObj.close()
710
713
711 self.procUnitObj.close()
714 self.procUnitObj.close()
712
715
713 return
716 return
714
717
715
718
716 class ReadUnitConf(ProcUnitConf):
719 class ReadUnitConf(ProcUnitConf):
717
720
718 ELEMENTNAME = 'ReadUnit'
721 ELEMENTNAME = 'ReadUnit'
719
722
720 def __init__(self):
723 def __init__(self):
721
724
722 self.id = None
725 self.id = None
723 self.datatype = None
726 self.datatype = None
724 self.name = None
727 self.name = None
725 self.inputId = None
728 self.inputId = None
726 self.opConfObjList = []
729 self.opConfObjList = []
730 self.mylock = Event()
727
731
728 def getElementName(self):
732 def getElementName(self):
729
733
730 return self.ELEMENTNAME
734 return self.ELEMENTNAME
731
735
732 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
736 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
733 startTime='', endTime='', server=None, **kwargs):
737 startTime='', endTime='', server=None, **kwargs):
734
738
735
739
736 '''
740 '''
737 *****el id del proceso sera el Topico
741 *****el id del proceso sera el Topico
738
742
739 Adicion de {topic}, si no esta presente -> error
743 Adicion de {topic}, si no esta presente -> error
740 kwargs deben ser trasmitidos en la instanciacion
744 kwargs deben ser trasmitidos en la instanciacion
741
745
742 '''
746 '''
743
747
744 # Compatible with old signal chain version
748 # Compatible with old signal chain version
745 if datatype == None and name == None:
749 if datatype == None and name == None:
746 raise ValueError('datatype or name should be defined')
750 raise ValueError('datatype or name should be defined')
747 if name == None:
751 if name == None:
748 if 'Reader' in datatype:
752 if 'Reader' in datatype:
749 name = datatype
753 name = datatype
750 datatype = name.replace('Reader','')
754 datatype = name.replace('Reader','')
751 else:
755 else:
752 name = '{}Reader'.format(datatype)
756 name = '{}Reader'.format(datatype)
753 if datatype == None:
757 if datatype == None:
754 if 'Reader' in name:
758 if 'Reader' in name:
755 datatype = name.replace('Reader','')
759 datatype = name.replace('Reader','')
756 else:
760 else:
757 datatype = name
761 datatype = name
758 name = '{}Reader'.format(name)
762 name = '{}Reader'.format(name)
759
763
760 self.id = id
764 self.id = id
761 self.project_id = project_id
765 self.project_id = project_id
762 self.name = name
766 self.name = name
763 self.datatype = datatype
767 self.datatype = datatype
764 if path != '':
768 if path != '':
765 self.path = os.path.abspath(path)
769 self.path = os.path.abspath(path)
766 self.startDate = startDate
770 self.startDate = startDate
767 self.endDate = endDate
771 self.endDate = endDate
768 self.startTime = startTime
772 self.startTime = startTime
769 self.endTime = endTime
773 self.endTime = endTime
770 self.server = server
774 self.server = server
771 self.err_queue = err_queue
775 self.err_queue = err_queue
776 self.lock = self.mylock
772 self.addRunOperation(**kwargs)
777 self.addRunOperation(**kwargs)
773
778
774 def update(self, **kwargs):
779 def update(self, **kwargs):
775
780
776 if 'datatype' in kwargs:
781 if 'datatype' in kwargs:
777 datatype = kwargs.pop('datatype')
782 datatype = kwargs.pop('datatype')
778 if 'Reader' in datatype:
783 if 'Reader' in datatype:
779 self.name = datatype
784 self.name = datatype
780 else:
785 else:
781 self.name = '%sReader' % (datatype)
786 self.name = '%sReader' % (datatype)
782 self.datatype = self.name.replace('Reader', '')
787 self.datatype = self.name.replace('Reader', '')
783
788
784 attrs = ('path', 'startDate', 'endDate',
789 attrs = ('path', 'startDate', 'endDate',
785 'startTime', 'endTime')
790 'startTime', 'endTime')
786
791
787 for attr in attrs:
792 for attr in attrs:
788 if attr in kwargs:
793 if attr in kwargs:
789 setattr(self, attr, kwargs.pop(attr))
794 setattr(self, attr, kwargs.pop(attr))
790
795
791 self.updateRunOperation(**kwargs)
796 self.updateRunOperation(**kwargs)
792
797
793 def removeOperations(self):
798 def removeOperations(self):
794
799
795 for obj in self.opConfObjList:
800 for obj in self.opConfObjList:
796 del obj
801 del obj
797
802
798 self.opConfObjList = []
803 self.opConfObjList = []
799
804
800 def addRunOperation(self, **kwargs):
805 def addRunOperation(self, **kwargs):
801
806
802 opObj = self.addOperation(name='run', optype='self')
807 opObj = self.addOperation(name='run', optype='self')
803
808
804 if self.server is None:
809 if self.server is None:
805 opObj.addParameter(
810 opObj.addParameter(
806 name='datatype', value=self.datatype, format='str')
811 name='datatype', value=self.datatype, format='str')
807 opObj.addParameter(name='path', value=self.path, format='str')
812 opObj.addParameter(name='path', value=self.path, format='str')
808 opObj.addParameter(
813 opObj.addParameter(
809 name='startDate', value=self.startDate, format='date')
814 name='startDate', value=self.startDate, format='date')
810 opObj.addParameter(
815 opObj.addParameter(
811 name='endDate', value=self.endDate, format='date')
816 name='endDate', value=self.endDate, format='date')
812 opObj.addParameter(
817 opObj.addParameter(
813 name='startTime', value=self.startTime, format='time')
818 name='startTime', value=self.startTime, format='time')
814 opObj.addParameter(
819 opObj.addParameter(
815 name='endTime', value=self.endTime, format='time')
820 name='endTime', value=self.endTime, format='time')
816
821
817 for key, value in list(kwargs.items()):
822 for key, value in list(kwargs.items()):
818 opObj.addParameter(name=key, value=value,
823 opObj.addParameter(name=key, value=value,
819 format=type(value).__name__)
824 format=type(value).__name__)
820 else:
825 else:
821 opObj.addParameter(name='server', value=self.server, format='str')
826 opObj.addParameter(name='server', value=self.server, format='str')
822
827
823 return opObj
828 return opObj
824
829
825 def updateRunOperation(self, **kwargs):
830 def updateRunOperation(self, **kwargs):
826
831
827 opObj = self.getOperationObj(name='run')
832 opObj = self.getOperationObj(name='run')
828 opObj.removeParameters()
833 opObj.removeParameters()
829
834
830 opObj.addParameter(name='datatype', value=self.datatype, format='str')
835 opObj.addParameter(name='datatype', value=self.datatype, format='str')
831 opObj.addParameter(name='path', value=self.path, format='str')
836 opObj.addParameter(name='path', value=self.path, format='str')
832 opObj.addParameter(
837 opObj.addParameter(
833 name='startDate', value=self.startDate, format='date')
838 name='startDate', value=self.startDate, format='date')
834 opObj.addParameter(name='endDate', value=self.endDate, format='date')
839 opObj.addParameter(name='endDate', value=self.endDate, format='date')
835 opObj.addParameter(
840 opObj.addParameter(
836 name='startTime', value=self.startTime, format='time')
841 name='startTime', value=self.startTime, format='time')
837 opObj.addParameter(name='endTime', value=self.endTime, format='time')
842 opObj.addParameter(name='endTime', value=self.endTime, format='time')
838
843
839 for key, value in list(kwargs.items()):
844 for key, value in list(kwargs.items()):
840 opObj.addParameter(name=key, value=value,
845 opObj.addParameter(name=key, value=value,
841 format=type(value).__name__)
846 format=type(value).__name__)
842
847
843 return opObj
848 return opObj
844
849
845 def readXml(self, upElement, project_id):
850 def readXml(self, upElement, project_id):
846
851
847 self.id = upElement.get('id')
852 self.id = upElement.get('id')
848 self.name = upElement.get('name')
853 self.name = upElement.get('name')
849 self.datatype = upElement.get('datatype')
854 self.datatype = upElement.get('datatype')
850 self.project_id = str(project_id) #yong
855 self.project_id = str(project_id) #yong
851
856
852 if self.ELEMENTNAME == 'ReadUnit':
857 if self.ELEMENTNAME == 'ReadUnit':
853 self.datatype = self.datatype.replace('Reader', '')
858 self.datatype = self.datatype.replace('Reader', '')
854
859
855 self.opConfObjList = []
860 self.opConfObjList = []
856
861
857 opElementList = upElement.iter(OperationConf().getElementName())
862 opElementList = upElement.iter(OperationConf().getElementName())
858
863
859 for opElement in opElementList:
864 for opElement in opElementList:
860 opConfObj = OperationConf()
865 opConfObj = OperationConf()
861 opConfObj.readXml(opElement, project_id)
866 opConfObj.readXml(opElement, project_id)
862 self.opConfObjList.append(opConfObj)
867 self.opConfObjList.append(opConfObj)
863
868
864 if opConfObj.name == 'run':
869 if opConfObj.name == 'run':
865 self.path = opConfObj.getParameterValue('path')
870 self.path = opConfObj.getParameterValue('path')
866 self.startDate = opConfObj.getParameterValue('startDate')
871 self.startDate = opConfObj.getParameterValue('startDate')
867 self.endDate = opConfObj.getParameterValue('endDate')
872 self.endDate = opConfObj.getParameterValue('endDate')
868 self.startTime = opConfObj.getParameterValue('startTime')
873 self.startTime = opConfObj.getParameterValue('startTime')
869 self.endTime = opConfObj.getParameterValue('endTime')
874 self.endTime = opConfObj.getParameterValue('endTime')
870
875
871
876
872 class Project(Process):
877 class Project(Process):
873
878
874 ELEMENTNAME = 'Project'
879 ELEMENTNAME = 'Project'
875
880
876 def __init__(self):
881 def __init__(self):
877
882
878 Process.__init__(self)
883 Process.__init__(self)
879 self.id = None
884 self.id = None
880 self.filename = None
885 self.filename = None
881 self.description = None
886 self.description = None
882 self.email = None
887 self.email = None
883 self.alarm = None
888 self.alarm = None
884 self.procUnitConfObjDict = {}
889 self.procUnitConfObjDict = {}
885 self.err_queue = Queue()
890 self.err_queue = Queue()
886
891
887 def __getNewId(self):
892 def __getNewId(self):
888
893
889 idList = list(self.procUnitConfObjDict.keys())
894 idList = list(self.procUnitConfObjDict.keys())
890 id = int(self.id) * 10
895 id = int(self.id) * 10
891
896
892 while True:
897 while True:
893 id += 1
898 id += 1
894
899
895 if str(id) in idList:
900 if str(id) in idList:
896 continue
901 continue
897
902
898 break
903 break
899
904
900 return str(id)
905 return str(id)
901
906
902 def getElementName(self):
907 def getElementName(self):
903
908
904 return self.ELEMENTNAME
909 return self.ELEMENTNAME
905
910
906 def getId(self):
911 def getId(self):
907
912
908 return self.id
913 return self.id
909
914
910 def updateId(self, new_id):
915 def updateId(self, new_id):
911
916
912 self.id = str(new_id)
917 self.id = str(new_id)
913
918
914 keyList = list(self.procUnitConfObjDict.keys())
919 keyList = list(self.procUnitConfObjDict.keys())
915 keyList.sort()
920 keyList.sort()
916
921
917 n = 1
922 n = 1
918 newProcUnitConfObjDict = {}
923 newProcUnitConfObjDict = {}
919
924
920 for procKey in keyList:
925 for procKey in keyList:
921
926
922 procUnitConfObj = self.procUnitConfObjDict[procKey]
927 procUnitConfObj = self.procUnitConfObjDict[procKey]
923 idProcUnit = str(int(self.id) * 10 + n)
928 idProcUnit = str(int(self.id) * 10 + n)
924 procUnitConfObj.updateId(idProcUnit)
929 procUnitConfObj.updateId(idProcUnit)
925 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
930 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
926 n += 1
931 n += 1
927
932
928 self.procUnitConfObjDict = newProcUnitConfObjDict
933 self.procUnitConfObjDict = newProcUnitConfObjDict
929
934
930 def setup(self, id=1, name='', description='', email=None, alarm=[]):
935 def setup(self, id=1, name='', description='', email=None, alarm=[]):
931
936
932 print(' ')
937 print(' ')
933 print('*' * 60)
938 print('*' * 60)
934 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
939 print('* Starting SIGNAL CHAIN PROCESSING (Multiprocessing) v%s *' % schainpy.__version__)
935 print('*' * 60)
940 print('*' * 60)
936 print("* Python " + python_version() + " *")
941 print("* Python " + python_version() + " *")
937 print('*' * 19)
942 print('*' * 19)
938 print(' ')
943 print(' ')
939 self.id = str(id)
944 self.id = str(id)
940 self.description = description
945 self.description = description
941 self.email = email
946 self.email = email
942 self.alarm = alarm
947 self.alarm = alarm
943 if name:
948 if name:
944 self.name = '{} ({})'.format(Process.__name__, name)
949 self.name = '{} ({})'.format(Process.__name__, name)
945
950
946 def update(self, **kwargs):
951 def update(self, **kwargs):
947
952
948 for key, value in list(kwargs.items()):
953 for key, value in list(kwargs.items()):
949 setattr(self, key, value)
954 setattr(self, key, value)
950
955
951 def clone(self):
956 def clone(self):
952
957
953 p = Project()
958 p = Project()
954 p.procUnitConfObjDict = self.procUnitConfObjDict
959 p.procUnitConfObjDict = self.procUnitConfObjDict
955 return p
960 return p
956
961
957 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
962 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
958
963
959 '''
964 '''
960 Actualizacion:
965 Actualizacion:
961 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
966 Se agrego un nuevo argumento: topic -relativo a la forma de comunicar los procesos simultaneos
962
967
963 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
968 * El id del proceso sera el topico al que se deben subscribir los procUnits para recibir la informacion(data)
964
969
965 '''
970 '''
966
971
967 if id is None:
972 if id is None:
968 idReadUnit = self.__getNewId()
973 idReadUnit = self.__getNewId()
969 else:
974 else:
970 idReadUnit = str(id)
975 idReadUnit = str(id)
971
976
972 readUnitConfObj = ReadUnitConf()
977 readUnitConfObj = ReadUnitConf()
973 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
978 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
974 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
979 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
975
980
976 return readUnitConfObj
981 return readUnitConfObj
977
982
978 def addProcUnit(self, inputId='0', datatype=None, name=None):
983 def addProcUnit(self, inputId='0', datatype=None, name=None):
979
984
980 '''
985 '''
981 Actualizacion:
986 Actualizacion:
982 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
987 Se agrego dos nuevos argumentos: topic_read (lee data de otro procUnit) y topic_write(escribe o envia data a otro procUnit)
983 Deberia reemplazar a "inputId"
988 Deberia reemplazar a "inputId"
984
989
985 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
990 ** A fin de mantener el inputID, este sera la representaacion del topicoal que deben subscribirse. El ID propio de la intancia
986 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
991 (proceso) sera el topico de la publicacion, todo sera asignado de manera dinamica.
987
992
988 '''
993 '''
989
994
990 idProcUnit = self.__getNewId()
995 idProcUnit = self.__getNewId()
991 procUnitConfObj = ProcUnitConf()
996 procUnitConfObj = ProcUnitConf()
992 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue)
997 input_proc = self.procUnitConfObjDict[inputId]
998 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.mylock)
993 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
999 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
994
1000
995 return procUnitConfObj
1001 return procUnitConfObj
996
1002
997 def removeProcUnit(self, id):
1003 def removeProcUnit(self, id):
998
1004
999 if id in list(self.procUnitConfObjDict.keys()):
1005 if id in list(self.procUnitConfObjDict.keys()):
1000 self.procUnitConfObjDict.pop(id)
1006 self.procUnitConfObjDict.pop(id)
1001
1007
1002 def getReadUnitId(self):
1008 def getReadUnitId(self):
1003
1009
1004 readUnitConfObj = self.getReadUnitObj()
1010 readUnitConfObj = self.getReadUnitObj()
1005
1011
1006 return readUnitConfObj.id
1012 return readUnitConfObj.id
1007
1013
1008 def getReadUnitObj(self):
1014 def getReadUnitObj(self):
1009
1015
1010 for obj in list(self.procUnitConfObjDict.values()):
1016 for obj in list(self.procUnitConfObjDict.values()):
1011 if obj.getElementName() == 'ReadUnit':
1017 if obj.getElementName() == 'ReadUnit':
1012 return obj
1018 return obj
1013
1019
1014 return None
1020 return None
1015
1021
1016 def getProcUnitObj(self, id=None, name=None):
1022 def getProcUnitObj(self, id=None, name=None):
1017
1023
1018 if id != None:
1024 if id != None:
1019 return self.procUnitConfObjDict[id]
1025 return self.procUnitConfObjDict[id]
1020
1026
1021 if name != None:
1027 if name != None:
1022 return self.getProcUnitObjByName(name)
1028 return self.getProcUnitObjByName(name)
1023
1029
1024 return None
1030 return None
1025
1031
1026 def getProcUnitObjByName(self, name):
1032 def getProcUnitObjByName(self, name):
1027
1033
1028 for obj in list(self.procUnitConfObjDict.values()):
1034 for obj in list(self.procUnitConfObjDict.values()):
1029 if obj.name == name:
1035 if obj.name == name:
1030 return obj
1036 return obj
1031
1037
1032 return None
1038 return None
1033
1039
1034 def procUnitItems(self):
1040 def procUnitItems(self):
1035
1041
1036 return list(self.procUnitConfObjDict.items())
1042 return list(self.procUnitConfObjDict.items())
1037
1043
1038 def makeXml(self):
1044 def makeXml(self):
1039
1045
1040 projectElement = Element('Project')
1046 projectElement = Element('Project')
1041 projectElement.set('id', str(self.id))
1047 projectElement.set('id', str(self.id))
1042 projectElement.set('name', self.name)
1048 projectElement.set('name', self.name)
1043 projectElement.set('description', self.description)
1049 projectElement.set('description', self.description)
1044
1050
1045 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1051 for procUnitConfObj in list(self.procUnitConfObjDict.values()):
1046 procUnitConfObj.makeXml(projectElement)
1052 procUnitConfObj.makeXml(projectElement)
1047
1053
1048 self.projectElement = projectElement
1054 self.projectElement = projectElement
1049
1055
1050 def writeXml(self, filename=None):
1056 def writeXml(self, filename=None):
1051
1057
1052 if filename == None:
1058 if filename == None:
1053 if self.filename:
1059 if self.filename:
1054 filename = self.filename
1060 filename = self.filename
1055 else:
1061 else:
1056 filename = 'schain.xml'
1062 filename = 'schain.xml'
1057
1063
1058 if not filename:
1064 if not filename:
1059 print('filename has not been defined. Use setFilename(filename) for do it.')
1065 print('filename has not been defined. Use setFilename(filename) for do it.')
1060 return 0
1066 return 0
1061
1067
1062 abs_file = os.path.abspath(filename)
1068 abs_file = os.path.abspath(filename)
1063
1069
1064 if not os.access(os.path.dirname(abs_file), os.W_OK):
1070 if not os.access(os.path.dirname(abs_file), os.W_OK):
1065 print('No write permission on %s' % os.path.dirname(abs_file))
1071 print('No write permission on %s' % os.path.dirname(abs_file))
1066 return 0
1072 return 0
1067
1073
1068 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1074 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1069 print('File %s already exists and it could not be overwriten' % abs_file)
1075 print('File %s already exists and it could not be overwriten' % abs_file)
1070 return 0
1076 return 0
1071
1077
1072 self.makeXml()
1078 self.makeXml()
1073
1079
1074 ElementTree(self.projectElement).write(abs_file, method='xml')
1080 ElementTree(self.projectElement).write(abs_file, method='xml')
1075
1081
1076 self.filename = abs_file
1082 self.filename = abs_file
1077
1083
1078 return 1
1084 return 1
1079
1085
1080 def readXml(self, filename=None):
1086 def readXml(self, filename=None):
1081
1087
1082 if not filename:
1088 if not filename:
1083 print('filename is not defined')
1089 print('filename is not defined')
1084 return 0
1090 return 0
1085
1091
1086 abs_file = os.path.abspath(filename)
1092 abs_file = os.path.abspath(filename)
1087
1093
1088 if not os.path.isfile(abs_file):
1094 if not os.path.isfile(abs_file):
1089 print('%s file does not exist' % abs_file)
1095 print('%s file does not exist' % abs_file)
1090 return 0
1096 return 0
1091
1097
1092 self.projectElement = None
1098 self.projectElement = None
1093 self.procUnitConfObjDict = {}
1099 self.procUnitConfObjDict = {}
1094
1100
1095 try:
1101 try:
1096 self.projectElement = ElementTree().parse(abs_file)
1102 self.projectElement = ElementTree().parse(abs_file)
1097 except:
1103 except:
1098 print('Error reading %s, verify file format' % filename)
1104 print('Error reading %s, verify file format' % filename)
1099 return 0
1105 return 0
1100
1106
1101 self.project = self.projectElement.tag
1107 self.project = self.projectElement.tag
1102
1108
1103 self.id = self.projectElement.get('id')
1109 self.id = self.projectElement.get('id')
1104 self.name = self.projectElement.get('name')
1110 self.name = self.projectElement.get('name')
1105 self.description = self.projectElement.get('description')
1111 self.description = self.projectElement.get('description')
1106
1112
1107 readUnitElementList = self.projectElement.iter(
1113 readUnitElementList = self.projectElement.iter(
1108 ReadUnitConf().getElementName())
1114 ReadUnitConf().getElementName())
1109
1115
1110 for readUnitElement in readUnitElementList:
1116 for readUnitElement in readUnitElementList:
1111 readUnitConfObj = ReadUnitConf()
1117 readUnitConfObj = ReadUnitConf()
1112 readUnitConfObj.readXml(readUnitElement, self.id)
1118 readUnitConfObj.readXml(readUnitElement, self.id)
1113 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1119 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1114
1120
1115 procUnitElementList = self.projectElement.iter(
1121 procUnitElementList = self.projectElement.iter(
1116 ProcUnitConf().getElementName())
1122 ProcUnitConf().getElementName())
1117
1123
1118 for procUnitElement in procUnitElementList:
1124 for procUnitElement in procUnitElementList:
1119 procUnitConfObj = ProcUnitConf()
1125 procUnitConfObj = ProcUnitConf()
1120 procUnitConfObj.readXml(procUnitElement, self.id)
1126 procUnitConfObj.readXml(procUnitElement, self.id)
1121 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1127 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1122
1128
1123 self.filename = abs_file
1129 self.filename = abs_file
1124
1130
1125 return 1
1131 return 1
1126
1132
1127 def __str__(self):
1133 def __str__(self):
1128
1134
1129 print('Project: name = %s, description = %s, id = %s' % (
1135 print('Project: name = %s, description = %s, id = %s' % (
1130 self.name,
1136 self.name,
1131 self.description,
1137 self.description,
1132 self.id))
1138 self.id))
1133
1139
1134 for procUnitConfObj in self.procUnitConfObjDict.values():
1140 for procUnitConfObj in self.procUnitConfObjDict.values():
1135 print(procUnitConfObj)
1141 print(procUnitConfObj)
1136
1142
1137 def createObjects(self):
1143 def createObjects(self):
1138
1144
1139
1145
1140 keys = list(self.procUnitConfObjDict.keys())
1146 keys = list(self.procUnitConfObjDict.keys())
1141 keys.sort()
1147 keys.sort()
1142 for key in keys:
1148 for key in keys:
1143 self.procUnitConfObjDict[key].createObjects()
1149 self.procUnitConfObjDict[key].createObjects()
1144
1150
1145 def monitor(self):
1151 def monitor(self):
1146
1152
1147 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1153 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1148 t.start()
1154 t.start()
1149
1155
1150 def __monitor(self, queue, ctx):
1156 def __monitor(self, queue, ctx):
1151
1157
1152 import socket
1158 import socket
1153
1159
1154 procs = 0
1160 procs = 0
1155 err_msg = ''
1161 err_msg = ''
1156
1162
1157 while True:
1163 while True:
1158 msg = queue.get()
1164 msg = queue.get()
1159 if '#_start_#' in msg:
1165 if '#_start_#' in msg:
1160 procs += 1
1166 procs += 1
1161 elif '#_end_#' in msg:
1167 elif '#_end_#' in msg:
1162 procs -=1
1168 procs -=1
1163 else:
1169 else:
1164 err_msg = msg
1170 err_msg = msg
1165
1171
1166 if procs == 0 or 'Traceback' in err_msg:
1172 if procs == 0 or 'Traceback' in err_msg:
1167 break
1173 break
1168 time.sleep(0.1)
1174 time.sleep(0.1)
1169
1175
1170 if '|' in err_msg:
1176 if '|' in err_msg:
1171 name, err = err_msg.split('|')
1177 name, err = err_msg.split('|')
1172 if 'SchainWarning' in err:
1178 if 'SchainWarning' in err:
1173 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1179 log.warning(err.split('SchainWarning:')[-1].split('\n')[0].strip(), name)
1174 elif 'SchainError' in err:
1180 elif 'SchainError' in err:
1175 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1181 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1176 else:
1182 else:
1177 log.error(err, name)
1183 log.error(err, name)
1178 else:
1184 else:
1179 name, err = self.name, err_msg
1185 name, err = self.name, err_msg
1180
1186
1181 time.sleep(2)
1187 time.sleep(2)
1182
1188
1183 for conf in self.procUnitConfObjDict.values():
1189 for conf in self.procUnitConfObjDict.values():
1184 for confop in conf.opConfObjList:
1190 for confop in conf.opConfObjList:
1185 if confop.type == 'external':
1191 if confop.type == 'external':
1186 confop.opObj.terminate()
1192 confop.opObj.terminate()
1187 conf.procUnitObj.terminate()
1193 conf.procUnitObj.terminate()
1188
1194
1189 ctx.term()
1195 ctx.term()
1190
1196
1191 message = ''.join(err)
1197 message = ''.join(err)
1192
1198
1193 if err_msg:
1199 if err_msg:
1194 subject = 'SChain v%s: Error running %s\n' % (
1200 subject = 'SChain v%s: Error running %s\n' % (
1195 schainpy.__version__, self.name)
1201 schainpy.__version__, self.name)
1196
1202
1197 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1203 subtitle = 'Hostname: %s\n' % socket.gethostbyname(
1198 socket.gethostname())
1204 socket.gethostname())
1199 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1205 subtitle += 'Working directory: %s\n' % os.path.abspath('./')
1200 subtitle += 'Configuration file: %s\n' % self.filename
1206 subtitle += 'Configuration file: %s\n' % self.filename
1201 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1207 subtitle += 'Time: %s\n' % str(datetime.datetime.now())
1202
1208
1203 readUnitConfObj = self.getReadUnitObj()
1209 readUnitConfObj = self.getReadUnitObj()
1204 if readUnitConfObj:
1210 if readUnitConfObj:
1205 subtitle += '\nInput parameters:\n'
1211 subtitle += '\nInput parameters:\n'
1206 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1212 subtitle += '[Data path = %s]\n' % readUnitConfObj.path
1207 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1213 subtitle += '[Data type = %s]\n' % readUnitConfObj.datatype
1208 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1214 subtitle += '[Start date = %s]\n' % readUnitConfObj.startDate
1209 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1215 subtitle += '[End date = %s]\n' % readUnitConfObj.endDate
1210 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1216 subtitle += '[Start time = %s]\n' % readUnitConfObj.startTime
1211 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1217 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1212
1218
1213 a = Alarm(
1219 a = Alarm(
1214 modes=self.alarm,
1220 modes=self.alarm,
1215 email=self.email,
1221 email=self.email,
1216 message=message,
1222 message=message,
1217 subject=subject,
1223 subject=subject,
1218 subtitle=subtitle,
1224 subtitle=subtitle,
1219 filename=self.filename
1225 filename=self.filename
1220 )
1226 )
1221
1227
1222 a.start()
1228 a.start()
1223
1229
1224 def isPaused(self):
1230 def isPaused(self):
1225 return 0
1231 return 0
1226
1232
1227 def isStopped(self):
1233 def isStopped(self):
1228 return 0
1234 return 0
1229
1235
1230 def runController(self):
1236 def runController(self):
1231 '''
1237 '''
1232 returns 0 when this process has been stopped, 1 otherwise
1238 returns 0 when this process has been stopped, 1 otherwise
1233 '''
1239 '''
1234
1240
1235 if self.isPaused():
1241 if self.isPaused():
1236 print('Process suspended')
1242 print('Process suspended')
1237
1243
1238 while True:
1244 while True:
1239 time.sleep(0.1)
1245 time.sleep(0.1)
1240
1246
1241 if not self.isPaused():
1247 if not self.isPaused():
1242 break
1248 break
1243
1249
1244 if self.isStopped():
1250 if self.isStopped():
1245 break
1251 break
1246
1252
1247 print('Process reinitialized')
1253 print('Process reinitialized')
1248
1254
1249 if self.isStopped():
1255 if self.isStopped():
1250 print('Process stopped')
1256 print('Process stopped')
1251 return 0
1257 return 0
1252
1258
1253 return 1
1259 return 1
1254
1260
1255 def setFilename(self, filename):
1261 def setFilename(self, filename):
1256
1262
1257 self.filename = filename
1263 self.filename = filename
1258
1264
1259 def setProxy(self):
1265 def setProxy(self):
1260
1266
1261 if not os.path.exists('/tmp/schain'):
1267 if not os.path.exists('/tmp/schain'):
1262 os.mkdir('/tmp/schain')
1268 os.mkdir('/tmp/schain')
1263
1269
1264 self.ctx = zmq.Context()
1270 self.ctx = zmq.Context()
1265 xpub = self.ctx.socket(zmq.XPUB)
1271 xpub = self.ctx.socket(zmq.XPUB)
1266 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1272 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
1267 xsub = self.ctx.socket(zmq.XSUB)
1273 xsub = self.ctx.socket(zmq.XSUB)
1268 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1274 xsub.bind('ipc:///tmp/schain/{}_sub'.format(self.id))
1269 self.monitor()
1275 self.monitor()
1270 try:
1276 try:
1271 zmq.proxy(xpub, xsub)
1277 zmq.proxy(xpub, xsub)
1272 except zmq.ContextTerminated:
1278 except zmq.ContextTerminated:
1273 xpub.close()
1279 xpub.close()
1274 xsub.close()
1280 xsub.close()
1275
1281
1276 def run(self):
1282 def run(self):
1277
1283
1278 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1284 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1279 self.start_time = time.time()
1285 self.start_time = time.time()
1280 self.createObjects()
1286 self.createObjects()
1281 self.setProxy()
1287 self.setProxy()
1282 log.success('{} Done (Time: {}s)'.format(
1288 log.success('{} Done (Time: {}s)'.format(
1283 self.name,
1289 self.name,
1284 time.time()-self.start_time), '')
1290 time.time()-self.start_time), '')
@@ -1,415 +1,416
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import os
15 import os
16 import sys
16 import inspect
17 import inspect
17 import zmq
18 import zmq
18 import time
19 import time
19 import pickle
20 import pickle
20 import traceback
21 import traceback
21 try:
22 try:
22 from queue import Queue
23 from queue import Queue
23 except:
24 except:
24 from Queue import Queue
25 from Queue import Queue
25 from threading import Thread
26 from threading import Thread
26 from multiprocessing import Process
27 from multiprocessing import Process
27
28
28 from schainpy.utils import log
29 from schainpy.utils import log
29
30
30
31
31 class ProcessingUnit(object):
32 class ProcessingUnit(object):
32
33
33 """
34 """
34 Update - Jan 2018 - MULTIPROCESSING
35 Update - Jan 2018 - MULTIPROCESSING
35 All the "call" methods present in the previous base were removed.
36 All the "call" methods present in the previous base were removed.
36 The majority of operations are independant processes, thus
37 The majority of operations are independant processes, thus
37 the decorator is in charge of communicate the operation processes
38 the decorator is in charge of communicate the operation processes
38 with the proccessing unit via IPC.
39 with the proccessing unit via IPC.
39
40
40 The constructor does not receive any argument. The remaining methods
41 The constructor does not receive any argument. The remaining methods
41 are related with the operations to execute.
42 are related with the operations to execute.
42
43
43
44
44 """
45 """
45 proc_type = 'processing'
46 proc_type = 'processing'
46 __attrs__ = []
47 __attrs__ = []
47
48
48 def __init__(self):
49 def __init__(self):
49
50
50 self.dataIn = None
51 self.dataIn = None
51 self.dataOut = None
52 self.dataOut = None
52 self.isConfig = False
53 self.isConfig = False
53 self.operations = []
54 self.operations = []
54 self.plots = []
55 self.plots = []
55
56
56 def getAllowedArgs(self):
57 def getAllowedArgs(self):
57 if hasattr(self, '__attrs__'):
58 if hasattr(self, '__attrs__'):
58 return self.__attrs__
59 return self.__attrs__
59 else:
60 else:
60 return inspect.getargspec(self.run).args
61 return inspect.getargspec(self.run).args
61
62
62 def addOperation(self, conf, operation):
63 def addOperation(self, conf, operation):
63 """
64 """
64 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
65 posses the id of the operation process (IPC purposes)
66 posses the id of the operation process (IPC purposes)
66
67
67 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
68 identificador asociado a este objeto.
69 identificador asociado a este objeto.
69
70
70 Input:
71 Input:
71
72
72 object : objeto de la clase "Operation"
73 object : objeto de la clase "Operation"
73
74
74 Return:
75 Return:
75
76
76 objId : identificador del objeto, necesario para comunicar con master(procUnit)
77 objId : identificador del objeto, necesario para comunicar con master(procUnit)
77 """
78 """
78
79
79 self.operations.append(
80 self.operations.append(
80 (operation, conf.type, conf.id, conf.getKwargs()))
81 (operation, conf.type, conf.id, conf.getKwargs()))
81
82
82 if 'plot' in self.name.lower():
83 if 'plot' in self.name.lower():
83 self.plots.append(operation.CODE)
84 self.plots.append(operation.CODE)
84
85
85 def getOperationObj(self, objId):
86 def getOperationObj(self, objId):
86
87
87 if objId not in list(self.operations.keys()):
88 if objId not in list(self.operations.keys()):
88 return None
89 return None
89
90
90 return self.operations[objId]
91 return self.operations[objId]
91
92
92 def operation(self, **kwargs):
93 def operation(self, **kwargs):
93 """
94 """
94 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
95 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
95 atributos del objeto dataOut
96 atributos del objeto dataOut
96
97
97 Input:
98 Input:
98
99
99 **kwargs : Diccionario de argumentos de la funcion a ejecutar
100 **kwargs : Diccionario de argumentos de la funcion a ejecutar
100 """
101 """
101
102
102 raise NotImplementedError
103 raise NotImplementedError
103
104
104 def setup(self):
105 def setup(self):
105
106
106 raise NotImplementedError
107 raise NotImplementedError
107
108
108 def run(self):
109 def run(self):
109
110
110 raise NotImplementedError
111 raise NotImplementedError
111
112
112 def close(self):
113 def close(self):
113
114
114 return
115 return
115
116
116
117
117 class Operation(object):
118 class Operation(object):
118
119
119 """
120 """
120 Update - Jan 2018 - MULTIPROCESSING
121 Update - Jan 2018 - MULTIPROCESSING
121
122
122 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
123 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
123 The constructor doe snot receive any argument, neither the baseclass.
124 The constructor doe snot receive any argument, neither the baseclass.
124
125
125
126
126 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
127 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
127 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
128 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
128 acumulacion dentro de esta clase
129 acumulacion dentro de esta clase
129
130
130 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
131 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
131
132
132 """
133 """
133 proc_type = 'operation'
134 proc_type = 'operation'
134 __attrs__ = []
135 __attrs__ = []
135
136
136 def __init__(self):
137 def __init__(self):
137
138
138 self.id = None
139 self.id = None
139 self.isConfig = False
140 self.isConfig = False
140
141
141 if not hasattr(self, 'name'):
142 if not hasattr(self, 'name'):
142 self.name = self.__class__.__name__
143 self.name = self.__class__.__name__
143
144
144 def getAllowedArgs(self):
145 def getAllowedArgs(self):
145 if hasattr(self, '__attrs__'):
146 if hasattr(self, '__attrs__'):
146 return self.__attrs__
147 return self.__attrs__
147 else:
148 else:
148 return inspect.getargspec(self.run).args
149 return inspect.getargspec(self.run).args
149
150
150 def setup(self):
151 def setup(self):
151
152
152 self.isConfig = True
153 self.isConfig = True
153
154
154 raise NotImplementedError
155 raise NotImplementedError
155
156
156 def run(self, dataIn, **kwargs):
157 def run(self, dataIn, **kwargs):
157 """
158 """
158 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
159 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
159 atributos del objeto dataIn.
160 atributos del objeto dataIn.
160
161
161 Input:
162 Input:
162
163
163 dataIn : objeto del tipo JROData
164 dataIn : objeto del tipo JROData
164
165
165 Return:
166 Return:
166
167
167 None
168 None
168
169
169 Affected:
170 Affected:
170 __buffer : buffer de recepcion de datos.
171 __buffer : buffer de recepcion de datos.
171
172
172 """
173 """
173 if not self.isConfig:
174 if not self.isConfig:
174 self.setup(**kwargs)
175 self.setup(**kwargs)
175
176
176 raise NotImplementedError
177 raise NotImplementedError
177
178
178 def close(self):
179 def close(self):
179
180
180 return
181 return
181
182
182 class InputQueue(Thread):
183 class InputQueue(Thread):
183
184
184 '''
185 '''
185 Class to hold input data for Proccessing Units and external Operations,
186 Class to hold input data for Proccessing Units and external Operations,
186 '''
187 '''
187
188
188 def __init__(self, project_id, inputId):
189 def __init__(self, project_id, inputId, lock=None):
189
190
190 Thread.__init__(self)
191 Thread.__init__(self)
191 self.queue = Queue()
192 self.queue = Queue()
192 self.project_id = project_id
193 self.project_id = project_id
193 self.inputId = inputId
194 self.inputId = inputId
195 self.lock = lock
196 self.size = 0
194
197
195 def run(self):
198 def run(self):
196
199
197 c = zmq.Context()
200 c = zmq.Context()
198 self.receiver = c.socket(zmq.SUB)
201 self.receiver = c.socket(zmq.SUB)
199 self.receiver.connect(
202 self.receiver.connect(
200 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
203 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
201 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
204 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
202
205
203 while True:
206 while True:
204 self.queue.put(self.receiver.recv_multipart()[1])
207 obj = self.receiver.recv_multipart()[1]
208 self.size += sys.getsizeof(obj)
209 self.queue.put(obj)
205
210
206 def get(self):
211 def get(self):
207
212 if self.size/1000000 > 2048:
208 return pickle.loads(self.queue.get())
213 self.lock.clear()
214 else:
215 self.lock.set()
216 obj = self.queue.get()
217 self.size -= sys.getsizeof(obj)
218 return pickle.loads(obj)
209
219
210
220
211 def MPDecorator(BaseClass):
221 def MPDecorator(BaseClass):
212 """
222 """
213 Multiprocessing class decorator
223 Multiprocessing class decorator
214
224
215 This function add multiprocessing features to a BaseClass. Also, it handle
225 This function add multiprocessing features to a BaseClass. Also, it handle
216 the communication beetween processes (readers, procUnits and operations).
226 the communication beetween processes (readers, procUnits and operations).
217 """
227 """
218
228
219 class MPClass(BaseClass, Process):
229 class MPClass(BaseClass, Process):
220
230
221 def __init__(self, *args, **kwargs):
231 def __init__(self, *args, **kwargs):
222 super(MPClass, self).__init__()
232 super(MPClass, self).__init__()
223 Process.__init__(self)
233 Process.__init__(self)
224 self.operationKwargs = {}
234 self.operationKwargs = {}
225 self.args = args
235 self.args = args
226 self.kwargs = kwargs
236 self.kwargs = kwargs
227 self.sender = None
237 self.sender = None
228 self.receiver = None
238 self.receiver = None
229 self.i = 0
239 self.i = 0
230 self.t = time.time()
240 self.t = time.time()
231 self.name = BaseClass.__name__
241 self.name = BaseClass.__name__
232 self.__doc__ = BaseClass.__doc__
242 self.__doc__ = BaseClass.__doc__
233
243
234 if 'plot' in self.name.lower() and not self.name.endswith('_'):
244 if 'plot' in self.name.lower() and not self.name.endswith('_'):
235 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
245 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
236
246
237 self.start_time = time.time()
247 self.start_time = time.time()
238 self.id = args[0]
248 self.id = args[0]
239 self.inputId = args[1]
249 self.inputId = args[1]
240 self.project_id = args[2]
250 self.project_id = args[2]
241 self.err_queue = args[3]
251 self.err_queue = args[3]
242 self.typeProc = args[4]
252 self.lock = args[4]
253 self.typeProc = args[5]
243 self.err_queue.put('#_start_#')
254 self.err_queue.put('#_start_#')
244 self.queue = InputQueue(self.project_id, self.inputId)
255 self.queue = InputQueue(self.project_id, self.inputId, self.lock)
245
256
246 def subscribe(self):
257 def subscribe(self):
247 '''
258 '''
248 Start the zmq socket receiver and subcribe to input ID.
259 Start the zmq socket receiver and subcribe to input ID.
249 '''
260 '''
250
261
251 self.queue.start()
262 self.queue.start()
252
263
253 def listen(self):
264 def listen(self):
254 '''
265 '''
255 This function waits for objects
266 This function waits for objects
256 '''
267 '''
257
268
258 return self.queue.get()
269 return self.queue.get()
259
270
260 def set_publisher(self):
271 def set_publisher(self):
261 '''
272 '''
262 This function create a zmq socket for publishing objects.
273 This function create a zmq socket for publishing objects.
263 '''
274 '''
264
275
265 time.sleep(0.5)
276 time.sleep(0.5)
266
277
267 c = zmq.Context()
278 c = zmq.Context()
268 self.sender = c.socket(zmq.PUB)
279 self.sender = c.socket(zmq.PUB)
269 self.sender.connect(
280 self.sender.connect(
270 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
281 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
271
282
272 def publish(self, data, id):
283 def publish(self, data, id):
273 '''
284 '''
274 This function publish an object, to an specific topic.
285 This function publish an object, to an specific topic.
275 For Read Units (inputId == None) adds a little delay
286 It blocks publishing when receiver queue is full to avoid data loss
276 to avoid data loss
277 '''
287 '''
278
288
279 if self.inputId is None:
289 if self.inputId is None:
280 self.i += 1
290 self.lock.wait()
281 if self.i % 40 == 0 and time.time()-self.t > 0.1:
282 self.i = 0
283 self.t = time.time()
284 time.sleep(0.05)
285 elif self.i % 40 == 0:
286 self.i = 0
287 self.t = time.time()
288 time.sleep(0.01)
289
290 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
291
292
292 def runReader(self):
293 def runReader(self):
293 '''
294 '''
294 Run fuction for read units
295 Run fuction for read units
295 '''
296 '''
296 while True:
297 while True:
297
298
298 try:
299 try:
299 BaseClass.run(self, **self.kwargs)
300 BaseClass.run(self, **self.kwargs)
300 except:
301 except:
301 err = traceback.format_exc()
302 err = traceback.format_exc()
302 if 'No more files' in err:
303 if 'No more files' in err:
303 log.warning('No more files to read', self.name)
304 log.warning('No more files to read', self.name)
304 else:
305 else:
305 self.err_queue.put('{}|{}'.format(self.name, err))
306 self.err_queue.put('{}|{}'.format(self.name, err))
306 self.dataOut.error = True
307 self.dataOut.error = True
307
308
308 for op, optype, opId, kwargs in self.operations:
309 for op, optype, opId, kwargs in self.operations:
309 if optype == 'self' and not self.dataOut.flagNoData:
310 if optype == 'self' and not self.dataOut.flagNoData:
310 op(**kwargs)
311 op(**kwargs)
311 elif optype == 'other' and not self.dataOut.flagNoData:
312 elif optype == 'other' and not self.dataOut.flagNoData:
312 self.dataOut = op.run(self.dataOut, **self.kwargs)
313 self.dataOut = op.run(self.dataOut, **self.kwargs)
313 elif optype == 'external':
314 elif optype == 'external':
314 self.publish(self.dataOut, opId)
315 self.publish(self.dataOut, opId)
315
316
316 if self.dataOut.flagNoData and not self.dataOut.error:
317 if self.dataOut.flagNoData and not self.dataOut.error:
317 continue
318 continue
318
319
319 self.publish(self.dataOut, self.id)
320 self.publish(self.dataOut, self.id)
320
321
321 if self.dataOut.error:
322 if self.dataOut.error:
322 break
323 break
323
324
324 time.sleep(0.5)
325 time.sleep(0.5)
325
326
326 def runProc(self):
327 def runProc(self):
327 '''
328 '''
328 Run function for proccessing units
329 Run function for proccessing units
329 '''
330 '''
330
331
331 while True:
332 while True:
332 self.dataIn = self.listen()
333 self.dataIn = self.listen()
333
334
334 if self.dataIn.flagNoData and self.dataIn.error is None:
335 if self.dataIn.flagNoData and self.dataIn.error is None:
335 continue
336 continue
336 elif not self.dataIn.error:
337 elif not self.dataIn.error:
337 try:
338 try:
338 BaseClass.run(self, **self.kwargs)
339 BaseClass.run(self, **self.kwargs)
339 except:
340 except:
340 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
341 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
341 self.dataOut.error = True
342 self.dataOut.error = True
342 elif self.dataIn.error:
343 elif self.dataIn.error:
343 self.dataOut.error = self.dataIn.error
344 self.dataOut.error = self.dataIn.error
344 self.dataOut.flagNoData = True
345 self.dataOut.flagNoData = True
345
346
346 for op, optype, opId, kwargs in self.operations:
347 for op, optype, opId, kwargs in self.operations:
347 if optype == 'self' and not self.dataOut.flagNoData:
348 if optype == 'self' and not self.dataOut.flagNoData:
348 op(**kwargs)
349 op(**kwargs)
349 elif optype == 'other' and not self.dataOut.flagNoData:
350 elif optype == 'other' and not self.dataOut.flagNoData:
350 self.dataOut = op.run(self.dataOut, **kwargs)
351 self.dataOut = op.run(self.dataOut, **kwargs)
351 elif optype == 'external' and not self.dataOut.flagNoData:
352 elif optype == 'external' and not self.dataOut.flagNoData:
352 self.publish(self.dataOut, opId)
353 self.publish(self.dataOut, opId)
353
354
354 self.publish(self.dataOut, self.id)
355 self.publish(self.dataOut, self.id)
355 for op, optype, opId, kwargs in self.operations:
356 for op, optype, opId, kwargs in self.operations:
356 if optype == 'external' and self.dataOut.error:
357 if optype == 'external' and self.dataOut.error:
357 self.publish(self.dataOut, opId)
358 self.publish(self.dataOut, opId)
358
359
359 if self.dataOut.error:
360 if self.dataOut.error:
360 break
361 break
361
362
362 time.sleep(0.5)
363 time.sleep(0.5)
363
364
364 def runOp(self):
365 def runOp(self):
365 '''
366 '''
366 Run function for external operations (this operations just receive data
367 Run function for external operations (this operations just receive data
367 ex: plots, writers, publishers)
368 ex: plots, writers, publishers)
368 '''
369 '''
369
370
370 while True:
371 while True:
371
372
372 dataOut = self.listen()
373 dataOut = self.listen()
373
374
374 if not dataOut.error:
375 if not dataOut.error:
375 BaseClass.run(self, dataOut, **self.kwargs)
376 BaseClass.run(self, dataOut, **self.kwargs)
376 else:
377 else:
377 break
378 break
378
379
379 def run(self):
380 def run(self):
380 if self.typeProc is "ProcUnit":
381 if self.typeProc is "ProcUnit":
381
382
382 if self.inputId is not None:
383 if self.inputId is not None:
383 self.subscribe()
384 self.subscribe()
384
385
385 self.set_publisher()
386 self.set_publisher()
386
387
387 if 'Reader' not in BaseClass.__name__:
388 if 'Reader' not in BaseClass.__name__:
388 self.runProc()
389 self.runProc()
389 else:
390 else:
390 self.runReader()
391 self.runReader()
391
392
392 elif self.typeProc is "Operation":
393 elif self.typeProc is "Operation":
393
394
394 self.subscribe()
395 self.subscribe()
395 self.runOp()
396 self.runOp()
396
397
397 else:
398 else:
398 raise ValueError("Unknown type")
399 raise ValueError("Unknown type")
399
400
400 self.close()
401 self.close()
401
402
402 def close(self):
403 def close(self):
403
404
404 BaseClass.close(self)
405 BaseClass.close(self)
405 self.err_queue.put('#_end_#')
406 self.err_queue.put('#_end_#')
406
407
407 if self.sender:
408 if self.sender:
408 self.sender.close()
409 self.sender.close()
409
410
410 if self.receiver:
411 if self.receiver:
411 self.receiver.close()
412 self.receiver.close()
412
413
413 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
414 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
414
415
415 return MPClass
416 return MPClass
General Comments 0
You need to be logged in to leave comments. Login now