##// END OF EJS Templates
Modificación a kmamisr para ejecutarse en la versión 3, creación de scripts con terminación v3 para difereciarlos, se comentó la linea #720 de JroIO_param.py debido a que reiniciaba la lista de archivos, ocasionando la reescritura del archivo hdf5. Alguna otra modificación aparente es producto de algunas variaciones en espacios al usar la función print()
joabAM -
r1279:c53fe2a4a291
parent child
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -57,7 +57,7 def MPProject(project, n=cpu_count()):
57 57 nFiles = len(files)
58 58 if nFiles == 0:
59 59 continue
60 skip = int(math.ceil(nFiles / n))
60 skip = int(math.ceil(nFiles / n))
61 61 while nFiles > cursor * skip:
62 62 rconf.update(startDate=dt_str, endDate=dt_str, cursor=cursor,
63 63 skip=skip)
@@ -81,11 +81,11 def MPProject(project, n=cpu_count()):
81 81 time.sleep(3)
82 82
83 83 def wait(context):
84
84
85 85 time.sleep(1)
86 86 c = zmq.Context()
87 87 receiver = c.socket(zmq.SUB)
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
88 receiver.connect('ipc:///tmp/schain_{}_pub'.format(self.id))
89 89 receiver.setsockopt(zmq.SUBSCRIBE, self.id.encode())
90 90 msg = receiver.recv_multipart()[1]
91 91 context.terminate()
@@ -262,7 +262,7 class ParameterConf():
262 262 parmElement.set('name', self.name)
263 263 parmElement.set('value', self.value)
264 264 parmElement.set('format', self.format)
265
265
266 266 def readXml(self, parmElement):
267 267
268 268 self.id = parmElement.get('id')
@@ -417,7 +417,7 class OperationConf():
417 417 self.name = opElement.get('name')
418 418 self.type = opElement.get('type')
419 419 self.priority = opElement.get('priority')
420 self.project_id = str(project_id)
420 self.project_id = str(project_id)
421 421
422 422 # Compatible with old signal chain version
423 423 # Use of 'run' method instead 'init'
@@ -476,7 +476,7 class ProcUnitConf():
476 476 self.id = None
477 477 self.datatype = None
478 478 self.name = None
479 self.inputId = None
479 self.inputId = None
480 480 self.opConfObjList = []
481 481 self.procUnitObj = None
482 482 self.opObjDict = {}
@@ -497,7 +497,7 class ProcUnitConf():
497 497
498 498 return self.id
499 499
500 def updateId(self, new_id):
500 def updateId(self, new_id):
501 501 '''
502 502 new_id = int(parentId) * 10 + (int(self.id) % 10)
503 503 new_inputId = int(parentId) * 10 + (int(self.inputId) % 10)
@@ -556,7 +556,7 class ProcUnitConf():
556 556 id sera el topico a publicar
557 557 inputId sera el topico a subscribirse
558 558 '''
559
559
560 560 # Compatible with old signal chain version
561 561 if datatype == None and name == None:
562 562 raise ValueError('datatype or name should be defined')
@@ -581,7 +581,7 class ProcUnitConf():
581 581 self.lock = lock
582 582 self.opConfObjList = []
583 583
584 self.addOperation(name='run', optype='self')
584 self.addOperation(name='run', optype='self')
585 585
586 586 def removeOperations(self):
587 587
@@ -679,28 +679,32 class ProcUnitConf():
679 679 '''
680 680
681 681 className = eval(self.name)
682 #print(self.name)
682 683 kwargs = self.getKwargs()
684 #print(kwargs)
685 #print("mark_a")
683 686 procUnitObj = className(self.id, self.inputId, self.project_id, self.err_queue, self.lock, 'ProcUnit', **kwargs)
687 #print("mark_b")
684 688 log.success('creating process...', self.name)
685 689
686 690 for opConfObj in self.opConfObjList:
687
691
688 692 if opConfObj.type == 'self' and opConfObj.name == 'run':
689 693 continue
690 694 elif opConfObj.type == 'self':
691 695 opObj = getattr(procUnitObj, opConfObj.name)
692 696 else:
693 697 opObj = opConfObj.createObject()
694
698
695 699 log.success('adding operation: {}, type:{}'.format(
696 700 opConfObj.name,
697 701 opConfObj.type), self.name)
698
702
699 703 procUnitObj.addOperation(opConfObj, opObj)
700
704
701 705 procUnitObj.start()
702 706 self.procUnitObj = procUnitObj
703
707
704 708 def close(self):
705 709
706 710 for opConfObj in self.opConfObjList:
@@ -732,8 +736,8 class ReadUnitConf(ProcUnitConf):
732 736
733 737 def getElementName(self):
734 738
735 return self.ELEMENTNAME
736
739 return self.ELEMENTNAME
740
737 741 def setup(self, project_id, id, name, datatype, err_queue, path='', startDate='', endDate='',
738 742 startTime='', endTime='', server=None, **kwargs):
739 743
@@ -745,7 +749,7 class ReadUnitConf(ProcUnitConf):
745 749 kwargs deben ser trasmitidos en la instanciacion
746 750
747 751 '''
748
752
749 753 # Compatible with old signal chain version
750 754 if datatype == None and name == None:
751 755 raise ValueError('datatype or name should be defined')
@@ -768,12 +772,13 class ReadUnitConf(ProcUnitConf):
768 772 self.datatype = datatype
769 773 if path != '':
770 774 self.path = os.path.abspath(path)
775 print (self.path)
771 776 self.startDate = startDate
772 777 self.endDate = endDate
773 778 self.startTime = startTime
774 779 self.endTime = endTime
775 780 self.server = server
776 self.err_queue = err_queue
781 self.err_queue = err_queue
777 782 self.addRunOperation(**kwargs)
778 783
779 784 def update(self, **kwargs):
@@ -804,7 +809,7 class ReadUnitConf(ProcUnitConf):
804 809
805 810 def addRunOperation(self, **kwargs):
806 811
807 opObj = self.addOperation(name='run', optype='self')
812 opObj = self.addOperation(name='run', optype='self')
808 813
809 814 if self.server is None:
810 815 opObj.addParameter(
@@ -942,7 +947,7 class Project(Process):
942 947 print('*' * 19)
943 948 print(' ')
944 949 self.id = str(id)
945 self.description = description
950 self.description = description
946 951 self.email = email
947 952 self.alarm = alarm
948 953 if name:
@@ -977,7 +982,7 class Project(Process):
977 982 readUnitConfObj = ReadUnitConf()
978 983 readUnitConfObj.setup(self.id, idReadUnit, name, datatype, self.err_queue, **kwargs)
979 984 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
980
985
981 986 return readUnitConfObj
982 987
983 988 def addProcUnit(self, inputId='0', datatype=None, name=None):
@@ -994,7 +999,7 class Project(Process):
994 999
995 1000 idProcUnit = self.__getNewId()
996 1001 procUnitConfObj = ProcUnitConf()
997 input_proc = self.procUnitConfObjDict[inputId]
1002 input_proc = self.procUnitConfObjDict[inputId]
998 1003 procUnitConfObj.setup(self.id, idProcUnit, name, datatype, inputId, self.err_queue, input_proc.lock)
999 1004 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1000 1005
@@ -1152,14 +1157,14 class Project(Process):
1152 1157
1153 1158 t = Thread(target=self.__monitor, args=(self.err_queue, self.ctx))
1154 1159 t.start()
1155
1160
1156 1161 def __monitor(self, queue, ctx):
1157 1162
1158 1163 import socket
1159
1164
1160 1165 procs = 0
1161 1166 err_msg = ''
1162
1167
1163 1168 while True:
1164 1169 msg = queue.get()
1165 1170 if '#_start_#' in msg:
@@ -1168,11 +1173,11 class Project(Process):
1168 1173 procs -=1
1169 1174 else:
1170 1175 err_msg = msg
1171
1172 if procs == 0 or 'Traceback' in err_msg:
1176
1177 if procs == 0 or 'Traceback' in err_msg:
1173 1178 break
1174 1179 time.sleep(0.1)
1175
1180
1176 1181 if '|' in err_msg:
1177 1182 name, err = err_msg.split('|')
1178 1183 if 'SchainWarning' in err:
@@ -1181,9 +1186,9 class Project(Process):
1181 1186 log.error(err.split('SchainError:')[-1].split('\n')[0].strip(), name)
1182 1187 else:
1183 1188 log.error(err, name)
1184 else:
1189 else:
1185 1190 name, err = self.name, err_msg
1186
1191
1187 1192 time.sleep(2)
1188 1193
1189 1194 for conf in self.procUnitConfObjDict.values():
@@ -1191,7 +1196,7 class Project(Process):
1191 1196 if confop.type == 'external':
1192 1197 confop.opObj.terminate()
1193 1198 conf.procUnitObj.terminate()
1194
1199
1195 1200 ctx.term()
1196 1201
1197 1202 message = ''.join(err)
@@ -1217,7 +1222,7 class Project(Process):
1217 1222 subtitle += '[End time = %s]\n' % readUnitConfObj.endTime
1218 1223
1219 1224 a = Alarm(
1220 modes=self.alarm,
1225 modes=self.alarm,
1221 1226 email=self.email,
1222 1227 message=message,
1223 1228 subject=subject,
@@ -1266,7 +1271,7 class Project(Process):
1266 1271
1267 1272 if not os.path.exists('/tmp/schain'):
1268 1273 os.mkdir('/tmp/schain')
1269
1274
1270 1275 self.ctx = zmq.Context()
1271 1276 xpub = self.ctx.socket(zmq.XPUB)
1272 1277 xpub.bind('ipc:///tmp/schain/{}_pub'.format(self.id))
@@ -1282,9 +1287,9 class Project(Process):
1282 1287 def run(self):
1283 1288
1284 1289 log.success('Starting {}: {}'.format(self.name, self.id), tag='')
1285 self.start_time = time.time()
1286 self.createObjects()
1287 self.setProxy()
1290 self.start_time = time.time()
1291 self.createObjects()
1292 self.setProxy()
1288 1293 log.success('{} Done (Time: {}s)'.format(
1289 1294 self.name,
1290 1295 time.time()-self.start_time), '')
@@ -114,7 +114,7 class GenericData(object):
114 114 flagNoData = True
115 115
116 116 def copy(self, inputObj=None):
117
117
118 118 if inputObj == None:
119 119 return copy.deepcopy(self)
120 120
@@ -548,7 +548,7 class Spectra(JROData):
548 548
549 549 deltav = self.getVmax() / (self.nFFTPoints * self.ippFactor)
550 550 velrange = deltav * (numpy.arange(self.nFFTPoints + extrapoints) - self.nFFTPoints / 2.)
551
551
552 552 if self.nmodes:
553 553 return velrange/self.nmodes
554 554 else:
@@ -1104,7 +1104,7 class PlotterData(object):
1104 1104 MAXNUMY = 100
1105 1105
1106 1106 def __init__(self, code, throttle_value, exp_code, buffering=True, snr=False):
1107
1107
1108 1108 self.key = code
1109 1109 self.throttle = throttle_value
1110 1110 self.exp_code = exp_code
@@ -1139,7 +1139,7 class PlotterData(object):
1139 1139 return len(self.__times)
1140 1140
1141 1141 def __getitem__(self, key):
1142
1142
1143 1143 if key not in self.data:
1144 1144 raise KeyError(log.error('Missing key: {}'.format(key)))
1145 1145 if 'spc' in key or not self.buffering:
@@ -1172,7 +1172,7 class PlotterData(object):
1172 1172 elif 'spc_moments' == plot:
1173 1173 plot = 'moments'
1174 1174 self.data[plot] = {}
1175
1175
1176 1176 if 'spc' in self.data or 'rti' in self.data or 'cspc' in self.data or 'moments' in self.data:
1177 1177 self.data['noise'] = {}
1178 1178 self.data['rti'] = {}
@@ -1180,7 +1180,7 class PlotterData(object):
1180 1180 self.plottypes.append('noise')
1181 1181 if 'rti' not in self.plottypes:
1182 1182 self.plottypes.append('rti')
1183
1183
1184 1184 def shape(self, key):
1185 1185 '''
1186 1186 Get the shape of the one-element data for the given key
@@ -1196,17 +1196,17 class PlotterData(object):
1196 1196 '''
1197 1197 Update data object with new dataOut
1198 1198 '''
1199
1199
1200 1200 if tm in self.__times:
1201 1201 return
1202 1202 self.profileIndex = dataOut.profileIndex
1203 1203 self.tm = tm
1204 1204 self.type = dataOut.type
1205 1205 self.parameters = getattr(dataOut, 'parameters', [])
1206
1206
1207 1207 if hasattr(dataOut, 'meta'):
1208 1208 self.meta.update(dataOut.meta)
1209
1209
1210 1210 self.pairs = dataOut.pairsList
1211 1211 self.interval = dataOut.getTimeInterval()
1212 1212 self.localtime = dataOut.useLocalTime
@@ -1217,7 +1217,7 class PlotterData(object):
1217 1217 self.__heights.append(dataOut.heightList)
1218 1218 self.__all_heights.update(dataOut.heightList)
1219 1219 self.__times.append(tm)
1220
1220
1221 1221 for plot in self.plottypes:
1222 1222 if plot in ('spc', 'spc_moments'):
1223 1223 z = dataOut.data_spc/dataOut.normFactor
@@ -1250,8 +1250,8 class PlotterData(object):
1250 1250 if plot == 'scope':
1251 1251 buffer = dataOut.data
1252 1252 self.flagDataAsBlock = dataOut.flagDataAsBlock
1253 self.nProfiles = dataOut.nProfiles
1254
1253 self.nProfiles = dataOut.nProfiles
1254
1255 1255 if plot == 'spc':
1256 1256 self.data['spc'] = buffer
1257 1257 elif plot == 'cspc':
@@ -1326,7 +1326,7 class PlotterData(object):
1326 1326 else:
1327 1327 meta['xrange'] = []
1328 1328
1329 meta.update(self.meta)
1329 meta.update(self.meta)
1330 1330 ret['metadata'] = meta
1331 1331 return json.dumps(ret)
1332 1332
@@ -218,7 +218,7 class SystemHeader(Header):
218 218 structure = SYSTEM_STRUCTURE
219 219
220 220 def __init__(self, nSamples=0, nProfiles=0, nChannels=0, adcResolution=14, pciDioBusWidth=0):
221
221
222 222 self.size = 24
223 223 self.nSamples = nSamples
224 224 self.nProfiles = nProfiles
@@ -903,4 +903,4 def get_procflag_dtype(index):
903 903
904 904 def get_dtype_width(index):
905 905
906 return DTYPE_WIDTH[index] No newline at end of file
906 return DTYPE_WIDTH[index]
@@ -228,7 +228,7 class Plot(Operation):
228 228 self.__throttle_plot = apply_throttle(self.throttle)
229 229 self.data = PlotterData(
230 230 self.CODE, self.throttle, self.exp_code, self.buffering, snr=self.showSNR)
231
231
232 232 if self.plot_server:
233 233 if not self.plot_server.startswith('tcp://'):
234 234 self.plot_server = 'tcp://{}'.format(self.plot_server)
@@ -246,7 +246,7 class Plot(Operation):
246 246
247 247 self.setup()
248 248
249 self.time_label = 'LT' if self.localtime else 'UTC'
249 self.time_label = 'LT' if self.localtime else 'UTC'
250 250
251 251 if self.width is None:
252 252 self.width = 8
@@ -305,7 +305,7 class Plot(Operation):
305 305 cmap = plt.get_cmap(self.colormap)
306 306 cmap.set_bad(self.bgcolor, 1.)
307 307 self.cmaps.append(cmap)
308
308
309 309 for fig in self.figures:
310 310 fig.canvas.mpl_connect('key_press_event', self.OnKeyPress)
311 311 fig.canvas.mpl_connect('scroll_event', self.OnBtnScroll)
@@ -474,11 +474,11 class Plot(Operation):
474 474 xmax += time.timezone
475 475 else:
476 476 xmax = self.xmax
477
477
478 478 ymin = self.ymin if self.ymin else numpy.nanmin(self.y)
479 479 ymax = self.ymax if self.ymax else numpy.nanmax(self.y)
480 480 #Y = numpy.array([1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000])
481
481
482 482 #i = 1 if numpy.where(
483 483 # abs(ymax-ymin) <= Y)[0][0] < 0 else numpy.where(abs(ymax-ymin) <= Y)[0][0]
484 484 #ystep = Y[i] / 10.
@@ -492,14 +492,14 class Plot(Operation):
492 492 ystep = ystep/5
493 493 ystep = ystep/(10**digD)
494 494
495 else:
495 else:
496 496 ystep = ((ymax + (10**(dig)))//10**(dig))*(10**(dig))
497 497 ystep = ystep/5
498
498
499 499 if self.xaxis is not 'time':
500
500
501 501 dig = int(numpy.log10(xmax))
502
502
503 503 if dig <= 0:
504 504 digD = len(str(xmax)) - 2
505 505 xdec = xmax*(10**digD)
@@ -508,11 +508,11 class Plot(Operation):
508 508 xstep = ((xdec + (10**(dig)))//10**(dig))*(10**(dig))
509 509 xstep = xstep*0.5
510 510 xstep = xstep/(10**digD)
511
512 else:
511
512 else:
513 513 xstep = ((xmax + (10**(dig)))//10**(dig))*(10**(dig))
514 514 xstep = xstep/5
515
515
516 516 for n, ax in enumerate(self.axes):
517 517 if ax.firsttime:
518 518 ax.set_facecolor(self.bgcolor)
@@ -610,7 +610,7 class Plot(Operation):
610 610
611 611 if self.save:
612 612 self.save_figure(n)
613
613
614 614 if self.plot_server:
615 615 self.send_to_server()
616 616 # t = Thread(target=self.send_to_server)
@@ -643,11 +643,10 class Plot(Operation):
643 643 '{}{}_{}.png'.format(
644 644 self.CODE,
645 645 label,
646 self.getDateTime(self.data.max_time).strftime(
647 '%Y%m%d_%H%M%S'
648 ),
646 self.getDateTime(self.data.max_time).strftime('%Y%m%d_%H%M%S'),
649 647 )
650 648 )
649
651 650 log.log('Saving figure: {}'.format(figname), self.name)
652 651 if not os.path.isdir(os.path.dirname(figname)):
653 652 os.makedirs(os.path.dirname(figname))
@@ -718,7 +717,7 class Plot(Operation):
718 717 self.ncols: number of cols
719 718 self.nplots: number of plots (channels or pairs)
720 719 self.ylabel: label for Y axes
721 self.titles: list of axes title
720 self.titles: list of axes title
722 721
723 722 '''
724 723 raise NotImplementedError
@@ -728,18 +727,18 class Plot(Operation):
728 727 Must be defined in the child class
729 728 '''
730 729 raise NotImplementedError
731
730
732 731 def run(self, dataOut, **kwargs):
733 732 '''
734 733 Main plotting routine
735 734 '''
736
735
737 736 if self.isConfig is False:
738 737 self.__setup(**kwargs)
739 738 if dataOut.type == 'Parameters':
740 739 t = dataOut.utctimeInit
741 740 else:
742 t = dataOut.utctime
741 t = dataOut.utctime
743 742
744 743 if dataOut.useLocalTime:
745 744 self.getDateTime = datetime.datetime.fromtimestamp
@@ -749,15 +748,15 class Plot(Operation):
749 748 self.getDateTime = datetime.datetime.utcfromtimestamp
750 749 if self.localtime:
751 750 t -= time.timezone
752
751
753 752 if 'buffer' in self.plot_type:
754 753 if self.xmin is None:
755 754 self.tmin = t
756 755 else:
757 756 self.tmin = (
758 757 self.getDateTime(t).replace(
759 hour=self.xmin,
760 minute=0,
758 hour=self.xmin,
759 minute=0,
761 760 second=0) - self.getDateTime(0)).total_seconds()
762 761
763 762 self.data.setup()
@@ -779,7 +778,7 class Plot(Operation):
779 778 if dataOut.useLocalTime and not self.localtime:
780 779 tm += time.timezone
781 780
782 if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60:
781 if self.xaxis is 'time' and self.data and (tm - self.tmin) >= self.xrange*60*60:
783 782 self.save_counter = self.save_period
784 783 self.__plot()
785 784 self.xmin += self.xrange
@@ -807,4 +806,3 class Plot(Operation):
807 806 self.__plot()
808 807 if self.data and self.pause:
809 808 figpause(10)
810
@@ -21,9 +21,10 except:
21 21
22 22 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
23 23 from schainpy.model.data.jrodata import Voltage
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
25 25 from numpy import imag
26 26
27 @MPDecorator
27 28 class AMISRReader(ProcessingUnit):
28 29 '''
29 30 classdocs
@@ -33,9 +34,9 class AMISRReader(ProcessingUnit):
33 34 '''
34 35 Constructor
35 36 '''
36
37
37 38 ProcessingUnit.__init__(self)
38
39
39 40 self.set = None
40 41 self.subset = None
41 42 self.extension_file = '.h5'
@@ -50,40 +51,41 class AMISRReader(ProcessingUnit):
50 51 self.flagIsNewFile = 0
51 52 self.filename = ''
52 53 self.amisrFilePointer = None
53
54
55 self.dataset = None
56
57
58
54
55
56 #self.dataset = None
57
58
59
59 60
60 61 self.profileIndex = 0
61
62
62
63
63 64 self.beamCodeByFrame = None
64 65 self.radacTimeByFrame = None
65
66
66 67 self.dataset = None
67
68
69
70
68
69
70
71
71 72 self.__firstFile = True
72
73
73 74 self.buffer = None
74
75
75
76
76 77 self.timezone = 'ut'
77
78
78 79 self.__waitForNewFile = 20
79 self.__filename_online = None
80 self.__filename_online = None
80 81 #Is really necessary create the output object in the initializer
81 82 self.dataOut = Voltage()
82
83 self.dataOut.error=False
84
83 85 def setup(self,path=None,
84 startDate=None,
85 endDate=None,
86 startTime=None,
86 startDate=None,
87 endDate=None,
88 startTime=None,
87 89 endTime=None,
88 90 walk=True,
89 91 timezone='ut',
@@ -92,41 +94,42 class AMISRReader(ProcessingUnit):
92 94 nCode = 0,
93 95 nBaud = 0,
94 96 online=False):
95
97
98 #print ("T",path)
99
96 100 self.timezone = timezone
97 101 self.all = all
98 102 self.online = online
99
103
100 104 self.code = code
101 105 self.nCode = int(nCode)
102 106 self.nBaud = int(nBaud)
103
104
105
107
108
109
106 110 #self.findFiles()
107 111 if not(online):
108 112 #Busqueda de archivos offline
109 113 self.searchFilesOffLine(path, startDate, endDate, startTime, endTime, walk)
110 114 else:
111 115 self.searchFilesOnLine(path, startDate, endDate, startTime,endTime,walk)
112
116
113 117 if not(self.filenameList):
114 118 print("There is no files into the folder: %s"%(path))
115
116 119 sys.exit(-1)
117
120
118 121 self.fileIndex = -1
119
120 self.readNextFile(online)
121
122
123 self.readNextFile(online)
124
122 125 '''
123 126 Add code
124 '''
127 '''
125 128 self.isConfig = True
126
129
127 130 pass
128
129
131
132
130 133 def readAMISRHeader(self,fp):
131 134 header = 'Raw11/Data/RadacHeader'
132 135 self.beamCodeByPulse = fp.get(header+'/BeamCode') # LIST OF BEAMS PER PROFILE, TO BE USED ON REARRANGE
@@ -142,26 +145,26 class AMISRReader(ProcessingUnit):
142 145 self.rangeFromFile = fp.get('Raw11/Data/Samples/Range')
143 146 self.frequency = fp.get('Rx/Frequency')
144 147 txAus = fp.get('Raw11/Data/Pulsewidth')
145
146
148
149
147 150 self.nblocks = self.pulseCount.shape[0] #nblocks
148
151
149 152 self.nprofiles = self.pulseCount.shape[1] #nprofile
150 153 self.nsa = self.nsamplesPulse[0,0] #ngates
151 154 self.nchannels = self.beamCode.shape[1]
152 155 self.ippSeconds = (self.radacTime[0][1] -self.radacTime[0][0]) #Ipp in seconds
153 156 #self.__waitForNewFile = self.nblocks # wait depending on the number of blocks since each block is 1 sec
154 157 self.__waitForNewFile = self.nblocks * self.nprofiles * self.ippSeconds # wait until new file is created
155
158
156 159 #filling radar controller header parameters
157 160 self.__ippKm = self.ippSeconds *.15*1e6 # in km
158 161 self.__txA = (txAus.value)*.15 #(ipp[us]*.15km/1us) in km
159 162 self.__txB = 0
160 163 nWindows=1
161 self.__nSamples = self.nsa
164 self.__nSamples = self.nsa
162 165 self.__firstHeight = self.rangeFromFile[0][0]/1000 #in km
163 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
164
166 self.__deltaHeight = (self.rangeFromFile[0][1] - self.rangeFromFile[0][0])/1000
167
165 168 #for now until understand why the code saved is different (code included even though code not in tuf file)
166 169 #self.__codeType = 0
167 170 # self.__nCode = None
@@ -173,20 +176,20 class AMISRReader(ProcessingUnit):
173 176 self.__nCode = self.nCode
174 177 self.__nBaud = self.nBaud
175 178 #self.__code = 0
176
179
177 180 #filling system header parameters
178 181 self.__nSamples = self.nsa
179 self.newProfiles = self.nprofiles/self.nchannels
182 self.newProfiles = self.nprofiles/self.nchannels
180 183 self.__channelList = list(range(self.nchannels))
181
184
182 185 self.__frequency = self.frequency[0][0]
183
184 186
185
187
188
186 189 def createBuffers(self):
187
188 pass
189
190
191 pass
192
190 193 def __setParameters(self,path='', startDate='',endDate='',startTime='', endTime='', walk=''):
191 194 self.path = path
192 195 self.startDate = startDate
@@ -194,35 +197,35 class AMISRReader(ProcessingUnit):
194 197 self.startTime = startTime
195 198 self.endTime = endTime
196 199 self.walk = walk
197
200
198 201 def __checkPath(self):
199 202 if os.path.exists(self.path):
200 203 self.status = 1
201 204 else:
202 205 self.status = 0
203 206 print('Path:%s does not exists'%self.path)
204
207
205 208 return
206
207
209
210
208 211 def __selDates(self, amisr_dirname_format):
209 212 try:
210 213 year = int(amisr_dirname_format[0:4])
211 214 month = int(amisr_dirname_format[4:6])
212 215 dom = int(amisr_dirname_format[6:8])
213 216 thisDate = datetime.date(year,month,dom)
214
217
215 218 if (thisDate>=self.startDate and thisDate <= self.endDate):
216 219 return amisr_dirname_format
217 220 except:
218 221 return None
219
220
222
223
221 224 def __findDataForDates(self,online=False):
222
225
223 226 if not(self.status):
224 227 return None
225
228
226 229 pat = '\d+.\d+'
227 230 dirnameList = [re.search(pat,x) for x in os.listdir(self.path)]
228 231 dirnameList = [x for x in dirnameList if x!=None]
@@ -237,7 +240,7 class AMISRReader(ProcessingUnit):
237 240 else:
238 241 self.status = 0
239 242 return None
240
243
241 244 def __getTimeFromData(self):
242 245 startDateTime_Reader = datetime.datetime.combine(self.startDate,self.startTime)
243 246 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
@@ -251,33 +254,35 class AMISRReader(ProcessingUnit):
251 254 filename = self.filenameList[i]
252 255 fp = h5py.File(filename,'r')
253 256 time_str = fp.get('Time/RadacTimeString')
254
255 startDateTimeStr_File = time_str[0][0].split('.')[0]
257
258 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
259 #startDateTimeStr_File = "2019-12-16 09:21:11"
256 260 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
257 261 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
258
259 endDateTimeStr_File = time_str[-1][-1].split('.')[0]
262
263 #endDateTimeStr_File = "2019-12-16 11:10:11"
264 endDateTimeStr_File = time_str[-1][-1].decode('UTF-8').split('.')[0]
260 265 junk = time.strptime(endDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
261 266 endDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
262
267
263 268 fp.close()
264
269
270 #print("check time", startDateTime_File)
265 271 if self.timezone == 'lt':
266 272 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
267 273 endDateTime_File = endDateTime_File - datetime.timedelta(minutes = 300)
268
269 274 if (endDateTime_File>=startDateTime_Reader and endDateTime_File<endDateTime_Reader):
270 275 #self.filenameList.remove(filename)
271 276 filter_filenameList.append(filename)
272
277
273 278 if (endDateTime_File>=endDateTime_Reader):
274 279 break
275
276
280
281
277 282 filter_filenameList.sort()
278 283 self.filenameList = filter_filenameList
279 284 return 1
280
285
281 286 def __filterByGlob1(self, dirName):
282 287 filter_files = glob.glob1(dirName, '*.*%s'%self.extension_file)
283 288 filter_files.sort()
@@ -285,24 +290,24 class AMISRReader(ProcessingUnit):
285 290 filterDict.setdefault(dirName)
286 291 filterDict[dirName] = filter_files
287 292 return filterDict
288
293
289 294 def __getFilenameList(self, fileListInKeys, dirList):
290 295 for value in fileListInKeys:
291 296 dirName = list(value.keys())[0]
292 297 for file in value[dirName]:
293 298 filename = os.path.join(dirName, file)
294 299 self.filenameList.append(filename)
295
296
300
301
297 302 def __selectDataForTimes(self, online=False):
298 303 #aun no esta implementado el filtro for tiempo
299 304 if not(self.status):
300 305 return None
301
306
302 307 dirList = [os.path.join(self.path,x) for x in self.dirnameList]
303
308
304 309 fileListInKeys = [self.__filterByGlob1(x) for x in dirList]
305
310
306 311 self.__getFilenameList(fileListInKeys, dirList)
307 312 if not(online):
308 313 #filtro por tiempo
@@ -315,11 +320,11 class AMISRReader(ProcessingUnit):
315 320 else:
316 321 self.status = 0
317 322 return None
318
323
319 324 else:
320 325 #get the last file - 1
321 326 self.filenameList = [self.filenameList[-2]]
322
327
323 328 new_dirnameList = []
324 329 for dirname in self.dirnameList:
325 330 junk = numpy.array([dirname in x for x in self.filenameList])
@@ -328,27 +333,27 class AMISRReader(ProcessingUnit):
328 333 new_dirnameList.append(dirname)
329 334 self.dirnameList = new_dirnameList
330 335 return 1
331
336
332 337 def searchFilesOnLine(self, path, startDate, endDate, startTime=datetime.time(0,0,0),
333 338 endTime=datetime.time(23,59,59),walk=True):
334
339
335 340 if endDate ==None:
336 341 startDate = datetime.datetime.utcnow().date()
337 342 endDate = datetime.datetime.utcnow().date()
338
343
339 344 self.__setParameters(path=path, startDate=startDate, endDate=endDate,startTime = startTime,endTime=endTime, walk=walk)
340
345
341 346 self.__checkPath()
342
347
343 348 self.__findDataForDates(online=True)
344
349
345 350 self.dirnameList = [self.dirnameList[-1]]
346
351
347 352 self.__selectDataForTimes(online=True)
348
353
349 354 return
350
351
355
356
352 357 def searchFilesOffLine(self,
353 358 path,
354 359 startDate,
@@ -356,20 +361,20 class AMISRReader(ProcessingUnit):
356 361 startTime=datetime.time(0,0,0),
357 362 endTime=datetime.time(23,59,59),
358 363 walk=True):
359
364
360 365 self.__setParameters(path, startDate, endDate, startTime, endTime, walk)
361
366
362 367 self.__checkPath()
363
368
364 369 self.__findDataForDates()
365
370
366 371 self.__selectDataForTimes()
367
372
368 373 for i in range(len(self.filenameList)):
369 374 print("%s" %(self.filenameList[i]))
370
371 return
372
375
376 return
377
373 378 def __setNextFileOffline(self):
374 379 idFile = self.fileIndex
375 380
@@ -378,12 +383,13 class AMISRReader(ProcessingUnit):
378 383 if not(idFile < len(self.filenameList)):
379 384 self.flagNoMoreFiles = 1
380 385 print("No more Files")
386 self.dataOut.error = True
381 387 return 0
382 388
383 389 filename = self.filenameList[idFile]
384 390
385 391 amisrFilePointer = h5py.File(filename,'r')
386
392
387 393 break
388 394
389 395 self.flagIsNewFile = 1
@@ -395,8 +401,8 class AMISRReader(ProcessingUnit):
395 401 print("Setting the file: %s"%self.filename)
396 402
397 403 return 1
398
399
404
405
400 406 def __setNextFileOnline(self):
401 407 filename = self.filenameList[0]
402 408 if self.__filename_online != None:
@@ -411,54 +417,56 class AMISRReader(ProcessingUnit):
411 417 self.__selectDataForTimes(online=True)
412 418 filename = self.filenameList[0]
413 419 wait += 1
414
420
415 421 self.__filename_online = filename
416
422
417 423 self.amisrFilePointer = h5py.File(filename,'r')
418 424 self.flagIsNewFile = 1
419 425 self.filename = filename
420 426 print("Setting the file: %s"%self.filename)
421 427 return 1
422
423
428
429
424 430 def readData(self):
425 431 buffer = self.amisrFilePointer.get('Raw11/Data/Samples/Data')
426 432 re = buffer[:,:,:,0]
427 433 im = buffer[:,:,:,1]
428 434 dataset = re + im*1j
435
429 436 self.radacTime = self.amisrFilePointer.get('Raw11/Data/RadacHeader/RadacTime')
430 437 timeset = self.radacTime[:,0]
438
431 439 return dataset,timeset
432
440
433 441 def reshapeData(self):
434 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
442 #self.beamCodeByPulse, self.beamCode, self.nblocks, self.nprofiles, self.nsa,
435 443 channels = self.beamCodeByPulse[0,:]
436 444 nchan = self.nchannels
437 445 #self.newProfiles = self.nprofiles/nchan #must be defined on filljroheader
438 446 nblocks = self.nblocks
439 447 nsamples = self.nsa
440
448
441 449 #Dimensions : nChannels, nProfiles, nSamples
442 new_block = numpy.empty((nblocks, nchan, self.newProfiles, nsamples), dtype="complex64")
450 new_block = numpy.empty((nblocks, nchan, numpy.int_(self.newProfiles), nsamples), dtype="complex64")
443 451 ############################################
444
452
445 453 for thisChannel in range(nchan):
446 454 new_block[:,thisChannel,:,:] = self.dataset[:,numpy.where(channels==self.beamCode[0][thisChannel])[0],:]
447 455
448
456
449 457 new_block = numpy.transpose(new_block, (1,0,2,3))
450 458 new_block = numpy.reshape(new_block, (nchan,-1, nsamples))
451
452 return new_block
453
459
460 return new_block
461
454 462 def updateIndexes(self):
455
463
456 464 pass
457
465
458 466 def fillJROHeader(self):
459
467
460 468 #fill radar controller header
461 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
469 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ipp=self.__ippKm,
462 470 txA=self.__txA,
463 471 txB=0,
464 472 nWindows=1,
@@ -469,161 +477,173 class AMISRReader(ProcessingUnit):
469 477 nCode=self.__nCode, nBaud=self.__nBaud,
470 478 code = self.__code,
471 479 fClock=1)
472
473
474
480
475 481 #fill system header
476 482 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
477 483 nProfiles=self.newProfiles,
478 484 nChannels=len(self.__channelList),
479 485 adcResolution=14,
480 pciDioBusWith=32)
481
486 pciDioBusWidth=32)
487
482 488 self.dataOut.type = "Voltage"
483
489
484 490 self.dataOut.data = None
485
491
486 492 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
487
493
488 494 # self.dataOut.nChannels = 0
489
495
490 496 # self.dataOut.nHeights = 0
491
497
492 498 self.dataOut.nProfiles = self.newProfiles*self.nblocks
493
499
494 500 #self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
495 501 ranges = numpy.reshape(self.rangeFromFile.value,(-1))
496 502 self.dataOut.heightList = ranges/1000.0 #km
497
498
503
504
499 505 self.dataOut.channelList = self.__channelList
500
506
501 507 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
502
508
503 509 # self.dataOut.channelIndexList = None
504
510
505 511 self.dataOut.flagNoData = True
506
507 #Set to TRUE if the data is discontinuous
512
513 #Set to TRUE if the data is discontinuous
508 514 self.dataOut.flagDiscontinuousBlock = False
509
515
510 516 self.dataOut.utctime = None
511
517
512 518 #self.dataOut.timeZone = -5 #self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
513 519 if self.timezone == 'lt':
514 520 self.dataOut.timeZone = time.timezone / 60. #get the timezone in minutes
515 else:
521 else:
516 522 self.dataOut.timeZone = 0 #by default time is UTC
517 523
518 524 self.dataOut.dstFlag = 0
519
525
520 526 self.dataOut.errorCount = 0
521
527
522 528 self.dataOut.nCohInt = 1
523
529
524 530 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
525
531
526 532 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
527
533
528 534 self.dataOut.flagShiftFFT = False
529
535
530 536 self.dataOut.ippSeconds = self.ippSeconds
531
532 #Time interval between profiles
537
538 #Time interval between profiles
533 539 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
534
540
535 541 self.dataOut.frequency = self.__frequency
536
537 542 self.dataOut.realtime = self.online
538 543 pass
539
544
540 545 def readNextFile(self,online=False):
541
546
542 547 if not(online):
543 548 newFile = self.__setNextFileOffline()
544 549 else:
545 newFile = self.__setNextFileOnline()
546
550 newFile = self.__setNextFileOnline()
551
547 552 if not(newFile):
548 553 return 0
549
550 554 #if self.__firstFile:
551 555 self.readAMISRHeader(self.amisrFilePointer)
556
552 557 self.createBuffers()
558
553 559 self.fillJROHeader()
560
554 561 #self.__firstFile = False
555
556
557
562
563
564
558 565 self.dataset,self.timeset = self.readData()
559
566
560 567 if self.endDate!=None:
561 568 endDateTime_Reader = datetime.datetime.combine(self.endDate,self.endTime)
562 569 time_str = self.amisrFilePointer.get('Time/RadacTimeString')
563 startDateTimeStr_File = time_str[0][0].split('.')[0]
570 startDateTimeStr_File = time_str[0][0].decode('UTF-8').split('.')[0]
564 571 junk = time.strptime(startDateTimeStr_File, '%Y-%m-%d %H:%M:%S')
565 572 startDateTime_File = datetime.datetime(junk.tm_year,junk.tm_mon,junk.tm_mday,junk.tm_hour, junk.tm_min, junk.tm_sec)
566 573 if self.timezone == 'lt':
567 574 startDateTime_File = startDateTime_File - datetime.timedelta(minutes = 300)
568 575 if (startDateTime_File>endDateTime_Reader):
569 576 return 0
570
577
571 578 self.jrodataset = self.reshapeData()
572 579 #----self.updateIndexes()
573 580 self.profileIndex = 0
574
581
575 582 return 1
576
577
583
584
578 585 def __hasNotDataInBuffer(self):
579 586 if self.profileIndex >= (self.newProfiles*self.nblocks):
580 587 return 1
581 588 return 0
582
583
589
590
584 591 def getData(self):
585
592
586 593 if self.flagNoMoreFiles:
587 594 self.dataOut.flagNoData = True
588 595 return 0
589
596
590 597 if self.__hasNotDataInBuffer():
591 598 if not (self.readNextFile(self.online)):
592 599 return 0
593 600
594
595 if self.dataset is None: # setear esta condicion cuando no hayan datos por leers
596 self.dataOut.flagNoData = True
601
602 if self.dataset is None: # setear esta condicion cuando no hayan datos por leer
603 self.dataOut.flagNoData = True
597 604 return 0
598
605
599 606 #self.dataOut.data = numpy.reshape(self.jrodataset[self.profileIndex,:],(1,-1))
600
607
601 608 self.dataOut.data = self.jrodataset[:,self.profileIndex,:]
602
609
610 #print("R_t",self.timeset)
611
603 612 #self.dataOut.utctime = self.jrotimeset[self.profileIndex]
604 613 #verificar basic header de jro data y ver si es compatible con este valor
605 614 #self.dataOut.utctime = self.timeset + (self.profileIndex * self.ippSeconds * self.nchannels)
606 615 indexprof = numpy.mod(self.profileIndex, self.newProfiles)
607 616 indexblock = self.profileIndex/self.newProfiles
608 #print indexblock, indexprof
609 self.dataOut.utctime = self.timeset[indexblock] + (indexprof * self.ippSeconds * self.nchannels)
617 #print (indexblock, indexprof)
618 diffUTC = 1.8e4 #UTC diference from peru in seconds --Joab
619 diffUTC = 0
620 t_comp = (indexprof * self.ippSeconds * self.nchannels) + diffUTC #
621 #cambio posible 18/02/2020
622
623
624
625 #print("utc :",indexblock," __ ",t_comp)
626 #print(numpy.shape(self.timeset))
627 self.dataOut.utctime = self.timeset[numpy.int_(indexblock)] + t_comp
628 #self.dataOut.utctime = self.timeset[self.profileIndex] + t_comp
629 #print(self.dataOut.utctime)
610 630 self.dataOut.profileIndex = self.profileIndex
611 631 self.dataOut.flagNoData = False
612 632 # if indexprof == 0:
613 633 # print self.dataOut.utctime
614
634
615 635 self.profileIndex += 1
616
636
617 637 return self.dataOut.data
618
619
638
639
620 640 def run(self, **kwargs):
621 641 '''
622 642 This method will be called many times so here you should put all your code
623 643 '''
624
644
625 645 if not self.isConfig:
626 646 self.setup(**kwargs)
627 647 self.isConfig = True
628
648
629 649 self.getData()
@@ -183,7 +183,7 class ParamReader(JRODataReader,ProcessingUnit):
183 183 except IOError:
184 184 traceback.print_exc()
185 185 raise IOError("The file %s can't be opened" %(filename))
186
186
187 187 #In case has utctime attribute
188 188 grp2 = grp1['utctime']
189 189 # thisUtcTime = grp2.value[0] - 5*3600 #To convert to local time
@@ -497,7 +497,7 class ParamWriter(Operation):
497 497 setType = None
498 498
499 499 def __init__(self):
500
500
501 501 Operation.__init__(self)
502 502 return
503 503
@@ -530,9 +530,9 class ParamWriter(Operation):
530 530 dsDict['variable'] = self.dataList[i]
531 531 #--------------------- Conditionals ------------------------
532 532 #There is no data
533
533
534 534 if dataAux is None:
535
535
536 536 return 0
537 537
538 538 if isinstance(dataAux, (int, float, numpy.integer, numpy.float)):
@@ -704,7 +704,7 class ParamWriter(Operation):
704 704 return False
705 705
706 706 def setNextFile(self):
707
707
708 708 ext = self.ext
709 709 path = self.path
710 710 setFile = self.setFile
@@ -717,7 +717,7 class ParamWriter(Operation):
717 717
718 718 if os.path.exists(fullpath):
719 719 filesList = os.listdir( fullpath )
720 filesList = [k for k in filesList if 'M' in k]
720 ##filesList = [k for k in filesList if 'M' in k]
721 721 if len( filesList ) > 0:
722 722 filesList = sorted( filesList, key=str.lower )
723 723 filen = filesList[-1]
@@ -785,7 +785,7 class ParamWriter(Operation):
785 785 for j in range(dsInfo['dsNumber']):
786 786 dsInfo = dsList[i]
787 787 tableName = dsInfo['dsName']
788
788
789 789
790 790 if dsInfo['nDim'] == 3:
791 791 shape = dsInfo['shape'].astype(int)
@@ -954,7 +954,7 class ParamWriter(Operation):
954 954
955 955 self.dataOut = dataOut
956 956 if not(self.isConfig):
957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
957 self.setup(dataOut, path=path, blocksPerFile=blocksPerFile,
958 958 metadataList=metadataList, dataList=dataList, mode=mode,
959 959 setType=setType)
960 960
@@ -963,7 +963,7 class ParamWriter(Operation):
963 963
964 964 self.putData()
965 965 return
966
966
967 967
968 968 @MPDecorator
969 969 class ParameterReader(Reader, ProcessingUnit):
@@ -992,43 +992,43 class ParameterReader(Reader, ProcessingUnit):
992 992
993 993 self.set_kwargs(**kwargs)
994 994 if not self.ext.startswith('.'):
995 self.ext = '.{}'.format(self.ext)
995 self.ext = '.{}'.format(self.ext)
996 996
997 997 if self.online:
998 998 log.log("Searching files in online mode...", self.name)
999 999
1000 1000 for nTries in range(self.nTries):
1001 1001 fullpath = self.searchFilesOnLine(self.path, self.startDate,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1002 self.endDate, self.expLabel, self.ext, self.walk,
1003 1003 self.filefmt, self.folderfmt)
1004 1004
1005 1005 try:
1006 1006 fullpath = next(fullpath)
1007 1007 except:
1008 1008 fullpath = None
1009
1009
1010 1010 if fullpath:
1011 1011 break
1012 1012
1013 1013 log.warning(
1014 1014 'Waiting {} sec for a valid file in {}: try {} ...'.format(
1015 self.delay, self.path, nTries + 1),
1015 self.delay, self.path, nTries + 1),
1016 1016 self.name)
1017 1017 time.sleep(self.delay)
1018 1018
1019 1019 if not(fullpath):
1020 1020 raise schainpy.admin.SchainError(
1021 'There isn\'t any valid file in {}'.format(self.path))
1021 'There isn\'t any valid file in {}'.format(self.path))
1022 1022
1023 1023 pathname, filename = os.path.split(fullpath)
1024 1024 self.year = int(filename[1:5])
1025 1025 self.doy = int(filename[5:8])
1026 self.set = int(filename[8:11]) - 1
1026 self.set = int(filename[8:11]) - 1
1027 1027 else:
1028 1028 log.log("Searching files in {}".format(self.path), self.name)
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1029 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
1030 1030 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
1031
1031
1032 1032 self.setNextFile()
1033 1033
1034 1034 return
@@ -1036,11 +1036,11 class ParameterReader(Reader, ProcessingUnit):
1036 1036 def readFirstHeader(self):
1037 1037 '''Read metadata and data'''
1038 1038
1039 self.__readMetadata()
1039 self.__readMetadata()
1040 1040 self.__readData()
1041 1041 self.__setBlockList()
1042 1042 self.blockIndex = 0
1043
1043
1044 1044 return
1045 1045
1046 1046 def __setBlockList(self):
@@ -1099,7 +1099,7 class ParameterReader(Reader, ProcessingUnit):
1099 1099 else:
1100 1100 data = gp[name].value
1101 1101 listMetaname.append(name)
1102 listMetadata.append(data)
1102 listMetadata.append(data)
1103 1103 elif self.metadata:
1104 1104 metadata = json.loads(self.metadata)
1105 1105 listShapes = {}
@@ -1115,7 +1115,7 class ParameterReader(Reader, ProcessingUnit):
1115 1115
1116 1116 self.listShapes = listShapes
1117 1117 self.listMetaname = listMetaname
1118 self.listMeta = listMetadata
1118 self.listMeta = listMetadata
1119 1119
1120 1120 return
1121 1121
@@ -1123,7 +1123,7 class ParameterReader(Reader, ProcessingUnit):
1123 1123
1124 1124 listdataname = []
1125 1125 listdata = []
1126
1126
1127 1127 if 'Data' in self.fp:
1128 1128 grp = self.fp['Data']
1129 1129 for item in list(grp.items()):
@@ -1137,7 +1137,7 class ParameterReader(Reader, ProcessingUnit):
1137 1137 for i in range(dim):
1138 1138 array.append(grp[name]['table{:02d}'.format(i)].value)
1139 1139 array = numpy.array(array)
1140
1140
1141 1141 listdata.append(array)
1142 1142 elif self.metadata:
1143 1143 metadata = json.loads(self.metadata)
@@ -1160,7 +1160,7 class ParameterReader(Reader, ProcessingUnit):
1160 1160 self.listDataname = listdataname
1161 1161 self.listData = listdata
1162 1162 return
1163
1163
1164 1164 def getData(self):
1165 1165
1166 1166 for i in range(len(self.listMeta)):
@@ -1230,7 +1230,7 class ParameterWriter(Operation):
1230 1230 lastTime = None
1231 1231
1232 1232 def __init__(self):
1233
1233
1234 1234 Operation.__init__(self)
1235 1235 return
1236 1236
@@ -1257,7 +1257,7 class ParameterWriter(Operation):
1257 1257 dsDict['nDim'] = len(dataAux.shape)
1258 1258 dsDict['shape'] = dataAux.shape
1259 1259 dsDict['dsNumber'] = dataAux.shape[0]
1260
1260
1261 1261 dsList.append(dsDict)
1262 1262 tableList.append((self.dataList[i], dsDict['nDim']))
1263 1263
@@ -1274,7 +1274,7 class ParameterWriter(Operation):
1274 1274 self.lastTime = currentTime
1275 1275 self.currentDay = dataDay
1276 1276 return False
1277
1277
1278 1278 timeDiff = currentTime - self.lastTime
1279 1279
1280 1280 #Si el dia es diferente o si la diferencia entre un dato y otro supera la hora
@@ -1292,7 +1292,7 class ParameterWriter(Operation):
1292 1292
1293 1293 self.dataOut = dataOut
1294 1294 if not(self.isConfig):
1295 self.setup(path=path, blocksPerFile=blocksPerFile,
1295 self.setup(path=path, blocksPerFile=blocksPerFile,
1296 1296 metadataList=metadataList, dataList=dataList,
1297 1297 setType=setType)
1298 1298
@@ -1301,9 +1301,9 class ParameterWriter(Operation):
1301 1301
1302 1302 self.putData()
1303 1303 return
1304
1304
1305 1305 def setNextFile(self):
1306
1306
1307 1307 ext = self.ext
1308 1308 path = self.path
1309 1309 setFile = self.setFile
@@ -1369,17 +1369,17 class ParameterWriter(Operation):
1369 1369 return
1370 1370
1371 1371 def writeData(self, fp):
1372
1372
1373 1373 grp = fp.create_group("Data")
1374 1374 dtsets = []
1375 1375 data = []
1376
1376
1377 1377 for dsInfo in self.dsList:
1378 1378 if dsInfo['nDim'] == 0:
1379 1379 ds = grp.create_dataset(
1380 dsInfo['variable'],
1380 dsInfo['variable'],
1381 1381 (self.blocksPerFile, ),
1382 chunks=True,
1382 chunks=True,
1383 1383 dtype=numpy.float64)
1384 1384 dtsets.append(ds)
1385 1385 data.append((dsInfo['variable'], -1))
@@ -1387,7 +1387,7 class ParameterWriter(Operation):
1387 1387 sgrp = grp.create_group(dsInfo['variable'])
1388 1388 for i in range(dsInfo['dsNumber']):
1389 1389 ds = sgrp.create_dataset(
1390 'table{:02d}'.format(i),
1390 'table{:02d}'.format(i),
1391 1391 (self.blocksPerFile, ) + dsInfo['shape'][1:],
1392 1392 chunks=True)
1393 1393 dtsets.append(ds)
@@ -1395,7 +1395,7 class ParameterWriter(Operation):
1395 1395 fp.flush()
1396 1396
1397 1397 log.log('Creating file: {}'.format(fp.filename), self.name)
1398
1398
1399 1399 self.ds = dtsets
1400 1400 self.data = data
1401 1401 self.firsttime = True
@@ -4,8 +4,8 Author : Sergio Cortez
4 4 Jan 2018
5 5 Abstract:
6 6 Base class for processing units and operations. A decorator provides multiprocessing features and interconnect the processes created.
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
7 The argument (kwargs) sent from the controller is parsed and filtered via the decorator for each processing unit or operation instantiated.
8 The decorator handle also the methods inside the processing unit to be called from the main script (not as operations) (OPERATION -> type ='self').
9 9
10 10 Based on:
11 11 $Author: murco $
@@ -33,14 +33,14 class ProcessingUnit(object):
33 33
34 34 """
35 35 Update - Jan 2018 - MULTIPROCESSING
36 All the "call" methods present in the previous base were removed.
36 All the "call" methods present in the previous base were removed.
37 37 The majority of operations are independant processes, thus
38 the decorator is in charge of communicate the operation processes
38 the decorator is in charge of communicate the operation processes
39 39 with the proccessing unit via IPC.
40 40
41 41 The constructor does not receive any argument. The remaining methods
42 42 are related with the operations to execute.
43
43
44 44
45 45 """
46 46 proc_type = 'processing'
@@ -62,7 +62,7 class ProcessingUnit(object):
62 62
63 63 def addOperation(self, conf, operation):
64 64 """
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
65 This method is used in the controller, and update the dictionary containing the operations to execute. The dict
66 66 posses the id of the operation process (IPC purposes)
67 67
68 68 Agrega un objeto del tipo "Operation" (opObj) a la lista de objetos "self.objectList" y retorna el
@@ -79,7 +79,7 class ProcessingUnit(object):
79 79
80 80 self.operations.append(
81 81 (operation, conf.type, conf.id, conf.getKwargs()))
82
82
83 83 if 'plot' in self.name.lower():
84 84 self.plots.append(operation.CODE)
85 85
@@ -181,7 +181,7 class Operation(object):
181 181 return
182 182
183 183 class InputQueue(Thread):
184
184
185 185 '''
186 186 Class to hold input data for Proccessing Units and external Operations,
187 187 '''
@@ -212,26 +212,26 class InputQueue(Thread):
212 212 def get(self):
213 213
214 214 if not self.islocked and self.size/1000000 > 512:
215 self.lock.n.value += 1
215 self.lock.n.value += 1
216 216 self.islocked = True
217 217 self.lock.clear()
218 218 elif self.islocked and self.size/1000000 <= 512:
219 219 self.islocked = False
220 220 self.lock.n.value -= 1
221 221 if self.lock.n.value == 0:
222 self.lock.set()
223
222 self.lock.set()
223
224 224 obj = self.queue.get()
225 225 self.size -= sys.getsizeof(obj)
226 226 return pickle.loads(obj)
227 227
228
228
229 229 def MPDecorator(BaseClass):
230 230 """
231 231 Multiprocessing class decorator
232 232
233 233 This function add multiprocessing features to a BaseClass. Also, it handle
234 the communication beetween processes (readers, procUnits and operations).
234 the communication beetween processes (readers, procUnits and operations).
235 235 """
236 236
237 237 class MPClass(BaseClass, Process):
@@ -248,11 +248,11 def MPDecorator(BaseClass):
248 248 self.t = time.time()
249 249 self.name = BaseClass.__name__
250 250 self.__doc__ = BaseClass.__doc__
251
251
252 252 if 'plot' in self.name.lower() and not self.name.endswith('_'):
253 253 self.name = '{}{}'.format(self.CODE.upper(), 'Plot')
254
255 self.start_time = time.time()
254
255 self.start_time = time.time()
256 256 self.id = args[0]
257 257 self.inputId = args[1]
258 258 self.project_id = args[2]
@@ -269,21 +269,21 def MPDecorator(BaseClass):
269 269 '''
270 270
271 271 self.queue.start()
272
272
273 273 def listen(self):
274 274 '''
275 275 This function waits for objects
276 276 '''
277
278 return self.queue.get()
277
278 return self.queue.get()
279 279
280 280 def set_publisher(self):
281 281 '''
282 This function create a zmq socket for publishing objects.
282 This function create a zmq socket for publishing objects.
283 283 '''
284 284
285 285 time.sleep(0.5)
286
286
287 287 c = zmq.Context()
288 288 self.sender = c.socket(zmq.PUB)
289 289 self.sender.connect(
@@ -293,12 +293,11 def MPDecorator(BaseClass):
293 293 '''
294 294 This function publish an object, to an specific topic.
295 295 It blocks publishing when receiver queue is full to avoid data loss
296 '''
297
296 '''
297
298 298 if self.inputId is None:
299 299 self.lock.wait()
300 300 self.sender.send_multipart([str(id).encode(), pickle.dumps(data)])
301
302 301 def runReader(self):
303 302 '''
304 303 Run fuction for read units
@@ -308,13 +307,13 def MPDecorator(BaseClass):
308 307 try:
309 308 BaseClass.run(self, **self.kwargs)
310 309 except:
311 err = traceback.format_exc()
310 err = traceback.format_exc()
312 311 if 'No more files' in err:
313 312 log.warning('No more files to read', self.name)
314 313 else:
315 314 self.err_queue.put('{}|{}'.format(self.name, err))
316 self.dataOut.error = True
317
315 self.dataOut.error = True
316
318 317 for op, optype, opId, kwargs in self.operations:
319 318 if optype == 'self' and not self.dataOut.flagNoData:
320 319 op(**kwargs)
@@ -327,8 +326,7 def MPDecorator(BaseClass):
327 326 continue
328 327
329 328 self.publish(self.dataOut, self.id)
330
331 if self.dataOut.error:
329 if self.dataOut.error:
332 330 break
333 331
334 332 time.sleep(0.5)
@@ -339,7 +337,7 def MPDecorator(BaseClass):
339 337 '''
340 338
341 339 while True:
342 self.dataIn = self.listen()
340 self.dataIn = self.listen()
343 341
344 342 if self.dataIn.flagNoData and self.dataIn.error is None:
345 343 continue
@@ -352,23 +350,23 def MPDecorator(BaseClass):
352 350 elif self.dataIn.error:
353 351 self.dataOut.error = self.dataIn.error
354 352 self.dataOut.flagNoData = True
355
353
356 354 for op, optype, opId, kwargs in self.operations:
357 355 if optype == 'self' and not self.dataOut.flagNoData:
358 356 op(**kwargs)
359 357 elif optype == 'other' and not self.dataOut.flagNoData:
360 358 self.dataOut = op.run(self.dataOut, **kwargs)
361 elif optype == 'external' and not self.dataOut.flagNoData:
359 elif optype == 'external' and not self.dataOut.flagNoData:
362 360 self.publish(self.dataOut, opId)
363
361
364 362 self.publish(self.dataOut, self.id)
365 363 for op, optype, opId, kwargs in self.operations:
366 if optype == 'external' and self.dataOut.error:
364 if optype == 'external' and self.dataOut.error:
367 365 self.publish(self.dataOut, opId)
368
366
369 367 if self.dataOut.error:
370 368 break
371
369
372 370 time.sleep(0.5)
373 371
374 372 def runOp(self):
@@ -376,7 +374,7 def MPDecorator(BaseClass):
376 374 Run function for external operations (this operations just receive data
377 375 ex: plots, writers, publishers)
378 376 '''
379
377
380 378 while True:
381 379
382 380 dataOut = self.listen()
@@ -388,21 +386,20 def MPDecorator(BaseClass):
388 386 self.err_queue.put('{}|{}'.format(self.name, traceback.format_exc()))
389 387 dataOut.error = True
390 388 else:
391 break
389 break
392 390
393 391 def run(self):
394 392 if self.typeProc is "ProcUnit":
395 393
396 394 if self.inputId is not None:
397 395 self.subscribe()
398
396
399 397 self.set_publisher()
400 398
401 399 if 'Reader' not in BaseClass.__name__:
402 400 self.runProc()
403 401 else:
404 402 self.runReader()
405
406 403 elif self.typeProc is "Operation":
407 404
408 405 self.subscribe()
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -291,16 +291,16 class SpectraProc(ProcessingUnit):
291 291 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
292 292 self.dataOut.channelList = range(len(channelIndexList))
293 293 self.__selectPairsByChannel(channelIndexList)
294
294
295 295 return 1
296
297
296
297
298 298 def selectFFTs(self, minFFT, maxFFT ):
299 299 """
300 Selecciona un bloque de datos en base a un grupo de valores de puntos FFTs segun el rango
300 Selecciona un bloque de datos en base a un grupo de valores de puntos FFTs segun el rango
301 301 minFFT<= FFT <= maxFFT
302 302 """
303
303
304 304 if (minFFT > maxFFT):
305 305 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minFFT, maxFFT))
306 306
@@ -330,20 +330,20 class SpectraProc(ProcessingUnit):
330 330 self.selectFFTsByIndex(minIndex, maxIndex)
331 331
332 332 return 1
333
334
333
334
335 335 def setH0(self, h0, deltaHeight = None):
336
336
337 337 if not deltaHeight:
338 338 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
339
339
340 340 nHeights = self.dataOut.nHeights
341
341
342 342 newHeiRange = h0 + numpy.arange(nHeights)*deltaHeight
343
343
344 344 self.dataOut.heightList = newHeiRange
345
346
345
346
347 347 def selectHeights(self, minHei, maxHei):
348 348 """
349 349 Selecciona un bloque de datos en base a un grupo de valores de alturas segun el rango
@@ -360,7 +360,7 class SpectraProc(ProcessingUnit):
360 360 1 si el metodo se ejecuto con exito caso contrario devuelve 0
361 361 """
362 362
363
363
364 364 if (minHei > maxHei):
365 365 raise ValueError("Error selecting heights: Height range (%d,%d) is not valid" % (minHei, maxHei))
366 366
@@ -388,7 +388,7 class SpectraProc(ProcessingUnit):
388 388 maxIndex = len(heights)
389 389
390 390 self.selectHeightsByIndex(minIndex, maxIndex)
391
391
392 392
393 393 return 1
394 394
@@ -436,7 +436,7 class SpectraProc(ProcessingUnit):
436 436
437 437 def selectFFTsByIndex(self, minIndex, maxIndex):
438 438 """
439
439
440 440 """
441 441
442 442 if (minIndex < 0) or (minIndex > maxIndex):
@@ -459,7 +459,7 class SpectraProc(ProcessingUnit):
459 459 self.dataOut.data_spc = data_spc
460 460 self.dataOut.data_cspc = data_cspc
461 461 self.dataOut.data_dc = data_dc
462
462
463 463 self.dataOut.ippSeconds = self.dataOut.ippSeconds*(self.dataOut.nFFTPoints / numpy.shape(data_cspc)[1])
464 464 self.dataOut.nFFTPoints = numpy.shape(data_cspc)[1]
465 465 self.dataOut.profilesPerBlock = numpy.shape(data_cspc)[1]
@@ -552,7 +552,7 class SpectraProc(ProcessingUnit):
552 552 xx_inv = numpy.linalg.inv(xx)
553 553 xx_aux = xx_inv[0, :]
554 554
555 for ich in range(num_chan):
555 for ich in range(num_chan):
556 556 yy = jspectra[ich, ind_vel, :]
557 557 jspectra[ich, freq_dc, :] = numpy.dot(xx_aux, yy)
558 558
@@ -574,12 +574,12 class SpectraProc(ProcessingUnit):
574 574 return 1
575 575
576 576 def removeInterference2(self):
577
577
578 578 cspc = self.dataOut.data_cspc
579 579 spc = self.dataOut.data_spc
580 Heights = numpy.arange(cspc.shape[2])
580 Heights = numpy.arange(cspc.shape[2])
581 581 realCspc = numpy.abs(cspc)
582
582
583 583 for i in range(cspc.shape[0]):
584 584 LinePower= numpy.sum(realCspc[i], axis=0)
585 585 Threshold = numpy.amax(LinePower)-numpy.sort(LinePower)[len(Heights)-int(len(Heights)*0.1)]
@@ -587,17 +587,17 class SpectraProc(ProcessingUnit):
587 587 InterferenceSum = numpy.sum( realCspc[i,:,SelectedHeights], axis=0 )
588 588 InterferenceThresholdMin = numpy.sort(InterferenceSum)[int(len(InterferenceSum)*0.98)]
589 589 InterferenceThresholdMax = numpy.sort(InterferenceSum)[int(len(InterferenceSum)*0.99)]
590
591
590
591
592 592 InterferenceRange = numpy.where( ([InterferenceSum > InterferenceThresholdMin]))# , InterferenceSum < InterferenceThresholdMax]) )
593 593 #InterferenceRange = numpy.where( ([InterferenceRange < InterferenceThresholdMax]))
594 594 if len(InterferenceRange)<int(cspc.shape[1]*0.3):
595 595 cspc[i,InterferenceRange,:] = numpy.NaN
596
597
598
596
597
598
599 599 self.dataOut.data_cspc = cspc
600
600
601 601 def removeInterference(self, interf = 2,hei_interf = None, nhei_interf = None, offhei_interf = None):
602 602
603 603 jspectra = self.dataOut.data_spc
@@ -931,7 +931,7 class IncohInt(Operation):
931 931 if n is not None:
932 932 self.n = int(n)
933 933 else:
934
934
935 935 self.__integrationtime = int(timeInterval)
936 936 self.n = None
937 937 self.__byTime = True
@@ -1032,7 +1032,7 class IncohInt(Operation):
1032 1032 def run(self, dataOut, n=None, timeInterval=None, overlapping=False):
1033 1033 if n == 1:
1034 1034 return
1035
1035
1036 1036 dataOut.flagNoData = True
1037 1037
1038 1038 if not self.isConfig:
@@ -1048,9 +1048,9 class IncohInt(Operation):
1048 1048
1049 1049 dataOut.data_spc = avgdata_spc
1050 1050 dataOut.data_cspc = avgdata_cspc
1051 dataOut.data_dc = avgdata_dc
1051 dataOut.data_dc = avgdata_dc
1052 1052 dataOut.nIncohInt *= self.n
1053 1053 dataOut.utctime = avgdatatime
1054 1054 dataOut.flagNoData = False
1055 1055
1056 return dataOut No newline at end of file
1056 return dataOut
@@ -8,8 +8,8 from time import time
8 8
9 9
10 10 @MPDecorator
11 class VoltageProc(ProcessingUnit):
12
11 class VoltageProc(ProcessingUnit):
12
13 13 def __init__(self):
14 14
15 15 ProcessingUnit.__init__(self)
@@ -115,7 +115,7 class VoltageProc(ProcessingUnit):
115 115 self.dataOut.data = data
116 116 # self.dataOut.channelList = [self.dataOut.channelList[i] for i in channelIndexList]
117 117 self.dataOut.channelList = range(len(channelIndexList))
118
118
119 119 return 1
120 120
121 121 def selectHeights(self, minHei=None, maxHei=None):
@@ -229,7 +229,7 class VoltageProc(ProcessingUnit):
229 229 """
230 230 Si la data es obtenida por bloques, dimension = [nChannels, nProfiles, nHeis]
231 231 """
232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
232 buffer = self.dataOut.data[:, :, 0:int(self.dataOut.nHeights-r)]
233 233 buffer = buffer.reshape(self.dataOut.nChannels, self.dataOut.nProfiles, int(self.dataOut.nHeights/window), window)
234 234 buffer = numpy.sum(buffer,3)
235 235
@@ -497,8 +497,8 class CohInt(Operation):
497 497 # print self.__bufferStride[self.__profIndexStride - 1]
498 498 # raise
499 499 return self.__bufferStride[self.__profIndexStride - 1]
500
501
500
501
502 502 return None, None
503 503
504 504 def integrate(self, data, datatime=None):
@@ -520,7 +520,7 class CohInt(Operation):
520 520 avgdatatime = self.__initime
521 521
522 522 deltatime = datatime - self.__lastdatatime
523
523
524 524 if not self.__withOverlapping:
525 525 self.__initime = datatime
526 526 else:
@@ -546,7 +546,7 class CohInt(Operation):
546 546 avgdatatime = (times - 1) * timeInterval + dataOut.utctime
547 547 self.__dataReady = True
548 548 return avgdata, avgdatatime
549
549
550 550 def run(self, dataOut, n=None, timeInterval=None, stride=None, overlapping=False, byblock=False, **kwargs):
551 551
552 552 if not self.isConfig:
@@ -560,12 +560,12 class CohInt(Operation):
560 560 avgdata, avgdatatime = self.integrateByBlock(dataOut)
561 561 dataOut.nProfiles /= self.n
562 562 else:
563 if stride is None:
563 if stride is None:
564 564 avgdata, avgdatatime = self.integrate(dataOut.data, dataOut.utctime)
565 565 else:
566 566 avgdata, avgdatatime = self.integrateByStride(dataOut.data, dataOut.utctime)
567 567
568
568
569 569 # dataOut.timeInterval *= n
570 570 dataOut.flagNoData = True
571 571
@@ -606,7 +606,6 class Decoder(Operation):
606 606
607 607 self.nCode = len(code)
608 608 self.nBaud = len(code[0])
609
610 609 if (osamp != None) and (osamp >1):
611 610 self.osamp = osamp
612 611 self.code = numpy.repeat(code, repeats=self.osamp, axis=1)
@@ -621,7 +620,7 class Decoder(Operation):
621 620
622 621 #Frequency
623 622 __codeBuffer = numpy.zeros((self.nCode, self.__nHeis), dtype=numpy.complex)
624
623
625 624 __codeBuffer[:,0:self.nBaud] = self.code
626 625
627 626 self.fft_code = numpy.conj(numpy.fft.fft(__codeBuffer, axis=1))
@@ -670,11 +669,11 class Decoder(Operation):
670 669 junk = junk.flatten()
671 670 code_block = numpy.reshape(junk, (self.nCode*repetitions, self.nBaud))
672 671 profilesList = range(self.__nProfiles)
673
674 for i in range(self.__nChannels):
675 for j in profilesList:
676 self.datadecTime[i,j,:] = numpy.correlate(data[i,j,:], code_block[j,:], mode='full')[self.nBaud-1:]
677 return self.datadecTime
672
673 for i in range(self.__nChannels):
674 for j in profilesList:
675 self.datadecTime[i,j,:] = numpy.correlate(data[i,j,:], code_block[j,:], mode='full')[self.nBaud-1:]
676 return self.datadecTime
678 677
679 678 def __convolutionByBlockInFreq(self, data):
680 679
@@ -691,7 +690,7 class Decoder(Operation):
691 690
692 691 return data
693 692
694
693
695 694 def run(self, dataOut, code=None, nCode=None, nBaud=None, mode = 0, osamp=None, times=None):
696 695
697 696 if dataOut.flagDecodeData:
@@ -722,7 +721,7 class Decoder(Operation):
722 721
723 722 self.__nProfiles = dataOut.nProfiles
724 723 datadec = None
725
724
726 725 if mode == 3:
727 726 mode = 0
728 727
@@ -1105,9 +1104,9 class SplitProfiles(Operation):
1105 1104
1106 1105 if shape[2] % n != 0:
1107 1106 raise ValueError("Could not split the data, n=%d has to be multiple of %d" %(n, shape[2]))
1108
1107
1109 1108 new_shape = shape[0], shape[1]*n, int(shape[2]/n)
1110
1109
1111 1110 dataOut.data = numpy.reshape(dataOut.data, new_shape)
1112 1111 dataOut.flagNoData = False
1113 1112
@@ -167,12 +167,12 class Remote(Thread):
167 167
168 168 self.mutex.acquire()
169 169 # init = time.time()
170 #
170 #
171 171 # while(self.bussy):
172 172 # sleep(0.1)
173 173 # if time.time() - init > 2*self.period:
174 174 # return 0
175
175
176 176 self.fileList = fileList
177 177 self.mutex.release()
178 178 return 1
@@ -195,7 +195,7 class Remote(Thread):
195 195
196 196 if self.stopFlag:
197 197 break
198
198
199 199 # self.bussy = True
200 200 self.mutex.acquire()
201 201
@@ -399,19 +399,19 class SSHClient(Remote):
399 399
400 400 """
401 401 This method is used to set SSH parameters and establish a connection to a remote server
402
402
403 403 Inputs:
404 server - remote server IP Address
405
406 username - remote server Username
407
404 server - remote server IP Address
405
406 username - remote server Username
407
408 408 password - remote server password
409
409
410 410 remotefolder - remote server current working directory
411
411
412 412 Return: void
413
414 Affects:
413
414 Affects:
415 415 self.status - in case of error or fail connection this parameter is set to 0 else 1
416 416
417 417 """
@@ -483,10 +483,10 class SSHClient(Remote):
483 483 def __execute(self, command):
484 484 """
485 485 __execute a command on remote server
486
486
487 487 Input:
488 488 command - Exmaple 'ls -l'
489
489
490 490 Return:
491 491 0 in error case else 1
492 492 """
@@ -508,10 +508,10 class SSHClient(Remote):
508 508 def mkdir(self, remotefolder):
509 509 """
510 510 mkdir is used to make a new directory in remote server
511
511
512 512 Input:
513 513 remotefolder - directory name
514
514
515 515 Return:
516 516 0 in error case else 1
517 517 """
@@ -529,14 +529,14 class SSHClient(Remote):
529 529 def cd(self, remotefolder):
530 530 """
531 531 cd is used to change remote working directory on server
532
532
533 533 Input:
534 534 remotefolder - current working directory
535
535
536 536 Affects:
537 537 self.remotefolder
538
539 Return:
538
539 Return:
540 540 0 in case of error else 1
541 541 """
542 542 if not self.status:
@@ -580,8 +580,8 class SendToServer(ProcessingUnit):
580 580 ProcessingUnit.__init__(self, **kwargs)
581 581
582 582 self.isConfig = False
583 self.clientObj = None
584
583 self.clientObj = None
584
585 585 def setup(self, server, username, password, remotefolder, localfolder, ext='.png', period=60, protocol='ftp', **kwargs):
586 586
587 587 self.clientObj = None
@@ -641,11 +641,11 class SendToServer(ProcessingUnit):
641 641 self.init = time.time()
642 642 self.setup(**kwargs)
643 643 self.isConfig = True
644
644
645 645 if not self.clientObj.is_alive():
646 646 print("[Remote Server]: Restarting connection ")
647 647 self.setup(**kwargs)
648
648
649 649 if time.time() - self.init >= self.period:
650 650 fullfilenameList = self.findFiles()
651 651
@@ -706,9 +706,9 class FTP(object):
706 706 try:
707 707 self.ftp = ftplib.FTP(self.server)
708 708 self.ftp.login(self.username,self.password)
709 self.ftp.cwd(self.remotefolder)
709 self.ftp.cwd(self.remotefolder)
710 710 # print 'Connect to FTP Server: Successfully'
711
711
712 712 except ftplib.all_errors:
713 713 print('Error FTP Service')
714 714 self.status = 1
@@ -1005,4 +1005,4 class SendByFTP(Operation):
1005 1005
1006 1006 self.counter = 0
1007 1007
1008 self.status = 1 No newline at end of file
1008 self.status = 1
@@ -47,7 +47,7 PLOT_CODES = {
47 47 def get_plot_code(s):
48 48 label = s.split('_')[0]
49 49 codes = [key for key in PLOT_CODES if key in label]
50 if codes:
50 if codes:
51 51 return PLOT_CODES[codes[0]]
52 52 else:
53 53 return 24
@@ -69,7 +69,7 class PublishData(Operation):
69 69 self.counter = 0
70 70 self.delay = kwargs.get('delay', 0)
71 71 self.cnt = 0
72 self.verbose = verbose
72 self.verbose = verbose
73 73 context = zmq.Context()
74 74 self.zmq_socket = context.socket(zmq.PUSH)
75 75 server = kwargs.get('server', 'zmq.pipe')
@@ -85,7 +85,7 class PublishData(Operation):
85 85
86 86 def publish_data(self):
87 87 self.dataOut.finished = False
88
88
89 89 if self.verbose:
90 90 log.log(
91 91 'Sending {} - {}'.format(self.dataOut.type, self.dataOut.datatime),
@@ -103,12 +103,12 class PublishData(Operation):
103 103 time.sleep(self.delay)
104 104
105 105 def close(self):
106
106
107 107 self.dataOut.finished = True
108 108 self.zmq_socket.send_pyobj(self.dataOut)
109 109 time.sleep(0.1)
110 110 self.zmq_socket.close()
111
111
112 112
113 113 class ReceiverData(ProcessingUnit):
114 114
@@ -195,7 +195,7 class SendToFTP(Operation):
195 195 self.ftp.close()
196 196 self.ftp = None
197 197 self.ready = False
198 return
198 return
199 199
200 200 try:
201 201 self.ftp.login(self.username, self.password)
@@ -244,8 +244,8 class SendToFTP(Operation):
244 244 def upload(self, src, dst):
245 245
246 246 log.log('Uploading {} -> {} '.format(
247 src.split('/')[-1], dst.split('/')[-1]),
248 self.name,
247 src.split('/')[-1], dst.split('/')[-1]),
248 self.name,
249 249 nl=False
250 250 )
251 251
@@ -273,7 +273,7 class SendToFTP(Operation):
273 273 fp.close()
274 274 log.success('OK', tag='')
275 275 return 1
276
276
277 277 def send_files(self):
278 278
279 279 for x, pattern in enumerate(self.patterns):
@@ -282,35 +282,35 class SendToFTP(Operation):
282 282 srcname = self.find_files(local, ext)
283 283 src = os.path.join(local, srcname)
284 284 if os.path.getmtime(src) < time.time() - 30*60:
285 log.warning('Skipping old file {}'.format(srcname))
285 log.warning('Skipping old file {}'.format(srcname))
286 286 continue
287 287
288 288 if srcname is None or srcname == self.latest[x]:
289 log.warning('File alreday uploaded {}'.format(srcname))
289 log.warning('File alreday uploaded {}'.format(srcname))
290 290 continue
291
291
292 292 if 'png' in ext:
293 293 dstname = self.getftpname(srcname, int(exp_code), int(sub_exp_code))
294 294 else:
295 dstname = srcname
296
295 dstname = srcname
296
297 297 dst = os.path.join(remote, dstname)
298 298
299 299 if self.upload(src, dst):
300 300 self.times[x] = time.time()
301 301 self.latest[x] = srcname
302 else:
302 else:
303 303 self.ready = False
304 break
304 break
305 305
306 306 def run(self, dataOut, server, username, password, timeout=10, **kwargs):
307 307
308 308 if not self.isConfig:
309 309 self.setup(
310 server=server,
311 username=username,
312 password=password,
313 timeout=timeout,
310 server=server,
311 username=username,
312 password=password,
313 timeout=timeout,
314 314 **kwargs
315 315 )
316 316 self.isConfig = True
General Comments 0
You need to be logged in to leave comments. Login now