##// END OF EJS Templates
Writing Unit for Madrigal decorated (just for python 2x)
George Yong -
r1206:59caf7a2130e
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,642 +1,645
1 '''
1 '''
2 Created on Aug 1, 2017
2 Created on Aug 1, 2017
3
3
4 @author: Juan C. Espinoza
4 @author: Juan C. Espinoza
5 '''
5 '''
6
6
7 import os
7 import os
8 import sys
8 import sys
9 import time
9 import time
10 import json
10 import json
11 import glob
11 import glob
12 import datetime
12 import datetime
13
13
14 import numpy
14 import numpy
15 import h5py
15 import h5py
16
16 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
17 from schainpy.model.io.jroIO_base import JRODataReader
18 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 from schainpy.model.data.jrodata import Parameters
18 from schainpy.model.data.jrodata import Parameters
20 from schainpy.utils import log
19 from schainpy.utils import log
21
20
22 try:
21 try:
23 import madrigal.cedar
22 import madrigal.cedar
24 except:
23 except:
25 log.warning(
24 log.warning(
26 'You should install "madrigal library" module if you want to read/write Madrigal data'
25 'You should install "madrigal library" module if you want to read/write Madrigal data'
27 )
26 )
28
27
29 DEF_CATALOG = {
28 DEF_CATALOG = {
30 'principleInvestigator': 'Marco Milla',
29 'principleInvestigator': 'Marco Milla',
31 'expPurpose': None,
30 'expPurpose': '',
32 'cycleTime': None,
31 'cycleTime': '',
33 'correlativeExp': None,
32 'correlativeExp': '',
34 'sciRemarks': None,
33 'sciRemarks': '',
35 'instRemarks': None
34 'instRemarks': ''
36 }
35 }
36
37 DEF_HEADER = {
37 DEF_HEADER = {
38 'kindatDesc': None,
38 'kindatDesc': '',
39 'analyst': 'Jicamarca User',
39 'analyst': 'Jicamarca User',
40 'comments': None,
40 'comments': '',
41 'history': None
41 'history': ''
42 }
42 }
43
43 MNEMONICS = {
44 MNEMONICS = {
44 10: 'jro',
45 10: 'jro',
45 11: 'jbr',
46 11: 'jbr',
46 840: 'jul',
47 840: 'jul',
47 13: 'jas',
48 13: 'jas',
48 1000: 'pbr',
49 1000: 'pbr',
49 1001: 'hbr',
50 1001: 'hbr',
50 1002: 'obr',
51 1002: 'obr',
52 400: 'clr'
53
51 }
54 }
52
55
53 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
56 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
54
57
55 def load_json(obj):
58 def load_json(obj):
56 '''
59 '''
57 Parse json as string instead of unicode
60 Parse json as string instead of unicode
58 '''
61 '''
59
62
60 if isinstance(obj, str):
63 if isinstance(obj, str):
61 iterable = json.loads(obj)
64 iterable = json.loads(obj)
62 else:
65 else:
63 iterable = obj
66 iterable = obj
64
67
65 if isinstance(iterable, dict):
68 if isinstance(iterable, dict):
66 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, str) else v
69 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, (str,unicode)) else v
67 for k, v in list(iterable.items())}
70 for k, v in list(iterable.items())}
68 elif isinstance(iterable, (list, tuple)):
71 elif isinstance(iterable, (list, tuple)):
69 return [str(v) if isinstance(v, str) else v for v in iterable]
72 return [str(v) if isinstance(v, str) else v for v in iterable]
70
73
71 return iterable
74 return iterable
72
75
73 @MPDecorator
76 @MPDecorator
74 class MADReader(JRODataReader, ProcessingUnit):
77 class MADReader(JRODataReader, ProcessingUnit):
75
78
76 def __init__(self):
79 def __init__(self):
77
80
78 ProcessingUnit.__init__(self)
81 ProcessingUnit.__init__(self)
79
82
80 self.dataOut = Parameters()
83 self.dataOut = Parameters()
81 self.counter_records = 0
84 self.counter_records = 0
82 self.nrecords = None
85 self.nrecords = None
83 self.flagNoMoreFiles = 0
86 self.flagNoMoreFiles = 0
84 self.isConfig = False
87 self.isConfig = False
85 self.filename = None
88 self.filename = None
86 self.intervals = set()
89 self.intervals = set()
87
90
88 def setup(self,
91 def setup(self,
89 path=None,
92 path=None,
90 startDate=None,
93 startDate=None,
91 endDate=None,
94 endDate=None,
92 format=None,
95 format=None,
93 startTime=datetime.time(0, 0, 0),
96 startTime=datetime.time(0, 0, 0),
94 endTime=datetime.time(23, 59, 59),
97 endTime=datetime.time(23, 59, 59),
95 **kwargs):
98 **kwargs):
96
99
97 self.path = path
100 self.path = path
98 self.startDate = startDate
101 self.startDate = startDate
99 self.endDate = endDate
102 self.endDate = endDate
100 self.startTime = startTime
103 self.startTime = startTime
101 self.endTime = endTime
104 self.endTime = endTime
102 self.datatime = datetime.datetime(1900,1,1)
105 self.datatime = datetime.datetime(1900,1,1)
103 self.oneDDict = load_json(kwargs.get('oneDDict',
106 self.oneDDict = load_json(kwargs.get('oneDDict',
104 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
107 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
105 self.twoDDict = load_json(kwargs.get('twoDDict',
108 self.twoDDict = load_json(kwargs.get('twoDDict',
106 "{\"GDALT\": \"heightList\"}"))
109 "{\"GDALT\": \"heightList\"}"))
107 self.ind2DList = load_json(kwargs.get('ind2DList',
110 self.ind2DList = load_json(kwargs.get('ind2DList',
108 "[\"GDALT\"]"))
111 "[\"GDALT\"]"))
109 if self.path is None:
112 if self.path is None:
110 raise ValueError('The path is not valid')
113 raise ValueError('The path is not valid')
111
114
112 if format is None:
115 if format is None:
113 raise ValueError('The format is not valid choose simple or hdf5')
116 raise ValueError('The format is not valid choose simple or hdf5')
114 elif format.lower() in ('simple', 'txt'):
117 elif format.lower() in ('simple', 'txt'):
115 self.ext = '.txt'
118 self.ext = '.txt'
116 elif format.lower() in ('cedar',):
119 elif format.lower() in ('cedar',):
117 self.ext = '.001'
120 self.ext = '.001'
118 else:
121 else:
119 self.ext = '.hdf5'
122 self.ext = '.hdf5'
120
123
121 self.search_files(self.path)
124 self.search_files(self.path)
122 self.fileId = 0
125 self.fileId = 0
123
126
124 if not self.fileList:
127 if not self.fileList:
125 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
128 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
126
129
127 self.setNextFile()
130 self.setNextFile()
128
131
129 def search_files(self, path):
132 def search_files(self, path):
130 '''
133 '''
131 Searching for madrigal files in path
134 Searching for madrigal files in path
132 Creating a list of files to procces included in [startDate,endDate]
135 Creating a list of files to procces included in [startDate,endDate]
133
136
134 Input:
137 Input:
135 path - Path to find files
138 path - Path to find files
136 '''
139 '''
137
140
138 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
141 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
139 foldercounter = 0
142 foldercounter = 0
140 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
143 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
141 fileList0.sort()
144 fileList0.sort()
142
145
143 self.fileList = []
146 self.fileList = []
144 self.dateFileList = []
147 self.dateFileList = []
145
148
146 startDate = self.startDate - datetime.timedelta(1)
149 startDate = self.startDate - datetime.timedelta(1)
147 endDate = self.endDate + datetime.timedelta(1)
150 endDate = self.endDate + datetime.timedelta(1)
148
151
149 for thisFile in fileList0:
152 for thisFile in fileList0:
150 year = thisFile[3:7]
153 year = thisFile[3:7]
151 if not year.isdigit():
154 if not year.isdigit():
152 continue
155 continue
153
156
154 month = thisFile[7:9]
157 month = thisFile[7:9]
155 if not month.isdigit():
158 if not month.isdigit():
156 continue
159 continue
157
160
158 day = thisFile[9:11]
161 day = thisFile[9:11]
159 if not day.isdigit():
162 if not day.isdigit():
160 continue
163 continue
161
164
162 year, month, day = int(year), int(month), int(day)
165 year, month, day = int(year), int(month), int(day)
163 dateFile = datetime.date(year, month, day)
166 dateFile = datetime.date(year, month, day)
164
167
165 if (startDate > dateFile) or (endDate < dateFile):
168 if (startDate > dateFile) or (endDate < dateFile):
166 continue
169 continue
167
170
168 self.fileList.append(thisFile)
171 self.fileList.append(thisFile)
169 self.dateFileList.append(dateFile)
172 self.dateFileList.append(dateFile)
170
173
171 return
174 return
172
175
173 def parseHeader(self):
176 def parseHeader(self):
174 '''
177 '''
175 '''
178 '''
176
179
177 self.output = {}
180 self.output = {}
178 self.version = '2'
181 self.version = '2'
179 s_parameters = None
182 s_parameters = None
180 if self.ext == '.txt':
183 if self.ext == '.txt':
181 self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
184 self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
182 elif self.ext == '.hdf5':
185 elif self.ext == '.hdf5':
183 metadata = self.fp['Metadata']
186 metadata = self.fp['Metadata']
184 data = self.fp['Data']['Array Layout']
187 data = self.fp['Data']['Array Layout']
185 if 'Independent Spatial Parameters' in metadata:
188 if 'Independent Spatial Parameters' in metadata:
186 s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
189 s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
187 self.version = '3'
190 self.version = '3'
188 one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
191 one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
189 one_d = [1 for s in one]
192 one_d = [1 for s in one]
190 two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
193 two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
191 two_d = [2 for s in two]
194 two_d = [2 for s in two]
192 self.parameters = one + two
195 self.parameters = one + two
193 self.parameters_d = one_d + two_d
196 self.parameters_d = one_d + two_d
194
197
195 log.success('Parameters found: {}'.format(','.join(str(self.parameters))),
198 log.success('Parameters found: {}'.format(self.parameters),
196 'MADReader')
199 'MADReader')
197 if s_parameters:
200 if s_parameters:
198 log.success('Spatial parameters: {}'.format(','.join(str(s_parameters))),
201 log.success('Spatial parameters: {}'.format(','.join(str(s_parameters))),
199 'MADReader')
202 'MADReader')
200
203
201 for param in list(self.oneDDict.keys()):
204 for param in list(self.oneDDict.keys()):
202 if param.lower() not in self.parameters:
205 if param.lower() not in self.parameters:
203 log.warning(
206 log.warning(
204 'Parameter {} not found will be ignored'.format(
207 'Parameter {} not found will be ignored'.format(
205 param),
208 param),
206 'MADReader')
209 'MADReader')
207 self.oneDDict.pop(param, None)
210 self.oneDDict.pop(param, None)
208
211
209 for param, value in list(self.twoDDict.items()):
212 for param, value in list(self.twoDDict.items()):
210 if param.lower() not in self.parameters:
213 if param.lower() not in self.parameters:
211 log.warning(
214 log.warning(
212 'Parameter {} not found, it will be ignored'.format(
215 'Parameter {} not found, it will be ignored'.format(
213 param),
216 param),
214 'MADReader')
217 'MADReader')
215 self.twoDDict.pop(param, None)
218 self.twoDDict.pop(param, None)
216 continue
219 continue
217 if isinstance(value, list):
220 if isinstance(value, list):
218 if value[0] not in self.output:
221 if value[0] not in self.output:
219 self.output[value[0]] = []
222 self.output[value[0]] = []
220 self.output[value[0]].append(None)
223 self.output[value[0]].append(None)
221
224
222 def parseData(self):
225 def parseData(self):
223 '''
226 '''
224 '''
227 '''
225
228
226 if self.ext == '.txt':
229 if self.ext == '.txt':
227 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
230 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
228 self.nrecords = self.data.shape[0]
231 self.nrecords = self.data.shape[0]
229 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
232 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
230 elif self.ext == '.hdf5':
233 elif self.ext == '.hdf5':
231 self.data = self.fp['Data']['Array Layout']
234 self.data = self.fp['Data']['Array Layout']
232 self.nrecords = len(self.data['timestamps'].value)
235 self.nrecords = len(self.data['timestamps'].value)
233 self.ranges = self.data['range'].value
236 self.ranges = self.data['range'].value
234
237
235 def setNextFile(self):
238 def setNextFile(self):
236 '''
239 '''
237 '''
240 '''
238
241
239 file_id = self.fileId
242 file_id = self.fileId
240
243
241 if file_id == len(self.fileList):
244 if file_id == len(self.fileList):
242 log.success('No more files', 'MADReader')
245 log.success('No more files', 'MADReader')
243 self.flagNoMoreFiles = 1
246 self.flagNoMoreFiles = 1
244 return 0
247 return 0
245
248
246 log.success(
249 log.success(
247 'Opening: {}'.format(self.fileList[file_id]),
250 'Opening: {}'.format(self.fileList[file_id]),
248 'MADReader'
251 'MADReader'
249 )
252 )
250
253
251 filename = os.path.join(self.path, self.fileList[file_id])
254 filename = os.path.join(self.path, self.fileList[file_id])
252
255
253 if self.filename is not None:
256 if self.filename is not None:
254 self.fp.close()
257 self.fp.close()
255
258
256 self.filename = filename
259 self.filename = filename
257 self.filedate = self.dateFileList[file_id]
260 self.filedate = self.dateFileList[file_id]
258
261
259 if self.ext=='.hdf5':
262 if self.ext=='.hdf5':
260 self.fp = h5py.File(self.filename, 'r')
263 self.fp = h5py.File(self.filename, 'r')
261 else:
264 else:
262 self.fp = open(self.filename, 'rb')
265 self.fp = open(self.filename, 'rb')
263
266
264 self.parseHeader()
267 self.parseHeader()
265 self.parseData()
268 self.parseData()
266 self.sizeOfFile = os.path.getsize(self.filename)
269 self.sizeOfFile = os.path.getsize(self.filename)
267 self.counter_records = 0
270 self.counter_records = 0
268 self.flagIsNewFile = 0
271 self.flagIsNewFile = 0
269 self.fileId += 1
272 self.fileId += 1
270
273
271 return 1
274 return 1
272
275
273 def readNextBlock(self):
276 def readNextBlock(self):
274
277
275 while True:
278 while True:
276 self.flagDiscontinuousBlock = 0
279 self.flagDiscontinuousBlock = 0
277 if self.flagIsNewFile:
280 if self.flagIsNewFile:
278 if not self.setNextFile():
281 if not self.setNextFile():
279 return 0
282 return 0
280
283
281 self.readBlock()
284 self.readBlock()
282
285
283 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
286 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
284 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
287 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
285 log.warning(
288 log.warning(
286 'Reading Record No. {}/{} -> {} [Skipping]'.format(
289 'Reading Record No. {}/{} -> {} [Skipping]'.format(
287 self.counter_records,
290 self.counter_records,
288 self.nrecords,
291 self.nrecords,
289 self.datatime.ctime()),
292 self.datatime.ctime()),
290 'MADReader')
293 'MADReader')
291 continue
294 continue
292 break
295 break
293
296
294 log.log(
297 log.log(
295 'Reading Record No. {}/{} -> {}'.format(
298 'Reading Record No. {}/{} -> {}'.format(
296 self.counter_records,
299 self.counter_records,
297 self.nrecords,
300 self.nrecords,
298 self.datatime.ctime()),
301 self.datatime.ctime()),
299 'MADReader')
302 'MADReader')
300
303
301 return 1
304 return 1
302
305
303 def readBlock(self):
306 def readBlock(self):
304 '''
307 '''
305 '''
308 '''
306 dum = []
309 dum = []
307 if self.ext == '.txt':
310 if self.ext == '.txt':
308 dt = self.data[self.counter_records][:6].astype(int)
311 dt = self.data[self.counter_records][:6].astype(int)
309 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
312 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
310 self.flagDiscontinuousBlock = 1
313 self.flagDiscontinuousBlock = 1
311 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
314 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
312 while True:
315 while True:
313 dt = self.data[self.counter_records][:6].astype(int)
316 dt = self.data[self.counter_records][:6].astype(int)
314 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
317 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
315 if datatime == self.datatime:
318 if datatime == self.datatime:
316 dum.append(self.data[self.counter_records])
319 dum.append(self.data[self.counter_records])
317 self.counter_records += 1
320 self.counter_records += 1
318 if self.counter_records == self.nrecords:
321 if self.counter_records == self.nrecords:
319 self.flagIsNewFile = True
322 self.flagIsNewFile = True
320 break
323 break
321 continue
324 continue
322 self.intervals.add((datatime-self.datatime).seconds)
325 self.intervals.add((datatime-self.datatime).seconds)
323 break
326 break
324 elif self.ext == '.hdf5':
327 elif self.ext == '.hdf5':
325 datatime = datetime.datetime.utcfromtimestamp(
328 datatime = datetime.datetime.utcfromtimestamp(
326 self.data['timestamps'][self.counter_records])
329 self.data['timestamps'][self.counter_records])
327 nHeights = len(self.ranges)
330 nHeights = len(self.ranges)
328 for n, param in enumerate(self.parameters):
331 for n, param in enumerate(self.parameters):
329 if self.parameters_d[n] == 1:
332 if self.parameters_d[n] == 1:
330 dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
333 dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
331 else:
334 else:
332 if self.version == '2':
335 if self.version == '2':
333 dum.append(self.data['2D Parameters'][param][self.counter_records])
336 dum.append(self.data['2D Parameters'][param][self.counter_records])
334 else:
337 else:
335 tmp = self.data['2D Parameters'][param].value.T
338 tmp = self.data['2D Parameters'][param].value.T
336 dum.append(tmp[self.counter_records])
339 dum.append(tmp[self.counter_records])
337 self.intervals.add((datatime-self.datatime).seconds)
340 self.intervals.add((datatime-self.datatime).seconds)
338 if datatime.date()>self.datatime.date():
341 if datatime.date()>self.datatime.date():
339 self.flagDiscontinuousBlock = 1
342 self.flagDiscontinuousBlock = 1
340 self.datatime = datatime
343 self.datatime = datatime
341 self.counter_records += 1
344 self.counter_records += 1
342 if self.counter_records == self.nrecords:
345 if self.counter_records == self.nrecords:
343 self.flagIsNewFile = True
346 self.flagIsNewFile = True
344
347
345 self.buffer = numpy.array(dum)
348 self.buffer = numpy.array(dum)
346 return
349 return
347
350
348 def set_output(self):
351 def set_output(self):
349 '''
352 '''
350 Storing data from buffer to dataOut object
353 Storing data from buffer to dataOut object
351 '''
354 '''
352
355
353 parameters = [None for __ in self.parameters]
356 parameters = [None for __ in self.parameters]
354
357
355 for param, attr in list(self.oneDDict.items()):
358 for param, attr in list(self.oneDDict.items()):
356 x = self.parameters.index(param.lower())
359 x = self.parameters.index(param.lower())
357 setattr(self.dataOut, attr, self.buffer[0][x])
360 setattr(self.dataOut, attr, self.buffer[0][x])
358
361
359 for param, value in list(self.twoDDict.items()):
362 for param, value in list(self.twoDDict.items()):
360 x = self.parameters.index(param.lower())
363 x = self.parameters.index(param.lower())
361 if self.ext == '.txt':
364 if self.ext == '.txt':
362 y = self.parameters.index(self.ind2DList[0].lower())
365 y = self.parameters.index(self.ind2DList[0].lower())
363 ranges = self.buffer[:,y]
366 ranges = self.buffer[:,y]
364 if self.ranges.size == ranges.size:
367 #if self.ranges.size == ranges.size:
365 continue
368 # continue
366 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
369 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
367 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
370 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
368 dummy[index] = self.buffer[:,x]
371 dummy[index] = self.buffer[:,x]
369 else:
372 else:
370 dummy = self.buffer[x]
373 dummy = self.buffer[x]
371
374
372 if isinstance(value, str):
375 if isinstance(value, str):
373 if value not in self.ind2DList:
376 if value not in self.ind2DList:
374 setattr(self.dataOut, value, dummy.reshape(1,-1))
377 setattr(self.dataOut, value, dummy.reshape(1,-1))
375 elif isinstance(value, list):
378 elif isinstance(value, list):
376 self.output[value[0]][value[1]] = dummy
379 self.output[value[0]][value[1]] = dummy
377 parameters[value[1]] = param
380 parameters[value[1]] = param
378
381
379 for key, value in list(self.output.items()):
382 for key, value in list(self.output.items()):
380 setattr(self.dataOut, key, numpy.array(value))
383 setattr(self.dataOut, key, numpy.array(value))
381
384
382 self.dataOut.parameters = [s for s in parameters if s]
385 self.dataOut.parameters = [s for s in parameters if s]
383 self.dataOut.heightList = self.ranges
386 self.dataOut.heightList = self.ranges
384 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
387 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
385 self.dataOut.utctimeInit = self.dataOut.utctime
388 self.dataOut.utctimeInit = self.dataOut.utctime
386 self.dataOut.paramInterval = min(self.intervals)
389 self.dataOut.paramInterval = min(self.intervals)
387 self.dataOut.useLocalTime = False
390 self.dataOut.useLocalTime = False
388 self.dataOut.flagNoData = False
391 self.dataOut.flagNoData = False
389 self.dataOut.nrecords = self.nrecords
392 self.dataOut.nrecords = self.nrecords
390 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
393 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
391
394
392 def getData(self):
395 def getData(self):
393 '''
396 '''
394 Storing data from databuffer to dataOut object
397 Storing data from databuffer to dataOut object
395 '''
398 '''
396 if self.flagNoMoreFiles:
399 if self.flagNoMoreFiles:
397 self.dataOut.flagNoData = True
400 self.dataOut.flagNoData = True
398 self.dataOut.error = 'No file left to process'
401 self.dataOut.error = 'No file left to process'
399 return 0
402 return 0
400
403
401 if not self.readNextBlock():
404 if not self.readNextBlock():
402 self.dataOut.flagNoData = True
405 self.dataOut.flagNoData = True
403 return 0
406 return 0
404
407
405 self.set_output()
408 self.set_output()
406
409
407 return 1
410 return 1
408
411
409
412 @MPDecorator
410 class MADWriter(Operation):
413 class MADWriter(Operation):
411
414
412 missing = -32767
415 missing = -32767
413
416
414 def __init__(self, **kwargs):
417 def __init__(self):
415
418
416 Operation.__init__(self, **kwargs)
419 Operation.__init__(self)
417 self.dataOut = Parameters()
420 self.dataOut = Parameters()
418 self.counter = 0
421 self.counter = 0
419 self.path = None
422 self.path = None
420 self.fp = None
423 self.fp = None
421
424
422 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
425 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
423 metadata='{}', format='cedar', **kwargs):
426 metadata='{}', format='cedar', **kwargs):
424 '''
427 '''
425 Inputs:
428 Inputs:
426 path - path where files will be created
429 path - path where files will be created
427 oneDDict - json of one-dimensional parameters in record where keys
430 oneDDict - json of one-dimensional parameters in record where keys
428 are Madrigal codes (integers or mnemonics) and values the corresponding
431 are Madrigal codes (integers or mnemonics) and values the corresponding
429 dataOut attribute e.g: {
432 dataOut attribute e.g: {
430 'gdlatr': 'lat',
433 'gdlatr': 'lat',
431 'gdlonr': 'lon',
434 'gdlonr': 'lon',
432 'gdlat2':'lat',
435 'gdlat2':'lat',
433 'glon2':'lon'}
436 'glon2':'lon'}
434 ind2DList - list of independent spatial two-dimensional parameters e.g:
437 ind2DList - list of independent spatial two-dimensional parameters e.g:
435 ['heighList']
438 ['heighList']
436 twoDDict - json of two-dimensional parameters in record where keys
439 twoDDict - json of two-dimensional parameters in record where keys
437 are Madrigal codes (integers or mnemonics) and values the corresponding
440 are Madrigal codes (integers or mnemonics) and values the corresponding
438 dataOut attribute if multidimensional array specify as tupple
441 dataOut attribute if multidimensional array specify as tupple
439 ('attr', pos) e.g: {
442 ('attr', pos) e.g: {
440 'gdalt': 'heightList',
443 'gdalt': 'heightList',
441 'vn1p2': ('data_output', 0),
444 'vn1p2': ('data_output', 0),
442 'vn2p2': ('data_output', 1),
445 'vn2p2': ('data_output', 1),
443 'vn3': ('data_output', 2),
446 'vn3': ('data_output', 2),
444 'snl': ('data_SNR', 'db')
447 'snl': ('data_SNR', 'db')
445 }
448 }
446 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
449 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
447 '''
450 '''
448 if not self.isConfig:
451 if not self.isConfig:
449 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
452 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
450 self.isConfig = True
453 self.isConfig = True
451
454
452 self.dataOut = dataOut
455 self.dataOut = dataOut
453 self.putData()
456 self.putData()
454 return
457 return 1
455
458
456 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
459 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
457 '''
460 '''
458 Configure Operation
461 Configure Operation
459 '''
462 '''
460
463
461 self.path = path
464 self.path = path
462 self.blocks = kwargs.get('blocks', None)
465 self.blocks = kwargs.get('blocks', None)
463 self.counter = 0
466 self.counter = 0
464 self.oneDDict = load_json(oneDDict)
467 self.oneDDict = load_json(oneDDict)
465 self.twoDDict = load_json(twoDDict)
468 self.twoDDict = load_json(twoDDict)
466 self.ind2DList = load_json(ind2DList)
469 self.ind2DList = load_json(ind2DList)
467 meta = load_json(metadata)
470 meta = load_json(metadata)
468 self.kinst = meta.get('kinst')
471 self.kinst = meta.get('kinst')
469 self.kindat = meta.get('kindat')
472 self.kindat = meta.get('kindat')
470 self.catalog = meta.get('catalog', DEF_CATALOG)
473 self.catalog = meta.get('catalog', DEF_CATALOG)
471 self.header = meta.get('header', DEF_HEADER)
474 self.header = meta.get('header', DEF_HEADER)
472 if format == 'cedar':
475 if format == 'cedar':
473 self.ext = '.dat'
476 self.ext = '.dat'
474 self.extra_args = {}
477 self.extra_args = {}
475 elif format == 'hdf5':
478 elif format == 'hdf5':
476 self.ext = '.hdf5'
479 self.ext = '.hdf5'
477 self.extra_args = {'ind2DList': self.ind2DList}
480 self.extra_args = {'ind2DList': self.ind2DList}
478
481
479 self.keys = [k.lower() for k in self.twoDDict]
482 self.keys = [k.lower() for k in self.twoDDict]
480 if 'range' in self.keys:
483 if 'range' in self.keys:
481 self.keys.remove('range')
484 self.keys.remove('range')
482 if 'gdalt' in self.keys:
485 if 'gdalt' in self.keys:
483 self.keys.remove('gdalt')
486 self.keys.remove('gdalt')
484
487
485 def setFile(self):
488 def setFile(self):
486 '''
489 '''
487 Create new cedar file object
490 Create new cedar file object
488 '''
491 '''
489
492
490 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
493 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
491 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
494 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
492
495
493 filename = '{}{}{}'.format(self.mnemonic,
496 filename = '{}{}{}'.format(self.mnemonic,
494 date.strftime('%Y%m%d_%H%M%S'),
497 date.strftime('%Y%m%d_%H%M%S'),
495 self.ext)
498 self.ext)
496
499
497 self.fullname = os.path.join(self.path, filename)
500 self.fullname = os.path.join(self.path, filename)
498
501
499 if os.path.isfile(self.fullname) :
502 if os.path.isfile(self.fullname) :
500 log.warning(
503 log.warning(
501 'Destination file {} already exists, previous file deleted.'.format(
504 'Destination file {} already exists, previous file deleted.'.format(
502 self.fullname),
505 self.fullname),
503 'MADWriter')
506 'MADWriter')
504 os.remove(self.fullname)
507 os.remove(self.fullname)
505
508
506 try:
509 try:
507 log.success(
510 log.success(
508 'Creating file: {}'.format(self.fullname),
511 'Creating file: {}'.format(self.fullname),
509 'MADWriter')
512 'MADWriter')
510 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
513 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
511 except ValueError as e:
514 except ValueError as e:
512 log.error(
515 log.error(
513 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
516 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
514 'MADWriter')
517 'MADWriter')
515 return
518 return
516
519
517 return 1
520 return 1
518
521
519 def writeBlock(self):
522 def writeBlock(self):
520 '''
523 '''
521 Add data records to cedar file taking data from oneDDict and twoDDict
524 Add data records to cedar file taking data from oneDDict and twoDDict
522 attributes.
525 attributes.
523 Allowed parameters in: parcodes.tab
526 Allowed parameters in: parcodes.tab
524 '''
527 '''
525
528
526 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
529 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
527 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
530 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
528 heights = self.dataOut.heightList
531 heights = self.dataOut.heightList
529
532
530 if self.ext == '.dat':
533 if self.ext == '.dat':
531 for key, value in list(self.twoDDict.items()):
534 for key, value in list(self.twoDDict.items()):
532 if isinstance(value, str):
535 if isinstance(value, str):
533 data = getattr(self.dataOut, value)
536 data = getattr(self.dataOut, value)
534 invalid = numpy.isnan(data)
537 invalid = numpy.isnan(data)
535 data[invalid] = self.missing
538 data[invalid] = self.missing
536 elif isinstance(value, (tuple, list)):
539 elif isinstance(value, (tuple, list)):
537 attr, key = value
540 attr, key = value
538 data = getattr(self.dataOut, attr)
541 data = getattr(self.dataOut, attr)
539 invalid = numpy.isnan(data)
542 invalid = numpy.isnan(data)
540 data[invalid] = self.missing
543 data[invalid] = self.missing
541
544
542 out = {}
545 out = {}
543 for key, value in list(self.twoDDict.items()):
546 for key, value in list(self.twoDDict.items()):
544 key = key.lower()
547 key = key.lower()
545 if isinstance(value, str):
548 if isinstance(value, str):
546 if 'db' in value.lower():
549 if 'db' in value.lower():
547 tmp = getattr(self.dataOut, value.replace('_db', ''))
550 tmp = getattr(self.dataOut, value.replace('_db', ''))
548 SNRavg = numpy.average(tmp, axis=0)
551 SNRavg = numpy.average(tmp, axis=0)
549 tmp = 10*numpy.log10(SNRavg)
552 tmp = 10*numpy.log10(SNRavg)
550 else:
553 else:
551 tmp = getattr(self.dataOut, value)
554 tmp = getattr(self.dataOut, value)
552 out[key] = tmp.flatten()
555 out[key] = tmp.flatten()
553 elif isinstance(value, (tuple, list)):
556 elif isinstance(value, (tuple, list)):
554 attr, x = value
557 attr, x = value
555 data = getattr(self.dataOut, attr)
558 data = getattr(self.dataOut, attr)
556 out[key] = data[int(x)]
559 out[key] = data[int(x)]
557
560
558 a = numpy.array([out[k] for k in self.keys])
561 a = numpy.array([out[k] for k in self.keys])
559 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
562 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
560 index = numpy.where(nrows == False)[0]
563 index = numpy.where(nrows == False)[0]
561
564
562 rec = madrigal.cedar.MadrigalDataRecord(
565 rec = madrigal.cedar.MadrigalDataRecord(
563 self.kinst,
566 self.kinst,
564 self.kindat,
567 self.kindat,
565 startTime.year,
568 startTime.year,
566 startTime.month,
569 startTime.month,
567 startTime.day,
570 startTime.day,
568 startTime.hour,
571 startTime.hour,
569 startTime.minute,
572 startTime.minute,
570 startTime.second,
573 startTime.second,
571 startTime.microsecond/10000,
574 startTime.microsecond/10000,
572 endTime.year,
575 endTime.year,
573 endTime.month,
576 endTime.month,
574 endTime.day,
577 endTime.day,
575 endTime.hour,
578 endTime.hour,
576 endTime.minute,
579 endTime.minute,
577 endTime.second,
580 endTime.second,
578 endTime.microsecond/10000,
581 endTime.microsecond/10000,
579 list(self.oneDDict.keys()),
582 list(self.oneDDict.keys()),
580 list(self.twoDDict.keys()),
583 list(self.twoDDict.keys()),
581 len(index),
584 len(index),
582 **self.extra_args
585 **self.extra_args
583 )
586 )
584
587
585 # Setting 1d values
588 # Setting 1d values
586 for key in self.oneDDict:
589 for key in self.oneDDict:
587 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
590 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
588
591
589 # Setting 2d values
592 # Setting 2d values
590 nrec = 0
593 nrec = 0
591 for n in index:
594 for n in index:
592 for key in out:
595 for key in out:
593 rec.set2D(key, nrec, out[key][n])
596 rec.set2D(key, nrec, out[key][n])
594 nrec += 1
597 nrec += 1
595
598
596 self.fp.append(rec)
599 self.fp.append(rec)
597 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
600 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
598 self.fp.dump()
601 self.fp.dump()
599 if self.counter % 100 == 0 and self.counter > 0:
602 if self.counter % 20 == 0 and self.counter > 0:
600 log.log(
603 log.log(
601 'Writing {} records'.format(
604 'Writing {} records'.format(
602 self.counter),
605 self.counter),
603 'MADWriter')
606 'MADWriter')
604
607
605 def setHeader(self):
608 def setHeader(self):
606 '''
609 '''
607 Create an add catalog and header to cedar file
610 Create an add catalog and header to cedar file
608 '''
611 '''
609
612
610 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
613 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
611
614
612 if self.ext == '.dat':
615 if self.ext == '.dat':
613 self.fp.write()
616 self.fp.write()
614 else:
617 else:
615 self.fp.dump()
618 self.fp.dump()
616 self.fp.close()
619 self.fp.close()
617
620
618 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
621 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
619 header.createCatalog(**self.catalog)
622 header.createCatalog(**self.catalog)
620 header.createHeader(**self.header)
623 header.createHeader(**self.header)
621 header.write()
624 header.write()
622
625
623 def putData(self):
626 def putData(self):
624
627
625 if self.dataOut.flagNoData:
628 if self.dataOut.flagNoData:
626 return 0
629 return 0
627
630
628 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
631 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
629 if self.counter > 0:
632 if self.counter > 0:
630 self.setHeader()
633 self.setHeader()
631 self.counter = 0
634 self.counter = 0
632
635
633 if self.counter == 0:
636 if self.counter == 0:
634 self.setFile()
637 self.setFile()
635
638
636 self.writeBlock()
639 self.writeBlock()
637 self.counter += 1
640 self.counter += 1
638
641
639 def close(self):
642 def close(self):
640
643
641 if self.counter > 0:
644 if self.counter > 0:
642 self.setHeader() No newline at end of file
645 self.setHeader()
@@ -1,384 +1,390
1 '''
1 '''
2 Updated for multiprocessing
2 Updated for multiprocessing
3 Author : Sergio Cortez
3 Author : Sergio Cortez
4 Jan 2018
4 Jan 2018
5 Abstract:
5 Abstract:
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9
9
10 Based on:
10 Based on:
11 $Author: murco $
11 $Author: murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 '''
13 '''
14
14
15 import inspect
15 import inspect
16 import zmq
16 import zmq
17 import time
17 import time
18 import pickle
18 import pickle
19 import os
19 import os
20 from multiprocessing import Process
20 from multiprocessing import Process
21 from zmq.utils.monitor import recv_monitor_message
21 from zmq.utils.monitor import recv_monitor_message
22
22
23 from schainpy.utils import log
23 from schainpy.utils import log
24
24
25
25
26 class ProcessingUnit(object):
26 class ProcessingUnit(object):
27
27
28 """
28 """
29 Update - Jan 2018 - MULTIPROCESSING
29 Update - Jan 2018 - MULTIPROCESSING
30 All the "call" methods present in the previous base were removed.
30 All the "call" methods present in the previous base were removed.
31 The majority of operations are independant processes, thus
31 The majority of operations are independant processes, thus
32 the decorator is in charge of communicate the operation processes
32 the decorator is in charge of communicate the operation processes
33 with the proccessing unit via IPC.
33 with the proccessing unit via IPC.
34
34
35 The constructor does not receive any argument. The remaining methods
35 The constructor does not receive any argument. The remaining methods
36 are related with the operations to execute.
36 are related with the operations to execute.
37
37
38
38
39 """
39 """
40
40
41 def __init__(self):
41 def __init__(self):
42
42
43 self.dataIn = None
43 self.dataIn = None
44 self.dataOut = None
44 self.dataOut = None
45 self.isConfig = False
45 self.isConfig = False
46 self.operations = []
46 self.operations = []
47 self.plots = []
47 self.plots = []
48
48
49 def getAllowedArgs(self):
49 def getAllowedArgs(self):
50 if hasattr(self, '__attrs__'):
50 if hasattr(self, '__attrs__'):
51 return self.__attrs__
51 return self.__attrs__
52 else:
52 else:
53 return inspect.getargspec(self.run).args
53 return inspect.getargspec(self.run).args
54
54
55 def addOperation(self, conf, operation):
55 def addOperation(self, conf, operation):
56 """
56 """
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 posses the id of the operation process (IPC purposes)
58 posses the id of the operation process (IPC purposes)
59
59
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 identificador asociado a este objeto.
61 identificador asociado a este objeto.
62
62
63 Input:
63 Input:
64
64
65 object : objeto de la clase "Operation"
65 object : objeto de la clase "Operation"
66
66
67 Return:
67 Return:
68
68
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 """
70 """
71
71
72 self.operations.append(
72 self.operations.append(
73 (operation, conf.type, conf.id, conf.getKwargs()))
73 (operation, conf.type, conf.id, conf.getKwargs()))
74
74
75 if 'plot' in self.name.lower():
75 if 'plot' in self.name.lower():
76 self.plots.append(operation.CODE)
76 self.plots.append(operation.CODE)
77
77
78 def getOperationObj(self, objId):
78 def getOperationObj(self, objId):
79
79
80 if objId not in list(self.operations.keys()):
80 if objId not in list(self.operations.keys()):
81 return None
81 return None
82
82
83 return self.operations[objId]
83 return self.operations[objId]
84
84
85 def operation(self, **kwargs):
85 def operation(self, **kwargs):
86 """
86 """
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 atributos del objeto dataOut
88 atributos del objeto dataOut
89
89
90 Input:
90 Input:
91
91
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 """
93 """
94
94
95 raise NotImplementedError
95 raise NotImplementedError
96
96
97 def setup(self):
97 def setup(self):
98
98
99 raise NotImplementedError
99 raise NotImplementedError
100
100
101 def run(self):
101 def run(self):
102
102
103 raise NotImplementedError
103 raise NotImplementedError
104
104
105 def close(self):
105 def close(self):
106
106
107 return
107 return
108
108
109
109
110 class Operation(object):
110 class Operation(object):
111
111
112 """
112 """
113 Update - Jan 2018 - MULTIPROCESSING
113 Update - Jan 2018 - MULTIPROCESSING
114
114
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 The constructor doe snot receive any argument, neither the baseclass.
116 The constructor doe snot receive any argument, neither the baseclass.
117
117
118
118
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 acumulacion dentro de esta clase
121 acumulacion dentro de esta clase
122
122
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124
124
125 """
125 """
126
126
127 def __init__(self):
127 def __init__(self):
128
128
129 self.id = None
129 self.id = None
130 self.isConfig = False
130 self.isConfig = False
131
131
132 if not hasattr(self, 'name'):
132 if not hasattr(self, 'name'):
133 self.name = self.__class__.__name__
133 self.name = self.__class__.__name__
134
134
135 def getAllowedArgs(self):
135 def getAllowedArgs(self):
136 if hasattr(self, '__attrs__'):
136 if hasattr(self, '__attrs__'):
137 return self.__attrs__
137 return self.__attrs__
138 else:
138 else:
139 return inspect.getargspec(self.run).args
139 return inspect.getargspec(self.run).args
140
140
141 def setup(self):
141 def setup(self):
142
142
143 self.isConfig = True
143 self.isConfig = True
144
144
145 raise NotImplementedError
145 raise NotImplementedError
146
146
147 def run(self, dataIn, **kwargs):
147 def run(self, dataIn, **kwargs):
148 """
148 """
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 atributos del objeto dataIn.
150 atributos del objeto dataIn.
151
151
152 Input:
152 Input:
153
153
154 dataIn : objeto del tipo JROData
154 dataIn : objeto del tipo JROData
155
155
156 Return:
156 Return:
157
157
158 None
158 None
159
159
160 Affected:
160 Affected:
161 __buffer : buffer de recepcion de datos.
161 __buffer : buffer de recepcion de datos.
162
162
163 """
163 """
164 if not self.isConfig:
164 if not self.isConfig:
165 self.setup(**kwargs)
165 self.setup(**kwargs)
166
166
167 raise NotImplementedError
167 raise NotImplementedError
168
168
169 def close(self):
169 def close(self):
170
170
171 return
171 return
172
172
173
173
174 def MPDecorator(BaseClass):
174 def MPDecorator(BaseClass):
175 """
175 """
176 Multiprocessing class decorator
176 Multiprocessing class decorator
177
177
178 This function add multiprocessing features to a BaseClass. Also, it handle
178 This function add multiprocessing features to a BaseClass. Also, it handle
179 the communication beetween processes (readers, procUnits and operations).
179 the communication beetween processes (readers, procUnits and operations).
180 """
180 """
181
181
182 class MPClass(BaseClass, Process):
182 class MPClass(BaseClass, Process):
183
183
184 def __init__(self, *args, **kwargs):
184 def __init__(self, *args, **kwargs):
185 super(MPClass, self).__init__()
185 super(MPClass, self).__init__()
186 Process.__init__(self)
186 Process.__init__(self)
187 self.operationKwargs = {}
187 self.operationKwargs = {}
188 self.args = args
188 self.args = args
189 self.kwargs = kwargs
189 self.kwargs = kwargs
190 self.sender = None
190 self.sender = None
191 self.receiver = None
191 self.receiver = None
192 self.name = BaseClass.__name__
192 self.name = BaseClass.__name__
193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 self.start_time = time.time()
195 self.start_time = time.time()
196
196
197 if len(self.args) is 3:
197 if len(self.args) is 3:
198 self.typeProc = "ProcUnit"
198 self.typeProc = "ProcUnit"
199 self.id = args[0]
199 self.id = args[0]
200 self.inputId = args[1]
200 self.inputId = args[1]
201 self.project_id = args[2]
201 self.project_id = args[2]
202 elif len(self.args) is 2:
202 elif len(self.args) is 2:
203 self.id = args[0]
203 self.id = args[0]
204 self.inputId = args[0]
204 self.inputId = args[0]
205 self.project_id = args[1]
205 self.project_id = args[1]
206 self.typeProc = "Operation"
206 self.typeProc = "Operation"
207
207
208 def subscribe(self):
208 def subscribe(self):
209 '''
209 '''
210 This function create a socket to receive objects from the
210 This function create a socket to receive objects from the
211 topic `inputId`.
211 topic `inputId`.
212 '''
212 '''
213
213
214 c = zmq.Context()
214 c = zmq.Context()
215 self.receiver = c.socket(zmq.SUB)
215 self.receiver = c.socket(zmq.SUB)
216 self.receiver.connect(
216 self.receiver.connect(
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219
219
220 def listen(self):
220 def listen(self):
221 '''
221 '''
222 This function waits for objects and deserialize using pickle
222 This function waits for objects and deserialize using pickle
223 '''
223 '''
224
224
225 data = pickle.loads(self.receiver.recv_multipart()[1])
225 data = pickle.loads(self.receiver.recv_multipart()[1])
226
226
227 return data
227 return data
228
228
229 def set_publisher(self):
229 def set_publisher(self):
230 '''
230 '''
231 This function create a socket for publishing purposes.
231 This function create a socket for publishing purposes.
232 '''
232 '''
233
233
234 time.sleep(1)
234 time.sleep(1)
235 c = zmq.Context()
235 c = zmq.Context()
236 self.sender = c.socket(zmq.PUB)
236 self.sender = c.socket(zmq.PUB)
237 self.sender.connect(
237 self.sender.connect(
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239
239
240 def publish(self, data, id):
240 def publish(self, data, id):
241 '''
241 '''
242 This function publish an object, to a specific topic.
242 This function publish an object, to a specific topic.
243 '''
243 '''
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245
245
246 def runReader(self):
246 def runReader(self):
247 '''
247 '''
248 Run fuction for read units
248 Run fuction for read units
249 '''
249 '''
250 while True:
250 while True:
251
251
252 BaseClass.run(self, **self.kwargs)
252 BaseClass.run(self, **self.kwargs)
253
253
254 for op, optype, opId, kwargs in self.operations:
254 for op, optype, opId, kwargs in self.operations:
255 if optype == 'self' and not self.dataOut.flagNoData:
255 if optype == 'self' and not self.dataOut.flagNoData:
256 op(**kwargs)
256 op(**kwargs)
257 elif optype == 'other' and not self.dataOut.flagNoData:
257 elif optype == 'other' and not self.dataOut.flagNoData:
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
258 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 elif optype == 'external':
259 elif optype == 'external':
260 self.publish(self.dataOut, opId)
260 self.publish(self.dataOut, opId)
261
261
262 if self.dataOut.flagNoData and not self.dataOut.error:
262 if self.dataOut.flagNoData and not self.dataOut.error:
263 continue
263 continue
264
264
265 self.publish(self.dataOut, self.id)
265 self.publish(self.dataOut, self.id)
266
266
267 if self.dataOut.error:
267 if self.dataOut.error:
268 log.error(self.dataOut.error, self.name)
268 log.error(self.dataOut.error, self.name)
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 break
270 break
271
271
272 time.sleep(1)
272 time.sleep(1)
273
273
274 def runProc(self):
274 def runProc(self):
275 '''
275 '''
276 Run function for proccessing units
276 Run function for proccessing units
277 '''
277 '''
278
278
279 while True:
279 while True:
280 self.dataIn = self.listen()
280 self.dataIn = self.listen()
281
281
282 if self.dataIn.flagNoData and self.dataIn.error is None:
282 if self.dataIn.flagNoData and self.dataIn.error is None:
283 continue
283 continue
284
284
285 BaseClass.run(self, **self.kwargs)
285 BaseClass.run(self, **self.kwargs)
286
286
287 if self.dataIn.error:
287 if self.dataIn.error:
288 self.dataOut.error = self.dataIn.error
288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
289 self.dataOut.flagNoData = True
290
290
291 for op, optype, opId, kwargs in self.operations:
291 for op, optype, opId, kwargs in self.operations:
292 if optype == 'self' and not self.dataOut.flagNoData:
292 if optype == 'self' and not self.dataOut.flagNoData:
293 op(**kwargs)
293 op(**kwargs)
294 elif optype == 'other' and not self.dataOut.flagNoData:
294 elif optype == 'other' and not self.dataOut.flagNoData:
295 self.dataOut = op.run(self.dataOut, **kwargs)
295 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external' and not self.dataOut.flagNoData:
296 elif optype == 'external' and not self.dataOut.flagNoData:
297 if not self.dataOut.flagNoData or self.dataOut.error:
298 self.publish(self.dataOut, opId)
297 self.publish(self.dataOut, opId)
299
298
300 if not self.dataOut.flagNoData or self.dataOut.error:
299 if not self.dataOut.flagNoData or self.dataOut.error:
301 self.publish(self.dataOut, self.id)
300 self.publish(self.dataOut, self.id)
301 for op, optype, opId, kwargs in self.operations:
302 if optype == 'self' and self.dataOut.error:
303 op(**kwargs)
304 elif optype == 'other' and self.dataOut.error:
305 self.dataOut = op.run(self.dataOut, **kwargs)
306 elif optype == 'external' and self.dataOut.error:
307 self.publish(self.dataOut, opId)
302
308
303 if self.dataIn.error:
309 if self.dataIn.error:
304 break
310 break
305
311
306 time.sleep(1)
312 time.sleep(1)
307
313
308 def runOp(self):
314 def runOp(self):
309 '''
315 '''
310 Run function for external operations (this operations just receive data
316 Run function for external operations (this operations just receive data
311 ex: plots, writers, publishers)
317 ex: plots, writers, publishers)
312 '''
318 '''
313
319
314 while True:
320 while True:
315
321
316 dataOut = self.listen()
322 dataOut = self.listen()
317
323
318 BaseClass.run(self, dataOut, **self.kwargs)
324 BaseClass.run(self, dataOut, **self.kwargs)
319
325
320 if dataOut.error:
326 if dataOut.error:
321 break
327 break
322
328
323 time.sleep(1)
329 time.sleep(1)
324
330
325 def run(self):
331 def run(self):
326 if self.typeProc is "ProcUnit":
332 if self.typeProc is "ProcUnit":
327
333
328 if self.inputId is not None:
334 if self.inputId is not None:
329
335
330 self.subscribe()
336 self.subscribe()
331
337
332 self.set_publisher()
338 self.set_publisher()
333
339
334 if 'Reader' not in BaseClass.__name__:
340 if 'Reader' not in BaseClass.__name__:
335 self.runProc()
341 self.runProc()
336 else:
342 else:
337 self.runReader()
343 self.runReader()
338
344
339 elif self.typeProc is "Operation":
345 elif self.typeProc is "Operation":
340
346
341 self.subscribe()
347 self.subscribe()
342 self.runOp()
348 self.runOp()
343
349
344 else:
350 else:
345 raise ValueError("Unknown type")
351 raise ValueError("Unknown type")
346
352
347 self.close()
353 self.close()
348
354
349 def event_monitor(self, monitor):
355 def event_monitor(self, monitor):
350
356
351 events = {}
357 events = {}
352
358
353 for name in dir(zmq):
359 for name in dir(zmq):
354 if name.startswith('EVENT_'):
360 if name.startswith('EVENT_'):
355 value = getattr(zmq, name)
361 value = getattr(zmq, name)
356 events[value] = name
362 events[value] = name
357
363
358 while monitor.poll():
364 while monitor.poll():
359 evt = recv_monitor_message(monitor)
365 evt = recv_monitor_message(monitor)
360 if evt['event'] == 32:
366 if evt['event'] == 32:
361 self.connections += 1
367 self.connections += 1
362 if evt['event'] == 512:
368 if evt['event'] == 512:
363 pass
369 pass
364
370
365 evt.update({'description': events[evt['event']]})
371 evt.update({'description': events[evt['event']]})
366
372
367 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
368 break
374 break
369 monitor.close()
375 monitor.close()
370 print('event monitor thread done!')
376 print('event monitor thread done!')
371
377
372 def close(self):
378 def close(self):
373
379
374 BaseClass.close(self)
380 BaseClass.close(self)
375
381
376 if self.sender:
382 if self.sender:
377 self.sender.close()
383 self.sender.close()
378
384
379 if self.receiver:
385 if self.receiver:
380 self.receiver.close()
386 self.receiver.close()
381
387
382 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
383
389
384 return MPClass
390 return MPClass
1 NO CONTENT: modified file
NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now