##// END OF EJS Templates
Writing Unit for Madrigal decorated (just for python 2x)
George Yong -
r1206:59caf7a2130e
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,642 +1,645
1 1 '''
2 2 Created on Aug 1, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15 import h5py
16
17 from schainpy.model.io.jroIO_base import JRODataReader
16 from schainpy.model.io.jroIO_base import LOCALTIME, JRODataReader, JRODataWriter
18 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
19 18 from schainpy.model.data.jrodata import Parameters
20 19 from schainpy.utils import log
21 20
22 21 try:
23 22 import madrigal.cedar
24 23 except:
25 24 log.warning(
26 25 'You should install "madrigal library" module if you want to read/write Madrigal data'
27 26 )
28 27
29 28 DEF_CATALOG = {
30 29 'principleInvestigator': 'Marco Milla',
31 'expPurpose': None,
32 'cycleTime': None,
33 'correlativeExp': None,
34 'sciRemarks': None,
35 'instRemarks': None
30 'expPurpose': '',
31 'cycleTime': '',
32 'correlativeExp': '',
33 'sciRemarks': '',
34 'instRemarks': ''
36 35 }
36
37 37 DEF_HEADER = {
38 'kindatDesc': None,
38 'kindatDesc': '',
39 39 'analyst': 'Jicamarca User',
40 'comments': None,
41 'history': None
40 'comments': '',
41 'history': ''
42 42 }
43
43 44 MNEMONICS = {
44 45 10: 'jro',
45 46 11: 'jbr',
46 47 840: 'jul',
47 48 13: 'jas',
48 49 1000: 'pbr',
49 50 1001: 'hbr',
50 51 1002: 'obr',
52 400: 'clr'
53
51 54 }
52 55
53 56 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
54 57
55 58 def load_json(obj):
56 59 '''
57 60 Parse json as string instead of unicode
58 61 '''
59 62
60 63 if isinstance(obj, str):
61 64 iterable = json.loads(obj)
62 65 else:
63 66 iterable = obj
64 67
65 68 if isinstance(iterable, dict):
66 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, str) else v
69 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, (str,unicode)) else v
67 70 for k, v in list(iterable.items())}
68 71 elif isinstance(iterable, (list, tuple)):
69 72 return [str(v) if isinstance(v, str) else v for v in iterable]
70 73
71 74 return iterable
72 75
73 76 @MPDecorator
74 77 class MADReader(JRODataReader, ProcessingUnit):
75 78
76 79 def __init__(self):
77 80
78 81 ProcessingUnit.__init__(self)
79 82
80 83 self.dataOut = Parameters()
81 84 self.counter_records = 0
82 85 self.nrecords = None
83 86 self.flagNoMoreFiles = 0
84 87 self.isConfig = False
85 88 self.filename = None
86 89 self.intervals = set()
87 90
88 91 def setup(self,
89 92 path=None,
90 93 startDate=None,
91 94 endDate=None,
92 95 format=None,
93 96 startTime=datetime.time(0, 0, 0),
94 97 endTime=datetime.time(23, 59, 59),
95 98 **kwargs):
96 99
97 100 self.path = path
98 101 self.startDate = startDate
99 102 self.endDate = endDate
100 103 self.startTime = startTime
101 104 self.endTime = endTime
102 105 self.datatime = datetime.datetime(1900,1,1)
103 106 self.oneDDict = load_json(kwargs.get('oneDDict',
104 107 "{\"GDLATR\":\"lat\", \"GDLONR\":\"lon\"}"))
105 108 self.twoDDict = load_json(kwargs.get('twoDDict',
106 109 "{\"GDALT\": \"heightList\"}"))
107 110 self.ind2DList = load_json(kwargs.get('ind2DList',
108 111 "[\"GDALT\"]"))
109 112 if self.path is None:
110 113 raise ValueError('The path is not valid')
111 114
112 115 if format is None:
113 116 raise ValueError('The format is not valid choose simple or hdf5')
114 117 elif format.lower() in ('simple', 'txt'):
115 118 self.ext = '.txt'
116 119 elif format.lower() in ('cedar',):
117 120 self.ext = '.001'
118 121 else:
119 122 self.ext = '.hdf5'
120 123
121 124 self.search_files(self.path)
122 125 self.fileId = 0
123 126
124 127 if not self.fileList:
125 128 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
126 129
127 130 self.setNextFile()
128 131
129 132 def search_files(self, path):
130 133 '''
131 134 Searching for madrigal files in path
132 135 Creating a list of files to procces included in [startDate,endDate]
133 136
134 137 Input:
135 138 path - Path to find files
136 139 '''
137 140
138 141 log.log('Searching files {} in {} '.format(self.ext, path), 'MADReader')
139 142 foldercounter = 0
140 143 fileList0 = glob.glob1(path, '*{}'.format(self.ext))
141 144 fileList0.sort()
142 145
143 146 self.fileList = []
144 147 self.dateFileList = []
145 148
146 149 startDate = self.startDate - datetime.timedelta(1)
147 150 endDate = self.endDate + datetime.timedelta(1)
148 151
149 152 for thisFile in fileList0:
150 153 year = thisFile[3:7]
151 154 if not year.isdigit():
152 155 continue
153 156
154 157 month = thisFile[7:9]
155 158 if not month.isdigit():
156 159 continue
157 160
158 161 day = thisFile[9:11]
159 162 if not day.isdigit():
160 163 continue
161 164
162 165 year, month, day = int(year), int(month), int(day)
163 166 dateFile = datetime.date(year, month, day)
164 167
165 168 if (startDate > dateFile) or (endDate < dateFile):
166 169 continue
167 170
168 171 self.fileList.append(thisFile)
169 172 self.dateFileList.append(dateFile)
170 173
171 174 return
172 175
173 176 def parseHeader(self):
174 177 '''
175 178 '''
176 179
177 180 self.output = {}
178 181 self.version = '2'
179 182 s_parameters = None
180 183 if self.ext == '.txt':
181 184 self.parameters = [s.strip().lower() for s in self.fp.readline().strip().split(' ') if s]
182 185 elif self.ext == '.hdf5':
183 186 metadata = self.fp['Metadata']
184 187 data = self.fp['Data']['Array Layout']
185 188 if 'Independent Spatial Parameters' in metadata:
186 189 s_parameters = [s[0].lower() for s in metadata['Independent Spatial Parameters']]
187 190 self.version = '3'
188 191 one = [s[0].lower() for s in data['1D Parameters']['Data Parameters']]
189 192 one_d = [1 for s in one]
190 193 two = [s[0].lower() for s in data['2D Parameters']['Data Parameters']]
191 194 two_d = [2 for s in two]
192 195 self.parameters = one + two
193 196 self.parameters_d = one_d + two_d
194 197
195 log.success('Parameters found: {}'.format(','.join(str(self.parameters))),
198 log.success('Parameters found: {}'.format(self.parameters),
196 199 'MADReader')
197 200 if s_parameters:
198 201 log.success('Spatial parameters: {}'.format(','.join(str(s_parameters))),
199 202 'MADReader')
200 203
201 204 for param in list(self.oneDDict.keys()):
202 205 if param.lower() not in self.parameters:
203 206 log.warning(
204 207 'Parameter {} not found will be ignored'.format(
205 208 param),
206 209 'MADReader')
207 210 self.oneDDict.pop(param, None)
208 211
209 212 for param, value in list(self.twoDDict.items()):
210 213 if param.lower() not in self.parameters:
211 214 log.warning(
212 215 'Parameter {} not found, it will be ignored'.format(
213 216 param),
214 217 'MADReader')
215 218 self.twoDDict.pop(param, None)
216 219 continue
217 220 if isinstance(value, list):
218 221 if value[0] not in self.output:
219 222 self.output[value[0]] = []
220 223 self.output[value[0]].append(None)
221 224
222 225 def parseData(self):
223 226 '''
224 227 '''
225 228
226 229 if self.ext == '.txt':
227 230 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
228 231 self.nrecords = self.data.shape[0]
229 232 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.ind2DList[0].lower())])
230 233 elif self.ext == '.hdf5':
231 234 self.data = self.fp['Data']['Array Layout']
232 235 self.nrecords = len(self.data['timestamps'].value)
233 236 self.ranges = self.data['range'].value
234 237
235 238 def setNextFile(self):
236 239 '''
237 240 '''
238 241
239 242 file_id = self.fileId
240 243
241 244 if file_id == len(self.fileList):
242 245 log.success('No more files', 'MADReader')
243 246 self.flagNoMoreFiles = 1
244 247 return 0
245 248
246 249 log.success(
247 250 'Opening: {}'.format(self.fileList[file_id]),
248 251 'MADReader'
249 252 )
250 253
251 254 filename = os.path.join(self.path, self.fileList[file_id])
252 255
253 256 if self.filename is not None:
254 257 self.fp.close()
255 258
256 259 self.filename = filename
257 260 self.filedate = self.dateFileList[file_id]
258 261
259 262 if self.ext=='.hdf5':
260 263 self.fp = h5py.File(self.filename, 'r')
261 264 else:
262 265 self.fp = open(self.filename, 'rb')
263 266
264 267 self.parseHeader()
265 268 self.parseData()
266 269 self.sizeOfFile = os.path.getsize(self.filename)
267 270 self.counter_records = 0
268 271 self.flagIsNewFile = 0
269 272 self.fileId += 1
270 273
271 274 return 1
272 275
273 276 def readNextBlock(self):
274 277
275 278 while True:
276 279 self.flagDiscontinuousBlock = 0
277 280 if self.flagIsNewFile:
278 281 if not self.setNextFile():
279 282 return 0
280 283
281 284 self.readBlock()
282 285
283 286 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
284 287 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
285 288 log.warning(
286 289 'Reading Record No. {}/{} -> {} [Skipping]'.format(
287 290 self.counter_records,
288 291 self.nrecords,
289 292 self.datatime.ctime()),
290 293 'MADReader')
291 294 continue
292 295 break
293 296
294 297 log.log(
295 298 'Reading Record No. {}/{} -> {}'.format(
296 299 self.counter_records,
297 300 self.nrecords,
298 301 self.datatime.ctime()),
299 302 'MADReader')
300 303
301 304 return 1
302 305
303 306 def readBlock(self):
304 307 '''
305 308 '''
306 309 dum = []
307 310 if self.ext == '.txt':
308 311 dt = self.data[self.counter_records][:6].astype(int)
309 312 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
310 313 self.flagDiscontinuousBlock = 1
311 314 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
312 315 while True:
313 316 dt = self.data[self.counter_records][:6].astype(int)
314 317 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
315 318 if datatime == self.datatime:
316 319 dum.append(self.data[self.counter_records])
317 320 self.counter_records += 1
318 321 if self.counter_records == self.nrecords:
319 322 self.flagIsNewFile = True
320 323 break
321 324 continue
322 325 self.intervals.add((datatime-self.datatime).seconds)
323 326 break
324 327 elif self.ext == '.hdf5':
325 328 datatime = datetime.datetime.utcfromtimestamp(
326 329 self.data['timestamps'][self.counter_records])
327 330 nHeights = len(self.ranges)
328 331 for n, param in enumerate(self.parameters):
329 332 if self.parameters_d[n] == 1:
330 333 dum.append(numpy.ones(nHeights)*self.data['1D Parameters'][param][self.counter_records])
331 334 else:
332 335 if self.version == '2':
333 336 dum.append(self.data['2D Parameters'][param][self.counter_records])
334 337 else:
335 338 tmp = self.data['2D Parameters'][param].value.T
336 339 dum.append(tmp[self.counter_records])
337 340 self.intervals.add((datatime-self.datatime).seconds)
338 341 if datatime.date()>self.datatime.date():
339 342 self.flagDiscontinuousBlock = 1
340 343 self.datatime = datatime
341 344 self.counter_records += 1
342 345 if self.counter_records == self.nrecords:
343 346 self.flagIsNewFile = True
344 347
345 348 self.buffer = numpy.array(dum)
346 349 return
347 350
348 351 def set_output(self):
349 352 '''
350 353 Storing data from buffer to dataOut object
351 354 '''
352 355
353 356 parameters = [None for __ in self.parameters]
354 357
355 358 for param, attr in list(self.oneDDict.items()):
356 359 x = self.parameters.index(param.lower())
357 360 setattr(self.dataOut, attr, self.buffer[0][x])
358 361
359 for param, value in list(self.twoDDict.items()):
362 for param, value in list(self.twoDDict.items()):
360 363 x = self.parameters.index(param.lower())
361 364 if self.ext == '.txt':
362 365 y = self.parameters.index(self.ind2DList[0].lower())
363 366 ranges = self.buffer[:,y]
364 if self.ranges.size == ranges.size:
365 continue
367 #if self.ranges.size == ranges.size:
368 # continue
366 369 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
367 370 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
368 371 dummy[index] = self.buffer[:,x]
369 372 else:
370 373 dummy = self.buffer[x]
371
374
372 375 if isinstance(value, str):
373 376 if value not in self.ind2DList:
374 377 setattr(self.dataOut, value, dummy.reshape(1,-1))
375 378 elif isinstance(value, list):
376 379 self.output[value[0]][value[1]] = dummy
377 380 parameters[value[1]] = param
378 381
379 382 for key, value in list(self.output.items()):
380 383 setattr(self.dataOut, key, numpy.array(value))
381 384
382 385 self.dataOut.parameters = [s for s in parameters if s]
383 386 self.dataOut.heightList = self.ranges
384 387 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
385 388 self.dataOut.utctimeInit = self.dataOut.utctime
386 389 self.dataOut.paramInterval = min(self.intervals)
387 390 self.dataOut.useLocalTime = False
388 391 self.dataOut.flagNoData = False
389 392 self.dataOut.nrecords = self.nrecords
390 393 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
391 394
392 395 def getData(self):
393 396 '''
394 397 Storing data from databuffer to dataOut object
395 398 '''
396 399 if self.flagNoMoreFiles:
397 400 self.dataOut.flagNoData = True
398 401 self.dataOut.error = 'No file left to process'
399 402 return 0
400 403
401 404 if not self.readNextBlock():
402 405 self.dataOut.flagNoData = True
403 406 return 0
404 407
405 408 self.set_output()
406 409
407 410 return 1
408 411
409
412 @MPDecorator
410 413 class MADWriter(Operation):
411 414
412 415 missing = -32767
413 416
414 def __init__(self, **kwargs):
417 def __init__(self):
415 418
416 Operation.__init__(self, **kwargs)
419 Operation.__init__(self)
417 420 self.dataOut = Parameters()
418 421 self.counter = 0
419 422 self.path = None
420 423 self.fp = None
421 424
422 425 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
423 426 metadata='{}', format='cedar', **kwargs):
424 427 '''
425 428 Inputs:
426 429 path - path where files will be created
427 430 oneDDict - json of one-dimensional parameters in record where keys
428 431 are Madrigal codes (integers or mnemonics) and values the corresponding
429 432 dataOut attribute e.g: {
430 433 'gdlatr': 'lat',
431 434 'gdlonr': 'lon',
432 435 'gdlat2':'lat',
433 436 'glon2':'lon'}
434 437 ind2DList - list of independent spatial two-dimensional parameters e.g:
435 438 ['heighList']
436 439 twoDDict - json of two-dimensional parameters in record where keys
437 440 are Madrigal codes (integers or mnemonics) and values the corresponding
438 441 dataOut attribute if multidimensional array specify as tupple
439 442 ('attr', pos) e.g: {
440 443 'gdalt': 'heightList',
441 444 'vn1p2': ('data_output', 0),
442 445 'vn2p2': ('data_output', 1),
443 446 'vn3': ('data_output', 2),
444 447 'snl': ('data_SNR', 'db')
445 448 }
446 449 metadata - json of madrigal metadata (kinst, kindat, catalog and header)
447 450 '''
448 451 if not self.isConfig:
449 452 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
450 453 self.isConfig = True
451 454
452 455 self.dataOut = dataOut
453 456 self.putData()
454 return
457 return 1
455 458
456 459 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
457 460 '''
458 461 Configure Operation
459 462 '''
460 463
461 464 self.path = path
462 465 self.blocks = kwargs.get('blocks', None)
463 466 self.counter = 0
464 467 self.oneDDict = load_json(oneDDict)
465 468 self.twoDDict = load_json(twoDDict)
466 469 self.ind2DList = load_json(ind2DList)
467 470 meta = load_json(metadata)
468 471 self.kinst = meta.get('kinst')
469 472 self.kindat = meta.get('kindat')
470 473 self.catalog = meta.get('catalog', DEF_CATALOG)
471 474 self.header = meta.get('header', DEF_HEADER)
472 475 if format == 'cedar':
473 476 self.ext = '.dat'
474 477 self.extra_args = {}
475 478 elif format == 'hdf5':
476 479 self.ext = '.hdf5'
477 480 self.extra_args = {'ind2DList': self.ind2DList}
478 481
479 482 self.keys = [k.lower() for k in self.twoDDict]
480 483 if 'range' in self.keys:
481 484 self.keys.remove('range')
482 485 if 'gdalt' in self.keys:
483 486 self.keys.remove('gdalt')
484 487
485 488 def setFile(self):
486 489 '''
487 490 Create new cedar file object
488 491 '''
489 492
490 493 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
491 494 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
492 495
493 496 filename = '{}{}{}'.format(self.mnemonic,
494 497 date.strftime('%Y%m%d_%H%M%S'),
495 498 self.ext)
496 499
497 500 self.fullname = os.path.join(self.path, filename)
498 501
499 502 if os.path.isfile(self.fullname) :
500 503 log.warning(
501 504 'Destination file {} already exists, previous file deleted.'.format(
502 505 self.fullname),
503 506 'MADWriter')
504 507 os.remove(self.fullname)
505 508
506 509 try:
507 510 log.success(
508 511 'Creating file: {}'.format(self.fullname),
509 512 'MADWriter')
510 513 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
511 514 except ValueError as e:
512 515 log.error(
513 516 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
514 517 'MADWriter')
515 518 return
516 519
517 520 return 1
518 521
519 522 def writeBlock(self):
520 523 '''
521 524 Add data records to cedar file taking data from oneDDict and twoDDict
522 525 attributes.
523 526 Allowed parameters in: parcodes.tab
524 527 '''
525 528
526 529 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
527 530 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
528 531 heights = self.dataOut.heightList
529 532
530 533 if self.ext == '.dat':
531 534 for key, value in list(self.twoDDict.items()):
532 535 if isinstance(value, str):
533 536 data = getattr(self.dataOut, value)
534 537 invalid = numpy.isnan(data)
535 538 data[invalid] = self.missing
536 539 elif isinstance(value, (tuple, list)):
537 540 attr, key = value
538 541 data = getattr(self.dataOut, attr)
539 542 invalid = numpy.isnan(data)
540 543 data[invalid] = self.missing
541 544
542 545 out = {}
543 546 for key, value in list(self.twoDDict.items()):
544 547 key = key.lower()
545 548 if isinstance(value, str):
546 549 if 'db' in value.lower():
547 550 tmp = getattr(self.dataOut, value.replace('_db', ''))
548 551 SNRavg = numpy.average(tmp, axis=0)
549 552 tmp = 10*numpy.log10(SNRavg)
550 553 else:
551 554 tmp = getattr(self.dataOut, value)
552 555 out[key] = tmp.flatten()
553 556 elif isinstance(value, (tuple, list)):
554 557 attr, x = value
555 558 data = getattr(self.dataOut, attr)
556 559 out[key] = data[int(x)]
557 560
558 561 a = numpy.array([out[k] for k in self.keys])
559 562 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
560 563 index = numpy.where(nrows == False)[0]
561 564
562 565 rec = madrigal.cedar.MadrigalDataRecord(
563 566 self.kinst,
564 567 self.kindat,
565 568 startTime.year,
566 569 startTime.month,
567 570 startTime.day,
568 571 startTime.hour,
569 572 startTime.minute,
570 573 startTime.second,
571 574 startTime.microsecond/10000,
572 575 endTime.year,
573 576 endTime.month,
574 577 endTime.day,
575 578 endTime.hour,
576 579 endTime.minute,
577 580 endTime.second,
578 581 endTime.microsecond/10000,
579 582 list(self.oneDDict.keys()),
580 583 list(self.twoDDict.keys()),
581 584 len(index),
582 585 **self.extra_args
583 586 )
584 587
585 588 # Setting 1d values
586 589 for key in self.oneDDict:
587 590 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
588 591
589 592 # Setting 2d values
590 593 nrec = 0
591 594 for n in index:
592 595 for key in out:
593 596 rec.set2D(key, nrec, out[key][n])
594 597 nrec += 1
595 598
596 599 self.fp.append(rec)
597 600 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
598 601 self.fp.dump()
599 if self.counter % 100 == 0 and self.counter > 0:
602 if self.counter % 20 == 0 and self.counter > 0:
600 603 log.log(
601 604 'Writing {} records'.format(
602 605 self.counter),
603 606 'MADWriter')
604 607
605 608 def setHeader(self):
606 609 '''
607 610 Create an add catalog and header to cedar file
608 611 '''
609 612
610 613 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
611 614
612 615 if self.ext == '.dat':
613 616 self.fp.write()
614 617 else:
615 618 self.fp.dump()
616 619 self.fp.close()
617 620
618 621 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
619 622 header.createCatalog(**self.catalog)
620 623 header.createHeader(**self.header)
621 624 header.write()
622 625
623 626 def putData(self):
624 627
625 628 if self.dataOut.flagNoData:
626 629 return 0
627 630
628 631 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
629 632 if self.counter > 0:
630 633 self.setHeader()
631 634 self.counter = 0
632 635
633 636 if self.counter == 0:
634 637 self.setFile()
635 638
636 639 self.writeBlock()
637 640 self.counter += 1
638 641
639 642 def close(self):
640 643
641 644 if self.counter > 0:
642 645 self.setHeader() No newline at end of file
@@ -1,384 +1,390
1 1 '''
2 2 Updated for multiprocessing
3 3 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
12 12 $Id: jroproc_base.py 1 2012-11-12 18:56:07Z murco $
13 13 '''
14 14
15 15 import inspect
16 16 import zmq
17 17 import time
18 18 import pickle
19 19 import os
20 20 from multiprocessing import Process
21 21 from zmq.utils.monitor import recv_monitor_message
22 22
23 23 from schainpy.utils import log
24 24
25 25
26 26 class ProcessingUnit(object):
27 27
28 28 """
29 29 Update - Jan 2018 - MULTIPROCESSING
30 30 All the "call" methods present in the previous base were removed.
31 31 The majority of operations are independant processes, thus
32 32 the decorator is in charge of communicate the operation processes
33 33 with the proccessing unit via IPC.
34 34
35 35 The constructor does not receive any argument. The remaining methods
36 36 are related with the operations to execute.
37 37
38 38
39 39 """
40 40
41 41 def __init__(self):
42 42
43 43 self.dataIn = None
44 44 self.dataOut = None
45 45 self.isConfig = False
46 46 self.operations = []
47 47 self.plots = []
48 48
49 49 def getAllowedArgs(self):
50 50 if hasattr(self, '__attrs__'):
51 51 return self.__attrs__
52 52 else:
53 53 return inspect.getargspec(self.run).args
54 54
55 55 def addOperation(self, conf, operation):
56 56 """
57 57 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
58 58 posses the id of the operation process (IPC purposes)
59 59
60 60 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
61 61 identificador asociado a este objeto.
62 62
63 63 Input:
64 64
65 65 object : objeto de la clase "Operation"
66 66
67 67 Return:
68 68
69 69 objId : identificador del objeto, necesario para comunicar con master(procUnit)
70 70 """
71 71
72 72 self.operations.append(
73 73 (operation, conf.type, conf.id, conf.getKwargs()))
74 74
75 75 if 'plot' in self.name.lower():
76 76 self.plots.append(operation.CODE)
77 77
78 78 def getOperationObj(self, objId):
79 79
80 80 if objId not in list(self.operations.keys()):
81 81 return None
82 82
83 83 return self.operations[objId]
84 84
85 85 def operation(self, **kwargs):
86 86 """
87 87 Operacion directa sobre la data (dataOut.data). Es necesario actualizar los valores de los
88 88 atributos del objeto dataOut
89 89
90 90 Input:
91 91
92 92 **kwargs : Diccionario de argumentos de la funcion a ejecutar
93 93 """
94 94
95 95 raise NotImplementedError
96 96
97 97 def setup(self):
98 98
99 99 raise NotImplementedError
100 100
101 101 def run(self):
102 102
103 103 raise NotImplementedError
104 104
105 105 def close(self):
106 106
107 107 return
108 108
109 109
110 110 class Operation(object):
111 111
112 112 """
113 113 Update - Jan 2018 - MULTIPROCESSING
114 114
115 115 Most of the methods remained the same. The decorator parse the arguments and executed the run() method for each process.
116 116 The constructor doe snot receive any argument, neither the baseclass.
117 117
118 118
119 119 Clase base para definir las operaciones adicionales que se pueden agregar a la clase ProcessingUnit
120 120 y necesiten acumular informacion previa de los datos a procesar. De preferencia usar un buffer de
121 121 acumulacion dentro de esta clase
122 122
123 123 Ejemplo: Integraciones coherentes, necesita la informacion previa de los n perfiles anteriores (bufffer)
124 124
125 125 """
126 126
127 127 def __init__(self):
128 128
129 129 self.id = None
130 130 self.isConfig = False
131 131
132 132 if not hasattr(self, 'name'):
133 133 self.name = self.__class__.__name__
134 134
135 135 def getAllowedArgs(self):
136 136 if hasattr(self, '__attrs__'):
137 137 return self.__attrs__
138 138 else:
139 139 return inspect.getargspec(self.run).args
140 140
141 141 def setup(self):
142 142
143 143 self.isConfig = True
144 144
145 145 raise NotImplementedError
146 146
147 147 def run(self, dataIn, **kwargs):
148 148 """
149 149 Realiza las operaciones necesarias sobre la dataIn.data y actualiza los
150 150 atributos del objeto dataIn.
151 151
152 152 Input:
153 153
154 154 dataIn : objeto del tipo JROData
155 155
156 156 Return:
157 157
158 158 None
159 159
160 160 Affected:
161 161 __buffer : buffer de recepcion de datos.
162 162
163 163 """
164 164 if not self.isConfig:
165 165 self.setup(**kwargs)
166 166
167 167 raise NotImplementedError
168 168
169 169 def close(self):
170 170
171 171 return
172 172
173 173
174 174 def MPDecorator(BaseClass):
175 175 """
176 176 Multiprocessing class decorator
177 177
178 178 This function add multiprocessing features to a BaseClass. Also, it handle
179 179 the communication beetween processes (readers, procUnits and operations).
180 180 """
181 181
182 182 class MPClass(BaseClass, Process):
183 183
184 184 def __init__(self, *args, **kwargs):
185 185 super(MPClass, self).__init__()
186 186 Process.__init__(self)
187 187 self.operationKwargs = {}
188 188 self.args = args
189 189 self.kwargs = kwargs
190 190 self.sender = None
191 191 self.receiver = None
192 192 self.name = BaseClass.__name__
193 193 if 'plot' in self.name.lower() and not self.name.endswith('_'):
194 194 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
195 195 self.start_time = time.time()
196 196
197 197 if len(self.args) is 3:
198 198 self.typeProc = "ProcUnit"
199 199 self.id = args[0]
200 200 self.inputId = args[1]
201 201 self.project_id = args[2]
202 202 elif len(self.args) is 2:
203 203 self.id = args[0]
204 204 self.inputId = args[0]
205 205 self.project_id = args[1]
206 206 self.typeProc = "Operation"
207 207
208 208 def subscribe(self):
209 209 '''
210 210 This function create a socket to receive objects from the
211 211 topic `inputId`.
212 212 '''
213 213
214 214 c = zmq.Context()
215 215 self.receiver = c.socket(zmq.SUB)
216 216 self.receiver.connect(
217 217 'ipc:///tmp/schain/{}_pub'.format(self.project_id))
218 218 self.receiver.setsockopt(zmq.SUBSCRIBE, self.inputId.encode())
219 219
220 220 def listen(self):
221 221 '''
222 222 This function waits for objects and deserialize using pickle
223 223 '''
224 224
225 225 data = pickle.loads(self.receiver.recv_multipart()[1])
226 226
227 227 return data
228 228
229 229 def set_publisher(self):
230 230 '''
231 231 This function create a socket for publishing purposes.
232 232 '''
233 233
234 234 time.sleep(1)
235 235 c = zmq.Context()
236 236 self.sender = c.socket(zmq.PUB)
237 237 self.sender.connect(
238 238 'ipc:///tmp/schain/{}_sub'.format(self.project_id))
239 239
240 240 def publish(self, data, id):
241 241 '''
242 242 This function publish an object, to a specific topic.
243 243 '''
244 244 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
245 245
246 246 def runReader(self):
247 247 '''
248 248 Run fuction for read units
249 249 '''
250 250 while True:
251 251
252 252 BaseClass.run(self, **self.kwargs)
253 253
254 254 for op, optype, opId, kwargs in self.operations:
255 255 if optype == 'self' and not self.dataOut.flagNoData:
256 256 op(**kwargs)
257 257 elif optype == 'other' and not self.dataOut.flagNoData:
258 258 self.dataOut = op.run(self.dataOut, **self.kwargs)
259 259 elif optype == 'external':
260 260 self.publish(self.dataOut, opId)
261 261
262 262 if self.dataOut.flagNoData and not self.dataOut.error:
263 263 continue
264 264
265 265 self.publish(self.dataOut, self.id)
266 266
267 267 if self.dataOut.error:
268 268 log.error(self.dataOut.error, self.name)
269 269 # self.sender.send_multipart([str(self.project_id).encode(), 'end'.encode()])
270 270 break
271 271
272 272 time.sleep(1)
273 273
274 274 def runProc(self):
275 275 '''
276 276 Run function for proccessing units
277 277 '''
278 278
279 279 while True:
280 280 self.dataIn = self.listen()
281 281
282 282 if self.dataIn.flagNoData and self.dataIn.error is None:
283 283 continue
284 284
285 285 BaseClass.run(self, **self.kwargs)
286 286
287 287 if self.dataIn.error:
288 288 self.dataOut.error = self.dataIn.error
289 self.dataOut.flagNoData = True
290
289 self.dataOut.flagNoData = True
290
291 291 for op, optype, opId, kwargs in self.operations:
292 292 if optype == 'self' and not self.dataOut.flagNoData:
293 293 op(**kwargs)
294 294 elif optype == 'other' and not self.dataOut.flagNoData:
295 295 self.dataOut = op.run(self.dataOut, **kwargs)
296 elif optype == 'external' and not self.dataOut.flagNoData:
297 if not self.dataOut.flagNoData or self.dataOut.error:
298 self.publish(self.dataOut, opId)
296 elif optype == 'external' and not self.dataOut.flagNoData:
297 self.publish(self.dataOut, opId)
299 298
300 299 if not self.dataOut.flagNoData or self.dataOut.error:
301 300 self.publish(self.dataOut, self.id)
301 for op, optype, opId, kwargs in self.operations:
302 if optype == 'self' and self.dataOut.error:
303 op(**kwargs)
304 elif optype == 'other' and self.dataOut.error:
305 self.dataOut = op.run(self.dataOut, **kwargs)
306 elif optype == 'external' and self.dataOut.error:
307 self.publish(self.dataOut, opId)
302 308
303 309 if self.dataIn.error:
304 310 break
305 311
306 312 time.sleep(1)
307 313
308 314 def runOp(self):
309 315 '''
310 316 Run function for external operations (this operations just receive data
311 317 ex: plots, writers, publishers)
312 318 '''
313 319
314 320 while True:
315 321
316 322 dataOut = self.listen()
317 323
318 324 BaseClass.run(self, dataOut, **self.kwargs)
319 325
320 326 if dataOut.error:
321 327 break
322 328
323 329 time.sleep(1)
324 330
325 331 def run(self):
326 332 if self.typeProc is "ProcUnit":
327 333
328 334 if self.inputId is not None:
329 335
330 336 self.subscribe()
331 337
332 338 self.set_publisher()
333 339
334 340 if 'Reader' not in BaseClass.__name__:
335 341 self.runProc()
336 342 else:
337 343 self.runReader()
338 344
339 345 elif self.typeProc is "Operation":
340 346
341 347 self.subscribe()
342 348 self.runOp()
343 349
344 350 else:
345 351 raise ValueError("Unknown type")
346 352
347 353 self.close()
348 354
349 355 def event_monitor(self, monitor):
350 356
351 357 events = {}
352 358
353 359 for name in dir(zmq):
354 360 if name.startswith('EVENT_'):
355 361 value = getattr(zmq, name)
356 362 events[value] = name
357 363
358 364 while monitor.poll():
359 365 evt = recv_monitor_message(monitor)
360 366 if evt['event'] == 32:
361 367 self.connections += 1
362 368 if evt['event'] == 512:
363 369 pass
364 370
365 371 evt.update({'description': events[evt['event']]})
366 372
367 373 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
368 374 break
369 375 monitor.close()
370 376 print('event monitor thread done!')
371 377
372 378 def close(self):
373 379
374 380 BaseClass.close(self)
375 381
376 382 if self.sender:
377 383 self.sender.close()
378 384
379 385 if self.receiver:
380 386 self.receiver.close()
381 387
382 388 log.success('Done...(Time:{:4.2f} secs)'.format(time.time()-self.start_time), self.name)
383 389
384 390 return MPClass
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
General Comments 0
You need to be logged in to leave comments. Login now