##// END OF EJS Templates
Fix CLI and remove imports warnings
Juan C. Espinoza -
r1284:f79a273a3a9b
parent child
Show More
@@ -1,236 +1,241
1 1 import click
2 2 import schainpy
3 3 import subprocess
4 4 import os
5 5 import sys
6 6 import glob
7 save_stdout = sys.stdout
8 sys.stdout = open('/dev/null', 'w')
9 7 from multiprocessing import cpu_count
10 8 from schainpy.controller import Project
11 9 from schainpy.model import Operation, ProcessingUnit
12 10 from schainpy.utils import log
13 11 from importlib import import_module
14 12 from pydoc import locate
15 13 from fuzzywuzzy import process
16 14 from schainpy.cli import templates
17 15 import inspect
18 16 try:
19 17 from queue import Queue
20 18 except:
21 19 from Queue import Queue
22 sys.stdout = save_stdout
23 20
24 21
25 22 def getProcs():
26 23 modules = dir(schainpy.model)
27 24 procs = check_module(modules, 'processing')
28 25 try:
29 26 procs.remove('ProcessingUnit')
30 27 except Exception as e:
31 28 pass
32 29 return procs
33 30
34 31 def getOperations():
35 32 module = dir(schainpy.model)
36 33 noProcs = [x for x in module if not x.endswith('Proc')]
37 34 operations = check_module(noProcs, 'operation')
38 35 try:
39 36 operations.remove('Operation')
40 37 operations.remove('Figure')
41 38 operations.remove('Plot')
42 39 except Exception as e:
43 40 pass
44 41 return operations
45 42
46 43 def getArgs(op):
47 44 module = locate('schainpy.model.{}'.format(op))
45 try:
46 obj = module(1,2,3,Queue(),5,6)
47 except:
48 obj = module()
48 49
49 if hasattr(module, '__attrs__'):
50 args = module.__attrs__
50 if hasattr(obj, '__attrs__'):
51 args = obj.__attrs__
52 else:
53 if hasattr(obj, 'myrun'):
54 args = inspect.getfullargspec(obj.myrun).args
51 55 else:
52 args = inspect.getargspec(module.run).args
56 args = inspect.getfullargspec(obj.run).args
57
53 58 try:
54 59 args.remove('self')
55 60 except Exception as e:
56 61 pass
57 62 try:
58 63 args.remove('dataOut')
59 64 except Exception as e:
60 65 pass
61 66 return args
62 67
63 68 def getDoc(obj):
64 69 module = locate('schainpy.model.{}'.format(obj))
65 70 try:
66 71 obj = module(1,2,3,Queue(),5,6)
67 72 except:
68 73 obj = module()
69 74 return obj.__doc__
70 75
71 76 def getAll():
72 77 modules = getOperations()
73 78 modules.extend(getProcs())
74 79 return modules
75 80
76 81
77 82 def print_version(ctx, param, value):
78 83 if not value or ctx.resilient_parsing:
79 84 return
80 85 click.echo(schainpy.__version__)
81 86 ctx.exit()
82 87
83 88
84 89 PREFIX = 'experiment'
85 90
86 91 @click.command()
87 92 @click.option('--version', '-v', is_flag=True, callback=print_version, help='SChain version', type=str)
88 93 @click.argument('command', default='run', required=True)
89 94 @click.argument('nextcommand', default=None, required=False, type=str)
90 95 def main(command, nextcommand, version):
91 96 """COMMAND LINE INTERFACE FOR SIGNAL CHAIN - JICAMARCA RADIO OBSERVATORY V3.0\n
92 97 Available commands.\n
93 98 xml: runs a schain XML generated file\n
94 99 run: runs any python script starting 'experiment_'\n
95 100 generate: generates a template schain script\n
96 101 list: return a list of available procs and operations\n
97 102 search: return avilable operations, procs or arguments of the given
98 103 operation/proc\n"""
99 104 if command == 'xml':
100 105 runFromXML(nextcommand)
101 106 elif command == 'generate':
102 107 generate()
103 108 elif command == 'test':
104 109 test()
105 110 elif command == 'run':
106 111 runschain(nextcommand)
107 112 elif command == 'search':
108 113 search(nextcommand)
109 114 elif command == 'list':
110 115 cmdlist(nextcommand)
111 116 else:
112 117 log.error('Command {} is not defined'.format(command))
113 118
114 119
115 120 def check_module(possible, instance):
116 121 def check(x):
117 122 try:
118 123 instancia = locate('schainpy.model.{}'.format(x))
119 124 ret = instancia.proc_type == instance
120 125 return ret
121 126 except Exception as e:
122 127 return False
123 128 clean = clean_modules(possible)
124 129 return [x for x in clean if check(x)]
125 130
126 131
127 132 def clean_modules(module):
128 133 noEndsUnder = [x for x in module if not x.endswith('__')]
129 134 noStartUnder = [x for x in noEndsUnder if not x.startswith('__')]
130 135 noFullUpper = [x for x in noStartUnder if not x.isupper()]
131 136 return noFullUpper
132 137
133 138 def cmdlist(nextcommand):
134 139 if nextcommand is None:
135 140 log.error('Missing argument, available arguments: procs, operations', '')
136 141 elif nextcommand == 'procs':
137 142 procs = getProcs()
138 143 log.success(
139 144 'Current ProcessingUnits are:\n {}'.format('\n '.join(procs)), '')
140 145 elif nextcommand == 'operations':
141 146 operations = getOperations()
142 147 log.success('Current Operations are:\n {}'.format(
143 148 '\n '.join(operations)), '')
144 149 else:
145 150 log.error('Wrong argument', '')
146 151
147 152 def search(nextcommand):
148 153 if nextcommand is None:
149 154 log.error('There is no Operation/ProcessingUnit to search', '')
150 155 else:
151 156 try:
152 157 args = getArgs(nextcommand)
153 158 doc = getDoc(nextcommand)
154 159 if len(args) == 0:
155 160 log.success('\n{} has no arguments'.format(nextcommand), '')
156 161 else:
157 162 log.success('{}\n{}\n\narguments:\n {}'.format(
158 163 nextcommand, doc, ', '.join(args)), '')
159 164 except Exception as e:
160 165 log.error('Module `{}` does not exists'.format(nextcommand), '')
161 166 allModules = getAll()
162 167 similar = [t[0] for t in process.extract(nextcommand, allModules, limit=12) if t[1]>80]
163 168 log.success('Possible modules are: {}'.format(', '.join(similar)), '')
164 169
165 170 def runschain(nextcommand):
166 171 if nextcommand is None:
167 172 currentfiles = glob.glob('./{}_*.py'.format(PREFIX))
168 173 numberfiles = len(currentfiles)
169 174 if numberfiles > 1:
170 175 log.error('There is more than one file to run')
171 176 elif numberfiles == 1:
172 177 subprocess.call(['python ' + currentfiles[0]], shell=True)
173 178 else:
174 179 log.error('There is no file to run')
175 180 else:
176 181 try:
177 182 subprocess.call(['python ' + nextcommand], shell=True)
178 183 except Exception as e:
179 184 log.error("I cannot run the file. Does it exists?")
180 185
181 186
182 187 def basicInputs():
183 188 inputs = {}
184 189 inputs['name'] = click.prompt(
185 190 'Name of the project', default="project", type=str)
186 191 inputs['desc'] = click.prompt(
187 192 'Enter a description', default="A schain project", type=str)
188 193 inputs['multiprocess'] = click.prompt(
189 194 '''Select data type:
190 195
191 196 - Voltage (*.r): [1]
192 197 - Spectra (*.pdata): [2]
193 198 - Voltage and Spectra (*.r): [3]
194 199
195 200 -->''', type=int)
196 201 inputs['path'] = click.prompt('Data path', default=os.getcwd(
197 202 ), type=click.Path(exists=True, resolve_path=True))
198 203 inputs['startDate'] = click.prompt(
199 204 'Start date', default='1970/01/01', type=str)
200 205 inputs['endDate'] = click.prompt(
201 206 'End date', default='2018/12/31', type=str)
202 207 inputs['startHour'] = click.prompt(
203 208 'Start hour', default='00:00:00', type=str)
204 209 inputs['endHour'] = click.prompt('End hour', default='23:59:59', type=str)
205 210 inputs['figpath'] = inputs['path'] + '/figs'
206 211 return inputs
207 212
208 213
209 214 def generate():
210 215 inputs = basicInputs()
211 216
212 217 if inputs['multiprocess'] == 1:
213 218 current = templates.voltage.format(**inputs)
214 219 elif inputs['multiprocess'] == 2:
215 220 current = templates.spectra.format(**inputs)
216 221 elif inputs['multiprocess'] == 3:
217 222 current = templates.voltagespectra.format(**inputs)
218 223 scriptname = '{}_{}.py'.format(PREFIX, inputs['name'])
219 224 script = open(scriptname, 'w')
220 225 try:
221 226 script.write(current)
222 227 log.success('Script {} generated'.format(scriptname))
223 228 except Exception as e:
224 229 log.error('I cannot create the file. Do you have writing permissions?')
225 230
226 231
227 232 def test():
228 233 log.warning('testing')
229 234
230 235
231 236 def runFromXML(filename):
232 237 controller = Project()
233 238 if not controller.readXml(filename):
234 239 return
235 240 controller.start()
236 241 return
@@ -1,794 +1,794
1 1
2 2 '''
3 3 Created on Jul 3, 2014
4 4
5 5 @author: roj-idl71
6 6 '''
7 7 # SUBCHANNELS EN VEZ DE CHANNELS
8 8 # BENCHMARKS -> PROBLEMAS CON ARCHIVOS GRANDES -> INCONSTANTE EN EL TIEMPO
9 9 # ACTUALIZACION DE VERSION
10 10 # HEADERS
11 11 # MODULO DE ESCRITURA
12 12 # METADATA
13 13
14 14 import os
15 15 import time
16 16 import datetime
17 17 import numpy
18 18 import timeit
19 19 from fractions import Fraction
20 20 from time import time
21 21 from time import sleep
22 22
23 23 import schainpy.admin
24 24 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
25 25 from schainpy.model.data.jrodata import Voltage
26 26 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
27 27
28 28 import pickle
29 29 try:
30 30 import digital_rf
31 31 except:
32 print('You should install "digital_rf" module if you want to read Digital RF data')
32 pass
33 33
34 34 @MPDecorator
35 35 class DigitalRFReader(ProcessingUnit):
36 36 '''
37 37 classdocs
38 38 '''
39 39
40 40 def __init__(self):
41 41 '''
42 42 Constructor
43 43 '''
44 44
45 45 ProcessingUnit.__init__(self)
46 46
47 47 self.dataOut = Voltage()
48 48 self.__printInfo = True
49 49 self.__flagDiscontinuousBlock = False
50 50 self.__bufferIndex = 9999999
51 51 self.__codeType = 0
52 52 self.__ippKm = None
53 53 self.__nCode = None
54 54 self.__nBaud = None
55 55 self.__code = None
56 56 self.dtype = None
57 57 self.oldAverage = None
58 58 self.path = None
59 59
60 60 def close(self):
61 61 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
62 62 return
63 63
64 64 def __getCurrentSecond(self):
65 65
66 66 return self.__thisUnixSample / self.__sample_rate
67 67
68 68 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
69 69
70 70 def __setFileHeader(self):
71 71 '''
72 72 In this method will be initialized every parameter of dataOut object (header, no data)
73 73 '''
74 74 ippSeconds = 1.0 * self.__nSamples / self.__sample_rate
75 75
76 76 nProfiles = 1.0 / ippSeconds # Number of profiles in one second
77 77
78 78 try:
79 79 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
80 80 self.__radarControllerHeader)
81 81 except:
82 82 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(
83 83 txA=0,
84 84 txB=0,
85 85 nWindows=1,
86 86 nHeights=self.__nSamples,
87 87 firstHeight=self.__firstHeigth,
88 88 deltaHeight=self.__deltaHeigth,
89 89 codeType=self.__codeType,
90 90 nCode=self.__nCode, nBaud=self.__nBaud,
91 91 code=self.__code)
92 92
93 93 try:
94 94 self.dataOut.systemHeaderObj = SystemHeader(self.__systemHeader)
95 95 except:
96 96 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
97 97 nProfiles=nProfiles,
98 98 nChannels=len(
99 99 self.__channelList),
100 100 adcResolution=14)
101 101 self.dataOut.type = "Voltage"
102 102
103 103 self.dataOut.data = None
104 104
105 105 self.dataOut.dtype = self.dtype
106 106
107 107 # self.dataOut.nChannels = 0
108 108
109 109 # self.dataOut.nHeights = 0
110 110
111 111 self.dataOut.nProfiles = int(nProfiles)
112 112
113 113 self.dataOut.heightList = self.__firstHeigth + \
114 114 numpy.arange(self.__nSamples, dtype=numpy.float) * \
115 115 self.__deltaHeigth
116 116
117 117 self.dataOut.channelList = list(range(self.__num_subchannels))
118 118
119 119 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
120 120
121 121 # self.dataOut.channelIndexList = None
122 122
123 123 self.dataOut.flagNoData = True
124 124
125 125 self.dataOut.flagDataAsBlock = False
126 126 # Set to TRUE if the data is discontinuous
127 127 self.dataOut.flagDiscontinuousBlock = False
128 128
129 129 self.dataOut.utctime = None
130 130
131 131 # timezone like jroheader, difference in minutes between UTC and localtime
132 132 self.dataOut.timeZone = self.__timezone / 60
133 133
134 134 self.dataOut.dstFlag = 0
135 135
136 136 self.dataOut.errorCount = 0
137 137
138 138 try:
139 139 self.dataOut.nCohInt = self.fixed_metadata_dict.get(
140 140 'nCohInt', self.nCohInt)
141 141
142 142 # asumo que la data esta decodificada
143 143 self.dataOut.flagDecodeData = self.fixed_metadata_dict.get(
144 144 'flagDecodeData', self.flagDecodeData)
145 145
146 146 # asumo que la data esta sin flip
147 147 self.dataOut.flagDeflipData = self.fixed_metadata_dict['flagDeflipData']
148 148
149 149 self.dataOut.flagShiftFFT = self.fixed_metadata_dict['flagShiftFFT']
150 150
151 151 self.dataOut.useLocalTime = self.fixed_metadata_dict['useLocalTime']
152 152 except:
153 153 pass
154 154
155 155 self.dataOut.ippSeconds = ippSeconds
156 156
157 157 # Time interval between profiles
158 158 # self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
159 159
160 160 self.dataOut.frequency = self.__frequency
161 161
162 162 self.dataOut.realtime = self.__online
163 163
164 164 def findDatafiles(self, path, startDate=None, endDate=None):
165 165
166 166 if not os.path.isdir(path):
167 167 return []
168 168
169 169 try:
170 170 digitalReadObj = digital_rf.DigitalRFReader(
171 171 path, load_all_metadata=True)
172 172 except:
173 173 digitalReadObj = digital_rf.DigitalRFReader(path)
174 174
175 175 channelNameList = digitalReadObj.get_channels()
176 176
177 177 if not channelNameList:
178 178 return []
179 179
180 180 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
181 181
182 182 sample_rate = metadata_dict['sample_rate'][0]
183 183
184 184 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
185 185
186 186 try:
187 187 timezone = this_metadata_file['timezone'].value
188 188 except:
189 189 timezone = 0
190 190
191 191 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(
192 192 channelNameList[0]) / sample_rate - timezone
193 193
194 194 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
195 195 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
196 196
197 197 if not startDate:
198 198 startDate = startDatetime.date()
199 199
200 200 if not endDate:
201 201 endDate = endDatatime.date()
202 202
203 203 dateList = []
204 204
205 205 thisDatetime = startDatetime
206 206
207 207 while(thisDatetime <= endDatatime):
208 208
209 209 thisDate = thisDatetime.date()
210 210
211 211 if thisDate < startDate:
212 212 continue
213 213
214 214 if thisDate > endDate:
215 215 break
216 216
217 217 dateList.append(thisDate)
218 218 thisDatetime += datetime.timedelta(1)
219 219
220 220 return dateList
221 221
222 222 def setup(self, path=None,
223 223 startDate=None,
224 224 endDate=None,
225 225 startTime=datetime.time(0, 0, 0),
226 226 endTime=datetime.time(23, 59, 59),
227 227 channelList=None,
228 228 nSamples=None,
229 229 online=False,
230 230 delay=60,
231 231 buffer_size=1024,
232 232 ippKm=None,
233 233 nCohInt=1,
234 234 nCode=1,
235 235 nBaud=1,
236 236 flagDecodeData=False,
237 237 code=numpy.ones((1, 1), dtype=numpy.int),
238 238 **kwargs):
239 239 '''
240 240 In this method we should set all initial parameters.
241 241
242 242 Inputs:
243 243 path
244 244 startDate
245 245 endDate
246 246 startTime
247 247 endTime
248 248 set
249 249 expLabel
250 250 ext
251 251 online
252 252 delay
253 253 '''
254 254 self.path = path
255 255 self.nCohInt = nCohInt
256 256 self.flagDecodeData = flagDecodeData
257 257 self.i = 0
258 258 if not os.path.isdir(path):
259 259 raise ValueError("[Reading] Directory %s does not exist" % path)
260 260
261 261 try:
262 262 self.digitalReadObj = digital_rf.DigitalRFReader(
263 263 path, load_all_metadata=True)
264 264 except:
265 265 self.digitalReadObj = digital_rf.DigitalRFReader(path)
266 266
267 267 channelNameList = self.digitalReadObj.get_channels()
268 268
269 269 if not channelNameList:
270 270 raise ValueError("[Reading] Directory %s does not have any files" % path)
271 271
272 272 if not channelList:
273 273 channelList = list(range(len(channelNameList)))
274 274
275 275 ########## Reading metadata ######################
276 276
277 277 top_properties = self.digitalReadObj.get_properties(
278 278 channelNameList[channelList[0]])
279 279
280 280 self.__num_subchannels = top_properties['num_subchannels']
281 281 self.__sample_rate = 1.0 * \
282 282 top_properties['sample_rate_numerator'] / \
283 283 top_properties['sample_rate_denominator']
284 284 # self.__samples_per_file = top_properties['samples_per_file'][0]
285 285 self.__deltaHeigth = 1e6 * 0.15 / self.__sample_rate # why 0.15?
286 286
287 287 this_metadata_file = self.digitalReadObj.get_digital_metadata(
288 288 channelNameList[channelList[0]])
289 289 metadata_bounds = this_metadata_file.get_bounds()
290 290 self.fixed_metadata_dict = this_metadata_file.read(
291 291 metadata_bounds[0])[metadata_bounds[0]] # GET FIRST HEADER
292 292
293 293 try:
294 294 self.__processingHeader = self.fixed_metadata_dict['processingHeader']
295 295 self.__radarControllerHeader = self.fixed_metadata_dict['radarControllerHeader']
296 296 self.__systemHeader = self.fixed_metadata_dict['systemHeader']
297 297 self.dtype = pickle.loads(self.fixed_metadata_dict['dtype'])
298 298 except:
299 299 pass
300 300
301 301 self.__frequency = None
302 302
303 303 self.__frequency = self.fixed_metadata_dict.get('frequency', 1)
304 304
305 305 self.__timezone = self.fixed_metadata_dict.get('timezone', 18000)
306 306
307 307 try:
308 308 nSamples = self.fixed_metadata_dict['nSamples']
309 309 except:
310 310 nSamples = None
311 311
312 312 self.__firstHeigth = 0
313 313
314 314 try:
315 315 codeType = self.__radarControllerHeader['codeType']
316 316 except:
317 317 codeType = 0
318 318
319 319 try:
320 320 if codeType:
321 321 nCode = self.__radarControllerHeader['nCode']
322 322 nBaud = self.__radarControllerHeader['nBaud']
323 323 code = self.__radarControllerHeader['code']
324 324 except:
325 325 pass
326 326
327 327 if not ippKm:
328 328 try:
329 329 # seconds to km
330 330 ippKm = self.__radarControllerHeader['ipp']
331 331 except:
332 332 ippKm = None
333 333 ####################################################
334 334 self.__ippKm = ippKm
335 335 startUTCSecond = None
336 336 endUTCSecond = None
337 337
338 338 if startDate:
339 339 startDatetime = datetime.datetime.combine(startDate, startTime)
340 340 startUTCSecond = (
341 341 startDatetime - datetime.datetime(1970, 1, 1)).total_seconds() + self.__timezone
342 342
343 343 if endDate:
344 344 endDatetime = datetime.datetime.combine(endDate, endTime)
345 345 endUTCSecond = (endDatetime - datetime.datetime(1970,
346 346 1, 1)).total_seconds() + self.__timezone
347 347
348 348 start_index, end_index = self.digitalReadObj.get_bounds(
349 349 channelNameList[channelList[0]])
350 350
351 351 if not startUTCSecond:
352 352 startUTCSecond = start_index / self.__sample_rate
353 353
354 354 if start_index > startUTCSecond * self.__sample_rate:
355 355 startUTCSecond = start_index / self.__sample_rate
356 356
357 357 if not endUTCSecond:
358 358 endUTCSecond = end_index / self.__sample_rate
359 359
360 360 if end_index < endUTCSecond * self.__sample_rate:
361 361 endUTCSecond = end_index / self.__sample_rate
362 362 if not nSamples:
363 363 if not ippKm:
364 364 raise ValueError("[Reading] nSamples or ippKm should be defined")
365 365 nSamples = int(ippKm / (1e6 * 0.15 / self.__sample_rate))
366 366 channelBoundList = []
367 367 channelNameListFiltered = []
368 368
369 369 for thisIndexChannel in channelList:
370 370 thisChannelName = channelNameList[thisIndexChannel]
371 371 start_index, end_index = self.digitalReadObj.get_bounds(
372 372 thisChannelName)
373 373 channelBoundList.append((start_index, end_index))
374 374 channelNameListFiltered.append(thisChannelName)
375 375
376 376 self.profileIndex = 0
377 377 self.i = 0
378 378 self.__delay = delay
379 379
380 380 self.__codeType = codeType
381 381 self.__nCode = nCode
382 382 self.__nBaud = nBaud
383 383 self.__code = code
384 384
385 385 self.__datapath = path
386 386 self.__online = online
387 387 self.__channelList = channelList
388 388 self.__channelNameList = channelNameListFiltered
389 389 self.__channelBoundList = channelBoundList
390 390 self.__nSamples = nSamples
391 391 self.__samples_to_read = int(nSamples) # FIJO: AHORA 40
392 392 self.__nChannels = len(self.__channelList)
393 393
394 394 self.__startUTCSecond = startUTCSecond
395 395 self.__endUTCSecond = endUTCSecond
396 396
397 397 self.__timeInterval = 1.0 * self.__samples_to_read / \
398 398 self.__sample_rate # Time interval
399 399
400 400 if online:
401 401 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
402 402 startUTCSecond = numpy.floor(endUTCSecond)
403 403
404 404 # por que en el otro metodo lo primero q se hace es sumar samplestoread
405 405 self.__thisUnixSample = int(startUTCSecond * self.__sample_rate) - self.__samples_to_read
406 406
407 407 self.__data_buffer = numpy.zeros(
408 408 (self.__num_subchannels, self.__samples_to_read), dtype=numpy.complex)
409 409
410 410 self.__setFileHeader()
411 411 self.isConfig = True
412 412
413 413 print("[Reading] Digital RF Data was found from %s to %s " % (
414 414 datetime.datetime.utcfromtimestamp(
415 415 self.__startUTCSecond - self.__timezone),
416 416 datetime.datetime.utcfromtimestamp(
417 417 self.__endUTCSecond - self.__timezone)
418 418 ))
419 419
420 420 print("[Reading] Starting process from %s to %s" % (datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
421 421 datetime.datetime.utcfromtimestamp(
422 422 endUTCSecond - self.__timezone)
423 423 ))
424 424 self.oldAverage = None
425 425 self.count = 0
426 426 self.executionTime = 0
427 427
428 428 def __reload(self):
429 429 # print
430 430 # print "%s not in range [%s, %s]" %(
431 431 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
432 432 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
433 433 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
434 434 # )
435 435 print("[Reading] reloading metadata ...")
436 436
437 437 try:
438 438 self.digitalReadObj.reload(complete_update=True)
439 439 except:
440 440 self.digitalReadObj = digital_rf.DigitalRFReader(self.path)
441 441
442 442 start_index, end_index = self.digitalReadObj.get_bounds(
443 443 self.__channelNameList[self.__channelList[0]])
444 444
445 445 if start_index > self.__startUTCSecond * self.__sample_rate:
446 446 self.__startUTCSecond = 1.0 * start_index / self.__sample_rate
447 447
448 448 if end_index > self.__endUTCSecond * self.__sample_rate:
449 449 self.__endUTCSecond = 1.0 * end_index / self.__sample_rate
450 450 print()
451 451 print("[Reading] New timerange found [%s, %s] " % (
452 452 datetime.datetime.utcfromtimestamp(
453 453 self.__startUTCSecond - self.__timezone),
454 454 datetime.datetime.utcfromtimestamp(
455 455 self.__endUTCSecond - self.__timezone)
456 456 ))
457 457
458 458 return True
459 459
460 460 return False
461 461
462 462 def timeit(self, toExecute):
463 463 t0 = time.time()
464 464 toExecute()
465 465 self.executionTime = time.time() - t0
466 466 if self.oldAverage is None:
467 467 self.oldAverage = self.executionTime
468 468 self.oldAverage = (self.executionTime + self.count *
469 469 self.oldAverage) / (self.count + 1.0)
470 470 self.count = self.count + 1.0
471 471 return
472 472
473 473 def __readNextBlock(self, seconds=30, volt_scale=1):
474 474 '''
475 475 '''
476 476
477 477 # Set the next data
478 478 self.__flagDiscontinuousBlock = False
479 479 self.__thisUnixSample += self.__samples_to_read
480 480
481 481 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
482 482 print ("[Reading] There are no more data into selected time-range")
483 483 if self.__online:
484 484 sleep(3)
485 485 self.__reload()
486 486 else:
487 487 return False
488 488
489 489 if self.__thisUnixSample + 2 * self.__samples_to_read > self.__endUTCSecond * self.__sample_rate:
490 490 return False
491 491 self.__thisUnixSample -= self.__samples_to_read
492 492
493 493 indexChannel = 0
494 494
495 495 dataOk = False
496 496
497 497 for thisChannelName in self.__channelNameList: # TODO VARIOS CHANNELS?
498 498 for indexSubchannel in range(self.__num_subchannels):
499 499 try:
500 500 t0 = time()
501 501 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
502 502 self.__samples_to_read,
503 503 thisChannelName, sub_channel=indexSubchannel)
504 504 self.executionTime = time() - t0
505 505 if self.oldAverage is None:
506 506 self.oldAverage = self.executionTime
507 507 self.oldAverage = (
508 508 self.executionTime + self.count * self.oldAverage) / (self.count + 1.0)
509 509 self.count = self.count + 1.0
510 510
511 511 except IOError as e:
512 512 # read next profile
513 513 self.__flagDiscontinuousBlock = True
514 514 print("[Reading] %s" % datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
515 515 break
516 516
517 517 if result.shape[0] != self.__samples_to_read:
518 518 self.__flagDiscontinuousBlock = True
519 519 print("[Reading] %s: Too few samples were found, just %d/%d samples" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
520 520 result.shape[0],
521 521 self.__samples_to_read))
522 522 break
523 523
524 524 self.__data_buffer[indexSubchannel, :] = result * volt_scale
525 525 indexChannel+=1
526 526
527 527 dataOk = True
528 528
529 529 self.__utctime = self.__thisUnixSample / self.__sample_rate
530 530
531 531 if not dataOk:
532 532 return False
533 533
534 534 print("[Reading] %s: %d samples <> %f sec" % (datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
535 535 self.__samples_to_read,
536 536 self.__timeInterval))
537 537
538 538 self.__bufferIndex = 0
539 539
540 540 return True
541 541
542 542 def __isBufferEmpty(self):
543 543 return self.__bufferIndex > self.__samples_to_read - self.__nSamples # 40960 - 40
544 544
545 545 def getData(self, seconds=30, nTries=5):
546 546 '''
547 547 This method gets the data from files and put the data into the dataOut object
548 548
549 549 In addition, increase el the buffer counter in one.
550 550
551 551 Return:
552 552 data : retorna un perfil de voltages (alturas * canales) copiados desde el
553 553 buffer. Si no hay mas archivos a leer retorna None.
554 554
555 555 Affected:
556 556 self.dataOut
557 557 self.profileIndex
558 558 self.flagDiscontinuousBlock
559 559 self.flagIsNewBlock
560 560 '''
561 561 #print("getdata")
562 562 err_counter = 0
563 563 self.dataOut.flagNoData = True
564 564
565 565 if self.__isBufferEmpty():
566 566 #print("hi")
567 567 self.__flagDiscontinuousBlock = False
568 568
569 569 while True:
570 570 #print ("q ha pasado")
571 571 if self.__readNextBlock():
572 572 break
573 573 if self.__thisUnixSample > self.__endUTCSecond * self.__sample_rate:
574 574 raise schainpy.admin.SchainError('Error')
575 575 return
576 576
577 577 if self.__flagDiscontinuousBlock:
578 578 raise schainpy.admin.SchainError('discontinuous block found')
579 579 return
580 580
581 581 if not self.__online:
582 582 raise schainpy.admin.SchainError('Online?')
583 583 return
584 584
585 585 err_counter += 1
586 586 if err_counter > nTries:
587 587 raise schainpy.admin.SchainError('Max retrys reach')
588 588 return
589 589
590 590 print('[Reading] waiting %d seconds to read a new block' % seconds)
591 591 time.sleep(seconds)
592 592
593 593 self.dataOut.data = self.__data_buffer[:, self.__bufferIndex:self.__bufferIndex + self.__nSamples]
594 594 self.dataOut.utctime = ( self.__thisUnixSample + self.__bufferIndex) / self.__sample_rate
595 595 self.dataOut.flagNoData = False
596 596 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
597 597 self.dataOut.profileIndex = self.profileIndex
598 598
599 599 self.__bufferIndex += self.__nSamples
600 600 self.profileIndex += 1
601 601
602 602 if self.profileIndex == self.dataOut.nProfiles:
603 603 self.profileIndex = 0
604 604
605 605 return True
606 606
607 607 def printInfo(self):
608 608 '''
609 609 '''
610 610 if self.__printInfo == False:
611 611 return
612 612
613 613 # self.systemHeaderObj.printInfo()
614 614 # self.radarControllerHeaderObj.printInfo()
615 615
616 616 self.__printInfo = False
617 617
618 618 def printNumberOfBlock(self):
619 619 '''
620 620 '''
621 621 return
622 622 # print self.profileIndex
623 623
624 624 def run(self, **kwargs):
625 625 '''
626 626 This method will be called many times so here you should put all your code
627 627 '''
628 628
629 629 if not self.isConfig:
630 630 self.setup(**kwargs)
631 631 #self.i = self.i+1
632 632 self.getData(seconds=self.__delay)
633 633
634 634 return
635 635
636 636
637 637 class DigitalRFWriter(Operation):
638 638 '''
639 639 classdocs
640 640 '''
641 641
642 642 def __init__(self, **kwargs):
643 643 '''
644 644 Constructor
645 645 '''
646 646 Operation.__init__(self, **kwargs)
647 647 self.metadata_dict = {}
648 648 self.dataOut = None
649 649 self.dtype = None
650 650 self.oldAverage = 0
651 651
652 652 def setHeader(self):
653 653
654 654 self.metadata_dict['frequency'] = self.dataOut.frequency
655 655 self.metadata_dict['timezone'] = self.dataOut.timeZone
656 656 self.metadata_dict['dtype'] = pickle.dumps(self.dataOut.dtype)
657 657 self.metadata_dict['nProfiles'] = self.dataOut.nProfiles
658 658 self.metadata_dict['heightList'] = self.dataOut.heightList
659 659 self.metadata_dict['channelList'] = self.dataOut.channelList
660 660 self.metadata_dict['flagDecodeData'] = self.dataOut.flagDecodeData
661 661 self.metadata_dict['flagDeflipData'] = self.dataOut.flagDeflipData
662 662 self.metadata_dict['flagShiftFFT'] = self.dataOut.flagShiftFFT
663 663 self.metadata_dict['useLocalTime'] = self.dataOut.useLocalTime
664 664 self.metadata_dict['nCohInt'] = self.dataOut.nCohInt
665 665 self.metadata_dict['type'] = self.dataOut.type
666 666 self.metadata_dict['flagDataAsBlock']= getattr(
667 667 self.dataOut, 'flagDataAsBlock', None) # chequear
668 668
669 669 def setup(self, dataOut, path, frequency, fileCadence, dirCadence, metadataCadence, set=0, metadataFile='metadata', ext='.h5'):
670 670 '''
671 671 In this method we should set all initial parameters.
672 672 Input:
673 673 dataOut: Input data will also be outputa data
674 674 '''
675 675 self.setHeader()
676 676 self.__ippSeconds = dataOut.ippSeconds
677 677 self.__deltaH = dataOut.getDeltaH()
678 678 self.__sample_rate = 1e6 * 0.15 / self.__deltaH
679 679 self.__dtype = dataOut.dtype
680 680 if len(dataOut.dtype) == 2:
681 681 self.__dtype = dataOut.dtype[0]
682 682 self.__nSamples = dataOut.systemHeaderObj.nSamples
683 683 self.__nProfiles = dataOut.nProfiles
684 684
685 685 if self.dataOut.type != 'Voltage':
686 686 raise 'Digital RF cannot be used with this data type'
687 687 self.arr_data = numpy.ones((1, dataOut.nFFTPoints * len(
688 688 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
689 689 else:
690 690 self.arr_data = numpy.ones((self.__nSamples, len(
691 691 self.dataOut.channelList)), dtype=[('r', self.__dtype), ('i', self.__dtype)])
692 692
693 693 file_cadence_millisecs = 1000
694 694
695 695 sample_rate_fraction = Fraction(self.__sample_rate).limit_denominator()
696 696 sample_rate_numerator = int(sample_rate_fraction.numerator)
697 697 sample_rate_denominator = int(sample_rate_fraction.denominator)
698 698 start_global_index = dataOut.utctime * self.__sample_rate
699 699
700 700 uuid = 'prueba'
701 701 compression_level = 0
702 702 checksum = False
703 703 is_complex = True
704 704 num_subchannels = len(dataOut.channelList)
705 705 is_continuous = True
706 706 marching_periods = False
707 707
708 708 self.digitalWriteObj = digital_rf.DigitalRFWriter(path, self.__dtype, dirCadence,
709 709 fileCadence, start_global_index,
710 710 sample_rate_numerator, sample_rate_denominator, uuid, compression_level, checksum,
711 711 is_complex, num_subchannels, is_continuous, marching_periods)
712 712 metadata_dir = os.path.join(path, 'metadata')
713 713 os.system('mkdir %s' % (metadata_dir))
714 714 self.digitalMetadataWriteObj = digital_rf.DigitalMetadataWriter(metadata_dir, dirCadence, 1, # 236, file_cadence_millisecs / 1000
715 715 sample_rate_numerator, sample_rate_denominator,
716 716 metadataFile)
717 717 self.isConfig = True
718 718 self.currentSample = 0
719 719 self.oldAverage = 0
720 720 self.count = 0
721 721 return
722 722
723 723 def writeMetadata(self):
724 724 start_idx = self.__sample_rate * self.dataOut.utctime
725 725
726 726 self.metadata_dict['processingHeader'] = self.dataOut.processingHeaderObj.getAsDict(
727 727 )
728 728 self.metadata_dict['radarControllerHeader'] = self.dataOut.radarControllerHeaderObj.getAsDict(
729 729 )
730 730 self.metadata_dict['systemHeader'] = self.dataOut.systemHeaderObj.getAsDict(
731 731 )
732 732 self.digitalMetadataWriteObj.write(start_idx, self.metadata_dict)
733 733 return
734 734
735 735 def timeit(self, toExecute):
736 736 t0 = time()
737 737 toExecute()
738 738 self.executionTime = time() - t0
739 739 if self.oldAverage is None:
740 740 self.oldAverage = self.executionTime
741 741 self.oldAverage = (self.executionTime + self.count *
742 742 self.oldAverage) / (self.count + 1.0)
743 743 self.count = self.count + 1.0
744 744 return
745 745
746 746 def writeData(self):
747 747 if self.dataOut.type != 'Voltage':
748 748 raise 'Digital RF cannot be used with this data type'
749 749 for channel in self.dataOut.channelList:
750 750 for i in range(self.dataOut.nFFTPoints):
751 751 self.arr_data[1][channel * self.dataOut.nFFTPoints +
752 752 i]['r'] = self.dataOut.data[channel][i].real
753 753 self.arr_data[1][channel * self.dataOut.nFFTPoints +
754 754 i]['i'] = self.dataOut.data[channel][i].imag
755 755 else:
756 756 for i in range(self.dataOut.systemHeaderObj.nSamples):
757 757 for channel in self.dataOut.channelList:
758 758 self.arr_data[i][channel]['r'] = self.dataOut.data[channel][i].real
759 759 self.arr_data[i][channel]['i'] = self.dataOut.data[channel][i].imag
760 760
761 761 def f(): return self.digitalWriteObj.rf_write(self.arr_data)
762 762 self.timeit(f)
763 763
764 764 return
765 765
766 766 def run(self, dataOut, frequency=49.92e6, path=None, fileCadence=1000, dirCadence=36000, metadataCadence=1, **kwargs):
767 767 '''
768 768 This method will be called many times so here you should put all your code
769 769 Inputs:
770 770 dataOut: object with the data
771 771 '''
772 772 # print dataOut.__dict__
773 773 self.dataOut = dataOut
774 774 if not self.isConfig:
775 775 self.setup(dataOut, path, frequency, fileCadence,
776 776 dirCadence, metadataCadence, **kwargs)
777 777 self.writeMetadata()
778 778
779 779 self.writeData()
780 780
781 781 ## self.currentSample += 1
782 782 # if self.dataOut.flagDataAsBlock or self.currentSample == 1:
783 783 # self.writeMetadata()
784 784 ## if self.currentSample == self.__nProfiles: self.currentSample = 0
785 785
786 786 return dataOut# en la version 2.7 no aparece este return
787 787
788 788 def close(self):
789 789 print('[Writing] - Closing files ')
790 790 print('Average of writing to digital rf format is ', self.oldAverage * 1000)
791 791 try:
792 792 self.digitalWriteObj.close()
793 793 except:
794 794 pass
@@ -1,849 +1,849
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6
7 7 import os, sys
8 8 import time, datetime
9 9 import numpy
10 10 import fnmatch
11 11 import glob
12 12 from time import sleep
13 13
14 14 try:
15 15 import pyfits
16 16 except ImportError as e:
17 print("Fits data cannot be used. Install pyfits module")
17 pass
18 18
19 19 from xml.etree.ElementTree import ElementTree
20 20
21 21 from .jroIO_base import isRadarFolder, isNumber
22 22 from schainpy.model.data.jrodata import Fits
23 23 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit, MPDecorator
24 24 from schainpy.utils import log
25 25
26 26
27 27 class PyFits(object):
28 28 name=None
29 29 format=None
30 30 array =None
31 31 data =None
32 32 thdulist=None
33 33 prihdr=None
34 34 hdu=None
35 35
36 36 def __init__(self):
37 37
38 38 pass
39 39
40 40 def setColF(self,name,format,array):
41 41 self.name=name
42 42 self.format=format
43 43 self.array=array
44 44 a1=numpy.array([self.array],dtype=numpy.float32)
45 45 self.col1 = pyfits.Column(name=self.name, format=self.format, array=a1)
46 46 return self.col1
47 47
48 48 # def setColP(self,name,format,data):
49 49 # self.name=name
50 50 # self.format=format
51 51 # self.data=data
52 52 # a2=numpy.array([self.data],dtype=numpy.float32)
53 53 # self.col2 = pyfits.Column(name=self.name, format=self.format, array=a2)
54 54 # return self.col2
55 55
56 56
57 57 def writeData(self,name,format,data):
58 58 self.name=name
59 59 self.format=format
60 60 self.data=data
61 61 a2=numpy.array([self.data],dtype=numpy.float32)
62 62 self.col2 = pyfits.Column(name=self.name, format=self.format, array=a2)
63 63 return self.col2
64 64
65 65 def cFImage(self,idblock,year,month,day,hour,minute,second):
66 66 self.hdu= pyfits.PrimaryHDU(idblock)
67 67 self.hdu.header.set("Year",year)
68 68 self.hdu.header.set("Month",month)
69 69 self.hdu.header.set("Day",day)
70 70 self.hdu.header.set("Hour",hour)
71 71 self.hdu.header.set("Minute",minute)
72 72 self.hdu.header.set("Second",second)
73 73 return self.hdu
74 74
75 75
76 76 def Ctable(self,colList):
77 77 self.cols=pyfits.ColDefs(colList)
78 78 self.tbhdu = pyfits.new_table(self.cols)
79 79 return self.tbhdu
80 80
81 81
82 82 def CFile(self,hdu,tbhdu):
83 83 self.thdulist=pyfits.HDUList([hdu,tbhdu])
84 84
85 85 def wFile(self,filename):
86 86 if os.path.isfile(filename):
87 87 os.remove(filename)
88 88 self.thdulist.writeto(filename)
89 89
90 90
91 91 class ParameterConf:
92 92 ELEMENTNAME = 'Parameter'
93 93 def __init__(self):
94 94 self.name = ''
95 95 self.value = ''
96 96
97 97 def readXml(self, parmElement):
98 98 self.name = parmElement.get('name')
99 99 self.value = parmElement.get('value')
100 100
101 101 def getElementName(self):
102 102 return self.ELEMENTNAME
103 103
104 104 class Metadata(object):
105 105
106 106 def __init__(self, filename):
107 107 self.parmConfObjList = []
108 108 self.readXml(filename)
109 109
110 110 def readXml(self, filename):
111 111 self.projectElement = None
112 112 self.procUnitConfObjDict = {}
113 113 self.projectElement = ElementTree().parse(filename)
114 114 self.project = self.projectElement.tag
115 115
116 116 parmElementList = self.projectElement.getiterator(ParameterConf().getElementName())
117 117
118 118 for parmElement in parmElementList:
119 119 parmConfObj = ParameterConf()
120 120 parmConfObj.readXml(parmElement)
121 121 self.parmConfObjList.append(parmConfObj)
122 122
123 123 class FitsWriter(Operation):
124 124 def __init__(self, **kwargs):
125 125 Operation.__init__(self, **kwargs)
126 126 self.isConfig = False
127 127 self.dataBlocksPerFile = None
128 128 self.blockIndex = 0
129 129 self.flagIsNewFile = 1
130 130 self.fitsObj = None
131 131 self.optchar = 'P'
132 132 self.ext = '.fits'
133 133 self.setFile = 0
134 134
135 135 def setFitsHeader(self, dataOut, metadatafile=None):
136 136
137 137 header_data = pyfits.PrimaryHDU()
138 138
139 139 header_data.header['EXPNAME'] = "RADAR DATA"
140 140 header_data.header['DATATYPE'] = "SPECTRA"
141 141 header_data.header['COMMENT'] = ""
142 142
143 143 if metadatafile:
144 144
145 145 metadata4fits = Metadata(metadatafile)
146 146
147 147 for parameter in metadata4fits.parmConfObjList:
148 148 parm_name = parameter.name
149 149 parm_value = parameter.value
150 150
151 151 header_data.header[parm_name] = parm_value
152 152
153 153 header_data.header['DATETIME'] = time.strftime("%b %d %Y %H:%M:%S", dataOut.datatime.timetuple())
154 154 header_data.header['CHANNELLIST'] = str(dataOut.channelList)
155 155 header_data.header['NCHANNELS'] = dataOut.nChannels
156 156 #header_data.header['HEIGHTS'] = dataOut.heightList
157 157 header_data.header['NHEIGHTS'] = dataOut.nHeights
158 158
159 159 header_data.header['IPPSECONDS'] = dataOut.ippSeconds
160 160 header_data.header['NCOHINT'] = dataOut.nCohInt
161 161 header_data.header['NINCOHINT'] = dataOut.nIncohInt
162 162 header_data.header['TIMEZONE'] = dataOut.timeZone
163 163 header_data.header['NBLOCK'] = self.blockIndex
164 164
165 165 header_data.writeto(self.filename)
166 166
167 167 self.addExtension(dataOut.heightList,'HEIGHTLIST')
168 168
169 169
170 170 def setup(self, dataOut, path, dataBlocksPerFile=100, metadatafile=None):
171 171
172 172 self.path = path
173 173 self.dataOut = dataOut
174 174 self.metadatafile = metadatafile
175 175 self.dataBlocksPerFile = dataBlocksPerFile
176 176
177 177 def open(self):
178 178 self.fitsObj = pyfits.open(self.filename, mode='update')
179 179
180 180
181 181 def addExtension(self, data, tagname):
182 182 self.open()
183 183 extension = pyfits.ImageHDU(data=data, name=tagname)
184 184 #extension.header['TAG'] = tagname
185 185 self.fitsObj.append(extension)
186 186 self.write()
187 187
188 188 def addData(self, data):
189 189 self.open()
190 190 extension = pyfits.ImageHDU(data=data, name=self.fitsObj[0].header['DATATYPE'])
191 191 extension.header['UTCTIME'] = self.dataOut.utctime
192 192 self.fitsObj.append(extension)
193 193 self.blockIndex += 1
194 194 self.fitsObj[0].header['NBLOCK'] = self.blockIndex
195 195
196 196 self.write()
197 197
198 198 def write(self):
199 199
200 200 self.fitsObj.flush(verbose=True)
201 201 self.fitsObj.close()
202 202
203 203
204 204 def setNextFile(self):
205 205
206 206 ext = self.ext
207 207 path = self.path
208 208
209 209 timeTuple = time.localtime( self.dataOut.utctime)
210 210 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
211 211
212 212 fullpath = os.path.join( path, subfolder )
213 213 if not( os.path.exists(fullpath) ):
214 214 os.mkdir(fullpath)
215 215 self.setFile = -1 #inicializo mi contador de seteo
216 216 else:
217 217 filesList = os.listdir( fullpath )
218 218 if len( filesList ) > 0:
219 219 filesList = sorted( filesList, key=str.lower )
220 220 filen = filesList[-1]
221 221
222 222 if isNumber( filen[8:11] ):
223 223 self.setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
224 224 else:
225 225 self.setFile = -1
226 226 else:
227 227 self.setFile = -1 #inicializo mi contador de seteo
228 228
229 229 setFile = self.setFile
230 230 setFile += 1
231 231
232 232 thisFile = '%s%4.4d%3.3d%3.3d%s' % (self.optchar,
233 233 timeTuple.tm_year,
234 234 timeTuple.tm_yday,
235 235 setFile,
236 236 ext )
237 237
238 238 filename = os.path.join( path, subfolder, thisFile )
239 239
240 240 self.blockIndex = 0
241 241 self.filename = filename
242 242 self.setFile = setFile
243 243 self.flagIsNewFile = 1
244 244
245 245 print('Writing the file: %s'%self.filename)
246 246
247 247 self.setFitsHeader(self.dataOut, self.metadatafile)
248 248
249 249 return 1
250 250
251 251 def writeBlock(self):
252 252 self.addData(self.dataOut.data_spc)
253 253 self.flagIsNewFile = 0
254 254
255 255
256 256 def __setNewBlock(self):
257 257
258 258 if self.flagIsNewFile:
259 259 return 1
260 260
261 261 if self.blockIndex < self.dataBlocksPerFile:
262 262 return 1
263 263
264 264 if not( self.setNextFile() ):
265 265 return 0
266 266
267 267 return 1
268 268
269 269 def writeNextBlock(self):
270 270 if not( self.__setNewBlock() ):
271 271 return 0
272 272 self.writeBlock()
273 273 return 1
274 274
275 275 def putData(self):
276 276 if self.flagIsNewFile:
277 277 self.setNextFile()
278 278 self.writeNextBlock()
279 279
280 280 def run(self, dataOut, path, dataBlocksPerFile=100, metadatafile=None, **kwargs):
281 281 if not(self.isConfig):
282 282 self.setup(dataOut, path, dataBlocksPerFile=dataBlocksPerFile, metadatafile=metadatafile, **kwargs)
283 283 self.isConfig = True
284 284 self.putData()
285 285
286 286 @MPDecorator
287 287 class FitsReader(ProcessingUnit):
288 288
289 289 # __TIMEZONE = time.timezone
290 290
291 291 expName = None
292 292 datetimestr = None
293 293 utc = None
294 294 nChannels = None
295 295 nSamples = None
296 296 dataBlocksPerFile = None
297 297 comments = None
298 298 lastUTTime = None
299 299 header_dict = None
300 300 data = None
301 301 data_header_dict = None
302 302
303 303 def __init__(self):#, **kwargs):
304 304 ProcessingUnit.__init__(self)#, **kwargs)
305 305 self.isConfig = False
306 306 self.ext = '.fits'
307 307 self.setFile = 0
308 308 self.flagNoMoreFiles = 0
309 309 self.flagIsNewFile = 1
310 310 self.flagDiscontinuousBlock = None
311 311 self.fileIndex = None
312 312 self.filename = None
313 313 self.fileSize = None
314 314 self.fitsObj = None
315 315 self.timeZone = None
316 316 self.nReadBlocks = 0
317 317 self.nTotalBlocks = 0
318 318 self.dataOut = self.createObjByDefault()
319 319 self.maxTimeStep = 10# deberia ser definido por el usuario usando el metodo setup()
320 320 self.blockIndex = 1
321 321
322 322 def createObjByDefault(self):
323 323
324 324 dataObj = Fits()
325 325
326 326 return dataObj
327 327
328 328 def isFileinThisTime(self, filename, startTime, endTime, useLocalTime=False):
329 329 try:
330 330 fitsObj = pyfits.open(filename,'readonly')
331 331 except:
332 332 print("File %s can't be opened" %(filename))
333 333 return None
334 334
335 335 header = fitsObj[0].header
336 336 struct_time = time.strptime(header['DATETIME'], "%b %d %Y %H:%M:%S")
337 337 utc = time.mktime(struct_time) - time.timezone #TIMEZONE debe ser un parametro del header FITS
338 338
339 339 ltc = utc
340 340 if useLocalTime:
341 341 ltc -= time.timezone
342 342 thisDatetime = datetime.datetime.utcfromtimestamp(ltc)
343 343 thisTime = thisDatetime.time()
344 344
345 345 if not ((startTime <= thisTime) and (endTime > thisTime)):
346 346 return None
347 347
348 348 return thisDatetime
349 349
350 350 def __setNextFileOnline(self):
351 351 raise NotImplementedError
352 352
353 353 def __setNextFileOffline(self):
354 354 idFile = self.fileIndex
355 355
356 356 while (True):
357 357 idFile += 1
358 358 if not(idFile < len(self.filenameList)):
359 359 self.flagNoMoreFiles = 1
360 360 print("No more Files")
361 361 return 0
362 362
363 363 filename = self.filenameList[idFile]
364 364
365 365 # if not(self.__verifyFile(filename)):
366 366 # continue
367 367
368 368 fileSize = os.path.getsize(filename)
369 369 fitsObj = pyfits.open(filename,'readonly')
370 370 break
371 371
372 372 self.flagIsNewFile = 1
373 373 self.fileIndex = idFile
374 374 self.filename = filename
375 375 self.fileSize = fileSize
376 376 self.fitsObj = fitsObj
377 377 self.blockIndex = 0
378 378 print("Setting the file: %s"%self.filename)
379 379
380 380 return 1
381 381
382 382 def __setValuesFromHeader(self):
383 383
384 384 self.dataOut.header = self.header_dict
385 385 self.dataOut.expName = self.expName
386 386
387 387 self.dataOut.timeZone = self.timeZone
388 388 self.dataOut.dataBlocksPerFile = self.dataBlocksPerFile
389 389 self.dataOut.comments = self.comments
390 390 # self.dataOut.timeInterval = self.timeInterval
391 391 self.dataOut.channelList = self.channelList
392 392 self.dataOut.heightList = self.heightList
393 393
394 394 self.dataOut.nCohInt = self.nCohInt
395 395 self.dataOut.nIncohInt = self.nIncohInt
396 396 self.dataOut.ipp_sec = self.ippSeconds
397 397
398 398 def readHeader(self):
399 399 headerObj = self.fitsObj[0]
400 400
401 401 self.header_dict = headerObj.header
402 402 if 'EXPNAME' in list(headerObj.header.keys()):
403 403 self.expName = headerObj.header['EXPNAME']
404 404
405 405 if 'DATATYPE' in list(headerObj.header.keys()):
406 406 self.dataType = headerObj.header['DATATYPE']
407 407
408 408 self.datetimestr = headerObj.header['DATETIME']
409 409 channelList = headerObj.header['CHANNELLIST']
410 410 channelList = channelList.split('[')
411 411 channelList = channelList[1].split(']')
412 412 channelList = channelList[0].split(',')
413 413 channelList = [int(ch) for ch in channelList]
414 414 self.channelList = channelList
415 415 self.nChannels = headerObj.header['NCHANNELS']
416 416 self.nHeights = headerObj.header['NHEIGHTS']
417 417 self.ippSeconds = headerObj.header['IPPSECONDS']
418 418 self.nCohInt = headerObj.header['NCOHINT']
419 419 self.nIncohInt = headerObj.header['NINCOHINT']
420 420 self.dataBlocksPerFile = headerObj.header['NBLOCK']
421 421 self.timeZone = headerObj.header['TIMEZONE']
422 422
423 423 # self.timeInterval = self.ippSeconds * self.nCohInt * self.nIncohInt
424 424
425 425 if 'COMMENT' in list(headerObj.header.keys()):
426 426 self.comments = headerObj.header['COMMENT']
427 427
428 428 self.readHeightList()
429 429
430 430 def readHeightList(self):
431 431 self.blockIndex = self.blockIndex + 1
432 432 obj = self.fitsObj[self.blockIndex]
433 433 self.heightList = obj.data
434 434 self.blockIndex = self.blockIndex + 1
435 435
436 436 def readExtension(self):
437 437 obj = self.fitsObj[self.blockIndex]
438 438 self.heightList = obj.data
439 439 self.blockIndex = self.blockIndex + 1
440 440
441 441 def setNextFile(self):
442 442
443 443 if self.online:
444 444 newFile = self.__setNextFileOnline()
445 445 else:
446 446 newFile = self.__setNextFileOffline()
447 447
448 448 if not(newFile):
449 449 return 0
450 450
451 451 self.readHeader()
452 452 self.__setValuesFromHeader()
453 453 self.nReadBlocks = 0
454 454 # self.blockIndex = 1
455 455 return 1
456 456
457 457 def searchFilesOffLine(self,
458 458 path,
459 459 startDate,
460 460 endDate,
461 461 startTime=datetime.time(0,0,0),
462 462 endTime=datetime.time(23,59,59),
463 463 set=None,
464 464 expLabel='',
465 465 ext='.fits',
466 466 walk=True):
467 467
468 468 pathList = []
469 469
470 470 if not walk:
471 471 pathList.append(path)
472 472
473 473 else:
474 474 dirList = []
475 475 for thisPath in os.listdir(path):
476 476 if not os.path.isdir(os.path.join(path,thisPath)):
477 477 continue
478 478 if not isRadarFolder(thisPath):
479 479 continue
480 480
481 481 dirList.append(thisPath)
482 482
483 483 if not(dirList):
484 484 return None, None
485 485
486 486 thisDate = startDate
487 487
488 488 while(thisDate <= endDate):
489 489 year = thisDate.timetuple().tm_year
490 490 doy = thisDate.timetuple().tm_yday
491 491
492 492 matchlist = fnmatch.filter(dirList, '?' + '%4.4d%3.3d' % (year,doy) + '*')
493 493 if len(matchlist) == 0:
494 494 thisDate += datetime.timedelta(1)
495 495 continue
496 496 for match in matchlist:
497 497 pathList.append(os.path.join(path,match,expLabel))
498 498
499 499 thisDate += datetime.timedelta(1)
500 500
501 501 if pathList == []:
502 502 print("Any folder was found for the date range: %s-%s" %(startDate, endDate))
503 503 return None, None
504 504
505 505 print("%d folder(s) was(were) found for the date range: %s - %s" %(len(pathList), startDate, endDate))
506 506
507 507 filenameList = []
508 508 datetimeList = []
509 509
510 510 for i in range(len(pathList)):
511 511
512 512 thisPath = pathList[i]
513 513
514 514 fileList = glob.glob1(thisPath, "*%s" %ext)
515 515 fileList.sort()
516 516
517 517 for thisFile in fileList:
518 518
519 519 filename = os.path.join(thisPath,thisFile)
520 520 thisDatetime = self.isFileinThisTime(filename, startTime, endTime)
521 521
522 522 if not(thisDatetime):
523 523 continue
524 524
525 525 filenameList.append(filename)
526 526 datetimeList.append(thisDatetime)
527 527
528 528 if not(filenameList):
529 529 print("Any file was found for the time range %s - %s" %(startTime, endTime))
530 530 return None, None
531 531
532 532 print("%d file(s) was(were) found for the time range: %s - %s" %(len(filenameList), startTime, endTime))
533 533 print()
534 534
535 535 for i in range(len(filenameList)):
536 536 print("%s -> [%s]" %(filenameList[i], datetimeList[i].ctime()))
537 537
538 538 self.filenameList = filenameList
539 539 self.datetimeList = datetimeList
540 540
541 541 return pathList, filenameList
542 542
543 543 def setup(self, path=None,
544 544 startDate=None,
545 545 endDate=None,
546 546 startTime=datetime.time(0,0,0),
547 547 endTime=datetime.time(23,59,59),
548 548 set=0,
549 549 expLabel = "",
550 550 ext = None,
551 551 online = False,
552 552 delay = 60,
553 553 walk = True):
554 554
555 555 if path == None:
556 556 raise ValueError("The path is not valid")
557 557
558 558 if ext == None:
559 559 ext = self.ext
560 560
561 561 if not(online):
562 562 print("Searching files in offline mode ...")
563 563 pathList, filenameList = self.searchFilesOffLine(path, startDate=startDate, endDate=endDate,
564 564 startTime=startTime, endTime=endTime,
565 565 set=set, expLabel=expLabel, ext=ext,
566 566 walk=walk)
567 567
568 568 if not(pathList):
569 569 print("No *%s files into the folder %s \nfor the range: %s - %s"%(ext, path,
570 570 datetime.datetime.combine(startDate,startTime).ctime(),
571 571 datetime.datetime.combine(endDate,endTime).ctime()))
572 572
573 573 sys.exit(-1)
574 574
575 575 self.fileIndex = -1
576 576 self.pathList = pathList
577 577 self.filenameList = filenameList
578 578
579 579 self.online = online
580 580 self.delay = delay
581 581 ext = ext.lower()
582 582 self.ext = ext
583 583
584 584 if not(self.setNextFile()):
585 585 if (startDate!=None) and (endDate!=None):
586 586 print("No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime()))
587 587 elif startDate != None:
588 588 print("No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime()))
589 589 else:
590 590 print("No files")
591 591
592 592 sys.exit(-1)
593 593
594 594
595 595
596 596 def readBlock(self):
597 597 dataObj = self.fitsObj[self.blockIndex]
598 598
599 599 self.data = dataObj.data
600 600 self.data_header_dict = dataObj.header
601 601 self.utc = self.data_header_dict['UTCTIME']
602 602
603 603 self.flagIsNewFile = 0
604 604 self.blockIndex += 1
605 605 self.nTotalBlocks += 1
606 606 self.nReadBlocks += 1
607 607
608 608 return 1
609 609
610 610 def __jumpToLastBlock(self):
611 611 raise NotImplementedError
612 612
613 613 def __waitNewBlock(self):
614 614 """
615 615 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
616 616
617 617 Si el modo de lectura es OffLine siempre retorn 0
618 618 """
619 619 if not self.online:
620 620 return 0
621 621
622 622 if (self.nReadBlocks >= self.dataBlocksPerFile):
623 623 return 0
624 624
625 625 currentPointer = self.fp.tell()
626 626
627 627 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
628 628
629 629 for nTries in range( self.nTries ):
630 630
631 631 self.fp.close()
632 632 self.fp = open( self.filename, 'rb' )
633 633 self.fp.seek( currentPointer )
634 634
635 635 self.fileSize = os.path.getsize( self.filename )
636 636 currentSize = self.fileSize - currentPointer
637 637
638 638 if ( currentSize >= neededSize ):
639 639 self.__rdBasicHeader()
640 640 return 1
641 641
642 642 print("\tWaiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries+1))
643 643 sleep( self.delay )
644 644
645 645
646 646 return 0
647 647
648 648 def __setNewBlock(self):
649 649
650 650 if self.online:
651 651 self.__jumpToLastBlock()
652 652
653 653 if self.flagIsNewFile:
654 654 return 1
655 655
656 656 self.lastUTTime = self.utc
657 657
658 658 if self.online:
659 659 if self.__waitNewBlock():
660 660 return 1
661 661
662 662 if self.nReadBlocks < self.dataBlocksPerFile:
663 663 return 1
664 664
665 665 if not(self.setNextFile()):
666 666 return 0
667 667
668 668 deltaTime = self.utc - self.lastUTTime
669 669
670 670 self.flagDiscontinuousBlock = 0
671 671
672 672 if deltaTime > self.maxTimeStep:
673 673 self.flagDiscontinuousBlock = 1
674 674
675 675 return 1
676 676
677 677
678 678 def readNextBlock(self):
679 679 if not(self.__setNewBlock()):
680 680 return 0
681 681
682 682 if not(self.readBlock()):
683 683 return 0
684 684
685 685 return 1
686 686
687 687 def printInfo(self):
688 688
689 689 pass
690 690
691 691 def getData(self):
692 692
693 693 if self.flagNoMoreFiles:
694 694 self.dataOut.flagNoData = True
695 695 return (0, 'No more files')
696 696
697 697 self.flagDiscontinuousBlock = 0
698 698 self.flagIsNewBlock = 0
699 699
700 700 if not(self.readNextBlock()):
701 701 return (1, 'Error reading data')
702 702
703 703 if self.data is None:
704 704 self.dataOut.flagNoData = True
705 705 return (0, 'No more data')
706 706
707 707 self.dataOut.data = self.data
708 708 self.dataOut.data_header = self.data_header_dict
709 709 self.dataOut.utctime = self.utc
710 710
711 711 # self.dataOut.header = self.header_dict
712 712 # self.dataOut.expName = self.expName
713 713 # self.dataOut.nChannels = self.nChannels
714 714 # self.dataOut.timeZone = self.timeZone
715 715 # self.dataOut.dataBlocksPerFile = self.dataBlocksPerFile
716 716 # self.dataOut.comments = self.comments
717 717 # # self.dataOut.timeInterval = self.timeInterval
718 718 # self.dataOut.channelList = self.channelList
719 719 # self.dataOut.heightList = self.heightList
720 720 self.dataOut.flagNoData = False
721 721 # return self.dataOut.data
722 722
723 723 def run(self, **kwargs):
724 724
725 725 if not(self.isConfig):
726 726 self.setup(**kwargs)
727 727 self.isConfig = True
728 728
729 729 self.getData()
730 730
731 731 @MPDecorator
732 732 class SpectraHeisWriter(Operation):
733 733 # set = None
734 734 setFile = None
735 735 idblock = None
736 736 doypath = None
737 737 subfolder = None
738 738
739 739 def __init__(self):#, **kwargs):
740 740 Operation.__init__(self)#, **kwargs)
741 741 self.wrObj = PyFits()
742 742 # self.dataOut = dataOut
743 743 self.nTotalBlocks=0
744 744 # self.set = None
745 745 self.setFile = None
746 746 self.idblock = 0
747 747 self.wrpath = None
748 748 self.doypath = None
749 749 self.subfolder = None
750 750 self.isConfig = False
751 751
752 752 def isNumber(str):
753 753 """
754 754 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
755 755
756 756 Excepciones:
757 757 Si un determinado string no puede ser convertido a numero
758 758 Input:
759 759 str, string al cual se le analiza para determinar si convertible a un numero o no
760 760
761 761 Return:
762 762 True : si el string es uno numerico
763 763 False : no es un string numerico
764 764 """
765 765 try:
766 766 float( str )
767 767 return True
768 768 except:
769 769 return False
770 770
771 771 def setup(self, dataOut, wrpath):
772 772
773 773 if not(os.path.exists(wrpath)):
774 774 os.mkdir(wrpath)
775 775
776 776 self.wrpath = wrpath
777 777 # self.setFile = 0
778 778 self.dataOut = dataOut
779 779
780 780 def putData(self):
781 781 name= time.localtime( self.dataOut.utctime)
782 782 ext=".fits"
783 783
784 784 if self.doypath == None:
785 785 self.subfolder = 'F%4.4d%3.3d_%d' % (name.tm_year,name.tm_yday,time.mktime(datetime.datetime.now().timetuple()))
786 786 self.doypath = os.path.join( self.wrpath, self.subfolder )
787 787 os.mkdir(self.doypath)
788 788
789 789 if self.setFile == None:
790 790 # self.set = self.dataOut.set
791 791 self.setFile = 0
792 792 # if self.set != self.dataOut.set:
793 793 ## self.set = self.dataOut.set
794 794 # self.setFile = 0
795 795
796 796 #make the filename
797 797 thisFile = 'D%4.4d%3.3d_%3.3d%s' % (name.tm_year,name.tm_yday,self.setFile,ext)
798 798
799 799 filename = os.path.join(self.wrpath,self.subfolder, thisFile)
800 800
801 801 idblock = numpy.array([self.idblock],dtype="int64")
802 802 header=self.wrObj.cFImage(idblock=idblock,
803 803 year=time.gmtime(self.dataOut.utctime).tm_year,
804 804 month=time.gmtime(self.dataOut.utctime).tm_mon,
805 805 day=time.gmtime(self.dataOut.utctime).tm_mday,
806 806 hour=time.gmtime(self.dataOut.utctime).tm_hour,
807 807 minute=time.gmtime(self.dataOut.utctime).tm_min,
808 808 second=time.gmtime(self.dataOut.utctime).tm_sec)
809 809
810 810 c=3E8
811 811 deltaHeight = self.dataOut.heightList[1] - self.dataOut.heightList[0]
812 812 freq=numpy.arange(-1*self.dataOut.nHeights/2.,self.dataOut.nHeights/2.)*(c/(2*deltaHeight*1000))
813 813
814 814 colList = []
815 815
816 816 colFreq=self.wrObj.setColF(name="freq", format=str(self.dataOut.nFFTPoints)+'E', array=freq)
817 817
818 818 colList.append(colFreq)
819 819
820 820 nchannel=self.dataOut.nChannels
821 821
822 822 for i in range(nchannel):
823 823 col = self.wrObj.writeData(name="PCh"+str(i+1),
824 824 format=str(self.dataOut.nFFTPoints)+'E',
825 825 data=10*numpy.log10(self.dataOut.data_spc[i,:]))
826 826
827 827 colList.append(col)
828 828
829 829 data=self.wrObj.Ctable(colList=colList)
830 830
831 831 self.wrObj.CFile(header,data)
832 832
833 833 self.wrObj.wFile(filename)
834 834
835 835 #update the setFile
836 836 self.setFile += 1
837 837 self.idblock += 1
838 838
839 839 return 1
840 840
841 841 def run(self, dataOut, **kwargs):
842 842
843 843 if not(self.isConfig):
844 844
845 845 self.setup(dataOut, **kwargs)
846 846 self.isConfig = True
847 847
848 848 self.putData()
849 849 return dataOut No newline at end of file
@@ -1,597 +1,595
1 1 '''
2 2 Created on Aug 1, 2017
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import json
11 11 import glob
12 12 import datetime
13 13
14 14 import numpy
15 15 import h5py
16 16
17 17 import schainpy.admin
18 18 from schainpy.model.io.jroIO_base import LOCALTIME, Reader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation, MPDecorator
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
22 22
23 23 try:
24 24 import madrigal.cedar
25 25 except:
26 log.warning(
27 'You should install "madrigal library" module if you want to read/write Madrigal data'
28 )
26 pass
29 27
30 28 try:
31 29 basestring
32 30 except:
33 31 basestring = str
34 32
35 33 DEF_CATALOG = {
36 34 'principleInvestigator': 'Marco Milla',
37 35 'expPurpose': '',
38 36 'cycleTime': '',
39 37 'correlativeExp': '',
40 38 'sciRemarks': '',
41 39 'instRemarks': ''
42 40 }
43 41
44 42 DEF_HEADER = {
45 43 'kindatDesc': '',
46 44 'analyst': 'Jicamarca User',
47 45 'comments': '',
48 46 'history': ''
49 47 }
50 48
51 49 MNEMONICS = {
52 50 10: 'jro',
53 51 11: 'jbr',
54 52 840: 'jul',
55 53 13: 'jas',
56 54 1000: 'pbr',
57 55 1001: 'hbr',
58 56 1002: 'obr',
59 57 400: 'clr'
60 58
61 59 }
62 60
63 61 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
64 62
65 63 def load_json(obj):
66 64 '''
67 65 Parse json as string instead of unicode
68 66 '''
69 67
70 68 if isinstance(obj, str):
71 69 iterable = json.loads(obj)
72 70 else:
73 71 iterable = obj
74 72
75 73 if isinstance(iterable, dict):
76 74 return {str(k): load_json(v) if isinstance(v, dict) else str(v) if isinstance(v, basestring) else v
77 75 for k, v in list(iterable.items())}
78 76 elif isinstance(iterable, (list, tuple)):
79 77 return [str(v) if isinstance(v, basestring) else v for v in iterable]
80 78
81 79 return iterable
82 80
83 81 @MPDecorator
84 82 class MADReader(Reader, ProcessingUnit):
85 83
86 84 def __init__(self):
87 85
88 86 ProcessingUnit.__init__(self)
89 87
90 88 self.dataOut = Parameters()
91 89 self.counter_records = 0
92 90 self.nrecords = None
93 91 self.flagNoMoreFiles = 0
94 92 self.filename = None
95 93 self.intervals = set()
96 94 self.datatime = datetime.datetime(1900,1,1)
97 95 self.format = None
98 96 self.filefmt = "***%Y%m%d*******"
99 97
100 98 def setup(self, **kwargs):
101 99
102 100 self.set_kwargs(**kwargs)
103 101 self.oneDDict = load_json(self.oneDDict)
104 102 self.twoDDict = load_json(self.twoDDict)
105 103 self.ind2DList = load_json(self.ind2DList)
106 104 self.independentParam = self.ind2DList[0]
107 105
108 106 if self.path is None:
109 107 raise ValueError('The path is not valid')
110 108
111 109 self.open_file = open
112 110 self.open_mode = 'rb'
113 111
114 112 if self.format is None:
115 113 raise ValueError('The format is not valid choose simple or hdf5')
116 114 elif self.format.lower() in ('simple', 'txt'):
117 115 self.ext = '.txt'
118 116 elif self.format.lower() in ('cedar',):
119 117 self.ext = '.001'
120 118 else:
121 119 self.ext = '.hdf5'
122 120 self.open_file = h5py.File
123 121 self.open_mode = 'r'
124 122
125 123 if self.online:
126 124 log.log("Searching files in online mode...", self.name)
127 125
128 126 for nTries in range(self.nTries):
129 127 fullpath = self.searchFilesOnLine(self.path, self.startDate,
130 128 self.endDate, self.expLabel, self.ext, self.walk,
131 129 self.filefmt, self.folderfmt)
132 130
133 131 try:
134 132 fullpath = next(fullpath)
135 133 except:
136 134 fullpath = None
137 135
138 136 if fullpath:
139 137 break
140 138
141 139 log.warning(
142 140 'Waiting {} sec for a valid file in {}: try {} ...'.format(
143 141 self.delay, self.path, nTries + 1),
144 142 self.name)
145 143 time.sleep(self.delay)
146 144
147 145 if not(fullpath):
148 146 raise schainpy.admin.SchainError(
149 147 'There isn\'t any valid file in {}'.format(self.path))
150 148
151 149 else:
152 150 log.log("Searching files in {}".format(self.path), self.name)
153 151 self.filenameList = self.searchFilesOffLine(self.path, self.startDate,
154 152 self.endDate, self.expLabel, self.ext, self.walk, self.filefmt, self.folderfmt)
155 153
156 154 self.setNextFile()
157 155
158 156 def readFirstHeader(self):
159 157 '''Read header and data'''
160 158
161 159 self.parseHeader()
162 160 self.parseData()
163 161 self.blockIndex = 0
164 162
165 163 return
166 164
167 165 def parseHeader(self):
168 166 '''
169 167 '''
170 168
171 169 self.output = {}
172 170 self.version = '2'
173 171 s_parameters = None
174 172 if self.ext == '.txt':
175 173 self.parameters = [s.strip().lower() for s in self.fp.readline().decode().strip().split(' ') if s]
176 174 elif self.ext == '.hdf5':
177 175 self.metadata = self.fp['Metadata']
178 176 if '_record_layout' in self.metadata:
179 177 s_parameters = [s[0].lower().decode() for s in self.metadata['Independent Spatial Parameters']]
180 178 self.version = '3'
181 179 self.parameters = [s[0].lower().decode() for s in self.metadata['Data Parameters']]
182 180
183 181 log.success('Parameters found: {}'.format(self.parameters),
184 182 'MADReader')
185 183 if s_parameters:
186 184 log.success('Spatial parameters found: {}'.format(s_parameters),
187 185 'MADReader')
188 186
189 187 for param in list(self.oneDDict.keys()):
190 188 if param.lower() not in self.parameters:
191 189 log.warning(
192 190 'Parameter {} not found will be ignored'.format(
193 191 param),
194 192 'MADReader')
195 193 self.oneDDict.pop(param, None)
196 194
197 195 for param, value in list(self.twoDDict.items()):
198 196 if param.lower() not in self.parameters:
199 197 log.warning(
200 198 'Parameter {} not found, it will be ignored'.format(
201 199 param),
202 200 'MADReader')
203 201 self.twoDDict.pop(param, None)
204 202 continue
205 203 if isinstance(value, list):
206 204 if value[0] not in self.output:
207 205 self.output[value[0]] = []
208 206 self.output[value[0]].append([])
209 207
210 208 def parseData(self):
211 209 '''
212 210 '''
213 211
214 212 if self.ext == '.txt':
215 213 self.data = numpy.genfromtxt(self.fp, missing_values=('missing'))
216 214 self.nrecords = self.data.shape[0]
217 215 self.ranges = numpy.unique(self.data[:,self.parameters.index(self.independentParam.lower())])
218 216 self.counter_records = 0
219 217 elif self.ext == '.hdf5':
220 218 self.data = self.fp['Data']
221 219 self.ranges = numpy.unique(self.data['Table Layout'][self.independentParam.lower()])
222 220 self.times = numpy.unique(self.data['Table Layout']['ut1_unix'])
223 221 self.counter_records = int(self.data['Table Layout']['recno'][0])
224 222 self.nrecords = int(self.data['Table Layout']['recno'][-1])
225 223
226 224 def readNextBlock(self):
227 225
228 226 while True:
229 227 self.flagDiscontinuousBlock = 0
230 228 if self.counter_records == self.nrecords:
231 229 self.setNextFile()
232 230
233 231 self.readBlock()
234 232
235 233 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
236 234 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
237 235 log.warning(
238 236 'Reading Record No. {}/{} -> {} [Skipping]'.format(
239 237 self.counter_records,
240 238 self.nrecords,
241 239 self.datatime.ctime()),
242 240 'MADReader')
243 241 continue
244 242 break
245 243
246 244 log.log(
247 245 'Reading Record No. {}/{} -> {}'.format(
248 246 self.counter_records,
249 247 self.nrecords,
250 248 self.datatime.ctime()),
251 249 'MADReader')
252 250
253 251 return 1
254 252
255 253 def readBlock(self):
256 254 '''
257 255 '''
258 256 dum = []
259 257 if self.ext == '.txt':
260 258 dt = self.data[self.counter_records][:6].astype(int)
261 259 if datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5]).date() > self.datatime.date():
262 260 self.flagDiscontinuousBlock = 1
263 261 self.datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
264 262 while True:
265 263 dt = self.data[self.counter_records][:6].astype(int)
266 264 datatime = datetime.datetime(dt[0], dt[1], dt[2], dt[3], dt[4], dt[5])
267 265 if datatime == self.datatime:
268 266 dum.append(self.data[self.counter_records])
269 267 self.counter_records += 1
270 268 if self.counter_records == self.nrecords:
271 269 break
272 270 continue
273 271 self.intervals.add((datatime-self.datatime).seconds)
274 272 break
275 273 elif self.ext == '.hdf5':
276 274 datatime = datetime.datetime.utcfromtimestamp(
277 275 self.times[self.counter_records])
278 276 dum = self.data['Table Layout'][self.data['Table Layout']['recno']==self.counter_records]
279 277 self.intervals.add((datatime-self.datatime).seconds)
280 278 if datatime.date()>self.datatime.date():
281 279 self.flagDiscontinuousBlock = 1
282 280 self.datatime = datatime
283 281 self.counter_records += 1
284 282
285 283 self.buffer = numpy.array(dum)
286 284 return
287 285
288 286 def set_output(self):
289 287 '''
290 288 Storing data from buffer to dataOut object
291 289 '''
292 290
293 291 parameters = [None for __ in self.parameters]
294 292
295 293 for param, attr in list(self.oneDDict.items()):
296 294 x = self.parameters.index(param.lower())
297 295 setattr(self.dataOut, attr, self.buffer[0][x])
298 296
299 297 for param, value in list(self.twoDDict.items()):
300 298 dummy = numpy.zeros(self.ranges.shape) + numpy.nan
301 299 if self.ext == '.txt':
302 300 x = self.parameters.index(param.lower())
303 301 y = self.parameters.index(self.independentParam.lower())
304 302 ranges = self.buffer[:,y]
305 303 #if self.ranges.size == ranges.size:
306 304 # continue
307 305 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
308 306 dummy[index] = self.buffer[:,x]
309 307 else:
310 308 ranges = self.buffer[self.independentParam.lower()]
311 309 index = numpy.where(numpy.in1d(self.ranges, ranges))[0]
312 310 dummy[index] = self.buffer[param.lower()]
313 311
314 312 if isinstance(value, str):
315 313 if value not in self.independentParam:
316 314 setattr(self.dataOut, value, dummy.reshape(1,-1))
317 315 elif isinstance(value, list):
318 316 self.output[value[0]][value[1]] = dummy
319 317 parameters[value[1]] = param
320 318 for key, value in list(self.output.items()):
321 319 setattr(self.dataOut, key, numpy.array(value))
322 320
323 321 self.dataOut.parameters = [s for s in parameters if s]
324 322 self.dataOut.heightList = self.ranges
325 323 self.dataOut.utctime = (self.datatime - datetime.datetime(1970, 1, 1)).total_seconds()
326 324 self.dataOut.utctimeInit = self.dataOut.utctime
327 325 self.dataOut.paramInterval = min(self.intervals)
328 326 self.dataOut.useLocalTime = False
329 327 self.dataOut.flagNoData = False
330 328 self.dataOut.nrecords = self.nrecords
331 329 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
332 330
333 331 def getData(self):
334 332 '''
335 333 Storing data from databuffer to dataOut object
336 334 '''
337 335
338 336 if not self.readNextBlock():
339 337 self.dataOut.flagNoData = True
340 338 return 0
341 339
342 340 self.set_output()
343 341
344 342 return 1
345 343
346 344 def run(self, **kwargs):
347 345
348 346 if not(self.isConfig):
349 347 self.setup(**kwargs)
350 348 self.isConfig = True
351 349
352 350 self.getData()
353 351
354 352 return
355 353
356 354 @MPDecorator
357 355 class MADWriter(Operation):
358 356 '''Writing module for Madrigal files
359 357
360 358 type: external
361 359
362 360 Inputs:
363 361 path path where files will be created
364 362 oneDDict json of one-dimensional parameters in record where keys
365 363 are Madrigal codes (integers or mnemonics) and values the corresponding
366 364 dataOut attribute e.g: {
367 365 'gdlatr': 'lat',
368 366 'gdlonr': 'lon',
369 367 'gdlat2':'lat',
370 368 'glon2':'lon'}
371 369 ind2DList list of independent spatial two-dimensional parameters e.g:
372 370 ['heigthList']
373 371 twoDDict json of two-dimensional parameters in record where keys
374 372 are Madrigal codes (integers or mnemonics) and values the corresponding
375 373 dataOut attribute if multidimensional array specify as tupple
376 374 ('attr', pos) e.g: {
377 375 'gdalt': 'heightList',
378 376 'vn1p2': ('data_output', 0),
379 377 'vn2p2': ('data_output', 1),
380 378 'vn3': ('data_output', 2),
381 379 'snl': ('data_SNR', 'db')
382 380 }
383 381 metadata json of madrigal metadata (kinst, kindat, catalog and header)
384 382 format hdf5, cedar
385 383 blocks number of blocks per file'''
386 384
387 385 __attrs__ = ['path', 'oneDDict', 'ind2DList', 'twoDDict','metadata', 'format', 'blocks']
388 386 missing = -32767
389 387
390 388 def __init__(self):
391 389
392 390 Operation.__init__(self)
393 391 self.dataOut = Parameters()
394 392 self.counter = 0
395 393 self.path = None
396 394 self.fp = None
397 395
398 396 def run(self, dataOut, path, oneDDict, ind2DList='[]', twoDDict='{}',
399 397 metadata='{}', format='cedar', **kwargs):
400 398
401 399 if not self.isConfig:
402 400 self.setup(path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs)
403 401 self.isConfig = True
404 402
405 403 self.dataOut = dataOut
406 404 self.putData()
407 405 return 1
408 406
409 407 def setup(self, path, oneDDict, ind2DList, twoDDict, metadata, format, **kwargs):
410 408 '''
411 409 Configure Operation
412 410 '''
413 411
414 412 self.path = path
415 413 self.blocks = kwargs.get('blocks', None)
416 414 self.counter = 0
417 415 self.oneDDict = load_json(oneDDict)
418 416 self.twoDDict = load_json(twoDDict)
419 417 self.ind2DList = load_json(ind2DList)
420 418 meta = load_json(metadata)
421 419 self.kinst = meta.get('kinst')
422 420 self.kindat = meta.get('kindat')
423 421 self.catalog = meta.get('catalog', DEF_CATALOG)
424 422 self.header = meta.get('header', DEF_HEADER)
425 423 if format == 'cedar':
426 424 self.ext = '.dat'
427 425 self.extra_args = {}
428 426 elif format == 'hdf5':
429 427 self.ext = '.hdf5'
430 428 self.extra_args = {'ind2DList': self.ind2DList}
431 429
432 430 self.keys = [k.lower() for k in self.twoDDict]
433 431 if 'range' in self.keys:
434 432 self.keys.remove('range')
435 433 if 'gdalt' in self.keys:
436 434 self.keys.remove('gdalt')
437 435
438 436 def setFile(self):
439 437 '''
440 438 Create new cedar file object
441 439 '''
442 440
443 441 self.mnemonic = MNEMONICS[self.kinst] #TODO get mnemonic from madrigal
444 442 date = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
445 443
446 444 filename = '{}{}{}'.format(self.mnemonic,
447 445 date.strftime('%Y%m%d_%H%M%S'),
448 446 self.ext)
449 447
450 448 self.fullname = os.path.join(self.path, filename)
451 449
452 450 if os.path.isfile(self.fullname) :
453 451 log.warning(
454 452 'Destination file {} already exists, previous file deleted.'.format(
455 453 self.fullname),
456 454 'MADWriter')
457 455 os.remove(self.fullname)
458 456
459 457 try:
460 458 log.success(
461 459 'Creating file: {}'.format(self.fullname),
462 460 'MADWriter')
463 461 if not os.path.exists(self.path):
464 462 os.makedirs(self.path)
465 463 self.fp = madrigal.cedar.MadrigalCedarFile(self.fullname, True)
466 464 except ValueError as e:
467 465 log.error(
468 466 'Impossible to create a cedar object with "madrigal.cedar.MadrigalCedarFile"',
469 467 'MADWriter')
470 468 return
471 469
472 470 return 1
473 471
474 472 def writeBlock(self):
475 473 '''
476 474 Add data records to cedar file taking data from oneDDict and twoDDict
477 475 attributes.
478 476 Allowed parameters in: parcodes.tab
479 477 '''
480 478
481 479 startTime = datetime.datetime.utcfromtimestamp(self.dataOut.utctime)
482 480 endTime = startTime + datetime.timedelta(seconds=self.dataOut.paramInterval)
483 481 heights = self.dataOut.heightList
484 482
485 483 if self.ext == '.dat':
486 484 for key, value in list(self.twoDDict.items()):
487 485 if isinstance(value, str):
488 486 data = getattr(self.dataOut, value)
489 487 invalid = numpy.isnan(data)
490 488 data[invalid] = self.missing
491 489 elif isinstance(value, (tuple, list)):
492 490 attr, key = value
493 491 data = getattr(self.dataOut, attr)
494 492 invalid = numpy.isnan(data)
495 493 data[invalid] = self.missing
496 494
497 495 out = {}
498 496 for key, value in list(self.twoDDict.items()):
499 497 key = key.lower()
500 498 if isinstance(value, str):
501 499 if 'db' in value.lower():
502 500 tmp = getattr(self.dataOut, value.replace('_db', ''))
503 501 SNRavg = numpy.average(tmp, axis=0)
504 502 tmp = 10*numpy.log10(SNRavg)
505 503 else:
506 504 tmp = getattr(self.dataOut, value)
507 505 out[key] = tmp.flatten()[:len(heights)]
508 506 elif isinstance(value, (tuple, list)):
509 507 attr, x = value
510 508 data = getattr(self.dataOut, attr)
511 509 out[key] = data[int(x)][:len(heights)]
512 510
513 511 a = numpy.array([out[k] for k in self.keys])
514 512 nrows = numpy.array([numpy.isnan(a[:, x]).all() for x in range(len(heights))])
515 513 index = numpy.where(nrows == False)[0]
516 514
517 515 rec = madrigal.cedar.MadrigalDataRecord(
518 516 self.kinst,
519 517 self.kindat,
520 518 startTime.year,
521 519 startTime.month,
522 520 startTime.day,
523 521 startTime.hour,
524 522 startTime.minute,
525 523 startTime.second,
526 524 startTime.microsecond/10000,
527 525 endTime.year,
528 526 endTime.month,
529 527 endTime.day,
530 528 endTime.hour,
531 529 endTime.minute,
532 530 endTime.second,
533 531 endTime.microsecond/10000,
534 532 list(self.oneDDict.keys()),
535 533 list(self.twoDDict.keys()),
536 534 len(index),
537 535 **self.extra_args
538 536 )
539 537
540 538 # Setting 1d values
541 539 for key in self.oneDDict:
542 540 rec.set1D(key, getattr(self.dataOut, self.oneDDict[key]))
543 541
544 542 # Setting 2d values
545 543 nrec = 0
546 544 for n in index:
547 545 for key in out:
548 546 rec.set2D(key, nrec, out[key][n])
549 547 nrec += 1
550 548
551 549 self.fp.append(rec)
552 550 if self.ext == '.hdf5' and self.counter % 500 == 0 and self.counter > 0:
553 551 self.fp.dump()
554 552 if self.counter % 20 == 0 and self.counter > 0:
555 553 log.log(
556 554 'Writing {} records'.format(
557 555 self.counter),
558 556 'MADWriter')
559 557
560 558 def setHeader(self):
561 559 '''
562 560 Create an add catalog and header to cedar file
563 561 '''
564 562
565 563 log.success('Closing file {}'.format(self.fullname), 'MADWriter')
566 564
567 565 if self.ext == '.dat':
568 566 self.fp.write()
569 567 else:
570 568 self.fp.dump()
571 569 self.fp.close()
572 570
573 571 header = madrigal.cedar.CatalogHeaderCreator(self.fullname)
574 572 header.createCatalog(**self.catalog)
575 573 header.createHeader(**self.header)
576 574 header.write()
577 575
578 576 def putData(self):
579 577
580 578 if self.dataOut.flagNoData:
581 579 return 0
582 580
583 581 if self.dataOut.flagDiscontinuousBlock or self.counter == self.blocks:
584 582 if self.counter > 0:
585 583 self.setHeader()
586 584 self.counter = 0
587 585
588 586 if self.counter == 0:
589 587 self.setFile()
590 588
591 589 self.writeBlock()
592 590 self.counter += 1
593 591
594 592 def close(self):
595 593
596 594 if self.counter > 0:
597 595 self.setHeader() No newline at end of file
@@ -1,600 +1,600
1 1 '''
2 2 Created on Jul 3, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import datetime
8 8 import numpy
9 9
10 10 try:
11 11 from gevent import sleep
12 12 except:
13 13 from time import sleep
14 14
15 15 from schainpy.model.data.jroheaderIO import RadarControllerHeader, SystemHeader
16 16 from schainpy.model.data.jrodata import Voltage
17 17 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
18 18
19 19 try:
20 20 import digital_rf_hdf5
21 21 except:
22 print('You should install "digital_rf_hdf5" module if you want to read USRP data')
22 pass
23 23
24 24 class USRPReader(ProcessingUnit):
25 25 '''
26 26 classdocs
27 27 '''
28 28
29 29 def __init__(self, **kwargs):
30 30 '''
31 31 Constructor
32 32 '''
33 33
34 34 ProcessingUnit.__init__(self, **kwargs)
35 35
36 36 self.dataOut = Voltage()
37 37 self.__printInfo = True
38 38 self.__flagDiscontinuousBlock = False
39 39 self.__bufferIndex = 9999999
40 40
41 41 self.__ippKm = None
42 42 self.__codeType = 0
43 43 self.__nCode = None
44 44 self.__nBaud = None
45 45 self.__code = None
46 46
47 47 def __getCurrentSecond(self):
48 48
49 49 return self.__thisUnixSample/self.__sample_rate
50 50
51 51 thisSecond = property(__getCurrentSecond, "I'm the 'thisSecond' property.")
52 52
53 53 def __setFileHeader(self):
54 54 '''
55 55 In this method will be initialized every parameter of dataOut object (header, no data)
56 56 '''
57 57 ippSeconds = 1.0*self.__nSamples/self.__sample_rate
58 58
59 59 nProfiles = 1.0/ippSeconds #Number of profiles in one second
60 60
61 61 self.dataOut.radarControllerHeaderObj = RadarControllerHeader(ippKm=self.__ippKm,
62 62 txA=0,
63 63 txB=0,
64 64 nWindows=1,
65 65 nHeights=self.__nSamples,
66 66 firstHeight=self.__firstHeigth,
67 67 deltaHeight=self.__deltaHeigth,
68 68 codeType=self.__codeType,
69 69 nCode=self.__nCode, nBaud=self.__nBaud,
70 70 code = self.__code)
71 71
72 72 self.dataOut.systemHeaderObj = SystemHeader(nSamples=self.__nSamples,
73 73 nProfiles=nProfiles,
74 74 nChannels=len(self.__channelList),
75 75 adcResolution=14)
76 76
77 77 self.dataOut.type = "Voltage"
78 78
79 79 self.dataOut.data = None
80 80
81 81 self.dataOut.dtype = numpy.dtype([('real','<i8'),('imag','<i8')])
82 82
83 83 # self.dataOut.nChannels = 0
84 84
85 85 # self.dataOut.nHeights = 0
86 86
87 87 self.dataOut.nProfiles = nProfiles
88 88
89 89 self.dataOut.heightList = self.__firstHeigth + numpy.arange(self.__nSamples, dtype = numpy.float)*self.__deltaHeigth
90 90
91 91 self.dataOut.channelList = self.__channelList
92 92
93 93 self.dataOut.blocksize = self.dataOut.getNChannels() * self.dataOut.getNHeights()
94 94
95 95 # self.dataOut.channelIndexList = None
96 96
97 97 self.dataOut.flagNoData = True
98 98
99 99 #Set to TRUE if the data is discontinuous
100 100 self.dataOut.flagDiscontinuousBlock = False
101 101
102 102 self.dataOut.utctime = None
103 103
104 104 self.dataOut.timeZone = self.__timezone/60 #timezone like jroheader, difference in minutes between UTC and localtime
105 105
106 106 self.dataOut.dstFlag = 0
107 107
108 108 self.dataOut.errorCount = 0
109 109
110 110 self.dataOut.nCohInt = 1
111 111
112 112 self.dataOut.flagDecodeData = False #asumo que la data esta decodificada
113 113
114 114 self.dataOut.flagDeflipData = False #asumo que la data esta sin flip
115 115
116 116 self.dataOut.flagShiftFFT = False
117 117
118 118 self.dataOut.ippSeconds = ippSeconds
119 119
120 120 #Time interval between profiles
121 121 #self.dataOut.timeInterval = self.dataOut.ippSeconds * self.dataOut.nCohInt
122 122
123 123 self.dataOut.frequency = self.__frequency
124 124
125 125 self.dataOut.realtime = self.__online
126 126
127 127 def findDatafiles(self, path, startDate=None, endDate=None):
128 128
129 129 if not os.path.isdir(path):
130 130 return []
131 131
132 132 try:
133 133 digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
134 134 except:
135 135 digitalReadObj = digital_rf_hdf5.read_hdf5(path)
136 136
137 137 channelNameList = digitalReadObj.get_channels()
138 138
139 139 if not channelNameList:
140 140 return []
141 141
142 142 metadata_dict = digitalReadObj.get_rf_file_metadata(channelNameList[0])
143 143
144 144 sample_rate = metadata_dict['sample_rate'][0]
145 145
146 146 this_metadata_file = digitalReadObj.get_metadata(channelNameList[0])
147 147
148 148 try:
149 149 timezone = this_metadata_file['timezone'].value
150 150 except:
151 151 timezone = 0
152 152
153 153 startUTCSecond, endUTCSecond = digitalReadObj.get_bounds(channelNameList[0])/sample_rate - timezone
154 154
155 155 startDatetime = datetime.datetime.utcfromtimestamp(startUTCSecond)
156 156 endDatatime = datetime.datetime.utcfromtimestamp(endUTCSecond)
157 157
158 158 if not startDate:
159 159 startDate = startDatetime.date()
160 160
161 161 if not endDate:
162 162 endDate = endDatatime.date()
163 163
164 164 dateList = []
165 165
166 166 thisDatetime = startDatetime
167 167
168 168 while(thisDatetime<=endDatatime):
169 169
170 170 thisDate = thisDatetime.date()
171 171
172 172 if thisDate < startDate:
173 173 continue
174 174
175 175 if thisDate > endDate:
176 176 break
177 177
178 178 dateList.append(thisDate)
179 179 thisDatetime += datetime.timedelta(1)
180 180
181 181 return dateList
182 182
183 183 def setup(self, path = None,
184 184 startDate = None,
185 185 endDate = None,
186 186 startTime = datetime.time(0,0,0),
187 187 endTime = datetime.time(23,59,59),
188 188 channelList = None,
189 189 nSamples = None,
190 190 ippKm = 60,
191 191 online = False,
192 192 delay = 60,
193 193 buffer_size = 1024,
194 194 **kwargs):
195 195 '''
196 196 In this method we should set all initial parameters.
197 197
198 198 Inputs:
199 199 path
200 200 startDate
201 201 endDate
202 202 startTime
203 203 endTime
204 204 set
205 205 expLabel
206 206 ext
207 207 online
208 208 delay
209 209 '''
210 210
211 211 if not os.path.isdir(path):
212 212 raise ValueError("[Reading] Directory %s does not exist" %path)
213 213
214 214 try:
215 215 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path, load_all_metadata=True)
216 216 except:
217 217 self.digitalReadObj = digital_rf_hdf5.read_hdf5(path)
218 218
219 219 channelNameList = self.digitalReadObj.get_channels()
220 220
221 221 if not channelNameList:
222 222 raise ValueError("[Reading] Directory %s does not have any files" %path)
223 223
224 224 if not channelList:
225 225 channelList = list(range(len(channelNameList)))
226 226
227 227 ########## Reading metadata ######################
228 228
229 229 metadata_dict = self.digitalReadObj.get_rf_file_metadata(channelNameList[channelList[0]])
230 230
231 231 self.__sample_rate = metadata_dict['sample_rate'][0]
232 232 # self.__samples_per_file = metadata_dict['samples_per_file'][0]
233 233 self.__deltaHeigth = 1e6*0.15/self.__sample_rate
234 234
235 235 this_metadata_file = self.digitalReadObj.get_metadata(channelNameList[channelList[0]])
236 236
237 237 self.__frequency = None
238 238 try:
239 239 self.__frequency = this_metadata_file['center_frequencies'].value
240 240 except:
241 241 self.__frequency = this_metadata_file['fc'].value
242 242
243 243 if not self.__frequency:
244 244 raise ValueError("Center Frequency is not defined in metadata file")
245 245
246 246 try:
247 247 self.__timezone = this_metadata_file['timezone'].value
248 248 except:
249 249 self.__timezone = 0
250 250
251 251 self.__firstHeigth = 0
252 252
253 253 try:
254 254 codeType = this_metadata_file['codeType'].value
255 255 except:
256 256 codeType = 0
257 257
258 258 nCode = 1
259 259 nBaud = 1
260 260 code = numpy.ones((nCode, nBaud), dtype=numpy.int)
261 261
262 262 if codeType:
263 263 nCode = this_metadata_file['nCode'].value
264 264 nBaud = this_metadata_file['nBaud'].value
265 265 code = this_metadata_file['code'].value
266 266
267 267 if not ippKm:
268 268 try:
269 269 #seconds to km
270 270 ippKm = 1e6*0.15*this_metadata_file['ipp'].value
271 271 except:
272 272 ippKm = None
273 273
274 274 ####################################################
275 275 startUTCSecond = None
276 276 endUTCSecond = None
277 277
278 278 if startDate:
279 279 startDatetime = datetime.datetime.combine(startDate, startTime)
280 280 startUTCSecond = (startDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
281 281
282 282 if endDate:
283 283 endDatetime = datetime.datetime.combine(endDate, endTime)
284 284 endUTCSecond = (endDatetime-datetime.datetime(1970,1,1)).total_seconds() + self.__timezone
285 285
286 286 start_index, end_index = self.digitalReadObj.get_bounds(channelNameList[channelList[0]])
287 287
288 288 if not startUTCSecond:
289 289 startUTCSecond = start_index/self.__sample_rate
290 290
291 291 if start_index > startUTCSecond*self.__sample_rate:
292 292 startUTCSecond = start_index/self.__sample_rate
293 293
294 294 if not endUTCSecond:
295 295 endUTCSecond = end_index/self.__sample_rate
296 296
297 297 if end_index < endUTCSecond*self.__sample_rate:
298 298 endUTCSecond = end_index/self.__sample_rate
299 299
300 300 if not nSamples:
301 301 if not ippKm:
302 302 raise ValueError("[Reading] nSamples or ippKm should be defined")
303 303
304 304 nSamples = int(ippKm / (1e6*0.15/self.__sample_rate))
305 305
306 306 channelBoundList = []
307 307 channelNameListFiltered = []
308 308
309 309 for thisIndexChannel in channelList:
310 310 thisChannelName = channelNameList[thisIndexChannel]
311 311 start_index, end_index = self.digitalReadObj.get_bounds(thisChannelName)
312 312 channelBoundList.append((start_index, end_index))
313 313 channelNameListFiltered.append(thisChannelName)
314 314
315 315 self.profileIndex = 0
316 316
317 317 self.__delay = delay
318 318 self.__ippKm = ippKm
319 319 self.__codeType = codeType
320 320 self.__nCode = nCode
321 321 self.__nBaud = nBaud
322 322 self.__code = code
323 323
324 324 self.__datapath = path
325 325 self.__online = online
326 326 self.__channelList = channelList
327 327 self.__channelNameList = channelNameListFiltered
328 328 self.__channelBoundList = channelBoundList
329 329 self.__nSamples = nSamples
330 330 self.__samples_to_read = int(buffer_size*nSamples)
331 331 self.__nChannels = len(self.__channelList)
332 332
333 333 self.__startUTCSecond = startUTCSecond
334 334 self.__endUTCSecond = endUTCSecond
335 335
336 336 self.__timeInterval = 1.0 * self.__samples_to_read/self.__sample_rate #Time interval
337 337
338 338 if online:
339 339 # self.__thisUnixSample = int(endUTCSecond*self.__sample_rate - 4*self.__samples_to_read)
340 340 startUTCSecond = numpy.floor(endUTCSecond)
341 341
342 342 self.__thisUnixSample = int(startUTCSecond*self.__sample_rate) - self.__samples_to_read
343 343
344 344 self.__data_buffer = numpy.zeros((self.__nChannels, self.__samples_to_read), dtype = numpy.complex)
345 345
346 346 self.__setFileHeader()
347 347 self.isConfig = True
348 348
349 349 print("[Reading] USRP Data was found from %s to %s " %(
350 350 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
351 351 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
352 352 ))
353 353
354 354 print("[Reading] Starting process from %s to %s" %(datetime.datetime.utcfromtimestamp(startUTCSecond - self.__timezone),
355 355 datetime.datetime.utcfromtimestamp(endUTCSecond - self.__timezone)
356 356 ))
357 357
358 358 def __reload(self):
359 359
360 360 if not self.__online:
361 361 return
362 362
363 363 # print
364 364 # print "%s not in range [%s, %s]" %(
365 365 # datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
366 366 # datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
367 367 # datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
368 368 # )
369 369 print("[Reading] reloading metadata ...")
370 370
371 371 try:
372 372 self.digitalReadObj.reload(complete_update=True)
373 373 except:
374 374 self.digitalReadObj.reload()
375 375
376 376 start_index, end_index = self.digitalReadObj.get_bounds(self.__channelNameList[self.__channelList[0]])
377 377
378 378 if start_index > self.__startUTCSecond*self.__sample_rate:
379 379 self.__startUTCSecond = 1.0*start_index/self.__sample_rate
380 380
381 381 if end_index > self.__endUTCSecond*self.__sample_rate:
382 382 self.__endUTCSecond = 1.0*end_index/self.__sample_rate
383 383 print()
384 384 print("[Reading] New timerange found [%s, %s] " %(
385 385 datetime.datetime.utcfromtimestamp(self.__startUTCSecond - self.__timezone),
386 386 datetime.datetime.utcfromtimestamp(self.__endUTCSecond - self.__timezone)
387 387 ))
388 388
389 389 return True
390 390
391 391 return False
392 392
393 393 def __readNextBlock(self, seconds=30, volt_scale = 218776):
394 394 '''
395 395 '''
396 396
397 397 #Set the next data
398 398 self.__flagDiscontinuousBlock = False
399 399 self.__thisUnixSample += self.__samples_to_read
400 400
401 401 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
402 402 print("[Reading] There are no more data into selected time-range")
403 403
404 404 self.__reload()
405 405
406 406 if self.__thisUnixSample + 2*self.__samples_to_read > self.__endUTCSecond*self.__sample_rate:
407 407 self.__thisUnixSample -= self.__samples_to_read
408 408 return False
409 409
410 410 indexChannel = 0
411 411
412 412 dataOk = False
413 413
414 414 for thisChannelName in self.__channelNameList:
415 415
416 416 try:
417 417 result = self.digitalReadObj.read_vector_c81d(self.__thisUnixSample,
418 418 self.__samples_to_read,
419 419 thisChannelName)
420 420
421 421 except IOError as e:
422 422 #read next profile
423 423 self.__flagDiscontinuousBlock = True
424 424 print("[Reading] %s" %datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone), e)
425 425 break
426 426
427 427 if result.shape[0] != self.__samples_to_read:
428 428 self.__flagDiscontinuousBlock = True
429 429 print("[Reading] %s: Too few samples were found, just %d/%d samples" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
430 430 result.shape[0],
431 431 self.__samples_to_read))
432 432 break
433 433
434 434 self.__data_buffer[indexChannel,:] = result*volt_scale
435 435
436 436 indexChannel += 1
437 437
438 438 dataOk = True
439 439
440 440 self.__utctime = self.__thisUnixSample/self.__sample_rate
441 441
442 442 if not dataOk:
443 443 return False
444 444
445 445 print("[Reading] %s: %d samples <> %f sec" %(datetime.datetime.utcfromtimestamp(self.thisSecond - self.__timezone),
446 446 self.__samples_to_read,
447 447 self.__timeInterval))
448 448
449 449 self.__bufferIndex = 0
450 450
451 451 return True
452 452
453 453 def __isBufferEmpty(self):
454 454
455 455 if self.__bufferIndex <= self.__samples_to_read - self.__nSamples:
456 456 return False
457 457
458 458 return True
459 459
460 460 def getData(self, seconds=30, nTries=5):
461 461
462 462 '''
463 463 This method gets the data from files and put the data into the dataOut object
464 464
465 465 In addition, increase el the buffer counter in one.
466 466
467 467 Return:
468 468 data : retorna un perfil de voltages (alturas * canales) copiados desde el
469 469 buffer. Si no hay mas archivos a leer retorna None.
470 470
471 471 Affected:
472 472 self.dataOut
473 473 self.profileIndex
474 474 self.flagDiscontinuousBlock
475 475 self.flagIsNewBlock
476 476 '''
477 477
478 478 err_counter = 0
479 479 self.dataOut.flagNoData = True
480 480
481 481 if self.__isBufferEmpty():
482 482
483 483 self.__flagDiscontinuousBlock = False
484 484
485 485 while True:
486 486 if self.__readNextBlock():
487 487 break
488 488
489 489 if self.__thisUnixSample > self.__endUTCSecond*self.__sample_rate:
490 490 return False
491 491
492 492 if self.__flagDiscontinuousBlock:
493 493 print('[Reading] discontinuous block found ... continue with the next block')
494 494 continue
495 495
496 496 if not self.__online:
497 497 return False
498 498
499 499 err_counter += 1
500 500 if err_counter > nTries:
501 501 return False
502 502
503 503 print('[Reading] waiting %d seconds to read a new block' %seconds)
504 504 sleep(seconds)
505 505
506 506 self.dataOut.data = self.__data_buffer[:,self.__bufferIndex:self.__bufferIndex+self.__nSamples]
507 507 self.dataOut.utctime = (self.__thisUnixSample + self.__bufferIndex)/self.__sample_rate
508 508 self.dataOut.flagNoData = False
509 509 self.dataOut.flagDiscontinuousBlock = self.__flagDiscontinuousBlock
510 510 self.dataOut.profileIndex = self.profileIndex
511 511
512 512 self.__bufferIndex += self.__nSamples
513 513 self.profileIndex += 1
514 514
515 515 if self.profileIndex == self.dataOut.nProfiles:
516 516 self.profileIndex = 0
517 517
518 518 return True
519 519
520 520 def printInfo(self):
521 521 '''
522 522 '''
523 523 if self.__printInfo == False:
524 524 return
525 525
526 526 # self.systemHeaderObj.printInfo()
527 527 # self.radarControllerHeaderObj.printInfo()
528 528
529 529 self.__printInfo = False
530 530
531 531 def printNumberOfBlock(self):
532 532 '''
533 533 '''
534 534
535 535 print(self.profileIndex)
536 536
537 537 def run(self, **kwargs):
538 538 '''
539 539 This method will be called many times so here you should put all your code
540 540 '''
541 541
542 542 if not self.isConfig:
543 543 self.setup(**kwargs)
544 544
545 545 self.getData(seconds=self.__delay)
546 546
547 547 return
548 548
549 549 class USRPWriter(Operation):
550 550 '''
551 551 classdocs
552 552 '''
553 553
554 554 def __init__(self, **kwargs):
555 555 '''
556 556 Constructor
557 557 '''
558 558 Operation.__init__(self, **kwargs)
559 559 self.dataOut = None
560 560
561 561 def setup(self, dataIn, path, blocksPerFile, set=0, ext=None):
562 562 '''
563 563 In this method we should set all initial parameters.
564 564
565 565 Input:
566 566 dataIn : Input data will also be outputa data
567 567
568 568 '''
569 569 self.dataOut = dataIn
570 570
571 571
572 572
573 573
574 574
575 575 self.isConfig = True
576 576
577 577 return
578 578
579 579 def run(self, dataIn, **kwargs):
580 580 '''
581 581 This method will be called many times so here you should put all your code
582 582
583 583 Inputs:
584 584
585 585 dataIn : object with the data
586 586
587 587 '''
588 588
589 589 if not self.isConfig:
590 590 self.setup(dataIn, **kwargs)
591 591
592 592
593 593 if __name__ == '__main__':
594 594
595 595 readObj = USRPReader()
596 596
597 597 while True:
598 598 readObj.run(path='/Volumes/DATA/haystack/passive_radar/')
599 599 # readObj.printInfo()
600 600 readObj.printNumberOfBlock() No newline at end of file
@@ -1,350 +1,348
1 1 '''
2 2 Created on Jan 15, 2018
3 3
4 4 @author: Juan C. Espinoza
5 5 '''
6 6
7 7 import os
8 8 import sys
9 9 import time
10 10 import glob
11 11 import datetime
12 12 import tarfile
13 13
14 14 import numpy
15 15
16 16 from .utils import folder_in_range
17 17
18 18 from schainpy.model.io.jroIO_base import JRODataReader
19 19 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
20 20 from schainpy.model.data.jrodata import Parameters
21 21 from schainpy.utils import log
22 22
23 23 try:
24 24 from netCDF4 import Dataset
25 25 except:
26 log.warning(
27 'You should install "netCDF4" module if you want to read/write NCDF files'
28 )
26 pass
29 27
30 28 UT1970 = datetime.datetime(1970, 1, 1) - datetime.timedelta(seconds=time.timezone)
31 29
32 30
33 31 class PXReader(JRODataReader, ProcessingUnit):
34 32
35 33 def __init__(self, **kwargs):
36 34
37 35 ProcessingUnit.__init__(self, **kwargs)
38 36
39 37 self.dataOut = Parameters()
40 38 self.counter_records = 0
41 39 self.nrecords = None
42 40 self.flagNoMoreFiles = 0
43 41 self.isConfig = False
44 42 self.filename = None
45 43 self.intervals = set()
46 44 self.ext = ('.nc', '.tgz')
47 45 self.online_mode = False
48 46
49 47 def setup(self,
50 48 path=None,
51 49 startDate=None,
52 50 endDate=None,
53 51 format=None,
54 52 startTime=datetime.time(0, 0, 0),
55 53 endTime=datetime.time(23, 59, 59),
56 54 walk=False,
57 55 **kwargs):
58 56
59 57 self.path = path
60 58 self.startDate = startDate
61 59 self.endDate = endDate
62 60 self.startTime = startTime
63 61 self.endTime = endTime
64 62 self.datatime = datetime.datetime(1900,1,1)
65 63 self.walk = walk
66 64 self.nTries = kwargs.get('nTries', 10)
67 65 self.online = kwargs.get('online', False)
68 66 self.delay = kwargs.get('delay', 60)
69 67 self.ele = kwargs.get('ext', '')
70 68
71 69 if self.path is None:
72 70 raise ValueError('The path is not valid')
73 71
74 72 self.search_files(path, startDate, endDate, startTime, endTime, walk)
75 73 self.cursor = 0
76 74 self.counter_records = 0
77 75
78 76 if not self.files:
79 77 raise Warning('There is no files matching these date in the folder: {}. \n Check startDate and endDate'.format(path))
80 78
81 79 def search_files(self, path, startDate, endDate, startTime, endTime, walk):
82 80 '''
83 81 Searching for NCDF files in path
84 82 Creating a list of files to procces included in [startDate,endDate]
85 83
86 84 Input:
87 85 path - Path to find files
88 86 '''
89 87
90 88 log.log('Searching files {} in {} '.format(self.ext, path), 'PXReader')
91 89 if walk:
92 90 paths = [os.path.join(path, p) for p in os.listdir(path) if os.path.isdir(os.path.join(path, p))]
93 91 paths.sort()
94 92 else:
95 93 paths = [path]
96 94
97 95 fileList0 = []
98 96
99 97 for subpath in paths:
100 98 if not folder_in_range(subpath.split('/')[-1], startDate, endDate, '%Y%m%d'):
101 99 continue
102 100 fileList0 += [os.path.join(subpath, s) for s in glob.glob1(subpath, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
103 101
104 102 fileList0.sort()
105 103 if self.online:
106 104 fileList0 = fileList0[-1:]
107 105
108 106 self.files = {}
109 107
110 108 startDate = startDate - datetime.timedelta(1)
111 109 endDate = endDate + datetime.timedelta(1)
112 110
113 111 for fullname in fileList0:
114 112 thisFile = fullname.split('/')[-1]
115 113 year = thisFile[3:7]
116 114 if not year.isdigit():
117 115 continue
118 116
119 117 month = thisFile[7:9]
120 118 if not month.isdigit():
121 119 continue
122 120
123 121 day = thisFile[9:11]
124 122 if not day.isdigit():
125 123 continue
126 124
127 125 year, month, day = int(year), int(month), int(day)
128 126 dateFile = datetime.date(year, month, day)
129 127 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
130 128
131 129 if (startDate > dateFile) or (endDate < dateFile):
132 130 continue
133 131
134 132 dt = datetime.datetime.combine(dateFile, timeFile)
135 133 if dt not in self.files:
136 134 self.files[dt] = []
137 135 self.files[dt].append(fullname)
138 136
139 137 self.dates = list(self.files.keys())
140 138 self.dates.sort()
141 139
142 140 return
143 141
144 142 def search_files_online(self):
145 143 '''
146 144 Searching for NCDF files in online mode path
147 145 Creating a list of files to procces included in [startDate,endDate]
148 146
149 147 Input:
150 148 path - Path to find files
151 149 '''
152 150
153 151 self.files = {}
154 152
155 153 for n in range(self.nTries):
156 154
157 155 if self.walk:
158 156 paths = [os.path.join(self.path, p) for p in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, p))]
159 157 paths.sort()
160 158 path = paths[-1]
161 159 else:
162 160 path = self.path
163 161
164 162 new_files = [os.path.join(path, s) for s in glob.glob1(path, '*') if os.path.splitext(s)[-1] in self.ext and '{}'.format(self.ele) in s]
165 163 new_files.sort()
166 164
167 165 for fullname in new_files:
168 166 thisFile = fullname.split('/')[-1]
169 167 year = thisFile[3:7]
170 168 if not year.isdigit():
171 169 continue
172 170
173 171 month = thisFile[7:9]
174 172 if not month.isdigit():
175 173 continue
176 174
177 175 day = thisFile[9:11]
178 176 if not day.isdigit():
179 177 continue
180 178
181 179 year, month, day = int(year), int(month), int(day)
182 180 dateFile = datetime.date(year, month, day)
183 181 timeFile = datetime.time(int(thisFile[12:14]), int(thisFile[14:16]), int(thisFile[16:18]))
184 182
185 183 dt = datetime.datetime.combine(dateFile, timeFile)
186 184
187 185 if self.dt >= dt:
188 186 continue
189 187
190 188 if dt not in self.files:
191 189 self.dt = dt
192 190 self.files[dt] = []
193 191
194 192 self.files[dt].append(fullname)
195 193 break
196 194
197 195 if self.files:
198 196 break
199 197 else:
200 198 log.warning('Waiting {} seconds for the next file, try {} ...'.format(self.delay, n + 1), 'PXReader')
201 199 time.sleep(self.delay)
202 200
203 201 if not self.files:
204 202 return 0
205 203
206 204 self.dates = list(self.files.keys())
207 205 self.dates.sort()
208 206 self.cursor = 0
209 207
210 208 return 1
211 209
212 210 def parseFile(self):
213 211 '''
214 212 '''
215 213
216 214 header = {}
217 215
218 216 for attr in self.fp.ncattrs():
219 217 header[str(attr)] = getattr(self.fp, attr)
220 218
221 219 self.header.append(header)
222 220
223 221 self.data[header['TypeName']] = numpy.array(self.fp.variables[header['TypeName']])
224 222
225 223 def setNextFile(self):
226 224 '''
227 225 Open next files for the current datetime
228 226 '''
229 227
230 228 cursor = self.cursor
231 229 if not self.online_mode:
232 230 if cursor == len(self.dates):
233 231 if self.online:
234 232 cursor = 0
235 233 self.dt = self.dates[cursor]
236 234 self.online_mode = True
237 235 if not self.search_files_online():
238 236 log.success('No more files', 'PXReader')
239 237 return 0
240 238 else:
241 239 log.success('No more files', 'PXReader')
242 240 self.flagNoMoreFiles = 1
243 241 return 0
244 242 else:
245 243 if not self.search_files_online():
246 244 return 0
247 245 cursor = self.cursor
248 246
249 247 self.data = {}
250 248 self.header = []
251 249
252 250 for fullname in self.files[self.dates[cursor]]:
253 251
254 252 log.log('Opening: {}'.format(fullname), 'PXReader')
255 253
256 254 if os.path.splitext(fullname)[-1] == '.tgz':
257 255 tar = tarfile.open(fullname, 'r:gz')
258 256 tar.extractall('/tmp')
259 257 files = [os.path.join('/tmp', member.name) for member in tar.getmembers()]
260 258 else:
261 259 files = [fullname]
262 260
263 261 for filename in files:
264 262 if self.filename is not None:
265 263 self.fp.close()
266 264
267 265 self.filename = filename
268 266 self.filedate = self.dates[cursor]
269 267 self.fp = Dataset(self.filename, 'r')
270 268 self.parseFile()
271 269
272 270 self.counter_records += 1
273 271 self.cursor += 1
274 272 return 1
275 273
276 274 def readNextFile(self):
277 275
278 276 while True:
279 277 self.flagDiscontinuousBlock = 0
280 278 if not self.setNextFile():
281 279 return 0
282 280
283 281 self.datatime = datetime.datetime.utcfromtimestamp(self.header[0]['Time'])
284 282
285 283 if self.online:
286 284 break
287 285
288 286 if (self.datatime < datetime.datetime.combine(self.startDate, self.startTime)) or \
289 287 (self.datatime > datetime.datetime.combine(self.endDate, self.endTime)):
290 288 log.warning(
291 289 'Reading Record No. {}/{} -> {} [Skipping]'.format(
292 290 self.counter_records,
293 291 self.nrecords,
294 292 self.datatime.ctime()),
295 293 'PXReader')
296 294 continue
297 295 break
298 296
299 297 log.log(
300 298 'Reading Record No. {}/{} -> {}'.format(
301 299 self.counter_records,
302 300 self.nrecords,
303 301 self.datatime.ctime()),
304 302 'PXReader')
305 303
306 304 return 1
307 305
308 306
309 307 def set_output(self):
310 308 '''
311 309 Storing data from buffer to dataOut object
312 310 '''
313 311
314 312 self.data['Elevation'] = numpy.array(self.fp.variables['Elevation'])
315 313 self.data['Azimuth'] = numpy.array(self.fp.variables['Azimuth'])
316 314 self.dataOut.range = numpy.array(self.fp.variables['GateWidth'])
317 315 self.dataOut.data = self.data
318 316 self.dataOut.units = [h['Unit-value'] for h in self.header]
319 317 self.dataOut.parameters = [h['TypeName'] for h in self.header]
320 318 self.dataOut.missing = self.header[0]['MissingData']
321 319 self.dataOut.max_range = self.header[0]['MaximumRange-value']
322 320 self.dataOut.elevation = self.header[0]['Elevation']
323 321 self.dataOut.azimuth = self.header[0]['Azimuth']
324 322 self.dataOut.latitude = self.header[0]['Latitude']
325 323 self.dataOut.longitude = self.header[0]['Longitude']
326 324 self.dataOut.utctime = self.header[0]['Time']
327 325 self.dataOut.utctimeInit = self.dataOut.utctime
328 326 self.dataOut.useLocalTime = True
329 327 self.dataOut.flagNoData = False
330 328 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
331 329
332 330 log.log('Parameters found: {}'.format(','.join(self.dataOut.parameters)),
333 331 'PXReader')
334 332
335 333 def getData(self):
336 334 '''
337 335 Storing data from databuffer to dataOut object
338 336 '''
339 337 if self.flagNoMoreFiles:
340 338 self.dataOut.flagNoData = True
341 339 log.error('No file left to process', 'PXReader')
342 340 return 0
343 341
344 342 if not self.readNextFile():
345 343 self.dataOut.flagNoData = True
346 344 return 0
347 345
348 346 self.set_output()
349 347
350 348 return 1
@@ -1,1008 +1,1008
1 1 '''
2 2 @author: Daniel Suarez
3 3 '''
4 4 import os
5 5 import glob
6 6 import ftplib
7 7
8 8 try:
9 9 import paramiko
10 10 import scp
11 11 except:
12 print("You should install paramiko and scp libraries \nif you want to use SSH protocol to upload files to the server")
12 pass
13 13
14 14 import time
15 15
16 16 import threading
17 17 Thread = threading.Thread
18 18
19 19 # try:
20 20 # from gevent import sleep
21 21 # except:
22 22 from time import sleep
23 23
24 24 from schainpy.model.proc.jroproc_base import ProcessingUnit, Operation
25 25
26 26 class Remote(Thread):
27 27 """
28 28 Remote is a parent class used to define the behaviour of FTP and SSH class. These clases are
29 29 used to upload or download files remotely.
30 30
31 31 Non-standard Python modules used:
32 32 None
33 33
34 34 Written by:
35 35 "Miguel Urco":mailto:miguel.urco@jro.igp.gob.pe Jun. 03, 2015
36 36 """
37 37
38 38 server = None
39 39 username = None
40 40 password = None
41 41 remotefolder = None
42 42
43 43 period = 60
44 44 fileList = []
45 45 bussy = False
46 46
47 47 def __init__(self, server, username, password, remotefolder, period=60):
48 48
49 49 Thread.__init__(self)
50 50
51 51 self.setDaemon(True)
52 52
53 53 self.status = 0
54 54
55 55 self.__server = server
56 56 self.__username = username
57 57 self.__password = password
58 58 self.__remotefolder = remotefolder
59 59
60 60 self.period = period
61 61
62 62 self.fileList = []
63 63 self.bussy = False
64 64
65 65 self.stopFlag = False
66 66
67 67 print("[Remote Server] Opening server: %s" %self.__server)
68 68 if self.open(self.__server, self.__username, self.__password, self.__remotefolder):
69 69 print("[Remote Server] %s server was opened successfully" %self.__server)
70 70
71 71 self.close()
72 72
73 73 self.mutex = threading.Lock()
74 74
75 75 def stop(self):
76 76
77 77 self.stopFlag = True
78 78 self.join(10)
79 79
80 80 def open(self):
81 81 """
82 82 Connect to server and create a connection class (FTP or SSH) to remote server.
83 83 """
84 84 raise NotImplementedError("Implement this method in child class")
85 85
86 86 def close(self):
87 87 """
88 88 Close connection to server
89 89 """
90 90 raise NotImplementedError("Implement this method in child class")
91 91
92 92 def mkdir(self, remotefolder):
93 93 """
94 94 Create a folder remotely
95 95 """
96 96 raise NotImplementedError("Implement this method in child class")
97 97
98 98 def cd(self, remotefolder):
99 99 """
100 100 Change working directory in remote server
101 101 """
102 102 raise NotImplementedError("Implement this method in child class")
103 103
104 104 def download(self, filename, localfolder=None):
105 105 """
106 106 Download a file from server to local host
107 107 """
108 108 raise NotImplementedError("Implement this method in child class")
109 109
110 110 def sendFile(self, fullfilename):
111 111 """
112 112 sendFile method is used to upload a local file to the current directory in remote server
113 113
114 114 Inputs:
115 115 fullfilename - full path name of local file to store in remote directory
116 116
117 117 Returns:
118 118 0 in error case else 1
119 119 """
120 120 raise NotImplementedError("Implement this method in child class")
121 121
122 122 def upload(self, fullfilename, remotefolder=None):
123 123 """
124 124 upload method is used to upload a local file to remote directory. This method changes
125 125 working directory before sending a file.
126 126
127 127 Inputs:
128 128 fullfilename - full path name of local file to store in remote directory
129 129
130 130 remotefolder - remote directory
131 131
132 132 Returns:
133 133 0 in error case else 1
134 134 """
135 135 print("[Remote Server] Uploading %s to %s:%s" %(fullfilename, self.server, self.remotefolder))
136 136
137 137 if not self.status:
138 138 return 0
139 139
140 140 if remotefolder == None:
141 141 remotefolder = self.remotefolder
142 142
143 143 if not self.cd(remotefolder):
144 144 return 0
145 145
146 146 if not self.sendFile(fullfilename):
147 147 print("[Remote Server] Error uploading file %s" %fullfilename)
148 148 return 0
149 149
150 150 print("[Remote Server] upload finished successfully")
151 151
152 152 return 1
153 153
154 154 def delete(self, filename):
155 155 """
156 156 Remove a file from remote server
157 157 """
158 158 pass
159 159
160 160 def updateFileList(self, fileList):
161 161 """
162 162 Remove a file from remote server
163 163 """
164 164
165 165 if fileList == self.fileList:
166 166 return 0
167 167
168 168 self.mutex.acquire()
169 169 # init = time.time()
170 170 #
171 171 # while(self.bussy):
172 172 # sleep(0.1)
173 173 # if time.time() - init > 2*self.period:
174 174 # return 0
175 175
176 176 self.fileList = fileList
177 177 self.mutex.release()
178 178 return 1
179 179
180 180 def run(self):
181 181
182 182 if not self.status:
183 183 print("Finishing FTP service")
184 184 return
185 185
186 186 if not self.cd(self.remotefolder):
187 187 raise ValueError("Could not access to the new remote directory: %s" %self.remotefolder)
188 188
189 189 while True:
190 190
191 191 for i in range(self.period):
192 192 if self.stopFlag:
193 193 break
194 194 sleep(1)
195 195
196 196 if self.stopFlag:
197 197 break
198 198
199 199 # self.bussy = True
200 200 self.mutex.acquire()
201 201
202 202 print("[Remote Server] Opening %s" %self.__server)
203 203 if not self.open(self.__server, self.__username, self.__password, self.__remotefolder):
204 204 self.mutex.release()
205 205 continue
206 206
207 207 for thisFile in self.fileList:
208 208 self.upload(thisFile, self.remotefolder)
209 209
210 210 print("[Remote Server] Closing %s" %self.__server)
211 211 self.close()
212 212
213 213 self.mutex.release()
214 214 # self.bussy = False
215 215
216 216 print("[Remote Server] Thread stopped successfully")
217 217
218 218 class FTPClient(Remote):
219 219
220 220 __ftpClientObj = None
221 221
222 222 def __init__(self, server, username, password, remotefolder, period=60):
223 223 """
224 224 """
225 225 Remote.__init__(self, server, username, password, remotefolder, period)
226 226
227 227 def open(self, server, username, password, remotefolder):
228 228
229 229 """
230 230 This method is used to set FTP parameters and establish a connection to remote server
231 231
232 232 Inputs:
233 233 server - remote server IP Address
234 234
235 235 username - remote server Username
236 236
237 237 password - remote server password
238 238
239 239 remotefolder - remote server current working directory
240 240
241 241 Return:
242 242 Boolean - Returns 1 if a connection has been established, 0 otherwise
243 243
244 244 Affects:
245 245 self.status - in case of error or fail connection this parameter is set to 0 else 1
246 246
247 247 """
248 248
249 249 if server == None:
250 250 raise ValueError("FTP server should be defined")
251 251
252 252 if username == None:
253 253 raise ValueError("FTP username should be defined")
254 254
255 255 if password == None:
256 256 raise ValueError("FTP password should be defined")
257 257
258 258 if remotefolder == None:
259 259 raise ValueError("FTP remote folder should be defined")
260 260
261 261 try:
262 262 ftpClientObj = ftplib.FTP(server)
263 263 except ftplib.all_errors as e:
264 264 print("[FTP Server]: FTP server connection fail: %s" %server)
265 265 print("[FTP Server]:", e)
266 266 self.status = 0
267 267 return 0
268 268
269 269 try:
270 270 ftpClientObj.login(username, password)
271 271 except ftplib.all_errors:
272 272 print("[FTP Server]: FTP username or password are incorrect")
273 273 self.status = 0
274 274 return 0
275 275
276 276 if remotefolder == None:
277 277 remotefolder = ftpClientObj.pwd()
278 278 else:
279 279 try:
280 280 ftpClientObj.cwd(remotefolder)
281 281 except ftplib.all_errors:
282 282 print("[FTP Server]: FTP remote folder is invalid: %s" %remotefolder)
283 283 remotefolder = ftpClientObj.pwd()
284 284
285 285 self.server = server
286 286 self.username = username
287 287 self.password = password
288 288 self.remotefolder = remotefolder
289 289 self.__ftpClientObj = ftpClientObj
290 290 self.status = 1
291 291
292 292 return 1
293 293
294 294 def close(self):
295 295 """
296 296 Close connection to remote server
297 297 """
298 298 if not self.status:
299 299 return 0
300 300
301 301 self.__ftpClientObj.close()
302 302
303 303 def mkdir(self, remotefolder):
304 304 """
305 305 mkdir is used to make a new directory in remote server
306 306
307 307 Input:
308 308 remotefolder - directory name
309 309
310 310 Return:
311 311 0 in error case else 1
312 312 """
313 313 if not self.status:
314 314 return 0
315 315
316 316 try:
317 317 self.__ftpClientObj.mkd(dirname)
318 318 except ftplib.all_errors:
319 319 print("[FTP Server]: Error creating remote folder: %s" %remotefolder)
320 320 return 0
321 321
322 322 return 1
323 323
324 324 def cd(self, remotefolder):
325 325 """
326 326 cd is used to change remote working directory on server
327 327
328 328 Input:
329 329 remotefolder - current working directory
330 330
331 331 Affects:
332 332 self.remotefolder
333 333
334 334 Return:
335 335 0 in case of error else 1
336 336 """
337 337 if not self.status:
338 338 return 0
339 339
340 340 if remotefolder == self.remotefolder:
341 341 return 1
342 342
343 343 try:
344 344 self.__ftpClientObj.cwd(remotefolder)
345 345 except ftplib.all_errors:
346 346 print('[FTP Server]: Error changing to %s' %remotefolder)
347 347 print('[FTP Server]: Trying to create remote folder')
348 348
349 349 if not self.mkdir(remotefolder):
350 350 print('[FTP Server]: Remote folder could not be created')
351 351 return 0
352 352
353 353 try:
354 354 self.__ftpClientObj.cwd(remotefolder)
355 355 except ftplib.all_errors:
356 356 return 0
357 357
358 358 self.remotefolder = remotefolder
359 359
360 360 return 1
361 361
362 362 def sendFile(self, fullfilename):
363 363
364 364 if not self.status:
365 365 return 0
366 366
367 367 fp = open(fullfilename, 'rb')
368 368
369 369 filename = os.path.basename(fullfilename)
370 370
371 371 command = "STOR %s" %filename
372 372
373 373 try:
374 374 self.__ftpClientObj.storbinary(command, fp)
375 375 except ftplib.all_errors as e:
376 376 print("[FTP Server]:", e)
377 377 return 0
378 378
379 379 try:
380 380 self.__ftpClientObj.sendcmd('SITE CHMOD 755 ' + filename)
381 381 except ftplib.all_errors as e:
382 382 print("[FTP Server]:", e)
383 383
384 384 fp.close()
385 385
386 386 return 1
387 387
388 388 class SSHClient(Remote):
389 389
390 390 __sshClientObj = None
391 391 __scpClientObj = None
392 392
393 393 def __init__(self, server, username, password, remotefolder, period=60):
394 394 """
395 395 """
396 396 Remote.__init__(self, server, username, password, remotefolder, period)
397 397
398 398 def open(self, server, username, password, remotefolder, port=22):
399 399
400 400 """
401 401 This method is used to set SSH parameters and establish a connection to a remote server
402 402
403 403 Inputs:
404 404 server - remote server IP Address
405 405
406 406 username - remote server Username
407 407
408 408 password - remote server password
409 409
410 410 remotefolder - remote server current working directory
411 411
412 412 Return: void
413 413
414 414 Affects:
415 415 self.status - in case of error or fail connection this parameter is set to 0 else 1
416 416
417 417 """
418 418 import socket
419 419
420 420 if server == None:
421 421 raise ValueError("SSH server should be defined")
422 422
423 423 if username == None:
424 424 raise ValueError("SSH username should be defined")
425 425
426 426 if password == None:
427 427 raise ValueError("SSH password should be defined")
428 428
429 429 if remotefolder == None:
430 430 raise ValueError("SSH remote folder should be defined")
431 431
432 432 sshClientObj = paramiko.SSHClient()
433 433
434 434 sshClientObj.load_system_host_keys()
435 435 sshClientObj.set_missing_host_key_policy(paramiko.WarningPolicy())
436 436
437 437 self.status = 0
438 438 try:
439 439 sshClientObj.connect(server, username=username, password=password, port=port)
440 440 except paramiko.AuthenticationException as e:
441 441 # print "SSH username or password are incorrect: %s"
442 442 print("[SSH Server]:", e)
443 443 return 0
444 444 except SSHException as e:
445 445 print("[SSH Server]:", e)
446 446 return 0
447 447 except socket.error:
448 448 self.status = 0
449 449 print("[SSH Server]:", e)
450 450 return 0
451 451
452 452 self.status = 1
453 453 scpClientObj = scp.SCPClient(sshClientObj.get_transport(), socket_timeout=30)
454 454
455 455 if remotefolder == None:
456 456 remotefolder = self.pwd()
457 457
458 458 self.server = server
459 459 self.username = username
460 460 self.password = password
461 461 self.__sshClientObj = sshClientObj
462 462 self.__scpClientObj = scpClientObj
463 463 self.status = 1
464 464
465 465 if not self.cd(remotefolder):
466 466 raise ValueError("[SSH Server]: Could not access to remote folder: %s" %remotefolder)
467 467 return 0
468 468
469 469 self.remotefolder = remotefolder
470 470
471 471 return 1
472 472
473 473 def close(self):
474 474 """
475 475 Close connection to remote server
476 476 """
477 477 if not self.status:
478 478 return 0
479 479
480 480 self.__scpClientObj.close()
481 481 self.__sshClientObj.close()
482 482
483 483 def __execute(self, command):
484 484 """
485 485 __execute a command on remote server
486 486
487 487 Input:
488 488 command - Exmaple 'ls -l'
489 489
490 490 Return:
491 491 0 in error case else 1
492 492 """
493 493 if not self.status:
494 494 return 0
495 495
496 496 stdin, stdout, stderr = self.__sshClientObj.exec_command(command)
497 497
498 498 result = stderr.readlines()
499 499 if len(result) > 1:
500 500 return 0
501 501
502 502 result = stdout.readlines()
503 503 if len(result) > 1:
504 504 return result[0][:-1]
505 505
506 506 return 1
507 507
508 508 def mkdir(self, remotefolder):
509 509 """
510 510 mkdir is used to make a new directory in remote server
511 511
512 512 Input:
513 513 remotefolder - directory name
514 514
515 515 Return:
516 516 0 in error case else 1
517 517 """
518 518
519 519 command = 'mkdir %s' %remotefolder
520 520
521 521 return self.__execute(command)
522 522
523 523 def pwd(self):
524 524
525 525 command = 'pwd'
526 526
527 527 return self.__execute(command)
528 528
529 529 def cd(self, remotefolder):
530 530 """
531 531 cd is used to change remote working directory on server
532 532
533 533 Input:
534 534 remotefolder - current working directory
535 535
536 536 Affects:
537 537 self.remotefolder
538 538
539 539 Return:
540 540 0 in case of error else 1
541 541 """
542 542 if not self.status:
543 543 return 0
544 544
545 545 if remotefolder == self.remotefolder:
546 546 return 1
547 547
548 548 chk_command = "cd %s; pwd" %remotefolder
549 549 mkdir_command = "mkdir %s" %remotefolder
550 550
551 551 if not self.__execute(chk_command):
552 552 if not self.__execute(mkdir_command):
553 553 self.remotefolder = None
554 554 return 0
555 555
556 556 self.remotefolder = remotefolder
557 557
558 558 return 1
559 559
560 560 def sendFile(self, fullfilename):
561 561
562 562 if not self.status:
563 563 return 0
564 564
565 565 try:
566 566 self.__scpClientObj.put(fullfilename, remote_path=self.remotefolder)
567 567 except scp.ScpError as e:
568 568 print("[SSH Server]", str(e))
569 569 return 0
570 570
571 571 remotefile = os.path.join(self.remotefolder, os.path.split(fullfilename)[-1])
572 572 command = 'chmod 775 %s' %remotefile
573 573
574 574 return self.__execute(command)
575 575
576 576 class SendToServer(ProcessingUnit):
577 577
578 578 def __init__(self, **kwargs):
579 579
580 580 ProcessingUnit.__init__(self, **kwargs)
581 581
582 582 self.isConfig = False
583 583 self.clientObj = None
584 584
585 585 def setup(self, server, username, password, remotefolder, localfolder, ext='.png', period=60, protocol='ftp', **kwargs):
586 586
587 587 self.clientObj = None
588 588 self.localfolder = localfolder
589 589 self.ext = ext
590 590 self.period = period
591 591
592 592 if str.lower(protocol) == 'ftp':
593 593 self.clientObj = FTPClient(server, username, password, remotefolder, period)
594 594
595 595 if str.lower(protocol) == 'ssh':
596 596 self.clientObj = SSHClient(server, username, password, remotefolder, period)
597 597
598 598 if not self.clientObj:
599 599 raise ValueError("%s has been chosen as remote access protocol but it is not valid" %protocol)
600 600
601 601 self.clientObj.start()
602 602
603 603 def findFiles(self):
604 604
605 605 if not type(self.localfolder) == list:
606 606 folderList = [self.localfolder]
607 607 else:
608 608 folderList = self.localfolder
609 609
610 610 #Remove duplicate items
611 611 folderList = list(set(folderList))
612 612
613 613 fullfilenameList = []
614 614
615 615 for thisFolder in folderList:
616 616
617 617 print("[Remote Server]: Searching files on %s" %thisFolder)
618 618
619 619 filenameList = glob.glob1(thisFolder, '*%s' %self.ext)
620 620
621 621 if len(filenameList) < 1:
622 622
623 623 continue
624 624
625 625 for thisFile in filenameList:
626 626 fullfilename = os.path.join(thisFolder, thisFile)
627 627
628 628 if fullfilename in fullfilenameList:
629 629 continue
630 630
631 631 #Only files modified in the last 30 minutes are considered
632 632 if os.path.getmtime(fullfilename) < time.time() - 30*60:
633 633 continue
634 634
635 635 fullfilenameList.append(fullfilename)
636 636
637 637 return fullfilenameList
638 638
639 639 def run(self, **kwargs):
640 640 if not self.isConfig:
641 641 self.init = time.time()
642 642 self.setup(**kwargs)
643 643 self.isConfig = True
644 644
645 645 if not self.clientObj.is_alive():
646 646 print("[Remote Server]: Restarting connection ")
647 647 self.setup(**kwargs)
648 648
649 649 if time.time() - self.init >= self.period:
650 650 fullfilenameList = self.findFiles()
651 651
652 652 if self.clientObj.updateFileList(fullfilenameList):
653 653 print("[Remote Server]: Sending the next files ", str(fullfilenameList))
654 654 self.init = time.time()
655 655
656 656 def close(self):
657 657 print("[Remote Server] Stopping thread")
658 658 self.clientObj.stop()
659 659
660 660
661 661 class FTP(object):
662 662 """
663 663 Ftp is a public class used to define custom File Transfer Protocol from "ftplib" python module
664 664
665 665 Non-standard Python modules used: None
666 666
667 667 Written by "Daniel Suarez":mailto:daniel.suarez@jro.igp.gob.pe Oct. 26, 2010
668 668 """
669 669
670 670 def __init__(self,server = None, username=None, password=None, remotefolder=None):
671 671 """
672 672 This method is used to setting parameters for FTP and establishing connection to remote server
673 673
674 674 Inputs:
675 675 server - remote server IP Address
676 676
677 677 username - remote server Username
678 678
679 679 password - remote server password
680 680
681 681 remotefolder - remote server current working directory
682 682
683 683 Return: void
684 684
685 685 Affects:
686 686 self.status - in Error Case or Connection Failed this parameter is set to 1 else 0
687 687
688 688 self.folderList - sub-folder list of remote folder
689 689
690 690 self.fileList - file list of remote folder
691 691
692 692
693 693 """
694 694
695 695 if ((server == None) and (username==None) and (password==None) and (remotefolder==None)):
696 696 server, username, password, remotefolder = self.parmsByDefault()
697 697
698 698 self.server = server
699 699 self.username = username
700 700 self.password = password
701 701 self.remotefolder = remotefolder
702 702 self.file = None
703 703 self.ftp = None
704 704 self.status = 0
705 705
706 706 try:
707 707 self.ftp = ftplib.FTP(self.server)
708 708 self.ftp.login(self.username,self.password)
709 709 self.ftp.cwd(self.remotefolder)
710 710 # print 'Connect to FTP Server: Successfully'
711 711
712 712 except ftplib.all_errors:
713 713 print('Error FTP Service')
714 714 self.status = 1
715 715 return
716 716
717 717
718 718
719 719 self.dirList = []
720 720
721 721 try:
722 722 self.dirList = self.ftp.nlst()
723 723
724 724 except ftplib.error_perm as resp:
725 725 if str(resp) == "550 No files found":
726 726 print("no files in this directory")
727 727 self.status = 1
728 728 return
729 729
730 730 except ftplib.all_errors:
731 731 print('Error Displaying Dir-Files')
732 732 self.status = 1
733 733 return
734 734
735 735 self.fileList = []
736 736 self.folderList = []
737 737 #only for test
738 738 for f in self.dirList:
739 739 name, ext = os.path.splitext(f)
740 740 if ext != '':
741 741 self.fileList.append(f)
742 742 # print 'filename: %s - size: %d'%(f,self.ftp.size(f))
743 743
744 744 def parmsByDefault(self):
745 745 server = 'jro-app.igp.gob.pe'
746 746 username = 'wmaster'
747 747 password = 'mst2010vhf'
748 748 remotefolder = '/home/wmaster/graficos'
749 749
750 750 return server, username, password, remotefolder
751 751
752 752
753 753 def mkd(self,dirname):
754 754 """
755 755 mkd is used to make directory in remote server
756 756
757 757 Input:
758 758 dirname - directory name
759 759
760 760 Return:
761 761 1 in error case else 0
762 762 """
763 763 try:
764 764 self.ftp.mkd(dirname)
765 765 except:
766 766 print('Error creating remote folder:%s'%dirname)
767 767 return 1
768 768
769 769 return 0
770 770
771 771
772 772 def delete(self,filename):
773 773 """
774 774 delete is used to delete file in current working directory of remote server
775 775
776 776 Input:
777 777 filename - filename to delete in remote folder
778 778
779 779 Return:
780 780 1 in error case else 0
781 781 """
782 782
783 783 try:
784 784 self.ftp.delete(filename)
785 785 except:
786 786 print('Error deleting remote file:%s'%filename)
787 787 return 1
788 788
789 789 return 0
790 790
791 791 def download(self,filename,localfolder):
792 792 """
793 793 download is used to downloading file from remote folder into local folder
794 794
795 795 Inputs:
796 796 filename - filename to donwload
797 797
798 798 localfolder - directory local to store filename
799 799
800 800 Returns:
801 801 self.status - 1 in error case else 0
802 802 """
803 803
804 804 self.status = 0
805 805
806 806
807 807 if not(filename in self.fileList):
808 808 print('filename:%s not exists'%filename)
809 809 self.status = 1
810 810 return self.status
811 811
812 812 newfilename = os.path.join(localfolder,filename)
813 813
814 814 self.file = open(newfilename, 'wb')
815 815
816 816 try:
817 817 print('Download: ' + filename)
818 818 self.ftp.retrbinary('RETR ' + filename, self.__handleDownload)
819 819 print('Download Complete')
820 820 except ftplib.all_errors:
821 821 print('Error Downloading ' + filename)
822 822 self.status = 1
823 823 return self.status
824 824
825 825 self.file.close()
826 826
827 827 return self.status
828 828
829 829
830 830 def __handleDownload(self,block):
831 831 """
832 832 __handleDownload is used to handle writing file
833 833 """
834 834 self.file.write(block)
835 835
836 836
837 837 def upload(self,filename,remotefolder=None):
838 838 """
839 839 upload is used to uploading local file to remote directory
840 840
841 841 Inputs:
842 842 filename - full path name of local file to store in remote directory
843 843
844 844 remotefolder - remote directory
845 845
846 846 Returns:
847 847 self.status - 1 in error case else 0
848 848 """
849 849
850 850 if remotefolder == None:
851 851 remotefolder = self.remotefolder
852 852
853 853 self.status = 0
854 854
855 855 try:
856 856 self.ftp.cwd(remotefolder)
857 857
858 858 self.file = open(filename, 'rb')
859 859
860 860 (head, tail) = os.path.split(filename)
861 861
862 862 command = "STOR " + tail
863 863
864 864 print('Uploading: ' + tail)
865 865 self.ftp.storbinary(command, self.file)
866 866 print('Upload Completed')
867 867
868 868 except ftplib.all_errors:
869 869 print('Error Uploading ' + tail)
870 870 self.status = 1
871 871 return self.status
872 872
873 873 self.file.close()
874 874
875 875 #back to initial directory in __init__()
876 876 self.ftp.cwd(self.remotefolder)
877 877
878 878 return self.status
879 879
880 880
881 881 def dir(self,remotefolder):
882 882 """
883 883 dir is used to change working directory of remote server and get folder and file list
884 884
885 885 Input:
886 886 remotefolder - current working directory
887 887
888 888 Affects:
889 889 self.fileList - file list of working directory
890 890
891 891 Return:
892 892 infoList - list with filenames and size of file in bytes
893 893
894 894 self.folderList - folder list
895 895 """
896 896
897 897 self.remotefolder = remotefolder
898 898 print('Change to ' + self.remotefolder)
899 899 try:
900 900 self.ftp.cwd(remotefolder)
901 901 except ftplib.all_errors:
902 902 print('Error Change to ' + self.remotefolder)
903 903 infoList = None
904 904 self.folderList = None
905 905 return infoList,self.folderList
906 906
907 907 self.dirList = []
908 908
909 909 try:
910 910 self.dirList = self.ftp.nlst()
911 911
912 912 except ftplib.error_perm as resp:
913 913 if str(resp) == "550 No files found":
914 914 print("no files in this directory")
915 915 infoList = None
916 916 self.folderList = None
917 917 return infoList,self.folderList
918 918 except ftplib.all_errors:
919 919 print('Error Displaying Dir-Files')
920 920 infoList = None
921 921 self.folderList = None
922 922 return infoList,self.folderList
923 923
924 924 infoList = []
925 925 self.fileList = []
926 926 self.folderList = []
927 927 for f in self.dirList:
928 928 name,ext = os.path.splitext(f)
929 929 if ext != '':
930 930 self.fileList.append(f)
931 931 value = (f,self.ftp.size(f))
932 932 infoList.append(value)
933 933
934 934 if ext == '':
935 935 self.folderList.append(f)
936 936
937 937 return infoList,self.folderList
938 938
939 939
940 940 def close(self):
941 941 """
942 942 close is used to close and end FTP connection
943 943
944 944 Inputs: None
945 945
946 946 Return: void
947 947
948 948 """
949 949 self.ftp.close()
950 950
951 951 class SendByFTP(Operation):
952 952
953 953 def __init__(self, **kwargs):
954 954 Operation.__init__(self, **kwargs)
955 955 self.status = 1
956 956 self.counter = 0
957 957
958 958 def error_print(self, ValueError):
959 959
960 960 print(ValueError, 'Error FTP')
961 961 print("don't worry the program is running...")
962 962
963 963 def worker_ftp(self, server, username, password, remotefolder, filenameList):
964 964
965 965 self.ftpClientObj = FTP(server, username, password, remotefolder)
966 966 for filename in filenameList:
967 967 self.ftpClientObj.upload(filename)
968 968 self.ftpClientObj.close()
969 969
970 970 def ftp_thread(self, server, username, password, remotefolder):
971 971 if not(self.status):
972 972 return
973 973
974 974 import multiprocessing
975 975
976 976 p = multiprocessing.Process(target=self.worker_ftp, args=(server, username, password, remotefolder, self.filenameList,))
977 977 p.start()
978 978
979 979 p.join(3)
980 980
981 981 if p.is_alive():
982 982 p.terminate()
983 983 p.join()
984 984 print('killing ftp process...')
985 985 self.status = 0
986 986 return
987 987
988 988 self.status = 1
989 989 return
990 990
991 991 def filterByExt(self, ext, localfolder):
992 992 fnameList = glob.glob1(localfolder,ext)
993 993 self.filenameList = [os.path.join(localfolder,x) for x in fnameList]
994 994
995 995 if len(self.filenameList) == 0:
996 996 self.status = 0
997 997
998 998 def run(self, dataOut, ext, localfolder, remotefolder, server, username, password, period=1):
999 999
1000 1000 self.counter += 1
1001 1001 if self.counter >= period:
1002 1002 self.filterByExt(ext, localfolder)
1003 1003
1004 1004 self.ftp_thread(server, username, password, remotefolder)
1005 1005
1006 1006 self.counter = 0
1007 1007
1008 1008 self.status = 1 No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now