##// END OF EJS Templates
add multiSchain (@jchavez)
jespinoza -
r892:30b3788062ca
parent child
Show More
@@ -0,0 +1,82
1 import argparse
2
3 from schainpy.controller import Project, multiSchain
4
5 desc = "HF_EXAMPLE"
6
7 def fiber(cursor, skip, q, dt):
8
9 controllerObj = Project()
10
11 controllerObj.setup(id='191', name='test01', description=desc)
12
13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
14 path='/data/workspace/data/julia/',
15 startDate=dt,
16 endDate=dt,
17 startTime="00:00:00",
18 endTime="23:59:59",
19 online=0,
20 #set=1426485881,
21 delay=10,
22 walk=1,
23 queue=q,
24 cursor=cursor,
25 skip=skip,
26 #timezone=-5*3600
27 )
28
29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
30 #
31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
33 #
34 # procUnitConfObj2 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
35
36 # opObj11 = procUnitConfObj2.addOperation(name='SpectralMoments', optype='other')
37
38 #
39 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
40 # opObj11.addParameter(name='id', value='1000', format='int')
41 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
42 # opObj11.addParameter(name='channelList', value='0', format='intlist')
43 # opObj11.addParameter(name='zmin', value='-120', format='float')
44 # opObj11.addParameter(name='zmax', value='-70', format='float')
45 # opObj11.addParameter(name='save', value='1', format='int')
46 # opObj11.addParameter(name='figpath', value=figpath, format='str')
47
48 # opObj11 = procUnitConfObj1.addOperation(name='RTIPlot', optype='other')
49 # opObj11.addParameter(name='id', value='2000', format='int')
50 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca', format='str')
51 # opObj11.addParameter(name='showprofile', value='0', format='int')
52 # opObj11.addParameter(name='channelList', value='0', format='intlist')
53 # # opObj11.addParameter(name='xmin', value='0', format='float')
54 # opObj11.addParameter(name='xmin', value='0', format='float')
55 # opObj11.addParameter(name='xmax', value='24', format='float')
56 #
57 # opObj11.addParameter(name='zmin', value='-110', format='float')
58 # opObj11.addParameter(name='zmax', value='-70', format='float')
59 # opObj11.addParameter(name='save', value='0', format='int')
60 # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
61
62 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
63 opObj12.addParameter(name='zeromq', value=1, format='int')
64
65 # print "Escribiendo el archivo XML"
66 # controllerObj.writeXml(filename)
67 # print "Leyendo el archivo XML"
68 # controllerObj.readXml(filename)
69
70 controllerObj.createObjects()
71 controllerObj.connectObjects()
72
73 # timeit.timeit('controllerObj.run()', number=2)
74
75 controllerObj.run()
76
77
78 if __name__ == '__main__':
79 parser = argparse.ArgumentParser(description='Set number of parallel processes')
80 parser.add_argument('--nProcess', default=2, type=int)
81 args = parser.parse_args()
82 multiSchain(fiber, nProcess=args.nProcess, startDate='2016/08/19', endDate='2016/08/20')
@@ -7,6 +7,8 import sys
7 import ast
7 import ast
8 import datetime
8 import datetime
9 import traceback
9 import traceback
10 from multiprocessing import Process, Queue, cpu_count
11
10 import schainpy
12 import schainpy
11 import schainpy.admin
13 import schainpy.admin
12
14
@@ -23,6 +25,51 def prettify(elem):
23 reparsed = minidom.parseString(rough_string)
25 reparsed = minidom.parseString(rough_string)
24 return reparsed.toprettyxml(indent=" ")
26 return reparsed.toprettyxml(indent=" ")
25
27
28 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None):
29 skip = 0
30 cursor = 0
31 nFiles = None
32 processes = []
33
34 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
35 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
36 days = (dt2 - dt1).days
37 print days
38 for day in range(days+1):
39 skip = 0
40 cursor = 0
41 q = Queue()
42 processes = []
43 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
44 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
45 firstProcess.start()
46 nFiles = q.get()
47 firstProcess.terminate()
48 skip = int(math.ceil(nFiles/nProcess))
49 try:
50 while True:
51 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
52 processes[cursor].start()
53 if nFiles < cursor*skip:
54 break
55 cursor += 1
56 except KeyboardInterrupt:
57 for process in processes:
58 process.terminate()
59 process.join()
60 for process in processes:
61 process.join()
62 #process.terminate()
63 sleep(3)
64
65 try:
66 while True:
67 pass
68 except KeyboardInterrupt:
69 for process in processes:
70 process.terminate()
71 process.join()
72
26 class ParameterConf():
73 class ParameterConf():
27
74
28 id = None
75 id = None
@@ -51,6 +98,9 class ParameterConf():
51
98
52 return self.__formated_value
99 return self.__formated_value
53
100
101 if format == 'obj':
102 return value
103
54 if format == 'str':
104 if format == 'str':
55 self.__formated_value = str(value)
105 self.__formated_value = str(value)
56 return self.__formated_value
106 return self.__formated_value
@@ -171,6 +221,9 class ParameterConf():
171
221
172 self.id = str(id)
222 self.id = str(id)
173 self.name = name
223 self.name = name
224 if format == 'obj':
225 self.value = value
226 else:
174 self.value = str(value)
227 self.value = str(value)
175 self.format = str.lower(format)
228 self.format = str.lower(format)
176
229
@@ -698,7 +751,7 class ReadUnitConf(ProcUnitConf):
698
751
699 return self.ELEMENTNAME
752 return self.ELEMENTNAME
700
753
701 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, **kwargs):
754 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
702
755
703 #Compatible with old signal chain version
756 #Compatible with old signal chain version
704 if datatype==None and name==None:
757 if datatype==None and name==None:
@@ -725,7 +778,7 class ReadUnitConf(ProcUnitConf):
725
778
726 self.inputId = '0'
779 self.inputId = '0'
727 self.parentId = parentId
780 self.parentId = parentId
728
781 self.queue = queue
729 self.addRunOperation(**kwargs)
782 self.addRunOperation(**kwargs)
730
783
731 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
784 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
@@ -84,7 +84,8 class PlotData(Operation, Process):
84 print 'plotting...{}'.format(self.CODE)
84 print 'plotting...{}'.format(self.CODE)
85
85
86 self.plot()
86 self.plot()
87 self.figure.suptitle('{} {}'.format(self.title, self.CODE.upper()))
87 self.figure.suptitle('{} {} - Date:{}'.format(self.title, self.CODE.upper(),
88 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')))
88
89
89 if self.save:
90 if self.save:
90 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
91 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
@@ -234,17 +235,17 class PlotSpectraData(PlotData):
234 ax.ax_profile.set_ylim(self.ymin, self.ymax)
235 ax.ax_profile.set_ylim(self.ymin, self.ymax)
235 ax.ax_profile.set_xlabel('dB')
236 ax.ax_profile.set_xlabel('dB')
236 ax.ax_profile.grid(b=True, axis='x')
237 ax.ax_profile.grid(b=True, axis='x')
238 ax.plot_noise = ax.ax_profile.plot(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y,
239 color="k", linestyle="dashed", lw=2)[0]
237 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
240 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
238 noise = 10*numpy.log10(self.data['rti'][self.max_time][n]/self.dataOut.normFactor)
239 ax.ax_profile.vlines(noise, self.ymin, self.ymax, colors="k", linestyle="dashed", lw=2)
240 else:
241 else:
241 ax.plot.set_array(z[n].T.ravel())
242 ax.plot.set_array(z[n].T.ravel())
242 ax.set_title('{} {}'.format(self.titles[n],
243 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
244 size=8)
245 if self.showprofile:
243 if self.showprofile:
246 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
244 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
245 ax.plot_noise.set_data(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y)
247
246
247 ax.set_title('{} - Noise: {:.2f} dB'.format(self.titles[n], self.data['noise'][self.max_time][n]),
248 size=8)
248
249
249 class PlotRTIData(PlotData):
250 class PlotRTIData(PlotData):
250
251
@@ -608,6 +608,9 class JRODataReader(JRODataIO):
608 set=None,
608 set=None,
609 expLabel='',
609 expLabel='',
610 ext='.r',
610 ext='.r',
611 queue=None,
612 cursor=None,
613 skip=None,
611 walk=True):
614 walk=True):
612
615
613 self.filenameList = []
616 self.filenameList = []
@@ -635,7 +638,21 class JRODataReader(JRODataIO):
635 fileList = glob.glob1(thisPath, "*%s" %ext)
638 fileList = glob.glob1(thisPath, "*%s" %ext)
636 fileList.sort()
639 fileList.sort()
637
640
638 for file in fileList:
641 skippedFileList = []
642
643 if cursor is not None and skip is not None:
644 # if cursor*skip > len(fileList):
645 if skip == 0:
646 if queue is not None:
647 queue.put(len(fileList))
648 skippedFileList = []
649 else:
650 skippedFileList = fileList[cursor*skip: cursor*skip + skip]
651
652 else:
653 skippedFileList = fileList
654
655 for file in skippedFileList:
639
656
640 filename = os.path.join(thisPath,file)
657 filename = os.path.join(thisPath,file)
641
658
@@ -1246,7 +1263,10 class JRODataReader(JRODataIO):
1246 nTxs = 1,
1263 nTxs = 1,
1247 realtime=False,
1264 realtime=False,
1248 blocksize=None,
1265 blocksize=None,
1249 blocktime=None):
1266 blocktime=None,
1267 queue=None,
1268 skip=None,
1269 cursor=None):
1250
1270
1251 if path == None:
1271 if path == None:
1252 raise ValueError, "[Reading] The path is not valid"
1272 raise ValueError, "[Reading] The path is not valid"
@@ -1282,7 +1302,8 class JRODataReader(JRODataIO):
1282 pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1302 pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1283 startTime=startTime, endTime=endTime,
1303 startTime=startTime, endTime=endTime,
1284 set=set, expLabel=expLabel, ext=ext,
1304 set=set, expLabel=expLabel, ext=ext,
1285 walk=walk)
1305 walk=walk, cursor=cursor,
1306 skip=skip, queue=queue)
1286
1307
1287 if not(pathList):
1308 if not(pathList):
1288 # print "[Reading] No *%s files in %s (%s - %s)"%(ext, path,
1309 # print "[Reading] No *%s files in %s (%s - %s)"%(ext, path,
@@ -1720,4 +1741,3 class JRODataWriter(JRODataIO):
1720 self.isConfig = True
1741 self.isConfig = True
1721
1742
1722 self.putData()
1743 self.putData()
1723
@@ -279,7 +279,7 class ReceiverData(ProcessingUnit, Process):
279 self.data['times'] = []
279 self.data['times'] = []
280 for plottype in self.plottypes:
280 for plottype in self.plottypes:
281 self.data[plottype] = {}
281 self.data[plottype] = {}
282
282 self.data['noise'] = {}
283 self.isConfig = True
283 self.isConfig = True
284
284
285 def event_monitor(self, monitor):
285 def event_monitor(self, monitor):
@@ -325,8 +325,8 class ReceiverData(ProcessingUnit, Process):
325
325
326 if plottype == 'spc':
326 if plottype == 'spc':
327 z = self.dataOut.data_spc/self.dataOut.normFactor
327 z = self.dataOut.data_spc/self.dataOut.normFactor
328 zdB = 10*numpy.log10(z)
328 self.data[plottype] = 10*numpy.log10(z)
329 self.data[plottype] = zdB
329 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
330 if plottype == 'rti':
330 if plottype == 'rti':
331 self.data[plottype][t] = self.dataOut.getPower()
331 self.data[plottype][t] = self.dataOut.getPower()
332 if plottype == 'snr':
332 if plottype == 'snr':
General Comments 0
You need to be logged in to leave comments. Login now