##// END OF EJS Templates
add multiSchain (@jchavez)
jespinoza -
r892:30b3788062ca
parent child
Show More
@@ -0,0 +1,82
1 import argparse
2
3 from schainpy.controller import Project, multiSchain
4
5 desc = "HF_EXAMPLE"
6
7 def fiber(cursor, skip, q, dt):
8
9 controllerObj = Project()
10
11 controllerObj.setup(id='191', name='test01', description=desc)
12
13 readUnitConfObj = controllerObj.addReadUnit(datatype='SpectraReader',
14 path='/data/workspace/data/julia/',
15 startDate=dt,
16 endDate=dt,
17 startTime="00:00:00",
18 endTime="23:59:59",
19 online=0,
20 #set=1426485881,
21 delay=10,
22 walk=1,
23 queue=q,
24 cursor=cursor,
25 skip=skip,
26 #timezone=-5*3600
27 )
28
29 # #opObj11 = readUnitConfObj.addOperation(name='printNumberOfBlock')
30 #
31 procUnitConfObj2 = controllerObj.addProcUnit(datatype='Spectra', inputId=readUnitConfObj.getId())
32 # opObj11 = procUnitConfObj2.addParameter(name='pairsList', value='(0,1)', format='pairslist')
33 #
34 # procUnitConfObj2 = controllerObj.addProcUnit(datatype='ParametersProc', inputId=readUnitConfObj.getId())
35
36 # opObj11 = procUnitConfObj2.addOperation(name='SpectralMoments', optype='other')
37
38 #
39 # opObj11 = procUnitConfObj1.addOperation(name='SpectraPlot', optype='other')
40 # opObj11.addParameter(name='id', value='1000', format='int')
41 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca_Spc', format='str')
42 # opObj11.addParameter(name='channelList', value='0', format='intlist')
43 # opObj11.addParameter(name='zmin', value='-120', format='float')
44 # opObj11.addParameter(name='zmax', value='-70', format='float')
45 # opObj11.addParameter(name='save', value='1', format='int')
46 # opObj11.addParameter(name='figpath', value=figpath, format='str')
47
48 # opObj11 = procUnitConfObj1.addOperation(name='RTIPlot', optype='other')
49 # opObj11.addParameter(name='id', value='2000', format='int')
50 # opObj11.addParameter(name='wintitle', value='HF_Jicamarca', format='str')
51 # opObj11.addParameter(name='showprofile', value='0', format='int')
52 # opObj11.addParameter(name='channelList', value='0', format='intlist')
53 # # opObj11.addParameter(name='xmin', value='0', format='float')
54 # opObj11.addParameter(name='xmin', value='0', format='float')
55 # opObj11.addParameter(name='xmax', value='24', format='float')
56 #
57 # opObj11.addParameter(name='zmin', value='-110', format='float')
58 # opObj11.addParameter(name='zmax', value='-70', format='float')
59 # opObj11.addParameter(name='save', value='0', format='int')
60 # opObj11.addParameter(name='figpath', value='/tmp/', format='str')
61
62 opObj12 = procUnitConfObj2.addOperation(name='PublishData', optype='other')
63 opObj12.addParameter(name='zeromq', value=1, format='int')
64
65 # print "Escribiendo el archivo XML"
66 # controllerObj.writeXml(filename)
67 # print "Leyendo el archivo XML"
68 # controllerObj.readXml(filename)
69
70 controllerObj.createObjects()
71 controllerObj.connectObjects()
72
73 # timeit.timeit('controllerObj.run()', number=2)
74
75 controllerObj.run()
76
77
78 if __name__ == '__main__':
79 parser = argparse.ArgumentParser(description='Set number of parallel processes')
80 parser.add_argument('--nProcess', default=2, type=int)
81 args = parser.parse_args()
82 multiSchain(fiber, nProcess=args.nProcess, startDate='2016/08/19', endDate='2016/08/20')
@@ -1,1261 +1,1314
1 1 '''
2 2 Created on September , 2012
3 3 @author:
4 4 '''
5 5
6 6 import sys
7 7 import ast
8 8 import datetime
9 9 import traceback
10 from multiprocessing import Process, Queue, cpu_count
11
10 12 import schainpy
11 13 import schainpy.admin
12 14
13 15 from xml.etree.ElementTree import ElementTree, Element, SubElement, tostring
14 16 from xml.dom import minidom
15 17
16 18 from schainpy.model import *
17 19 from time import sleep
18 20
19 21 def prettify(elem):
20 22 """Return a pretty-printed XML string for the Element.
21 23 """
22 24 rough_string = tostring(elem, 'utf-8')
23 25 reparsed = minidom.parseString(rough_string)
24 26 return reparsed.toprettyxml(indent=" ")
25 27
28 def multiSchain(child, nProcess=cpu_count(), startDate=None, endDate=None):
29 skip = 0
30 cursor = 0
31 nFiles = None
32 processes = []
33
34 dt1 = datetime.datetime.strptime(startDate, '%Y/%m/%d')
35 dt2 = datetime.datetime.strptime(endDate, '%Y/%m/%d')
36 days = (dt2 - dt1).days
37 print days
38 for day in range(days+1):
39 skip = 0
40 cursor = 0
41 q = Queue()
42 processes = []
43 dt = (dt1 + datetime.timedelta(day)).strftime('%Y/%m/%d')
44 firstProcess = Process(target=child, args=(cursor, skip, q, dt))
45 firstProcess.start()
46 nFiles = q.get()
47 firstProcess.terminate()
48 skip = int(math.ceil(nFiles/nProcess))
49 try:
50 while True:
51 processes.append(Process(target=child, args=(cursor, skip, q, dt)))
52 processes[cursor].start()
53 if nFiles < cursor*skip:
54 break
55 cursor += 1
56 except KeyboardInterrupt:
57 for process in processes:
58 process.terminate()
59 process.join()
60 for process in processes:
61 process.join()
62 #process.terminate()
63 sleep(3)
64
65 try:
66 while True:
67 pass
68 except KeyboardInterrupt:
69 for process in processes:
70 process.terminate()
71 process.join()
72
26 73 class ParameterConf():
27 74
28 75 id = None
29 76 name = None
30 77 value = None
31 78 format = None
32 79
33 80 __formated_value = None
34 81
35 82 ELEMENTNAME = 'Parameter'
36 83
37 84 def __init__(self):
38 85
39 86 self.format = 'str'
40 87
41 88 def getElementName(self):
42 89
43 90 return self.ELEMENTNAME
44 91
45 92 def getValue(self):
46 93
47 94 value = self.value
48 95 format = self.format
49 96
50 97 if self.__formated_value != None:
51 98
52 99 return self.__formated_value
53 100
101 if format == 'obj':
102 return value
103
54 104 if format == 'str':
55 105 self.__formated_value = str(value)
56 106 return self.__formated_value
57 107
58 108 if value == '':
59 109 raise ValueError, "%s: This parameter value is empty" %self.name
60 110
61 111 if format == 'list':
62 112 strList = value.split(',')
63 113
64 114 self.__formated_value = strList
65 115
66 116 return self.__formated_value
67 117
68 118 if format == 'intlist':
69 119 """
70 120 Example:
71 121 value = (0,1,2)
72 122 """
73 123
74 124 new_value = ast.literal_eval(value)
75 125
76 126 if type(new_value) not in (tuple, list):
77 127 new_value = [int(new_value)]
78 128
79 129 self.__formated_value = new_value
80 130
81 131 return self.__formated_value
82 132
83 133 if format == 'floatlist':
84 134 """
85 135 Example:
86 136 value = (0.5, 1.4, 2.7)
87 137 """
88 138
89 139 new_value = ast.literal_eval(value)
90 140
91 141 if type(new_value) not in (tuple, list):
92 142 new_value = [float(new_value)]
93 143
94 144 self.__formated_value = new_value
95 145
96 146 return self.__formated_value
97 147
98 148 if format == 'date':
99 149 strList = value.split('/')
100 150 intList = [int(x) for x in strList]
101 151 date = datetime.date(intList[0], intList[1], intList[2])
102 152
103 153 self.__formated_value = date
104 154
105 155 return self.__formated_value
106 156
107 157 if format == 'time':
108 158 strList = value.split(':')
109 159 intList = [int(x) for x in strList]
110 160 time = datetime.time(intList[0], intList[1], intList[2])
111 161
112 162 self.__formated_value = time
113 163
114 164 return self.__formated_value
115 165
116 166 if format == 'pairslist':
117 167 """
118 168 Example:
119 169 value = (0,1),(1,2)
120 170 """
121 171
122 172 new_value = ast.literal_eval(value)
123 173
124 174 if type(new_value) not in (tuple, list):
125 175 raise ValueError, "%s has to be a tuple or list of pairs" %value
126 176
127 177 if type(new_value[0]) not in (tuple, list):
128 178 if len(new_value) != 2:
129 179 raise ValueError, "%s has to be a tuple or list of pairs" %value
130 180 new_value = [new_value]
131 181
132 182 for thisPair in new_value:
133 183 if len(thisPair) != 2:
134 184 raise ValueError, "%s has to be a tuple or list of pairs" %value
135 185
136 186 self.__formated_value = new_value
137 187
138 188 return self.__formated_value
139 189
140 190 if format == 'multilist':
141 191 """
142 192 Example:
143 193 value = (0,1,2),(3,4,5)
144 194 """
145 195 multiList = ast.literal_eval(value)
146 196
147 197 if type(multiList[0]) == int:
148 198 multiList = ast.literal_eval("(" + value + ")")
149 199
150 200 self.__formated_value = multiList
151 201
152 202 return self.__formated_value
153 203
154 204 if format == 'bool':
155 205 value = int(value)
156 206
157 207 if format == 'int':
158 208 value = float(value)
159 209
160 210 format_func = eval(format)
161 211
162 212 self.__formated_value = format_func(value)
163 213
164 214 return self.__formated_value
165 215
166 216 def updateId(self, new_id):
167 217
168 218 self.id = str(new_id)
169 219
170 220 def setup(self, id, name, value, format='str'):
171 221
172 222 self.id = str(id)
173 223 self.name = name
174 self.value = str(value)
224 if format == 'obj':
225 self.value = value
226 else:
227 self.value = str(value)
175 228 self.format = str.lower(format)
176 229
177 230 self.getValue()
178 231
179 232 return 1
180 233
181 234 def update(self, name, value, format='str'):
182 235
183 236 self.name = name
184 237 self.value = str(value)
185 238 self.format = format
186 239
187 240 def makeXml(self, opElement):
188 241
189 242 parmElement = SubElement(opElement, self.ELEMENTNAME)
190 243 parmElement.set('id', str(self.id))
191 244 parmElement.set('name', self.name)
192 245 parmElement.set('value', self.value)
193 246 parmElement.set('format', self.format)
194 247
195 248 def readXml(self, parmElement):
196 249
197 250 self.id = parmElement.get('id')
198 251 self.name = parmElement.get('name')
199 252 self.value = parmElement.get('value')
200 253 self.format = str.lower(parmElement.get('format'))
201 254
202 255 #Compatible with old signal chain version
203 256 if self.format == 'int' and self.name == 'idfigure':
204 257 self.name = 'id'
205 258
206 259 def printattr(self):
207 260
208 261 print "Parameter[%s]: name = %s, value = %s, format = %s" %(self.id, self.name, self.value, self.format)
209 262
210 263 class OperationConf():
211 264
212 265 id = None
213 266 name = None
214 267 priority = None
215 268 type = None
216 269
217 270 parmConfObjList = []
218 271
219 272 ELEMENTNAME = 'Operation'
220 273
221 274 def __init__(self):
222 275
223 276 self.id = '0'
224 277 self.name = None
225 278 self.priority = None
226 279 self.type = 'self'
227 280
228 281
229 282 def __getNewId(self):
230 283
231 284 return int(self.id)*10 + len(self.parmConfObjList) + 1
232 285
233 286 def updateId(self, new_id):
234 287
235 288 self.id = str(new_id)
236 289
237 290 n = 1
238 291 for parmObj in self.parmConfObjList:
239 292
240 293 idParm = str(int(new_id)*10 + n)
241 294 parmObj.updateId(idParm)
242 295
243 296 n += 1
244 297
245 298 def getElementName(self):
246 299
247 300 return self.ELEMENTNAME
248 301
249 302 def getParameterObjList(self):
250 303
251 304 return self.parmConfObjList
252 305
253 306 def getParameterObj(self, parameterName):
254 307
255 308 for parmConfObj in self.parmConfObjList:
256 309
257 310 if parmConfObj.name != parameterName:
258 311 continue
259 312
260 313 return parmConfObj
261 314
262 315 return None
263 316
264 317 def getParameterObjfromValue(self, parameterValue):
265 318
266 319 for parmConfObj in self.parmConfObjList:
267 320
268 321 if parmConfObj.getValue() != parameterValue:
269 322 continue
270 323
271 324 return parmConfObj.getValue()
272 325
273 326 return None
274 327
275 328 def getParameterValue(self, parameterName):
276 329
277 330 parameterObj = self.getParameterObj(parameterName)
278 331
279 332 # if not parameterObj:
280 333 # return None
281 334
282 335 value = parameterObj.getValue()
283 336
284 337 return value
285 338
286 339
287 340 def getKwargs(self):
288 341
289 342 kwargs = {}
290 343
291 344 for parmConfObj in self.parmConfObjList:
292 345 if self.name == 'run' and parmConfObj.name == 'datatype':
293 346 continue
294 347
295 348 kwargs[parmConfObj.name] = parmConfObj.getValue()
296 349
297 350 return kwargs
298 351
299 352 def setup(self, id, name, priority, type):
300 353
301 354 self.id = str(id)
302 355 self.name = name
303 356 self.type = type
304 357 self.priority = priority
305 358
306 359 self.parmConfObjList = []
307 360
308 361 def removeParameters(self):
309 362
310 363 for obj in self.parmConfObjList:
311 364 del obj
312 365
313 366 self.parmConfObjList = []
314 367
315 368 def addParameter(self, name, value, format='str'):
316 369
317 370 id = self.__getNewId()
318 371
319 372 parmConfObj = ParameterConf()
320 373 if not parmConfObj.setup(id, name, value, format):
321 374 return None
322 375
323 376 self.parmConfObjList.append(parmConfObj)
324 377
325 378 return parmConfObj
326 379
327 380 def changeParameter(self, name, value, format='str'):
328 381
329 382 parmConfObj = self.getParameterObj(name)
330 383 parmConfObj.update(name, value, format)
331 384
332 385 return parmConfObj
333 386
334 387 def makeXml(self, procUnitElement):
335 388
336 389 opElement = SubElement(procUnitElement, self.ELEMENTNAME)
337 390 opElement.set('id', str(self.id))
338 391 opElement.set('name', self.name)
339 392 opElement.set('type', self.type)
340 393 opElement.set('priority', str(self.priority))
341 394
342 395 for parmConfObj in self.parmConfObjList:
343 396 parmConfObj.makeXml(opElement)
344 397
345 398 def readXml(self, opElement):
346 399
347 400 self.id = opElement.get('id')
348 401 self.name = opElement.get('name')
349 402 self.type = opElement.get('type')
350 403 self.priority = opElement.get('priority')
351 404
352 405 #Compatible with old signal chain version
353 406 #Use of 'run' method instead 'init'
354 407 if self.type == 'self' and self.name == 'init':
355 408 self.name = 'run'
356 409
357 410 self.parmConfObjList = []
358 411
359 412 parmElementList = opElement.iter(ParameterConf().getElementName())
360 413
361 414 for parmElement in parmElementList:
362 415 parmConfObj = ParameterConf()
363 416 parmConfObj.readXml(parmElement)
364 417
365 418 #Compatible with old signal chain version
366 419 #If an 'plot' OPERATION is found, changes name operation by the value of its type PARAMETER
367 420 if self.type != 'self' and self.name == 'Plot':
368 421 if parmConfObj.format == 'str' and parmConfObj.name == 'type':
369 422 self.name = parmConfObj.value
370 423 continue
371 424
372 425 self.parmConfObjList.append(parmConfObj)
373 426
374 427 def printattr(self):
375 428
376 429 print "%s[%s]: name = %s, type = %s, priority = %s" %(self.ELEMENTNAME,
377 430 self.id,
378 431 self.name,
379 432 self.type,
380 433 self.priority)
381 434
382 435 for parmConfObj in self.parmConfObjList:
383 436 parmConfObj.printattr()
384 437
385 438 def createObject(self, plotter_queue=None):
386 439
387 440 if self.type == 'self':
388 441 raise ValueError, "This operation type cannot be created"
389 442
390 443 if self.type == 'plotter':
391 444 #Plotter(plotter_name)
392 445 if not plotter_queue:
393 446 raise ValueError, "plotter_queue is not defined. Use:\nmyProject = Project()\nmyProject.setPlotterQueue(plotter_queue)"
394 447
395 448 opObj = Plotter(self.name, plotter_queue)
396 449
397 450 if self.type == 'external' or self.type == 'other':
398 451 className = eval(self.name)
399 452 kwargs = self.getKwargs()
400 453 opObj = className(**kwargs)
401 454
402 455 return opObj
403 456
404 457
405 458 class ProcUnitConf():
406 459
407 460 id = None
408 461 name = None
409 462 datatype = None
410 463 inputId = None
411 464 parentId = None
412 465
413 466 opConfObjList = []
414 467
415 468 procUnitObj = None
416 469 opObjList = []
417 470
418 471 ELEMENTNAME = 'ProcUnit'
419 472
420 473 def __init__(self):
421 474
422 475 self.id = None
423 476 self.datatype = None
424 477 self.name = None
425 478 self.inputId = None
426 479
427 480 self.opConfObjList = []
428 481
429 482 self.procUnitObj = None
430 483 self.opObjDict = {}
431 484
432 485 def __getPriority(self):
433 486
434 487 return len(self.opConfObjList)+1
435 488
436 489 def __getNewId(self):
437 490
438 491 return int(self.id)*10 + len(self.opConfObjList) + 1
439 492
440 493 def getElementName(self):
441 494
442 495 return self.ELEMENTNAME
443 496
444 497 def getId(self):
445 498
446 499 return self.id
447 500
448 501 def updateId(self, new_id, parentId=parentId):
449 502
450 503
451 504 new_id = int(parentId)*10 + (int(self.id) % 10)
452 505 new_inputId = int(parentId)*10 + (int(self.inputId) % 10)
453 506
454 507 #If this proc unit has not inputs
455 508 if self.inputId == '0':
456 509 new_inputId = 0
457 510
458 511 n = 1
459 512 for opConfObj in self.opConfObjList:
460 513
461 514 idOp = str(int(new_id)*10 + n)
462 515 opConfObj.updateId(idOp)
463 516
464 517 n += 1
465 518
466 519 self.parentId = str(parentId)
467 520 self.id = str(new_id)
468 521 self.inputId = str(new_inputId)
469 522
470 523
471 524 def getInputId(self):
472 525
473 526 return self.inputId
474 527
475 528 def getOperationObjList(self):
476 529
477 530 return self.opConfObjList
478 531
479 532 def getOperationObj(self, name=None):
480 533
481 534 for opConfObj in self.opConfObjList:
482 535
483 536 if opConfObj.name != name:
484 537 continue
485 538
486 539 return opConfObj
487 540
488 541 return None
489 542
490 543 def getOpObjfromParamValue(self, value=None):
491 544
492 545 for opConfObj in self.opConfObjList:
493 546 if opConfObj.getParameterObjfromValue(parameterValue=value) != value:
494 547 continue
495 548 return opConfObj
496 549 return None
497 550
498 551 def getProcUnitObj(self):
499 552
500 553 return self.procUnitObj
501 554
502 555 def setup(self, id, name, datatype, inputId, parentId=None):
503 556
504 557 #Compatible with old signal chain version
505 558 if datatype==None and name==None:
506 559 raise ValueError, "datatype or name should be defined"
507 560
508 561 if name==None:
509 562 if 'Proc' in datatype:
510 563 name = datatype
511 564 else:
512 565 name = '%sProc' %(datatype)
513 566
514 567 if datatype==None:
515 568 datatype = name.replace('Proc','')
516 569
517 570 self.id = str(id)
518 571 self.name = name
519 572 self.datatype = datatype
520 573 self.inputId = inputId
521 574 self.parentId = parentId
522 575
523 576 self.opConfObjList = []
524 577
525 578 self.addOperation(name='run', optype='self')
526 579
527 580 def removeOperations(self):
528 581
529 582 for obj in self.opConfObjList:
530 583 del obj
531 584
532 585 self.opConfObjList = []
533 586 self.addOperation(name='run')
534 587
535 588 def addParameter(self, **kwargs):
536 589 '''
537 590 Add parameters to "run" operation
538 591 '''
539 592 opObj = self.opConfObjList[0]
540 593
541 594 opObj.addParameter(**kwargs)
542 595
543 596 return opObj
544 597
545 598 def addOperation(self, name, optype='self'):
546 599
547 600 id = self.__getNewId()
548 601 priority = self.__getPriority()
549 602
550 603 opConfObj = OperationConf()
551 604 opConfObj.setup(id, name=name, priority=priority, type=optype)
552 605
553 606 self.opConfObjList.append(opConfObj)
554 607
555 608 return opConfObj
556 609
557 610 def makeXml(self, projectElement):
558 611
559 612 procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
560 613 procUnitElement.set('id', str(self.id))
561 614 procUnitElement.set('name', self.name)
562 615 procUnitElement.set('datatype', self.datatype)
563 616 procUnitElement.set('inputId', str(self.inputId))
564 617
565 618 for opConfObj in self.opConfObjList:
566 619 opConfObj.makeXml(procUnitElement)
567 620
568 621 def readXml(self, upElement):
569 622
570 623 self.id = upElement.get('id')
571 624 self.name = upElement.get('name')
572 625 self.datatype = upElement.get('datatype')
573 626 self.inputId = upElement.get('inputId')
574 627
575 628 if self.ELEMENTNAME == "ReadUnit":
576 629 self.datatype = self.datatype.replace("Reader", "")
577 630
578 631 if self.ELEMENTNAME == "ProcUnit":
579 632 self.datatype = self.datatype.replace("Proc", "")
580 633
581 634 if self.inputId == 'None':
582 635 self.inputId = '0'
583 636
584 637 self.opConfObjList = []
585 638
586 639 opElementList = upElement.iter(OperationConf().getElementName())
587 640
588 641 for opElement in opElementList:
589 642 opConfObj = OperationConf()
590 643 opConfObj.readXml(opElement)
591 644 self.opConfObjList.append(opConfObj)
592 645
593 646 def printattr(self):
594 647
595 648 print "%s[%s]: name = %s, datatype = %s, inputId = %s" %(self.ELEMENTNAME,
596 649 self.id,
597 650 self.name,
598 651 self.datatype,
599 652 self.inputId)
600 653
601 654 for opConfObj in self.opConfObjList:
602 655 opConfObj.printattr()
603 656
604 657
605 658 def getKwargs(self):
606 659
607 660 opObj = self.opConfObjList[0]
608 661 kwargs = opObj.getKwargs()
609 662
610 663 return kwargs
611 664
612 665 def createObjects(self, plotter_queue=None):
613 666
614 667 className = eval(self.name)
615 668 kwargs = self.getKwargs()
616 669 procUnitObj = className(**kwargs)
617 670
618 671 for opConfObj in self.opConfObjList:
619 672
620 673 if opConfObj.type == 'self':
621 674 continue
622 675
623 676 opObj = opConfObj.createObject(plotter_queue)
624 677
625 678 self.opObjDict[opConfObj.id] = opObj
626 679 procUnitObj.addOperation(opObj, opConfObj.id)
627 680
628 681 self.procUnitObj = procUnitObj
629 682
630 683 return procUnitObj
631 684
632 685 def run(self):
633 686
634 687 is_ok = False
635 688
636 689 for opConfObj in self.opConfObjList:
637 690
638 691 kwargs = {}
639 692 for parmConfObj in opConfObj.getParameterObjList():
640 693 if opConfObj.name == 'run' and parmConfObj.name == 'datatype':
641 694 continue
642 695
643 696 kwargs[parmConfObj.name] = parmConfObj.getValue()
644 697
645 698 #ini = time.time()
646 699
647 700 #print "\tRunning the '%s' operation with %s" %(opConfObj.name, opConfObj.id)
648 701 sts = self.procUnitObj.call(opType = opConfObj.type,
649 702 opName = opConfObj.name,
650 703 opId = opConfObj.id,
651 704 )
652 705
653 706 # total_time = time.time() - ini
654 707 #
655 708 # if total_time > 0.002:
656 709 # print "%s::%s took %f seconds" %(self.name, opConfObj.name, total_time)
657 710
658 711 is_ok = is_ok or sts
659 712
660 713 return is_ok
661 714
662 715 def close(self):
663 716
664 717 for opConfObj in self.opConfObjList:
665 718 if opConfObj.type == 'self':
666 719 continue
667 720
668 721 opObj = self.procUnitObj.getOperationObj(opConfObj.id)
669 722 opObj.close()
670 723
671 724 self.procUnitObj.close()
672 725
673 726 return
674 727
675 728 class ReadUnitConf(ProcUnitConf):
676 729
677 730 path = None
678 731 startDate = None
679 732 endDate = None
680 733 startTime = None
681 734 endTime = None
682 735
683 736 ELEMENTNAME = 'ReadUnit'
684 737
685 738 def __init__(self):
686 739
687 740 self.id = None
688 741 self.datatype = None
689 742 self.name = None
690 743 self.inputId = None
691 744
692 745 self.parentId = None
693 746
694 747 self.opConfObjList = []
695 748 self.opObjList = []
696 749
697 750 def getElementName(self):
698 751
699 752 return self.ELEMENTNAME
700 753
701 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, **kwargs):
754 def setup(self, id, name, datatype, path, startDate="", endDate="", startTime="", endTime="", parentId=None, queue=None, **kwargs):
702 755
703 756 #Compatible with old signal chain version
704 757 if datatype==None and name==None:
705 758 raise ValueError, "datatype or name should be defined"
706 759
707 760 if name==None:
708 761 if 'Reader' in datatype:
709 762 name = datatype
710 763 else:
711 764 name = '%sReader' %(datatype)
712 765
713 766 if datatype==None:
714 767 datatype = name.replace('Reader','')
715 768
716 769 self.id = id
717 770 self.name = name
718 771 self.datatype = datatype
719 772
720 773 self.path = os.path.abspath(path)
721 774 self.startDate = startDate
722 775 self.endDate = endDate
723 776 self.startTime = startTime
724 777 self.endTime = endTime
725 778
726 779 self.inputId = '0'
727 780 self.parentId = parentId
728
781 self.queue = queue
729 782 self.addRunOperation(**kwargs)
730 783
731 784 def update(self, datatype, path, startDate, endDate, startTime, endTime, parentId=None, name=None, **kwargs):
732 785
733 786 #Compatible with old signal chain version
734 787 if datatype==None and name==None:
735 788 raise ValueError, "datatype or name should be defined"
736 789
737 790 if name==None:
738 791 if 'Reader' in datatype:
739 792 name = datatype
740 793 else:
741 794 name = '%sReader' %(datatype)
742 795
743 796 if datatype==None:
744 797 datatype = name.replace('Reader','')
745 798
746 799 self.datatype = datatype
747 800 self.name = name
748 801 self.path = path
749 802 self.startDate = startDate
750 803 self.endDate = endDate
751 804 self.startTime = startTime
752 805 self.endTime = endTime
753 806
754 807 self.inputId = '0'
755 808 self.parentId = parentId
756 809
757 810 self.updateRunOperation(**kwargs)
758 811
759 812 def removeOperations(self):
760 813
761 814 for obj in self.opConfObjList:
762 815 del obj
763 816
764 817 self.opConfObjList = []
765 818
766 819 def addRunOperation(self, **kwargs):
767 820
768 821 opObj = self.addOperation(name = 'run', optype = 'self')
769 822
770 823 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
771 824 opObj.addParameter(name='path' , value=self.path, format='str')
772 825 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
773 826 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
774 827 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
775 828 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
776 829
777 830 for key, value in kwargs.items():
778 831 opObj.addParameter(name=key, value=value, format=type(value).__name__)
779 832
780 833 return opObj
781 834
782 835 def updateRunOperation(self, **kwargs):
783 836
784 837 opObj = self.getOperationObj(name = 'run')
785 838 opObj.removeParameters()
786 839
787 840 opObj.addParameter(name='datatype' , value=self.datatype, format='str')
788 841 opObj.addParameter(name='path' , value=self.path, format='str')
789 842 opObj.addParameter(name='startDate' , value=self.startDate, format='date')
790 843 opObj.addParameter(name='endDate' , value=self.endDate, format='date')
791 844 opObj.addParameter(name='startTime' , value=self.startTime, format='time')
792 845 opObj.addParameter(name='endTime' , value=self.endTime, format='time')
793 846
794 847 for key, value in kwargs.items():
795 848 opObj.addParameter(name=key, value=value, format=type(value).__name__)
796 849
797 850 return opObj
798 851
799 852 # def makeXml(self, projectElement):
800 853 #
801 854 # procUnitElement = SubElement(projectElement, self.ELEMENTNAME)
802 855 # procUnitElement.set('id', str(self.id))
803 856 # procUnitElement.set('name', self.name)
804 857 # procUnitElement.set('datatype', self.datatype)
805 858 # procUnitElement.set('inputId', str(self.inputId))
806 859 #
807 860 # for opConfObj in self.opConfObjList:
808 861 # opConfObj.makeXml(procUnitElement)
809 862
810 863 def readXml(self, upElement):
811 864
812 865 self.id = upElement.get('id')
813 866 self.name = upElement.get('name')
814 867 self.datatype = upElement.get('datatype')
815 868 self.inputId = upElement.get('inputId')
816 869
817 870 if self.ELEMENTNAME == "ReadUnit":
818 871 self.datatype = self.datatype.replace("Reader", "")
819 872
820 873 if self.inputId == 'None':
821 874 self.inputId = '0'
822 875
823 876 self.opConfObjList = []
824 877
825 878 opElementList = upElement.iter(OperationConf().getElementName())
826 879
827 880 for opElement in opElementList:
828 881 opConfObj = OperationConf()
829 882 opConfObj.readXml(opElement)
830 883 self.opConfObjList.append(opConfObj)
831 884
832 885 if opConfObj.name == 'run':
833 886 self.path = opConfObj.getParameterValue('path')
834 887 self.startDate = opConfObj.getParameterValue('startDate')
835 888 self.endDate = opConfObj.getParameterValue('endDate')
836 889 self.startTime = opConfObj.getParameterValue('startTime')
837 890 self.endTime = opConfObj.getParameterValue('endTime')
838 891
839 892 class Project():
840 893
841 894 id = None
842 895 name = None
843 896 description = None
844 897 filename = None
845 898
846 899 procUnitConfObjDict = None
847 900
848 901 ELEMENTNAME = 'Project'
849 902
850 903 plotterQueue = None
851 904
852 905 def __init__(self, plotter_queue=None):
853 906
854 907 self.id = None
855 908 self.name = None
856 909 self.description = None
857 910
858 911 self.plotterQueue = plotter_queue
859 912
860 913 self.procUnitConfObjDict = {}
861 914
862 915 def __getNewId(self):
863 916
864 917 idList = self.procUnitConfObjDict.keys()
865 918
866 919 id = int(self.id)*10
867 920
868 921 while True:
869 922 id += 1
870 923
871 924 if str(id) in idList:
872 925 continue
873 926
874 927 break
875 928
876 929 return str(id)
877 930
878 931 def getElementName(self):
879 932
880 933 return self.ELEMENTNAME
881 934
882 935 def getId(self):
883 936
884 937 return self.id
885 938
886 939 def updateId(self, new_id):
887 940
888 941 self.id = str(new_id)
889 942
890 943 keyList = self.procUnitConfObjDict.keys()
891 944 keyList.sort()
892 945
893 946 n = 1
894 947 newProcUnitConfObjDict = {}
895 948
896 949 for procKey in keyList:
897 950
898 951 procUnitConfObj = self.procUnitConfObjDict[procKey]
899 952 idProcUnit = str(int(self.id)*10 + n)
900 953 procUnitConfObj.updateId(idProcUnit, parentId = self.id)
901 954
902 955 newProcUnitConfObjDict[idProcUnit] = procUnitConfObj
903 956 n += 1
904 957
905 958 self.procUnitConfObjDict = newProcUnitConfObjDict
906 959
907 960 def setup(self, id, name, description):
908 961
909 962 self.id = str(id)
910 963 self.name = name
911 964 self.description = description
912 965
913 966 def update(self, name, description):
914 967
915 968 self.name = name
916 969 self.description = description
917 970
918 971 def addReadUnit(self, id=None, datatype=None, name=None, **kwargs):
919 972
920 973 if id is None:
921 974 idReadUnit = self.__getNewId()
922 975 else:
923 976 idReadUnit = str(id)
924 977
925 978 readUnitConfObj = ReadUnitConf()
926 979 readUnitConfObj.setup(idReadUnit, name, datatype, parentId=self.id, **kwargs)
927 980
928 981 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
929 982
930 983 return readUnitConfObj
931 984
932 985 def addProcUnit(self, inputId='0', datatype=None, name=None):
933 986
934 987 idProcUnit = self.__getNewId()
935 988
936 989 procUnitConfObj = ProcUnitConf()
937 990 procUnitConfObj.setup(idProcUnit, name, datatype, inputId, parentId=self.id)
938 991
939 992 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
940 993
941 994 return procUnitConfObj
942 995
943 996 def removeProcUnit(self, id):
944 997
945 998 if id in self.procUnitConfObjDict.keys():
946 999 self.procUnitConfObjDict.pop(id)
947 1000
948 1001 def getReadUnitId(self):
949 1002
950 1003 readUnitConfObj = self.getReadUnitObj()
951 1004
952 1005 return readUnitConfObj.id
953 1006
954 1007 def getReadUnitObj(self):
955 1008
956 1009 for obj in self.procUnitConfObjDict.values():
957 1010 if obj.getElementName() == "ReadUnit":
958 1011 return obj
959 1012
960 1013 return None
961 1014
962 1015 def getProcUnitObj(self, id=None, name=None):
963 1016
964 1017 if id != None:
965 1018 return self.procUnitConfObjDict[id]
966 1019
967 1020 if name != None:
968 1021 return self.getProcUnitObjByName(name)
969 1022
970 1023 return None
971 1024
972 1025 def getProcUnitObjByName(self, name):
973 1026
974 1027 for obj in self.procUnitConfObjDict.values():
975 1028 if obj.name == name:
976 1029 return obj
977 1030
978 1031 return None
979 1032
980 1033 def procUnitItems(self):
981 1034
982 1035 return self.procUnitConfObjDict.items()
983 1036
984 1037 def makeXml(self):
985 1038
986 1039 projectElement = Element('Project')
987 1040 projectElement.set('id', str(self.id))
988 1041 projectElement.set('name', self.name)
989 1042 projectElement.set('description', self.description)
990 1043
991 1044 for procUnitConfObj in self.procUnitConfObjDict.values():
992 1045 procUnitConfObj.makeXml(projectElement)
993 1046
994 1047 self.projectElement = projectElement
995 1048
996 1049 def writeXml(self, filename=None):
997 1050
998 1051 if filename == None:
999 1052 if self.filename:
1000 1053 filename = self.filename
1001 1054 else:
1002 1055 filename = "schain.xml"
1003 1056
1004 1057 if not filename:
1005 1058 print "filename has not been defined. Use setFilename(filename) for do it."
1006 1059 return 0
1007 1060
1008 1061 abs_file = os.path.abspath(filename)
1009 1062
1010 1063 if not os.access(os.path.dirname(abs_file), os.W_OK):
1011 1064 print "No write permission on %s" %os.path.dirname(abs_file)
1012 1065 return 0
1013 1066
1014 1067 if os.path.isfile(abs_file) and not(os.access(abs_file, os.W_OK)):
1015 1068 print "File %s already exists and it could not be overwriten" %abs_file
1016 1069 return 0
1017 1070
1018 1071 self.makeXml()
1019 1072
1020 1073 ElementTree(self.projectElement).write(abs_file, method='xml')
1021 1074
1022 1075 self.filename = abs_file
1023 1076
1024 1077 return 1
1025 1078
1026 1079 def readXml(self, filename = None):
1027 1080
1028 1081 if not filename:
1029 1082 print "filename is not defined"
1030 1083 return 0
1031 1084
1032 1085 abs_file = os.path.abspath(filename)
1033 1086
1034 1087 if not os.path.isfile(abs_file):
1035 1088 print "%s file does not exist" %abs_file
1036 1089 return 0
1037 1090
1038 1091 self.projectElement = None
1039 1092 self.procUnitConfObjDict = {}
1040 1093
1041 1094 try:
1042 1095 self.projectElement = ElementTree().parse(abs_file)
1043 1096 except:
1044 1097 print "Error reading %s, verify file format" %filename
1045 1098 return 0
1046 1099
1047 1100 self.project = self.projectElement.tag
1048 1101
1049 1102 self.id = self.projectElement.get('id')
1050 1103 self.name = self.projectElement.get('name')
1051 1104 self.description = self.projectElement.get('description')
1052 1105
1053 1106 readUnitElementList = self.projectElement.iter(ReadUnitConf().getElementName())
1054 1107
1055 1108 for readUnitElement in readUnitElementList:
1056 1109 readUnitConfObj = ReadUnitConf()
1057 1110 readUnitConfObj.readXml(readUnitElement)
1058 1111
1059 1112 if readUnitConfObj.parentId == None:
1060 1113 readUnitConfObj.parentId = self.id
1061 1114
1062 1115 self.procUnitConfObjDict[readUnitConfObj.getId()] = readUnitConfObj
1063 1116
1064 1117 procUnitElementList = self.projectElement.iter(ProcUnitConf().getElementName())
1065 1118
1066 1119 for procUnitElement in procUnitElementList:
1067 1120 procUnitConfObj = ProcUnitConf()
1068 1121 procUnitConfObj.readXml(procUnitElement)
1069 1122
1070 1123 if procUnitConfObj.parentId == None:
1071 1124 procUnitConfObj.parentId = self.id
1072 1125
1073 1126 self.procUnitConfObjDict[procUnitConfObj.getId()] = procUnitConfObj
1074 1127
1075 1128 self.filename = abs_file
1076 1129
1077 1130 return 1
1078 1131
1079 1132 def printattr(self):
1080 1133
1081 1134 print "Project[%s]: name = %s, description = %s" %(self.id,
1082 1135 self.name,
1083 1136 self.description)
1084 1137
1085 1138 for procUnitConfObj in self.procUnitConfObjDict.values():
1086 1139 procUnitConfObj.printattr()
1087 1140
1088 1141 def createObjects(self):
1089 1142
1090 1143 for procUnitConfObj in self.procUnitConfObjDict.values():
1091 1144 procUnitConfObj.createObjects(self.plotterQueue)
1092 1145
1093 1146 def __connect(self, objIN, thisObj):
1094 1147
1095 1148 thisObj.setInput(objIN.getOutputObj())
1096 1149
1097 1150 def connectObjects(self):
1098 1151
1099 1152 for thisPUConfObj in self.procUnitConfObjDict.values():
1100 1153
1101 1154 inputId = thisPUConfObj.getInputId()
1102 1155
1103 1156 if int(inputId) == 0:
1104 1157 continue
1105 1158
1106 1159 #Get input object
1107 1160 puConfINObj = self.procUnitConfObjDict[inputId]
1108 1161 puObjIN = puConfINObj.getProcUnitObj()
1109 1162
1110 1163 #Get current object
1111 1164 thisPUObj = thisPUConfObj.getProcUnitObj()
1112 1165
1113 1166 self.__connect(puObjIN, thisPUObj)
1114 1167
1115 1168 def __handleError(self, procUnitConfObj, send_email=True):
1116 1169
1117 1170 import socket
1118 1171
1119 1172 err = traceback.format_exception(sys.exc_info()[0],
1120 1173 sys.exc_info()[1],
1121 1174 sys.exc_info()[2])
1122 1175
1123 1176 print "***** Error occurred in %s *****" %(procUnitConfObj.name)
1124 1177 print "***** %s" %err[-1]
1125 1178
1126 1179 message = "".join(err)
1127 1180
1128 1181 sys.stderr.write(message)
1129 1182
1130 1183 if not send_email:
1131 1184 return
1132 1185
1133 1186 subject = "SChain v%s: Error running %s\n" %(schainpy.__version__, procUnitConfObj.name)
1134 1187
1135 1188 subtitle = "%s: %s\n" %(procUnitConfObj.getElementName() ,procUnitConfObj.name)
1136 1189 subtitle += "Hostname: %s\n" %socket.gethostbyname(socket.gethostname())
1137 1190 subtitle += "Working directory: %s\n" %os.path.abspath("./")
1138 1191 subtitle += "Configuration file: %s\n" %self.filename
1139 1192 subtitle += "Time: %s\n" %str(datetime.datetime.now())
1140 1193
1141 1194 readUnitConfObj = self.getReadUnitObj()
1142 1195 if readUnitConfObj:
1143 1196 subtitle += "\nInput parameters:\n"
1144 1197 subtitle += "[Data path = %s]\n" %readUnitConfObj.path
1145 1198 subtitle += "[Data type = %s]\n" %readUnitConfObj.datatype
1146 1199 subtitle += "[Start date = %s]\n" %readUnitConfObj.startDate
1147 1200 subtitle += "[End date = %s]\n" %readUnitConfObj.endDate
1148 1201 subtitle += "[Start time = %s]\n" %readUnitConfObj.startTime
1149 1202 subtitle += "[End time = %s]\n" %readUnitConfObj.endTime
1150 1203
1151 1204 adminObj = schainpy.admin.SchainNotify()
1152 1205 adminObj.sendAlert(message=message,
1153 1206 subject=subject,
1154 1207 subtitle=subtitle,
1155 1208 filename=self.filename)
1156 1209
1157 1210 def isPaused(self):
1158 1211 return 0
1159 1212
1160 1213 def isStopped(self):
1161 1214 return 0
1162 1215
1163 1216 def runController(self):
1164 1217 """
1165 1218 returns 0 when this process has been stopped, 1 otherwise
1166 1219 """
1167 1220
1168 1221 if self.isPaused():
1169 1222 print "Process suspended"
1170 1223
1171 1224 while True:
1172 1225 sleep(0.1)
1173 1226
1174 1227 if not self.isPaused():
1175 1228 break
1176 1229
1177 1230 if self.isStopped():
1178 1231 break
1179 1232
1180 1233 print "Process reinitialized"
1181 1234
1182 1235 if self.isStopped():
1183 1236 print "Process stopped"
1184 1237 return 0
1185 1238
1186 1239 return 1
1187 1240
1188 1241 def setFilename(self, filename):
1189 1242
1190 1243 self.filename = filename
1191 1244
1192 1245 def setPlotterQueue(self, plotter_queue):
1193 1246
1194 1247 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1195 1248
1196 1249 def getPlotterQueue(self):
1197 1250
1198 1251 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1199 1252
1200 1253 def useExternalPlotter(self):
1201 1254
1202 1255 raise NotImplementedError, "Use schainpy.controller_api.ControllerThread instead Project class"
1203 1256
1204 1257 def run(self):
1205 1258
1206 1259 print
1207 1260 print "*"*60
1208 1261 print " Starting SIGNAL CHAIN PROCESSING v%s " %schainpy.__version__
1209 1262 print "*"*60
1210 1263 print
1211 1264
1212 1265 keyList = self.procUnitConfObjDict.keys()
1213 1266 keyList.sort()
1214 1267
1215 1268 while(True):
1216 1269
1217 1270 is_ok = False
1218 1271
1219 1272 for procKey in keyList:
1220 1273 # print "Running the '%s' process with %s" %(procUnitConfObj.name, procUnitConfObj.id)
1221 1274
1222 1275 procUnitConfObj = self.procUnitConfObjDict[procKey]
1223 1276
1224 1277 try:
1225 1278 sts = procUnitConfObj.run()
1226 1279 is_ok = is_ok or sts
1227 1280 except KeyboardInterrupt:
1228 1281 is_ok = False
1229 1282 break
1230 1283 except ValueError, e:
1231 1284 sleep(0.5)
1232 1285 self.__handleError(procUnitConfObj, send_email=True)
1233 1286 is_ok = False
1234 1287 break
1235 1288 except:
1236 1289 sleep(0.5)
1237 1290 self.__handleError(procUnitConfObj)
1238 1291 is_ok = False
1239 1292 break
1240 1293
1241 1294 #If every process unit finished so end process
1242 1295 if not(is_ok):
1243 1296 # print "Every process unit have finished"
1244 1297 break
1245 1298
1246 1299 if not self.runController():
1247 1300 break
1248 1301
1249 1302 #Closing every process
1250 1303 for procKey in keyList:
1251 1304 procUnitConfObj = self.procUnitConfObjDict[procKey]
1252 1305 procUnitConfObj.close()
1253 1306
1254 1307 print "Process finished"
1255 1308
1256 1309 def start(self):
1257 1310
1258 1311 self.writeXml()
1259 1312 self.createObjects()
1260 1313 self.connectObjects()
1261 1314 self.run()
@@ -1,376 +1,377
1 1
2 2 import os
3 3 import zmq
4 4 import time
5 5 import numpy
6 6 import datetime
7 7 import numpy as np
8 8 import matplotlib.pyplot as plt
9 9 from mpl_toolkits.axes_grid1 import make_axes_locatable
10 10 from matplotlib.ticker import FuncFormatter, LinearLocator
11 11 from multiprocessing import Process
12 12
13 13 from schainpy.model.proc.jroproc_base import Operation
14 14
15 15 #plt.ion()
16 16
17 17 func = lambda x, pos: ('%s') %(datetime.datetime.utcfromtimestamp(x).strftime('%H:%M'))
18 18
19 19 d1970 = datetime.datetime(1970,1,1)
20 20
21 21 class PlotData(Operation, Process):
22 22
23 23 CODE = 'Figure'
24 24 colormap = 'jet'
25 25 __MAXNUMX = 80
26 26 __MAXNUMY = 80
27 27 __missing = 1E30
28 28
29 29 def __init__(self, **kwargs):
30 30
31 31 Operation.__init__(self)
32 32 Process.__init__(self)
33 33 self.mp = False
34 34 self.dataOut = None
35 35 self.isConfig = False
36 36 self.figure = None
37 37 self.axes = []
38 38 self.localtime = kwargs.pop('localtime', True)
39 39 self.show = kwargs.get('show', True)
40 40 self.save = kwargs.get('save', False)
41 41 self.colormap = kwargs.get('colormap', self.colormap)
42 42 self.showprofile = kwargs.get('showprofile', False)
43 43 self.title = kwargs.get('wintitle', '')
44 44 self.xaxis = kwargs.get('xaxis', 'time')
45 45 self.zmin = kwargs.get('zmin', None)
46 46 self.zmax = kwargs.get('zmax', None)
47 47 self.xmin = kwargs.get('xmin', None)
48 48 self.xmax = kwargs.get('xmax', None)
49 49 self.xrange = kwargs.get('xrange', 24)
50 50 self.ymin = kwargs.get('ymin', None)
51 51 self.ymax = kwargs.get('ymax', None)
52 52
53 53 def fill_gaps(self, x_buffer, y_buffer, z_buffer):
54 54
55 55 if x_buffer.shape[0] < 2:
56 56 return x_buffer, y_buffer, z_buffer
57 57
58 58 deltas = x_buffer[1:] - x_buffer[0:-1]
59 59 x_median = np.median(deltas)
60 60
61 61 index = np.where(deltas > 5*x_median)
62 62
63 63 if len(index[0]) != 0:
64 64 z_buffer[::,index[0],::] = self.__missing
65 65 z_buffer = np.ma.masked_inside(z_buffer,
66 66 0.99*self.__missing,
67 67 1.01*self.__missing)
68 68
69 69 return x_buffer, y_buffer, z_buffer
70 70
71 71 def decimate(self):
72 72
73 73 dx = int(len(self.x)/self.__MAXNUMX) + 1
74 74 dy = int(len(self.y)/self.__MAXNUMY) + 1
75 75
76 76 x = self.x[::dx]
77 77 y = self.y[::dy]
78 78 z = self.z[::, ::dx, ::dy]
79 79
80 80 return x, y, z
81 81
82 82 def __plot(self):
83 83
84 84 print 'plotting...{}'.format(self.CODE)
85 85
86 86 self.plot()
87 self.figure.suptitle('{} {}'.format(self.title, self.CODE.upper()))
87 self.figure.suptitle('{} {} - Date:{}'.format(self.title, self.CODE.upper(),
88 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')))
88 89
89 90 if self.save:
90 91 figname = os.path.join(self.save, '{}_{}.png'.format(self.CODE,
91 92 datetime.datetime.utcfromtimestamp(self.times[-1]).strftime('%y%m%d_%H%M%S')))
92 93 print 'Saving figure: {}'.format(figname)
93 94 self.figure.savefig(figname)
94 95
95 96 self.figure.canvas.draw()
96 97
97 98 def plot(self):
98 99
99 100 print 'plotting...{}'.format(self.CODE.upper())
100 101 return
101 102
102 103 def run(self):
103 104
104 105 print '[Starting] {}'.format(self.name)
105 106 context = zmq.Context()
106 107 receiver = context.socket(zmq.SUB)
107 108 receiver.setsockopt(zmq.SUBSCRIBE, '')
108 109 receiver.setsockopt(zmq.CONFLATE, True)
109 110 receiver.connect("ipc:///tmp/zmq.plots")
110 111
111 112 while True:
112 113 try:
113 114 #if True:
114 115 self.data = receiver.recv_pyobj(flags=zmq.NOBLOCK)
115 116 self.dataOut = self.data['dataOut']
116 117 self.times = self.data['times']
117 118 self.times.sort()
118 119 self.min_time = self.times[0]
119 120 self.max_time = self.times[-1]
120 121
121 122 if self.isConfig is False:
122 123 self.setup()
123 124 self.isConfig = True
124 125
125 126 self.__plot()
126 127
127 128 if 'ENDED' in self.data:
128 129 #self.setup()
129 130 #self.__plot()
130 131 pass
131 132
132 133 except zmq.Again as e:
133 134 print 'Waiting for data...'
134 135 plt.pause(5)
135 136 #time.sleep(3)
136 137
137 138 def close(self):
138 139 if self.dataOut:
139 140 self._plot()
140 141
141 142
142 143 class PlotSpectraData(PlotData):
143 144
144 145 CODE = 'spc'
145 146 colormap = 'jro'
146 147
147 148 def setup(self):
148 149
149 150 ncolspan = 1
150 151 colspan = 1
151 152 self.ncols = int(numpy.sqrt(self.dataOut.nChannels)+0.9)
152 153 self.nrows = int(self.dataOut.nChannels*1./self.ncols + 0.9)
153 154 self.width = 3.6*self.ncols
154 155 self.height = 3.2*self.nrows
155 156 if self.showprofile:
156 157 ncolspan = 3
157 158 colspan = 2
158 159 self.width += 1.2*self.ncols
159 160
160 161 self.ylabel = 'Range [Km]'
161 162 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
162 163
163 164 if self.figure is None:
164 165 self.figure = plt.figure(figsize=(self.width, self.height),
165 166 edgecolor='k',
166 167 facecolor='w')
167 168 else:
168 169 self.figure.clf()
169 170
170 171 n = 0
171 172 for y in range(self.nrows):
172 173 for x in range(self.ncols):
173 174 if n>=self.dataOut.nChannels:
174 175 break
175 176 ax = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan), 1, colspan)
176 177 if self.showprofile:
177 178 ax.ax_profile = plt.subplot2grid((self.nrows, self.ncols*ncolspan), (y, x*ncolspan+colspan), 1, 1)
178 179
179 180 ax.firsttime = True
180 181 self.axes.append(ax)
181 182 n += 1
182 183
183 184 self.figure.subplots_adjust(wspace=0.9, hspace=0.5)
184 185 self.figure.show()
185 186
186 187 def plot(self):
187 188
188 189 if self.xaxis == "frequency":
189 190 x = self.dataOut.getFreqRange(1)/1000.
190 191 xlabel = "Frequency (kHz)"
191 192 elif self.xaxis == "time":
192 193 x = self.dataOut.getAcfRange(1)
193 194 xlabel = "Time (ms)"
194 195 else:
195 196 x = self.dataOut.getVelRange(1)
196 197 xlabel = "Velocity (m/s)"
197 198
198 199 y = self.dataOut.getHeiRange()
199 200 z = self.data[self.CODE]
200 201
201 202 for n, ax in enumerate(self.axes):
202 203
203 204 if ax.firsttime:
204 205 self.xmax = self.xmax if self.xmax else np.nanmax(x)
205 206 self.xmin = self.xmin if self.xmin else -self.xmax
206 207 self.ymin = self.ymin if self.ymin else np.nanmin(y)
207 208 self.ymax = self.ymax if self.ymax else np.nanmax(y)
208 209 self.zmin = self.zmin if self.zmin else np.nanmin(z)
209 210 self.zmax = self.zmax if self.zmax else np.nanmax(z)
210 211 ax.plot = ax.pcolormesh(x, y, z[n].T,
211 212 vmin=self.zmin,
212 213 vmax=self.zmax,
213 214 cmap=plt.get_cmap(self.colormap)
214 215 )
215 216 divider = make_axes_locatable(ax)
216 217 cax = divider.new_horizontal(size='3%', pad=0.05)
217 218 self.figure.add_axes(cax)
218 219 plt.colorbar(ax.plot, cax)
219 220
220 221 ax.set_xlim(self.xmin, self.xmax)
221 222 ax.set_ylim(self.ymin, self.ymax)
222 223
223 224 ax.xaxis.set_major_locator(LinearLocator(5))
224 225 #ax.yaxis.set_major_locator(LinearLocator(4))
225 226
226 227 ax.set_ylabel(self.ylabel)
227 228 ax.set_xlabel(xlabel)
228 229
229 230 ax.firsttime = False
230 231
231 232 if self.showprofile:
232 233 ax.plot_profile= ax.ax_profile.plot(self.data['rti'][self.max_time][n], y)[0]
233 234 ax.ax_profile.set_xlim(self.zmin, self.zmax)
234 235 ax.ax_profile.set_ylim(self.ymin, self.ymax)
235 236 ax.ax_profile.set_xlabel('dB')
236 237 ax.ax_profile.grid(b=True, axis='x')
238 ax.plot_noise = ax.ax_profile.plot(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y,
239 color="k", linestyle="dashed", lw=2)[0]
237 240 [tick.set_visible(False) for tick in ax.ax_profile.get_yticklabels()]
238 noise = 10*numpy.log10(self.data['rti'][self.max_time][n]/self.dataOut.normFactor)
239 ax.ax_profile.vlines(noise, self.ymin, self.ymax, colors="k", linestyle="dashed", lw=2)
240 241 else:
241 242 ax.plot.set_array(z[n].T.ravel())
242 ax.set_title('{} {}'.format(self.titles[n],
243 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
244 size=8)
245 243 if self.showprofile:
246 244 ax.plot_profile.set_data(self.data['rti'][self.max_time][n], y)
245 ax.plot_noise.set_data(numpy.repeat(self.data['noise'][self.max_time][n], len(y)), y)
247 246
247 ax.set_title('{} - Noise: {:.2f} dB'.format(self.titles[n], self.data['noise'][self.max_time][n]),
248 size=8)
248 249
249 250 class PlotRTIData(PlotData):
250 251
251 252 CODE = 'rti'
252 253 colormap = 'jro'
253 254
254 255 def setup(self):
255 256
256 257 self.ncols = 1
257 258 self.nrows = self.dataOut.nChannels
258 259 self.width = 10
259 260 self.height = 2.2*self.nrows
260 261 self.ylabel = 'Range [Km]'
261 262 self.titles = ['Channel {}'.format(x) for x in self.dataOut.channelList]
262 263
263 264 if self.figure is None:
264 265 self.figure = plt.figure(figsize=(self.width, self.height),
265 266 edgecolor='k',
266 267 facecolor='w')
267 268 else:
268 269 self.figure.clf()
269 270
270 271 for n in range(self.nrows):
271 272 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
272 273 ax.firsttime = True
273 274 self.axes.append(ax)
274 275
275 276 self.figure.subplots_adjust(hspace=0.5)
276 277 self.figure.show()
277 278
278 279 def plot(self):
279 280
280 281 self.x = np.array(self.times)
281 282 self.y = self.dataOut.getHeiRange()
282 283 self.z = []
283 284
284 285 for ch in range(self.nrows):
285 286 self.z.append([self.data[self.CODE][t][ch] for t in self.times])
286 287
287 288 self.z = np.array(self.z)
288 289
289 290 for n, ax in enumerate(self.axes):
290 291
291 292 x, y, z = self.fill_gaps(*self.decimate())
292 293
293 294 if ax.firsttime:
294 295 self.ymin = self.ymin if self.ymin else np.nanmin(self.y)
295 296 self.ymax = self.ymax if self.ymax else np.nanmax(self.y)
296 297 self.zmin = self.zmin if self.zmin else np.nanmin(self.z)
297 298 zmax = self.zmax if self.zmax else np.nanmax(self.z)
298 299 plot = ax.pcolormesh(x, y, z[n].T,
299 300 vmin=self.zmin,
300 301 vmax=self.zmax,
301 302 cmap=plt.get_cmap(self.colormap)
302 303 )
303 304 divider = make_axes_locatable(ax)
304 305 cax = divider.new_horizontal(size='2%', pad=0.05)
305 306 self.figure.add_axes(cax)
306 307 plt.colorbar(plot, cax)
307 308 ax.set_ylim(self.ymin, self.ymax)
308 309 if self.xaxis=='time':
309 310 ax.xaxis.set_major_formatter(FuncFormatter(func))
310 311 ax.xaxis.set_major_locator(LinearLocator(6))
311 312
312 313 ax.yaxis.set_major_locator(LinearLocator(4))
313 314
314 315 ax.set_ylabel(self.ylabel)
315 316
316 317 if self.xmin is None:
317 318 print 'is none'
318 319 xmin = self.min_time
319 320 else:
320 321
321 322 xmin = (datetime.datetime.combine(self.dataOut.datatime.date(),
322 323 datetime.time(self.xmin, 0, 0))-d1970).total_seconds()
323 324
324 325 xmax = xmin+self.xrange*60*60
325 326
326 327 ax.set_xlim(xmin, xmax)
327 328 ax.firsttime = False
328 329 else:
329 330 ax.collections.remove(ax.collections[0])
330 331 plot = ax.pcolormesh(x, y, z[n].T,
331 332 vmin=self.zmin,
332 333 vmax=self.zmax,
333 334 cmap=plt.get_cmap(self.colormap)
334 335 )
335 336 ax.set_title('{} {}'.format(self.titles[n],
336 337 datetime.datetime.utcfromtimestamp(self.max_time).strftime('%y/%m/%d %H:%M:%S')),
337 338 size=8)
338 339
339 340
340 341 class PlotCOHData(PlotRTIData):
341 342
342 343 CODE = 'coh'
343 344
344 345 def setup(self):
345 346
346 347 self.ncols = 1
347 348 self.nrows = self.dataOut.nPairs
348 349 self.width = 10
349 350 self.height = 2.2*self.nrows
350 351 self.ylabel = 'Range [Km]'
351 352 self.titles = ['Channels {}'.format(x) for x in self.dataOut.pairsList]
352 353
353 354 if self.figure is None:
354 355 self.figure = plt.figure(figsize=(self.width, self.height),
355 356 edgecolor='k',
356 357 facecolor='w')
357 358 else:
358 359 self.figure.clf()
359 360
360 361 for n in range(self.nrows):
361 362 ax = self.figure.add_subplot(self.nrows, self.ncols, n+1)
362 363 ax.firsttime = True
363 364 self.axes.append(ax)
364 365
365 366 self.figure.subplots_adjust(hspace=0.5)
366 367 self.figure.show()
367 368
368 369 class PlotSNRData(PlotRTIData):
369 370
370 371 CODE = 'coh'
371 372
372 373
373 374 class PlotPHASEData(PlotCOHData):
374 375
375 376 CODE = 'phase'
376 377 colormap = 'seismic'
This diff has been collapsed as it changes many lines, (966 lines changed) Show them Hide them
@@ -1,1723 +1,1743
1 1 '''
2 2 Created on Jul 2, 2014
3 3
4 4 @author: roj-idl71
5 5 '''
6 6 import os
7 7 import sys
8 8 import glob
9 9 import time
10 10 import numpy
11 11 import fnmatch
12 12 import time, datetime
13 13 #import h5py
14 14 import traceback
15 15
16 16 try:
17 17 from gevent import sleep
18 18 except:
19 19 from time import sleep
20
20
21 21 from schainpy.model.data.jroheaderIO import PROCFLAG, BasicHeader, SystemHeader, RadarControllerHeader, ProcessingHeader
22 22 from schainpy.model.data.jroheaderIO import get_dtype_index, get_numpy_dtype, get_procflag_dtype, get_dtype_width
23 23
24 24 LOCALTIME = True
25 25
26 26 def isNumber(cad):
27 27 """
28 28 Chequea si el conjunto de caracteres que componen un string puede ser convertidos a un numero.
29 29
30 Excepciones:
30 Excepciones:
31 31 Si un determinado string no puede ser convertido a numero
32 32 Input:
33 33 str, string al cual se le analiza para determinar si convertible a un numero o no
34
34
35 35 Return:
36 36 True : si el string es uno numerico
37 37 False : no es un string numerico
38 38 """
39 39 try:
40 40 float( cad )
41 41 return True
42 42 except:
43 43 return False
44 44
45 45 def isFileInEpoch(filename, startUTSeconds, endUTSeconds):
46 46 """
47 47 Esta funcion determina si un archivo de datos se encuentra o no dentro del rango de fecha especificado.
48
48
49 49 Inputs:
50 50 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
51
51
52 52 startUTSeconds : fecha inicial del rango seleccionado. La fecha esta dada en
53 53 segundos contados desde 01/01/1970.
54 54 endUTSeconds : fecha final del rango seleccionado. La fecha esta dada en
55 55 segundos contados desde 01/01/1970.
56
56
57 57 Return:
58 58 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
59 59 fecha especificado, de lo contrario retorna False.
60
60
61 61 Excepciones:
62 62 Si el archivo no existe o no puede ser abierto
63 63 Si la cabecera no puede ser leida.
64
64
65 65 """
66 66 basicHeaderObj = BasicHeader(LOCALTIME)
67
67
68 68 try:
69 69 fp = open(filename,'rb')
70 70 except IOError:
71 71 print "The file %s can't be opened" %(filename)
72 72 return 0
73
73
74 74 sts = basicHeaderObj.read(fp)
75 75 fp.close()
76
76
77 77 if not(sts):
78 78 print "Skipping the file %s because it has not a valid header" %(filename)
79 79 return 0
80
80
81 81 if not ((startUTSeconds <= basicHeaderObj.utc) and (endUTSeconds > basicHeaderObj.utc)):
82 82 return 0
83
83
84 84 return 1
85 85
86 86 def isTimeInRange(thisTime, startTime, endTime):
87
87
88 88 if endTime >= startTime:
89 89 if (thisTime < startTime) or (thisTime > endTime):
90 90 return 0
91
91
92 92 return 1
93 93 else:
94 94 if (thisTime < startTime) and (thisTime > endTime):
95 95 return 0
96
96
97 97 return 1
98
98
99 99 def isFileInTimeRange(filename, startDate, endDate, startTime, endTime):
100 100 """
101 101 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
102
102
103 103 Inputs:
104 104 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
105
105
106 106 startDate : fecha inicial del rango seleccionado en formato datetime.date
107
107
108 108 endDate : fecha final del rango seleccionado en formato datetime.date
109
109
110 110 startTime : tiempo inicial del rango seleccionado en formato datetime.time
111
111
112 112 endTime : tiempo final del rango seleccionado en formato datetime.time
113
113
114 114 Return:
115 115 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
116 116 fecha especificado, de lo contrario retorna False.
117
117
118 118 Excepciones:
119 119 Si el archivo no existe o no puede ser abierto
120 120 Si la cabecera no puede ser leida.
121
121
122 122 """
123
124
123
124
125 125 try:
126 126 fp = open(filename,'rb')
127 127 except IOError:
128 128 print "The file %s can't be opened" %(filename)
129 129 return None
130
130
131 131 firstBasicHeaderObj = BasicHeader(LOCALTIME)
132 132 systemHeaderObj = SystemHeader()
133 133 radarControllerHeaderObj = RadarControllerHeader()
134 134 processingHeaderObj = ProcessingHeader()
135
135
136 136 lastBasicHeaderObj = BasicHeader(LOCALTIME)
137
137
138 138 sts = firstBasicHeaderObj.read(fp)
139
139
140 140 if not(sts):
141 141 print "[Reading] Skipping the file %s because it has not a valid header" %(filename)
142 142 return None
143
143
144 144 if not systemHeaderObj.read(fp):
145 145 return None
146
146
147 147 if not radarControllerHeaderObj.read(fp):
148 148 return None
149
149
150 150 if not processingHeaderObj.read(fp):
151 151 return None
152
152
153 153 filesize = os.path.getsize(filename)
154
154
155 155 offset = processingHeaderObj.blockSize + 24 #header size
156
156
157 157 if filesize <= offset:
158 158 print "[Reading] %s: This file has not enough data" %filename
159 159 return None
160
160
161 161 fp.seek(-offset, 2)
162
162
163 163 sts = lastBasicHeaderObj.read(fp)
164
164
165 165 fp.close()
166
166
167 167 thisDatetime = lastBasicHeaderObj.datatime
168 168 thisTime_last_block = thisDatetime.time()
169
169
170 170 thisDatetime = firstBasicHeaderObj.datatime
171 171 thisDate = thisDatetime.date()
172 172 thisTime_first_block = thisDatetime.time()
173
173
174 174 #General case
175 175 # o>>>>>>>>>>>>>><<<<<<<<<<<<<<o
176 176 #-----------o----------------------------o-----------
177 177 # startTime endTime
178
178
179 179 if endTime >= startTime:
180 180 if (thisTime_last_block < startTime) or (thisTime_first_block > endTime):
181 181 return None
182
182
183 183 return thisDatetime
184
184
185 185 #If endTime < startTime then endTime belongs to the next day
186
187
186
187
188 188 #<<<<<<<<<<<o o>>>>>>>>>>>
189 189 #-----------o----------------------------o-----------
190 190 # endTime startTime
191
191
192 192 if (thisDate == startDate) and (thisTime_last_block < startTime):
193 193 return None
194
194
195 195 if (thisDate == endDate) and (thisTime_first_block > endTime):
196 196 return None
197
197
198 198 if (thisTime_last_block < startTime) and (thisTime_first_block > endTime):
199 199 return None
200
200
201 201 return thisDatetime
202 202
203 203 def isFolderInDateRange(folder, startDate=None, endDate=None):
204 204 """
205 205 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
206
206
207 207 Inputs:
208 208 folder : nombre completo del directorio.
209 209 Su formato deberia ser "/path_root/?YYYYDDD"
210
210
211 211 siendo:
212 212 YYYY : Anio (ejemplo 2015)
213 213 DDD : Dia del anio (ejemplo 305)
214
214
215 215 startDate : fecha inicial del rango seleccionado en formato datetime.date
216
216
217 217 endDate : fecha final del rango seleccionado en formato datetime.date
218
218
219 219 Return:
220 220 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
221 221 fecha especificado, de lo contrario retorna False.
222 222 Excepciones:
223 223 Si el directorio no tiene el formato adecuado
224 224 """
225
225
226 226 basename = os.path.basename(folder)
227
227
228 228 if not isRadarFolder(basename):
229 229 print "The folder %s has not the rigth format" %folder
230 230 return 0
231
231
232 232 if startDate and endDate:
233 233 thisDate = getDateFromRadarFolder(basename)
234
234
235 235 if thisDate < startDate:
236 236 return 0
237
237
238 238 if thisDate > endDate:
239 239 return 0
240
240
241 241 return 1
242 242
243 243 def isFileInDateRange(filename, startDate=None, endDate=None):
244 244 """
245 245 Retorna 1 si el archivo de datos se encuentra dentro del rango de horas especificado.
246
246
247 247 Inputs:
248 248 filename : nombre completo del archivo de datos en formato Jicamarca (.r)
249
249
250 250 Su formato deberia ser "?YYYYDDDsss"
251
251
252 252 siendo:
253 253 YYYY : Anio (ejemplo 2015)
254 254 DDD : Dia del anio (ejemplo 305)
255 255 sss : set
256
256
257 257 startDate : fecha inicial del rango seleccionado en formato datetime.date
258
258
259 259 endDate : fecha final del rango seleccionado en formato datetime.date
260
260
261 261 Return:
262 262 Boolean : Retorna True si el archivo de datos contiene datos en el rango de
263 263 fecha especificado, de lo contrario retorna False.
264 264 Excepciones:
265 265 Si el archivo no tiene el formato adecuado
266 266 """
267
267
268 268 basename = os.path.basename(filename)
269
269
270 270 if not isRadarFile(basename):
271 271 print "The filename %s has not the rigth format" %filename
272 272 return 0
273
273
274 274 if startDate and endDate:
275 275 thisDate = getDateFromRadarFile(basename)
276
276
277 277 if thisDate < startDate:
278 278 return 0
279
279
280 280 if thisDate > endDate:
281 281 return 0
282
282
283 283 return 1
284 284
285 285 def getFileFromSet(path, ext, set):
286 286 validFilelist = []
287 287 fileList = os.listdir(path)
288
288
289 289 # 0 1234 567 89A BCDE
290 290 # H YYYY DDD SSS .ext
291
291
292 292 for thisFile in fileList:
293 293 try:
294 294 year = int(thisFile[1:5])
295 295 doy = int(thisFile[5:8])
296 296 except:
297 297 continue
298
298
299 299 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
300 300 continue
301
301
302 302 validFilelist.append(thisFile)
303 303
304 304 myfile = fnmatch.filter(validFilelist,'*%4.4d%3.3d%3.3d*'%(year,doy,set))
305
305
306 306 if len(myfile)!= 0:
307 307 return myfile[0]
308 308 else:
309 309 filename = '*%4.4d%3.3d%3.3d%s'%(year,doy,set,ext.lower())
310 310 print 'the filename %s does not exist'%filename
311 311 print '...going to the last file: '
312
312
313 313 if validFilelist:
314 314 validFilelist = sorted( validFilelist, key=str.lower )
315 315 return validFilelist[-1]
316 316
317 317 return None
318 318
319 319 def getlastFileFromPath(path, ext):
320 320 """
321 321 Depura el fileList dejando solo los que cumplan el formato de "PYYYYDDDSSS.ext"
322 al final de la depuracion devuelve el ultimo file de la lista que quedo.
323
324 Input:
322 al final de la depuracion devuelve el ultimo file de la lista que quedo.
323
324 Input:
325 325 fileList : lista conteniendo todos los files (sin path) que componen una determinada carpeta
326 ext : extension de los files contenidos en una carpeta
327
326 ext : extension de los files contenidos en una carpeta
327
328 328 Return:
329 329 El ultimo file de una determinada carpeta, no se considera el path.
330 330 """
331 331 validFilelist = []
332 332 fileList = os.listdir(path)
333
333
334 334 # 0 1234 567 89A BCDE
335 335 # H YYYY DDD SSS .ext
336
336
337 337 for thisFile in fileList:
338
338
339 339 year = thisFile[1:5]
340 340 if not isNumber(year):
341 341 continue
342
342
343 343 doy = thisFile[5:8]
344 344 if not isNumber(doy):
345 345 continue
346
346
347 347 year = int(year)
348 348 doy = int(doy)
349
349
350 350 if (os.path.splitext(thisFile)[-1].lower() != ext.lower()):
351 351 continue
352
352
353 353 validFilelist.append(thisFile)
354 354
355 355 if validFilelist:
356 356 validFilelist = sorted( validFilelist, key=str.lower )
357 357 return validFilelist[-1]
358 358
359 359 return None
360 360
361 361 def checkForRealPath(path, foldercounter, year, doy, set, ext):
362 362 """
363 363 Por ser Linux Case Sensitive entonces checkForRealPath encuentra el nombre correcto de un path,
364 364 Prueba por varias combinaciones de nombres entre mayusculas y minusculas para determinar
365 365 el path exacto de un determinado file.
366
366
367 367 Example :
368 368 nombre correcto del file es .../.../D2009307/P2009307367.ext
369
369
370 370 Entonces la funcion prueba con las siguientes combinaciones
371 371 .../.../y2009307367.ext
372 372 .../.../Y2009307367.ext
373 373 .../.../x2009307/y2009307367.ext
374 374 .../.../x2009307/Y2009307367.ext
375 375 .../.../X2009307/y2009307367.ext
376 376 .../.../X2009307/Y2009307367.ext
377 siendo para este caso, la ultima combinacion de letras, identica al file buscado
378
377 siendo para este caso, la ultima combinacion de letras, identica al file buscado
378
379 379 Return:
380 Si encuentra la cobinacion adecuada devuelve el path completo y el nombre del file
381 caso contrario devuelve None como path y el la ultima combinacion de nombre en mayusculas
382 para el filename
380 Si encuentra la cobinacion adecuada devuelve el path completo y el nombre del file
381 caso contrario devuelve None como path y el la ultima combinacion de nombre en mayusculas
382 para el filename
383 383 """
384 384 fullfilename = None
385 385 find_flag = False
386 386 filename = None
387
387
388 388 prefixDirList = [None,'d','D']
389 389 if ext.lower() == ".r": #voltage
390 390 prefixFileList = ['d','D']
391 391 elif ext.lower() == ".pdata": #spectra
392 392 prefixFileList = ['p','P']
393 393 else:
394 394 return None, filename
395
396 #barrido por las combinaciones posibles
395
396 #barrido por las combinaciones posibles
397 397 for prefixDir in prefixDirList:
398 398 thispath = path
399 399 if prefixDir != None:
400 400 #formo el nombre del directorio xYYYYDDD (x=d o x=D)
401 401 if foldercounter == 0:
402 thispath = os.path.join(path, "%s%04d%03d" % ( prefixDir, year, doy ))
402 thispath = os.path.join(path, "%s%04d%03d" % ( prefixDir, year, doy ))
403 403 else:
404 404 thispath = os.path.join(path, "%s%04d%03d_%02d" % ( prefixDir, year, doy , foldercounter))
405 405 for prefixFile in prefixFileList: #barrido por las dos combinaciones posibles de "D"
406 406 filename = "%s%04d%03d%03d%s" % ( prefixFile, year, doy, set, ext ) #formo el nombre del file xYYYYDDDSSS.ext
407 407 fullfilename = os.path.join( thispath, filename ) #formo el path completo
408
408
409 409 if os.path.exists( fullfilename ): #verifico que exista
410 410 find_flag = True
411 411 break
412 412 if find_flag:
413 413 break
414 414
415 415 if not(find_flag):
416 416 return None, filename
417 417
418 418 return fullfilename, filename
419 419
420 420 def isRadarFolder(folder):
421 421 try:
422 422 year = int(folder[1:5])
423 423 doy = int(folder[5:8])
424 424 except:
425 425 return 0
426
426
427 427 return 1
428 428
429 429 def isRadarFile(file):
430 430 try:
431 431 year = int(file[1:5])
432 432 doy = int(file[5:8])
433 433 set = int(file[8:11])
434 434 except:
435 435 return 0
436
436
437 437 return 1
438 438
439 439 def getDateFromRadarFile(file):
440 440 try:
441 441 year = int(file[1:5])
442 442 doy = int(file[5:8])
443 443 set = int(file[8:11])
444 444 except:
445 445 return None
446 446
447 447 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy-1)
448 448 return thisDate
449 449
450 450 def getDateFromRadarFolder(folder):
451 451 try:
452 452 year = int(folder[1:5])
453 453 doy = int(folder[5:8])
454 454 except:
455 455 return None
456 456
457 457 thisDate = datetime.date(year, 1, 1) + datetime.timedelta(doy-1)
458 458 return thisDate
459
459
460 460 class JRODataIO:
461
461
462 462 c = 3E8
463
463
464 464 isConfig = False
465
465
466 466 basicHeaderObj = None
467
467
468 468 systemHeaderObj = None
469
469
470 470 radarControllerHeaderObj = None
471
471
472 472 processingHeaderObj = None
473
473
474 474 dtype = None
475
475
476 476 pathList = []
477
477
478 478 filenameList = []
479
479
480 480 filename = None
481
481
482 482 ext = None
483
483
484 484 flagIsNewFile = 1
485
485
486 486 flagDiscontinuousBlock = 0
487
487
488 488 flagIsNewBlock = 0
489
489
490 490 fp = None
491
491
492 492 firstHeaderSize = 0
493
493
494 494 basicHeaderSize = 24
495
495
496 496 versionFile = 1103
497
497
498 498 fileSize = None
499
499
500 500 # ippSeconds = None
501
501
502 502 fileSizeByHeader = None
503
503
504 504 fileIndex = None
505
505
506 506 profileIndex = None
507
507
508 508 blockIndex = None
509
509
510 510 nTotalBlocks = None
511
511
512 512 maxTimeStep = 30
513
513
514 514 lastUTTime = None
515
515
516 516 datablock = None
517
517
518 518 dataOut = None
519
519
520 520 blocksize = None
521
521
522 522 getByBlock = False
523
523
524 524 def __init__(self):
525
525
526 526 raise NotImplementedError
527
527
528 528 def run(self):
529
529
530 530 raise NotImplementedError
531 531
532 532 def getDtypeWidth(self):
533
533
534 534 dtype_index = get_dtype_index(self.dtype)
535 535 dtype_width = get_dtype_width(dtype_index)
536
536
537 537 return dtype_width
538
538
539 539 class JRODataReader(JRODataIO):
540
541
540
541
542 542 online = 0
543
543
544 544 realtime = 0
545
545
546 546 nReadBlocks = 0
547
547
548 548 delay = 10 #number of seconds waiting a new file
549
549
550 550 nTries = 3 #quantity tries
551
551
552 552 nFiles = 3 #number of files for searching
553
553
554 554 path = None
555
555
556 556 foldercounter = 0
557
557
558 558 flagNoMoreFiles = 0
559
559
560 560 datetimeList = []
561
561
562 562 __isFirstTimeOnline = 1
563
563
564 564 __printInfo = True
565
565
566 566 profileIndex = None
567
567
568 568 nTxs = 1
569
569
570 570 txIndex = None
571
571
572 572 #Added--------------------
573
573
574 574 selBlocksize = None
575
575
576 576 selBlocktime = None
577
578
577
578
579 579 def __init__(self):
580
580
581 581 """
582 582 This class is used to find data files
583
583
584 584 Example:
585 585 reader = JRODataReader()
586 586 fileList = reader.findDataFiles()
587
587
588 588 """
589 589 pass
590
590
591 591
592 592 def createObjByDefault(self):
593 593 """
594 594
595 595 """
596 596 raise NotImplementedError
597 597
598 598 def getBlockDimension(self):
599
599
600 600 raise NotImplementedError
601 601
602 602 def __searchFilesOffLine(self,
603 603 path,
604 604 startDate=None,
605 605 endDate=None,
606 606 startTime=datetime.time(0,0,0),
607 607 endTime=datetime.time(23,59,59),
608 608 set=None,
609 609 expLabel='',
610 610 ext='.r',
611 queue=None,
612 cursor=None,
613 skip=None,
611 614 walk=True):
612
615
613 616 self.filenameList = []
614 617 self.datetimeList = []
615
618
616 619 pathList = []
617
620
618 621 dateList, pathList = self.findDatafiles(path, startDate, endDate, expLabel, ext, walk, include_path=True)
619
622
620 623 if dateList == []:
621 624 # print "[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" %(startDate, endDate, ext, path)
622 625 return None, None
623
626
624 627 if len(dateList) > 1:
625 628 print "[Reading] Data found for date range [%s - %s]: total days = %d" %(startDate, endDate, len(dateList))
626 629 else:
627 630 print "[Reading] Data found for date range [%s - %s]: date = %s" %(startDate, endDate, dateList[0])
628
631
629 632 filenameList = []
630 633 datetimeList = []
631
634
632 635 for thisPath in pathList:
633 636 # thisPath = pathList[pathDict[file]]
634
637
635 638 fileList = glob.glob1(thisPath, "*%s" %ext)
636 639 fileList.sort()
637
638 for file in fileList:
639
640
641 skippedFileList = []
642
643 if cursor is not None and skip is not None:
644 # if cursor*skip > len(fileList):
645 if skip == 0:
646 if queue is not None:
647 queue.put(len(fileList))
648 skippedFileList = []
649 else:
650 skippedFileList = fileList[cursor*skip: cursor*skip + skip]
651
652 else:
653 skippedFileList = fileList
654
655 for file in skippedFileList:
656
640 657 filename = os.path.join(thisPath,file)
641
658
642 659 if not isFileInDateRange(filename, startDate, endDate):
643 660 continue
644
661
645 662 thisDatetime = isFileInTimeRange(filename, startDate, endDate, startTime, endTime)
646
663
647 664 if not(thisDatetime):
648 665 continue
649
666
650 667 filenameList.append(filename)
651 668 datetimeList.append(thisDatetime)
652
669
653 670 if not(filenameList):
654 671 print "[Reading] Time range selected invalid [%s - %s]: No *%s files in %s)" %(startTime, endTime, ext, path)
655 672 return None, None
656
673
657 674 print "[Reading] %d file(s) was(were) found in time range: %s - %s" %(len(filenameList), startTime, endTime)
658 675 print
659
676
660 677 for i in range(len(filenameList)):
661 678 print "[Reading] %s -> [%s]" %(filenameList[i], datetimeList[i].ctime())
662 679
663 680 self.filenameList = filenameList
664 681 self.datetimeList = datetimeList
665
682
666 683 return pathList, filenameList
667 684
668 685 def __searchFilesOnLine(self, path, expLabel = "", ext = None, walk=True, set=None):
669
686
670 687 """
671 688 Busca el ultimo archivo de la ultima carpeta (determinada o no por startDateTime) y
672 689 devuelve el archivo encontrado ademas de otros datos.
673
674 Input:
690
691 Input:
675 692 path : carpeta donde estan contenidos los files que contiene data
676
693
677 694 expLabel : Nombre del subexperimento (subfolder)
678
695
679 696 ext : extension de los files
680
697
681 698 walk : Si es habilitado no realiza busquedas dentro de los ubdirectorios (doypath)
682 699
683 700 Return:
684 701 directory : eL directorio donde esta el file encontrado
685 702 filename : el ultimo file de una determinada carpeta
686 703 year : el anho
687 704 doy : el numero de dia del anho
688 705 set : el set del archivo
689
690
706
707
691 708 """
692 709 if not os.path.isdir(path):
693 710 return None, None, None, None, None, None
694
711
695 712 dirList = []
696
713
697 714 if not walk:
698 715 fullpath = path
699 716 foldercounter = 0
700 717 else:
701 718 #Filtra solo los directorios
702 719 for thisPath in os.listdir(path):
703 720 if not os.path.isdir(os.path.join(path,thisPath)):
704 721 continue
705 722 if not isRadarFolder(thisPath):
706 723 continue
707
724
708 725 dirList.append(thisPath)
709
726
710 727 if not(dirList):
711 728 return None, None, None, None, None, None
712
729
713 730 dirList = sorted( dirList, key=str.lower )
714
731
715 732 doypath = dirList[-1]
716 733 foldercounter = int(doypath.split('_')[1]) if len(doypath.split('_'))>1 else 0
717 734 fullpath = os.path.join(path, doypath, expLabel)
718
719
735
736
720 737 print "[Reading] %s folder was found: " %(fullpath )
721 738
722 739 if set == None:
723 740 filename = getlastFileFromPath(fullpath, ext)
724 741 else:
725 742 filename = getFileFromSet(fullpath, ext, set)
726 743
727 744 if not(filename):
728 745 return None, None, None, None, None, None
729
746
730 747 print "[Reading] %s file was found" %(filename)
731
748
732 749 if not(self.__verifyFile(os.path.join(fullpath, filename))):
733 750 return None, None, None, None, None, None
734 751
735 752 year = int( filename[1:5] )
736 753 doy = int( filename[5:8] )
737 set = int( filename[8:11] )
738
754 set = int( filename[8:11] )
755
739 756 return fullpath, foldercounter, filename, year, doy, set
740
757
741 758 def __setNextFileOffline(self):
742
759
743 760 idFile = self.fileIndex
744 761
745 762 while (True):
746 763 idFile += 1
747 764 if not(idFile < len(self.filenameList)):
748 765 self.flagNoMoreFiles = 1
749 766 # print "[Reading] No more Files"
750 767 return 0
751 768
752 769 filename = self.filenameList[idFile]
753 770
754 771 if not(self.__verifyFile(filename)):
755 772 continue
756 773
757 774 fileSize = os.path.getsize(filename)
758 775 fp = open(filename,'rb')
759 776 break
760 777
761 778 self.flagIsNewFile = 1
762 779 self.fileIndex = idFile
763 780 self.filename = filename
764 781 self.fileSize = fileSize
765 782 self.fp = fp
766 783
767 784 # print "[Reading] Setting the file: %s"%self.filename
768 785
769 786 return 1
770 787
771 788 def __setNextFileOnline(self):
772 789 """
773 790 Busca el siguiente file que tenga suficiente data para ser leida, dentro de un folder especifico, si
774 791 no encuentra un file valido espera un tiempo determinado y luego busca en los posibles n files
775 siguientes.
776
777 Affected:
792 siguientes.
793
794 Affected:
778 795 self.flagIsNewFile
779 796 self.filename
780 797 self.fileSize
781 798 self.fp
782 799 self.set
783 800 self.flagNoMoreFiles
784 801
785 Return:
802 Return:
786 803 0 : si luego de una busqueda del siguiente file valido este no pudo ser encontrado
787 804 1 : si el file fue abierto con exito y esta listo a ser leido
788
789 Excepciones:
805
806 Excepciones:
790 807 Si un determinado file no puede ser abierto
791 808 """
792 809 nFiles = 0
793 fileOk_flag = False
810 fileOk_flag = False
794 811 firstTime_flag = True
795 812
796 813 self.set += 1
797
814
798 815 if self.set > 999:
799 816 self.set = 0
800 self.foldercounter += 1
801
817 self.foldercounter += 1
818
802 819 #busca el 1er file disponible
803 820 fullfilename, filename = checkForRealPath( self.path, self.foldercounter, self.year, self.doy, self.set, self.ext )
804 821 if fullfilename:
805 822 if self.__verifyFile(fullfilename, False):
806 823 fileOk_flag = True
807 824
808 825 #si no encuentra un file entonces espera y vuelve a buscar
809 if not(fileOk_flag):
826 if not(fileOk_flag):
810 827 for nFiles in range(self.nFiles+1): #busco en los siguientes self.nFiles+1 files posibles
811 828
812 if firstTime_flag: #si es la 1era vez entonces hace el for self.nTries veces
829 if firstTime_flag: #si es la 1era vez entonces hace el for self.nTries veces
813 830 tries = self.nTries
814 831 else:
815 832 tries = 1 #si no es la 1era vez entonces solo lo hace una vez
816
817 for nTries in range( tries ):
833
834 for nTries in range( tries ):
818 835 if firstTime_flag:
819 print "\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % ( self.delay, filename, nTries+1 )
836 print "\t[Reading] Waiting %0.2f sec for the next file: \"%s\" , try %03d ..." % ( self.delay, filename, nTries+1 )
820 837 sleep( self.delay )
821 838 else:
822 839 print "\t[Reading] Searching the next \"%s%04d%03d%03d%s\" file ..." % (self.optchar, self.year, self.doy, self.set, self.ext)
823
840
824 841 fullfilename, filename = checkForRealPath( self.path, self.foldercounter, self.year, self.doy, self.set, self.ext )
825 842 if fullfilename:
826 843 if self.__verifyFile(fullfilename):
827 844 fileOk_flag = True
828 845 break
829
846
830 847 if fileOk_flag:
831 848 break
832 849
833 850 firstTime_flag = False
834 851
835 852 print "\t[Reading] Skipping the file \"%s\" due to this file doesn't exist" % filename
836 853 self.set += 1
837
854
838 855 if nFiles == (self.nFiles-1): #si no encuentro el file buscado cambio de carpeta y busco en la siguiente carpeta
839 856 self.set = 0
840 857 self.doy += 1
841 858 self.foldercounter = 0
842 859
843 860 if fileOk_flag:
844 861 self.fileSize = os.path.getsize( fullfilename )
845 862 self.filename = fullfilename
846 863 self.flagIsNewFile = 1
847 if self.fp != None: self.fp.close()
864 if self.fp != None: self.fp.close()
848 865 self.fp = open(fullfilename, 'rb')
849 866 self.flagNoMoreFiles = 0
850 867 # print '[Reading] Setting the file: %s' % fullfilename
851 868 else:
852 869 self.fileSize = 0
853 870 self.filename = None
854 871 self.flagIsNewFile = 0
855 872 self.fp = None
856 873 self.flagNoMoreFiles = 1
857 874 # print '[Reading] No more files to read'
858 875
859 876 return fileOk_flag
860
877
861 878 def setNextFile(self):
862 879 if self.fp != None:
863 880 self.fp.close()
864 881
865 882 if self.online:
866 883 newFile = self.__setNextFileOnline()
867 884 else:
868 885 newFile = self.__setNextFileOffline()
869 886
870 887 if not(newFile):
871 888 print '[Reading] No more files to read'
872 889 return 0
873 890
874 891 print '[Reading] Setting the file: %s' % self.filename
875
892
876 893 self.__readFirstHeader()
877 894 self.nReadBlocks = 0
878 895 return 1
879 896
880 897 def __waitNewBlock(self):
881 898 """
882 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
883
899 Return 1 si se encontro un nuevo bloque de datos, 0 de otra forma.
900
884 901 Si el modo de lectura es OffLine siempre retorn 0
885 902 """
886 903 if not self.online:
887 904 return 0
888
905
889 906 if (self.nReadBlocks >= self.processingHeaderObj.dataBlocksPerFile):
890 907 return 0
891
908
892 909 currentPointer = self.fp.tell()
893
910
894 911 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
895
912
896 913 for nTries in range( self.nTries ):
897
914
898 915 self.fp.close()
899 916 self.fp = open( self.filename, 'rb' )
900 917 self.fp.seek( currentPointer )
901 918
902 919 self.fileSize = os.path.getsize( self.filename )
903 920 currentSize = self.fileSize - currentPointer
904 921
905 922 if ( currentSize >= neededSize ):
906 923 self.basicHeaderObj.read(self.fp)
907 924 return 1
908
925
909 926 if self.fileSize == self.fileSizeByHeader:
910 927 # self.flagEoF = True
911 928 return 0
912
929
913 930 print "[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries+1)
914 931 sleep( self.delay )
915
916
917 return 0
932
933
934 return 0
918 935
919 936 def waitDataBlock(self,pointer_location):
920
937
921 938 currentPointer = pointer_location
922
939
923 940 neededSize = self.processingHeaderObj.blockSize #+ self.basicHeaderSize
924
941
925 942 for nTries in range( self.nTries ):
926 943 self.fp.close()
927 944 self.fp = open( self.filename, 'rb' )
928 945 self.fp.seek( currentPointer )
929
946
930 947 self.fileSize = os.path.getsize( self.filename )
931 948 currentSize = self.fileSize - currentPointer
932
949
933 950 if ( currentSize >= neededSize ):
934 951 return 1
935
952
936 953 print "[Reading] Waiting %0.2f seconds for the next block, try %03d ..." % (self.delay, nTries+1)
937 954 sleep( self.delay )
938
955
939 956 return 0
940 957
941 958 def __jumpToLastBlock(self):
942
959
943 960 if not(self.__isFirstTimeOnline):
944 961 return
945
962
946 963 csize = self.fileSize - self.fp.tell()
947 964 blocksize = self.processingHeaderObj.blockSize
948
965
949 966 #salta el primer bloque de datos
950 967 if csize > self.processingHeaderObj.blockSize:
951 968 self.fp.seek(self.fp.tell() + blocksize)
952 969 else:
953 970 return
954
971
955 972 csize = self.fileSize - self.fp.tell()
956 973 neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
957 974 while True:
958
975
959 976 if self.fp.tell()<self.fileSize:
960 977 self.fp.seek(self.fp.tell() + neededsize)
961 978 else:
962 979 self.fp.seek(self.fp.tell() - neededsize)
963 980 break
964
981
965 982 # csize = self.fileSize - self.fp.tell()
966 983 # neededsize = self.processingHeaderObj.blockSize + self.basicHeaderSize
967 984 # factor = int(csize/neededsize)
968 985 # if factor > 0:
969 986 # self.fp.seek(self.fp.tell() + factor*neededsize)
970
987
971 988 self.flagIsNewFile = 0
972 989 self.__isFirstTimeOnline = 0
973 990
974 991 def __setNewBlock(self):
975
992
976 993 if self.fp == None:
977 994 return 0
978
995
979 996 # if self.online:
980 997 # self.__jumpToLastBlock()
981
998
982 999 if self.flagIsNewFile:
983 1000 self.lastUTTime = self.basicHeaderObj.utc
984 1001 return 1
985
1002
986 1003 if self.realtime:
987 1004 self.flagDiscontinuousBlock = 1
988 1005 if not(self.setNextFile()):
989 1006 return 0
990 1007 else:
991 1008 return 1
992
1009
993 1010 currentSize = self.fileSize - self.fp.tell()
994 1011 neededSize = self.processingHeaderObj.blockSize + self.basicHeaderSize
995
1012
996 1013 if (currentSize >= neededSize):
997 1014 self.basicHeaderObj.read(self.fp)
998 1015 self.lastUTTime = self.basicHeaderObj.utc
999 1016 return 1
1000
1017
1001 1018 if self.__waitNewBlock():
1002 1019 self.lastUTTime = self.basicHeaderObj.utc
1003 1020 return 1
1004 1021
1005 1022 if not(self.setNextFile()):
1006 1023 return 0
1007 1024
1008 1025 deltaTime = self.basicHeaderObj.utc - self.lastUTTime #
1009 1026 self.lastUTTime = self.basicHeaderObj.utc
1010
1027
1011 1028 self.flagDiscontinuousBlock = 0
1012 1029
1013 1030 if deltaTime > self.maxTimeStep:
1014 1031 self.flagDiscontinuousBlock = 1
1015 1032
1016 1033 return 1
1017 1034
1018 1035 def readNextBlock(self):
1019
1036
1020 1037 #Skip block out of startTime and endTime
1021 1038 while True:
1022 1039 if not(self.__setNewBlock()):
1023 1040 return 0
1024
1041
1025 1042 if not(self.readBlock()):
1026 1043 return 0
1027
1044
1028 1045 self.getBasicHeader()
1029
1046
1030 1047 if not isTimeInRange(self.dataOut.datatime.time(), self.startTime, self.endTime):
1031
1048
1032 1049 print "[Reading] Block No. %d/%d -> %s [Skipping]" %(self.nReadBlocks,
1033 1050 self.processingHeaderObj.dataBlocksPerFile,
1034 1051 self.dataOut.datatime.ctime())
1035 1052 continue
1036
1053
1037 1054 break
1038
1055
1039 1056 print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1040 1057 self.processingHeaderObj.dataBlocksPerFile,
1041 1058 self.dataOut.datatime.ctime())
1042 1059 return 1
1043 1060
1044 1061 def __readFirstHeader(self):
1045
1062
1046 1063 self.basicHeaderObj.read(self.fp)
1047 1064 self.systemHeaderObj.read(self.fp)
1048 1065 self.radarControllerHeaderObj.read(self.fp)
1049 1066 self.processingHeaderObj.read(self.fp)
1050 1067
1051 1068 self.firstHeaderSize = self.basicHeaderObj.size
1052 1069
1053 1070 datatype = int(numpy.log2((self.processingHeaderObj.processFlags & PROCFLAG.DATATYPE_MASK))-numpy.log2(PROCFLAG.DATATYPE_CHAR))
1054 1071 if datatype == 0:
1055 1072 datatype_str = numpy.dtype([('real','<i1'),('imag','<i1')])
1056 1073 elif datatype == 1:
1057 1074 datatype_str = numpy.dtype([('real','<i2'),('imag','<i2')])
1058 1075 elif datatype == 2:
1059 1076 datatype_str = numpy.dtype([('real','<i4'),('imag','<i4')])
1060 1077 elif datatype == 3:
1061 1078 datatype_str = numpy.dtype([('real','<i8'),('imag','<i8')])
1062 1079 elif datatype == 4:
1063 1080 datatype_str = numpy.dtype([('real','<f4'),('imag','<f4')])
1064 1081 elif datatype == 5:
1065 1082 datatype_str = numpy.dtype([('real','<f8'),('imag','<f8')])
1066 1083 else:
1067 1084 raise ValueError, 'Data type was not defined'
1068 1085
1069 1086 self.dtype = datatype_str
1070 1087 #self.ippSeconds = 2 * 1000 * self.radarControllerHeaderObj.ipp / self.c
1071 1088 self.fileSizeByHeader = self.processingHeaderObj.dataBlocksPerFile * self.processingHeaderObj.blockSize + self.firstHeaderSize + self.basicHeaderSize*(self.processingHeaderObj.dataBlocksPerFile - 1)
1072 1089 # self.dataOut.channelList = numpy.arange(self.systemHeaderObj.numChannels)
1073 1090 # self.dataOut.channelIndexList = numpy.arange(self.systemHeaderObj.numChannels)
1074 1091 self.getBlockDimension()
1075
1092
1076 1093 def __verifyFile(self, filename, msgFlag=True):
1077
1094
1078 1095 msg = None
1079
1096
1080 1097 try:
1081 1098 fp = open(filename, 'rb')
1082 1099 except IOError:
1083
1100
1084 1101 if msgFlag:
1085 1102 print "[Reading] File %s can't be opened" % (filename)
1086
1103
1087 1104 return False
1088
1105
1089 1106 currentPosition = fp.tell()
1090 1107 neededSize = self.processingHeaderObj.blockSize + self.firstHeaderSize
1091 1108
1092 1109 if neededSize == 0:
1093 1110 basicHeaderObj = BasicHeader(LOCALTIME)
1094 1111 systemHeaderObj = SystemHeader()
1095 1112 radarControllerHeaderObj = RadarControllerHeader()
1096 1113 processingHeaderObj = ProcessingHeader()
1097
1114
1098 1115 if not( basicHeaderObj.read(fp) ):
1099 1116 fp.close()
1100 1117 return False
1101
1118
1102 1119 if not( systemHeaderObj.read(fp) ):
1103 1120 fp.close()
1104 1121 return False
1105
1122
1106 1123 if not( radarControllerHeaderObj.read(fp) ):
1107 1124 fp.close()
1108 1125 return False
1109
1126
1110 1127 if not( processingHeaderObj.read(fp) ):
1111 1128 fp.close()
1112 1129 return False
1113
1130
1114 1131 neededSize = processingHeaderObj.blockSize + basicHeaderObj.size
1115 1132 else:
1116 1133 msg = "[Reading] Skipping the file %s due to it hasn't enough data" %filename
1117 1134
1118 1135 fp.close()
1119
1136
1120 1137 fileSize = os.path.getsize(filename)
1121 1138 currentSize = fileSize - currentPosition
1122
1139
1123 1140 if currentSize < neededSize:
1124 1141 if msgFlag and (msg != None):
1125 1142 print msg
1126 1143 return False
1127 1144
1128 1145 return True
1129 1146
1130 1147 def findDatafiles(self, path, startDate=None, endDate=None, expLabel='', ext='.r', walk=True, include_path=False):
1131
1148
1132 1149 path_empty = True
1133
1150
1134 1151 dateList = []
1135 1152 pathList = []
1136
1153
1137 1154 multi_path = path.split(',')
1138
1155
1139 1156 if not walk:
1140
1157
1141 1158 for single_path in multi_path:
1142
1159
1143 1160 if not os.path.isdir(single_path):
1144 1161 continue
1145
1162
1146 1163 fileList = glob.glob1(single_path, "*"+ext)
1147
1164
1148 1165 if not fileList:
1149 1166 continue
1150
1167
1151 1168 path_empty = False
1152
1169
1153 1170 fileList.sort()
1154
1171
1155 1172 for thisFile in fileList:
1156
1173
1157 1174 if not os.path.isfile(os.path.join(single_path, thisFile)):
1158 1175 continue
1159
1176
1160 1177 if not isRadarFile(thisFile):
1161 1178 continue
1162
1179
1163 1180 if not isFileInDateRange(thisFile, startDate, endDate):
1164 1181 continue
1165
1182
1166 1183 thisDate = getDateFromRadarFile(thisFile)
1167
1184
1168 1185 if thisDate in dateList:
1169 1186 continue
1170
1187
1171 1188 dateList.append(thisDate)
1172 1189 pathList.append(single_path)
1173
1190
1174 1191 else:
1175 1192 for single_path in multi_path:
1176
1193
1177 1194 if not os.path.isdir(single_path):
1178 1195 continue
1179
1196
1180 1197 dirList = []
1181
1198
1182 1199 for thisPath in os.listdir(single_path):
1183
1200
1184 1201 if not os.path.isdir(os.path.join(single_path,thisPath)):
1185 1202 continue
1186
1203
1187 1204 if not isRadarFolder(thisPath):
1188 1205 continue
1189
1206
1190 1207 if not isFolderInDateRange(thisPath, startDate, endDate):
1191 1208 continue
1192
1209
1193 1210 dirList.append(thisPath)
1194
1211
1195 1212 if not dirList:
1196 1213 continue
1197
1214
1198 1215 dirList.sort()
1199
1216
1200 1217 for thisDir in dirList:
1201
1218
1202 1219 datapath = os.path.join(single_path, thisDir, expLabel)
1203 1220 fileList = glob.glob1(datapath, "*"+ext)
1204
1221
1205 1222 if not fileList:
1206 1223 continue
1207
1224
1208 1225 path_empty = False
1209
1226
1210 1227 thisDate = getDateFromRadarFolder(thisDir)
1211
1228
1212 1229 pathList.append(datapath)
1213 1230 dateList.append(thisDate)
1214
1231
1215 1232 dateList.sort()
1216
1233
1217 1234 if walk:
1218 1235 pattern_path = os.path.join(multi_path[0], "[dYYYYDDD]", expLabel)
1219 1236 else:
1220 1237 pattern_path = multi_path[0]
1221
1238
1222 1239 if path_empty:
1223 1240 print "[Reading] No *%s files in %s for %s to %s" %(ext, pattern_path, startDate, endDate)
1224 1241 else:
1225 1242 if not dateList:
1226 1243 print "[Reading] Date range selected invalid [%s - %s]: No *%s files in %s)" %(startDate, endDate, ext, path)
1227 1244
1228 1245 if include_path:
1229 1246 return dateList, pathList
1230
1247
1231 1248 return dateList
1232
1249
1233 1250 def setup(self,
1234 1251 path=None,
1235 startDate=None,
1236 endDate=None,
1237 startTime=datetime.time(0,0,0),
1238 endTime=datetime.time(23,59,59),
1239 set=None,
1240 expLabel = "",
1241 ext = None,
1252 startDate=None,
1253 endDate=None,
1254 startTime=datetime.time(0,0,0),
1255 endTime=datetime.time(23,59,59),
1256 set=None,
1257 expLabel = "",
1258 ext = None,
1242 1259 online = False,
1243 1260 delay = 60,
1244 1261 walk = True,
1245 1262 getblock = False,
1246 1263 nTxs = 1,
1247 1264 realtime=False,
1248 1265 blocksize=None,
1249 blocktime=None):
1266 blocktime=None,
1267 queue=None,
1268 skip=None,
1269 cursor=None):
1250 1270
1251 1271 if path == None:
1252 1272 raise ValueError, "[Reading] The path is not valid"
1253 1273
1254 1274 if ext == None:
1255 1275 ext = self.ext
1256 1276
1257 1277 if online:
1258 print "[Reading] Searching files in online mode..."
1259
1278 print "[Reading] Searching files in online mode..."
1279
1260 1280 for nTries in range( self.nTries ):
1261 1281 fullpath, foldercounter, file, year, doy, set = self.__searchFilesOnLine(path=path, expLabel=expLabel, ext=ext, walk=walk, set=set)
1262
1282
1263 1283 if fullpath:
1264 1284 break
1265
1285
1266 1286 print '[Reading] Waiting %0.2f sec for an valid file in %s: try %02d ...' % (self.delay, path, nTries+1)
1267 1287 sleep( self.delay )
1268
1288
1269 1289 if not(fullpath):
1270 1290 print "[Reading] There 'isn't any valid file in %s" % path
1271 1291 return
1272
1292
1273 1293 self.year = year
1274 1294 self.doy = doy
1275 1295 self.set = set - 1
1276 1296 self.path = path
1277 1297 self.foldercounter = foldercounter
1278 1298 last_set = None
1279
1299
1280 1300 else:
1281 1301 print "[Reading] Searching files in offline mode ..."
1282 1302 pathList, filenameList = self.__searchFilesOffLine(path, startDate=startDate, endDate=endDate,
1283 1303 startTime=startTime, endTime=endTime,
1284 1304 set=set, expLabel=expLabel, ext=ext,
1285 walk=walk)
1286
1305 walk=walk, cursor=cursor,
1306 skip=skip, queue=queue)
1307
1287 1308 if not(pathList):
1288 1309 # print "[Reading] No *%s files in %s (%s - %s)"%(ext, path,
1289 1310 # datetime.datetime.combine(startDate,startTime).ctime(),
1290 1311 # datetime.datetime.combine(endDate,endTime).ctime())
1291
1312
1292 1313 # sys.exit(-1)
1293
1314
1294 1315 self.fileIndex = -1
1295 1316 self.pathList = []
1296 1317 self.filenameList = []
1297 1318 return
1298
1319
1299 1320 self.fileIndex = -1
1300 1321 self.pathList = pathList
1301 1322 self.filenameList = filenameList
1302 1323 file_name = os.path.basename(filenameList[-1])
1303 1324 basename, ext = os.path.splitext(file_name)
1304 1325 last_set = int(basename[-3:])
1305
1326
1306 1327 self.online = online
1307 1328 self.realtime = realtime
1308 1329 self.delay = delay
1309 1330 ext = ext.lower()
1310 1331 self.ext = ext
1311 1332 self.getByBlock = getblock
1312 1333 self.nTxs = nTxs
1313 1334 self.startTime = startTime
1314 1335 self.endTime = endTime
1315
1336
1316 1337 #Added-----------------
1317 1338 self.selBlocksize = blocksize
1318 1339 self.selBlocktime = blocktime
1319
1320
1340
1341
1321 1342 if not(self.setNextFile()):
1322 1343 if (startDate!=None) and (endDate!=None):
1323 1344 print "[Reading] No files in range: %s - %s" %(datetime.datetime.combine(startDate,startTime).ctime(), datetime.datetime.combine(endDate,endTime).ctime())
1324 1345 elif startDate != None:
1325 1346 print "[Reading] No files in range: %s" %(datetime.datetime.combine(startDate,startTime).ctime())
1326 1347 else:
1327 1348 print "[Reading] No files"
1328
1349
1329 1350 self.fileIndex = -1
1330 1351 self.pathList = []
1331 1352 self.filenameList = []
1332 1353 return
1333 1354
1334 1355 # self.getBasicHeader()
1335
1356
1336 1357 if last_set != None:
1337 1358 self.dataOut.last_block = last_set * self.processingHeaderObj.dataBlocksPerFile + self.basicHeaderObj.dataBlock
1338 1359 return
1339 1360
1340 1361 def getBasicHeader(self):
1341
1362
1342 1363 self.dataOut.utctime = self.basicHeaderObj.utc + self.basicHeaderObj.miliSecond/1000. + self.profileIndex * self.radarControllerHeaderObj.ippSeconds
1343
1364
1344 1365 self.dataOut.flagDiscontinuousBlock = self.flagDiscontinuousBlock
1345
1366
1346 1367 self.dataOut.timeZone = self.basicHeaderObj.timeZone
1347
1368
1348 1369 self.dataOut.dstFlag = self.basicHeaderObj.dstFlag
1349
1370
1350 1371 self.dataOut.errorCount = self.basicHeaderObj.errorCount
1351
1372
1352 1373 self.dataOut.useLocalTime = self.basicHeaderObj.useLocalTime
1353
1374
1354 1375 self.dataOut.ippSeconds = self.radarControllerHeaderObj.ippSeconds/self.nTxs
1355
1376
1356 1377 # self.dataOut.nProfiles = self.processingHeaderObj.profilesPerBlock*self.nTxs
1357
1358
1378
1379
1359 1380 def getFirstHeader(self):
1360
1381
1361 1382 raise NotImplementedError
1362
1383
1363 1384 def getData(self):
1364
1385
1365 1386 raise NotImplementedError
1366 1387
1367 1388 def hasNotDataInBuffer(self):
1368
1389
1369 1390 raise NotImplementedError
1370 1391
1371 1392 def readBlock(self):
1372
1393
1373 1394 raise NotImplementedError
1374
1395
1375 1396 def isEndProcess(self):
1376
1397
1377 1398 return self.flagNoMoreFiles
1378
1399
1379 1400 def printReadBlocks(self):
1380
1401
1381 1402 print "[Reading] Number of read blocks per file %04d" %self.nReadBlocks
1382
1403
1383 1404 def printTotalBlocks(self):
1384
1405
1385 1406 print "[Reading] Number of read blocks %04d" %self.nTotalBlocks
1386 1407
1387 1408 def printNumberOfBlock(self):
1388
1409
1389 1410 if self.flagIsNewBlock:
1390 1411 print "[Reading] Block No. %d/%d -> %s" %(self.nReadBlocks,
1391 1412 self.processingHeaderObj.dataBlocksPerFile,
1392 1413 self.dataOut.datatime.ctime())
1393 1414
1394 1415 def printInfo(self):
1395
1416
1396 1417 if self.__printInfo == False:
1397 1418 return
1398
1419
1399 1420 self.basicHeaderObj.printInfo()
1400 1421 self.systemHeaderObj.printInfo()
1401 1422 self.radarControllerHeaderObj.printInfo()
1402 1423 self.processingHeaderObj.printInfo()
1403
1424
1404 1425 self.__printInfo = False
1405
1406
1426
1427
1407 1428 def run(self, **kwargs):
1408
1429
1409 1430 if not(self.isConfig):
1410
1431
1411 1432 # self.dataOut = dataOut
1412 1433 self.setup(**kwargs)
1413 1434 self.isConfig = True
1414
1435
1415 1436 self.getData()
1416 1437
1417 1438 class JRODataWriter(JRODataIO):
1418 1439
1419 """
1440 """
1420 1441 Esta clase permite escribir datos a archivos procesados (.r o ,pdata). La escritura
1421 de los datos siempre se realiza por bloques.
1442 de los datos siempre se realiza por bloques.
1422 1443 """
1423
1444
1424 1445 blockIndex = 0
1425
1446
1426 1447 path = None
1427
1448
1428 1449 setFile = None
1429
1450
1430 1451 profilesPerBlock = None
1431
1452
1432 1453 blocksPerFile = None
1433
1454
1434 1455 nWriteBlocks = 0
1435
1456
1436 1457 fileDate = None
1437
1458
1438 1459 def __init__(self, dataOut=None):
1439 1460 raise NotImplementedError
1440 1461
1441 1462
1442 1463 def hasAllDataInBuffer(self):
1443 1464 raise NotImplementedError
1444 1465
1445 1466
1446 1467 def setBlockDimension(self):
1447 1468 raise NotImplementedError
1448 1469
1449
1470
1450 1471 def writeBlock(self):
1451 1472 raise NotImplementedError
1452 1473
1453 1474
1454 1475 def putData(self):
1455 1476 raise NotImplementedError
1456 1477
1457
1478
1458 1479 def getProcessFlags(self):
1459
1480
1460 1481 processFlags = 0
1461
1482
1462 1483 dtype_index = get_dtype_index(self.dtype)
1463 1484 procflag_dtype = get_procflag_dtype(dtype_index)
1464
1485
1465 1486 processFlags += procflag_dtype
1466
1487
1467 1488 if self.dataOut.flagDecodeData:
1468 1489 processFlags += PROCFLAG.DECODE_DATA
1469
1490
1470 1491 if self.dataOut.flagDeflipData:
1471 1492 processFlags += PROCFLAG.DEFLIP_DATA
1472
1493
1473 1494 if self.dataOut.code is not None:
1474 1495 processFlags += PROCFLAG.DEFINE_PROCESS_CODE
1475
1496
1476 1497 if self.dataOut.nCohInt > 1:
1477 1498 processFlags += PROCFLAG.COHERENT_INTEGRATION
1478
1499
1479 1500 if self.dataOut.type == "Spectra":
1480 1501 if self.dataOut.nIncohInt > 1:
1481 1502 processFlags += PROCFLAG.INCOHERENT_INTEGRATION
1482
1503
1483 1504 if self.dataOut.data_dc is not None:
1484 1505 processFlags += PROCFLAG.SAVE_CHANNELS_DC
1485
1506
1486 1507 if self.dataOut.flagShiftFFT:
1487 1508 processFlags += PROCFLAG.SHIFT_FFT_DATA
1488
1509
1489 1510 return processFlags
1490
1511
1491 1512 def setBasicHeader(self):
1492
1513
1493 1514 self.basicHeaderObj.size = self.basicHeaderSize #bytes
1494 1515 self.basicHeaderObj.version = self.versionFile
1495 1516 self.basicHeaderObj.dataBlock = self.nTotalBlocks
1496
1517
1497 1518 utc = numpy.floor(self.dataOut.utctime)
1498 1519 milisecond = (self.dataOut.utctime - utc)* 1000.0
1499
1520
1500 1521 self.basicHeaderObj.utc = utc
1501 1522 self.basicHeaderObj.miliSecond = milisecond
1502 1523 self.basicHeaderObj.timeZone = self.dataOut.timeZone
1503 1524 self.basicHeaderObj.dstFlag = self.dataOut.dstFlag
1504 1525 self.basicHeaderObj.errorCount = self.dataOut.errorCount
1505
1526
1506 1527 def setFirstHeader(self):
1507 1528 """
1508 1529 Obtiene una copia del First Header
1509
1530
1510 1531 Affected:
1511
1532
1512 1533 self.basicHeaderObj
1513 1534 self.systemHeaderObj
1514 1535 self.radarControllerHeaderObj
1515 1536 self.processingHeaderObj self.
1516
1537
1517 1538 Return:
1518 1539 None
1519 1540 """
1520
1541
1521 1542 raise NotImplementedError
1522
1543
1523 1544 def __writeFirstHeader(self):
1524 1545 """
1525 1546 Escribe el primer header del file es decir el Basic header y el Long header (SystemHeader, RadarControllerHeader, ProcessingHeader)
1526
1547
1527 1548 Affected:
1528 1549 __dataType
1529
1550
1530 1551 Return:
1531 1552 None
1532 1553 """
1533
1554
1534 1555 # CALCULAR PARAMETROS
1535
1556
1536 1557 sizeLongHeader = self.systemHeaderObj.size + self.radarControllerHeaderObj.size + self.processingHeaderObj.size
1537 1558 self.basicHeaderObj.size = self.basicHeaderSize + sizeLongHeader
1538
1559
1539 1560 self.basicHeaderObj.write(self.fp)
1540 1561 self.systemHeaderObj.write(self.fp)
1541 1562 self.radarControllerHeaderObj.write(self.fp)
1542 1563 self.processingHeaderObj.write(self.fp)
1543
1564
1544 1565 def __setNewBlock(self):
1545 1566 """
1546 1567 Si es un nuevo file escribe el First Header caso contrario escribe solo el Basic Header
1547
1568
1548 1569 Return:
1549 1570 0 : si no pudo escribir nada
1550 1571 1 : Si escribio el Basic el First Header
1551 """
1572 """
1552 1573 if self.fp == None:
1553 1574 self.setNextFile()
1554
1575
1555 1576 if self.flagIsNewFile:
1556 1577 return 1
1557
1578
1558 1579 if self.blockIndex < self.processingHeaderObj.dataBlocksPerFile:
1559 1580 self.basicHeaderObj.write(self.fp)
1560 1581 return 1
1561
1582
1562 1583 if not( self.setNextFile() ):
1563 1584 return 0
1564
1585
1565 1586 return 1
1566 1587
1567 1588
1568 1589 def writeNextBlock(self):
1569 1590 """
1570 1591 Selecciona el bloque siguiente de datos y los escribe en un file
1571
1572 Return:
1573 0 : Si no hizo pudo escribir el bloque de datos
1592
1593 Return:
1594 0 : Si no hizo pudo escribir el bloque de datos
1574 1595 1 : Si no pudo escribir el bloque de datos
1575 1596 """
1576 1597 if not( self.__setNewBlock() ):
1577 1598 return 0
1578
1599
1579 1600 self.writeBlock()
1580
1601
1581 1602 print "[Writing] Block No. %d/%d" %(self.blockIndex,
1582 1603 self.processingHeaderObj.dataBlocksPerFile)
1583
1584 return 1
1604
1605 return 1
1585 1606
1586 1607 def setNextFile(self):
1587 """
1608 """
1588 1609 Determina el siguiente file que sera escrito
1589 1610
1590 Affected:
1611 Affected:
1591 1612 self.filename
1592 1613 self.subfolder
1593 1614 self.fp
1594 1615 self.setFile
1595 1616 self.flagIsNewFile
1596 1617
1597 1618 Return:
1598 1619 0 : Si el archivo no puede ser escrito
1599 1620 1 : Si el archivo esta listo para ser escrito
1600 1621 """
1601 1622 ext = self.ext
1602 1623 path = self.path
1603
1624
1604 1625 if self.fp != None:
1605 1626 self.fp.close()
1606
1627
1607 1628 timeTuple = time.localtime( self.dataOut.utctime)
1608 1629 subfolder = 'd%4.4d%3.3d' % (timeTuple.tm_year,timeTuple.tm_yday)
1609 1630
1610 1631 fullpath = os.path.join( path, subfolder )
1611 1632 setFile = self.setFile
1612
1633
1613 1634 if not( os.path.exists(fullpath) ):
1614 1635 os.mkdir(fullpath)
1615 1636 setFile = -1 #inicializo mi contador de seteo
1616 1637 else:
1617 1638 filesList = os.listdir( fullpath )
1618 1639 if len( filesList ) > 0:
1619 1640 filesList = sorted( filesList, key=str.lower )
1620 1641 filen = filesList[-1]
1621 1642 # el filename debera tener el siguiente formato
1622 1643 # 0 1234 567 89A BCDE (hex)
1623 1644 # x YYYY DDD SSS .ext
1624 1645 if isNumber( filen[8:11] ):
1625 1646 setFile = int( filen[8:11] ) #inicializo mi contador de seteo al seteo del ultimo file
1626 else:
1647 else:
1627 1648 setFile = -1
1628 1649 else:
1629 1650 setFile = -1 #inicializo mi contador de seteo
1630
1651
1631 1652 setFile += 1
1632
1653
1633 1654 #If this is a new day it resets some values
1634 1655 if self.dataOut.datatime.date() > self.fileDate:
1635 1656 setFile = 0
1636 1657 self.nTotalBlocks = 0
1637
1658
1638 1659 filen = '%s%4.4d%3.3d%3.3d%s' % (self.optchar, timeTuple.tm_year, timeTuple.tm_yday, setFile, ext )
1639 1660
1640 1661 filename = os.path.join( path, subfolder, filen )
1641 1662
1642 1663 fp = open( filename,'wb' )
1643
1664
1644 1665 self.blockIndex = 0
1645
1646 #guardando atributos
1666
1667 #guardando atributos
1647 1668 self.filename = filename
1648 1669 self.subfolder = subfolder
1649 1670 self.fp = fp
1650 1671 self.setFile = setFile
1651 1672 self.flagIsNewFile = 1
1652 1673 self.fileDate = self.dataOut.datatime.date()
1653
1674
1654 1675 self.setFirstHeader()
1655
1676
1656 1677 print '[Writing] Opening file: %s'%self.filename
1657
1678
1658 1679 self.__writeFirstHeader()
1659
1680
1660 1681 return 1
1661
1682
1662 1683 def setup(self, dataOut, path, blocksPerFile, profilesPerBlock=64, set=None, ext=None, datatype=4):
1663 1684 """
1664 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1665
1685 Setea el tipo de formato en la cual sera guardada la data y escribe el First Header
1686
1666 1687 Inputs:
1667 1688 path : directory where data will be saved
1668 profilesPerBlock : number of profiles per block
1689 profilesPerBlock : number of profiles per block
1669 1690 set : initial file set
1670 1691 datatype : An integer number that defines data type:
1671 1692 0 : int8 (1 byte)
1672 1693 1 : int16 (2 bytes)
1673 1694 2 : int32 (4 bytes)
1674 1695 3 : int64 (8 bytes)
1675 1696 4 : float32 (4 bytes)
1676 1697 5 : double64 (8 bytes)
1677
1698
1678 1699 Return:
1679 1700 0 : Si no realizo un buen seteo
1680 1 : Si realizo un buen seteo
1701 1 : Si realizo un buen seteo
1681 1702 """
1682
1703
1683 1704 if ext == None:
1684 1705 ext = self.ext
1685
1706
1686 1707 self.ext = ext.lower()
1687
1708
1688 1709 self.path = path
1689
1710
1690 1711 if set is None:
1691 1712 self.setFile = -1
1692 1713 else:
1693 1714 self.setFile = set - 1
1694
1715
1695 1716 self.blocksPerFile = blocksPerFile
1696
1717
1697 1718 self.profilesPerBlock = profilesPerBlock
1698
1719
1699 1720 self.dataOut = dataOut
1700 1721 self.fileDate = self.dataOut.datatime.date()
1701 1722 #By default
1702 1723 self.dtype = self.dataOut.dtype
1703
1724
1704 1725 if datatype is not None:
1705 1726 self.dtype = get_numpy_dtype(datatype)
1706
1727
1707 1728 if not(self.setNextFile()):
1708 1729 print "[Writing] There isn't a next file"
1709 1730 return 0
1710
1731
1711 1732 self.setBlockDimension()
1712
1733
1713 1734 return 1
1714
1735
1715 1736 def run(self, dataOut, **kwargs):
1716
1737
1717 1738 if not(self.isConfig):
1718
1739
1719 1740 self.setup(dataOut, **kwargs)
1720 1741 self.isConfig = True
1721
1722 self.putData()
1723 1742
1743 self.putData()
@@ -1,378 +1,378
1 1 '''
2 2 @author: Juan C. Espinoza
3 3 '''
4 4
5 5 import time
6 6 import json
7 7 import numpy
8 8 import paho.mqtt.client as mqtt
9 9 import zmq
10 10 import cPickle as pickle
11 11 import datetime
12 12 from zmq.utils.monitor import recv_monitor_message
13 13 from functools import wraps
14 14 from threading import Thread
15 15 from multiprocessing import Process
16 16
17 17 from schainpy.model.proc.jroproc_base import Operation, ProcessingUnit
18 18
19 19 MAXNUMX = 100
20 20 MAXNUMY = 100
21 21 throttle_value = 5
22 22
23 23 class PrettyFloat(float):
24 24 def __repr__(self):
25 25 return '%.2f' % self
26 26
27 27 def roundFloats(obj):
28 28 if isinstance(obj, list):
29 29 return map(roundFloats, obj)
30 30 elif isinstance(obj, float):
31 31 return round(obj, 2)
32 32
33 33
34 34 class throttle(object):
35 35 """Decorator that prevents a function from being called more than once every
36 36 time period.
37 37 To create a function that cannot be called more than once a minute, but
38 38 will sleep until it can be called:
39 39 @throttle(minutes=1)
40 40 def foo():
41 41 pass
42 42
43 43 for i in range(10):
44 44 foo()
45 45 print "This function has run %s times." % i
46 46 """
47 47
48 48 def __init__(self, seconds=0, minutes=0, hours=0):
49 49 self.throttle_period = datetime.timedelta(
50 50 seconds=seconds, minutes=minutes, hours=hours
51 51 )
52 52 self.time_of_last_call = datetime.datetime.min
53 53
54 54 def __call__(self, fn):
55 55 @wraps(fn)
56 56 def wrapper(*args, **kwargs):
57 57 now = datetime.datetime.now()
58 58 time_since_last_call = now - self.time_of_last_call
59 59 time_left = self.throttle_period - time_since_last_call
60 60
61 61 if time_left > datetime.timedelta(seconds=0):
62 62 return
63 63
64 64 self.time_of_last_call = datetime.datetime.now()
65 65 return fn(*args, **kwargs)
66 66
67 67 return wrapper
68 68
69 69
70 70 class PublishData(Operation):
71 71 """Clase publish."""
72 72
73 73 def __init__(self, **kwargs):
74 74 """Inicio."""
75 75 Operation.__init__(self, **kwargs)
76 76 self.isConfig = False
77 77 self.client = None
78 78 self.zeromq = None
79 79 self.mqtt = None
80 80
81 81 def on_disconnect(self, client, userdata, rc):
82 82 if rc != 0:
83 83 print("Unexpected disconnection.")
84 84 self.connect()
85 85
86 86 def connect(self):
87 87 print 'trying to connect'
88 88 try:
89 89 self.client.connect(
90 90 host=self.host,
91 91 port=self.port,
92 92 keepalive=60*10,
93 93 bind_address='')
94 94 print "connected"
95 95 self.client.loop_start()
96 96 # self.client.publish(
97 97 # self.topic + 'SETUP',
98 98 # json.dumps(setup),
99 99 # retain=True
100 100 # )
101 101 except:
102 102 print "MQTT Conection error."
103 103 self.client = False
104 104
105 105 def setup(self, port=1883, username=None, password=None, clientId="user", zeromq=1, **kwargs):
106 106 self.counter = 0
107 107 self.topic = kwargs.get('topic', 'schain')
108 108 self.delay = kwargs.get('delay', 0)
109 109 self.plottype = kwargs.get('plottype', 'spectra')
110 110 self.host = kwargs.get('host', "10.10.10.82")
111 111 self.port = kwargs.get('port', 3000)
112 112 self.clientId = clientId
113 113 self.cnt = 0
114 114 self.zeromq = zeromq
115 115 self.mqtt = kwargs.get('plottype', 0)
116 116 self.client = None
117 117 setup = []
118 118 if mqtt is 1:
119 119 print 'mqqt es 1'
120 120 self.client = mqtt.Client(
121 121 client_id=self.clientId + self.topic + 'SCHAIN',
122 122 clean_session=True)
123 123 self.client.on_disconnect = self.on_disconnect
124 124 self.connect()
125 125 for plot in self.plottype:
126 126 setup.append({
127 127 'plot': plot,
128 128 'topic': self.topic + plot,
129 129 'title': getattr(self, plot + '_' + 'title', False),
130 130 'xlabel': getattr(self, plot + '_' + 'xlabel', False),
131 131 'ylabel': getattr(self, plot + '_' + 'ylabel', False),
132 132 'xrange': getattr(self, plot + '_' + 'xrange', False),
133 133 'yrange': getattr(self, plot + '_' + 'yrange', False),
134 134 'zrange': getattr(self, plot + '_' + 'zrange', False),
135 135 })
136 136 if zeromq is 1:
137 137 context = zmq.Context()
138 138 self.zmq_socket = context.socket(zmq.PUSH)
139 139 server = kwargs.get('server', 'zmq.pipe')
140 140
141 141 if 'tcp://' in server:
142 142 address = server
143 143 else:
144 144 address = 'ipc:///tmp/%s' % server
145 145
146 146 self.zmq_socket.connect(address)
147 147 time.sleep(1)
148 148 print 'zeromq configured'
149 149
150 150
151 151 def publish_data(self):
152 152 self.dataOut.finished = False
153 153 if self.mqtt is 1:
154 154 yData = self.dataOut.heightList[:2].tolist()
155 155 if self.plottype == 'spectra':
156 156 data = getattr(self.dataOut, 'data_spc')
157 157 z = data/self.dataOut.normFactor
158 158 zdB = 10*numpy.log10(z)
159 159 xlen, ylen = zdB[0].shape
160 160 dx = int(xlen/MAXNUMX) + 1
161 161 dy = int(ylen/MAXNUMY) + 1
162 162 Z = [0 for i in self.dataOut.channelList]
163 163 for i in self.dataOut.channelList:
164 164 Z[i] = zdB[i][::dx, ::dy].tolist()
165 165 payload = {
166 166 'timestamp': self.dataOut.utctime,
167 167 'data': roundFloats(Z),
168 168 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
169 169 'interval': self.dataOut.getTimeInterval(),
170 170 'type': self.plottype,
171 171 'yData': yData
172 172 }
173 173 # print payload
174 174
175 175 elif self.plottype in ('rti', 'power'):
176 176 data = getattr(self.dataOut, 'data_spc')
177 177 z = data/self.dataOut.normFactor
178 178 avg = numpy.average(z, axis=1)
179 179 avgdB = 10*numpy.log10(avg)
180 180 xlen, ylen = z[0].shape
181 181 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
182 182 AVG = [0 for i in self.dataOut.channelList]
183 183 for i in self.dataOut.channelList:
184 184 AVG[i] = avgdB[i][::dy].tolist()
185 185 payload = {
186 186 'timestamp': self.dataOut.utctime,
187 187 'data': roundFloats(AVG),
188 188 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
189 189 'interval': self.dataOut.getTimeInterval(),
190 190 'type': self.plottype,
191 191 'yData': yData
192 192 }
193 193 elif self.plottype == 'noise':
194 194 noise = self.dataOut.getNoise()/self.dataOut.normFactor
195 195 noisedB = 10*numpy.log10(noise)
196 196 payload = {
197 197 'timestamp': self.dataOut.utctime,
198 198 'data': roundFloats(noisedB.reshape(-1, 1).tolist()),
199 199 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
200 200 'interval': self.dataOut.getTimeInterval(),
201 201 'type': self.plottype,
202 202 'yData': yData
203 203 }
204 204 elif self.plottype == 'snr':
205 205 data = getattr(self.dataOut, 'data_SNR')
206 206 avgdB = 10*numpy.log10(data)
207 207
208 208 ylen = data[0].size
209 209 dy = numpy.floor(ylen/self.__MAXNUMY) + 1
210 210 AVG = [0 for i in self.dataOut.channelList]
211 211 for i in self.dataOut.channelList:
212 212 AVG[i] = avgdB[i][::dy].tolist()
213 213 payload = {
214 214 'timestamp': self.dataOut.utctime,
215 215 'data': roundFloats(AVG),
216 216 'channels': ['Ch %s' % ch for ch in self.dataOut.channelList],
217 217 'type': self.plottype,
218 218 'yData': yData
219 219 }
220 220 else:
221 221 print "Tipo de grafico invalido"
222 222 payload = {
223 223 'data': 'None',
224 224 'timestamp': 'None',
225 225 'type': None
226 226 }
227 227 # print 'Publishing data to {}'.format(self.host)
228 228 self.client.publish(self.topic + self.plottype, json.dumps(payload), qos=0)
229 229
230 230 if self.zeromq is 1:
231 231 print '[Sending] {} - {}'.format(self.dataOut.type, self.dataOut.datatime)
232 232 self.zmq_socket.send_pyobj(self.dataOut)
233 233
234 234 def run(self, dataOut, **kwargs):
235 235 self.dataOut = dataOut
236 236 if not self.isConfig:
237 237 self.setup(**kwargs)
238 238 self.isConfig = True
239 239
240 240 self.publish_data()
241 241 time.sleep(self.delay)
242 242
243 243 def close(self):
244 244 if self.zeromq is 1:
245 245 self.dataOut.finished = True
246 246 self.zmq_socket.send_pyobj(self.dataOut)
247 247
248 248 if self.client:
249 249 self.client.loop_stop()
250 250 self.client.disconnect()
251 251
252 252
253 253 class ReceiverData(ProcessingUnit, Process):
254 254
255 255 def __init__(self, **kwargs):
256 256
257 257 ProcessingUnit.__init__(self, **kwargs)
258 258 Process.__init__(self)
259 259 self.mp = False
260 260 self.isConfig = False
261 261 self.plottypes =[]
262 262 self.connections = 0
263 263 server = kwargs.get('server', 'zmq.pipe')
264 264 if 'tcp://' in server:
265 265 address = server
266 266 else:
267 267 address = 'ipc:///tmp/%s' % server
268 268
269 269 self.address = address
270 270 self.plottypes = [s.strip() for s in kwargs.get('plottypes', 'rti').split(',')]
271 271 self.realtime = kwargs.get('realtime', False)
272 272 global throttle_value
273 273 throttle_value = kwargs.get('throttle', 10)
274 274 self.setup()
275 275
276 276 def setup(self):
277 277
278 278 self.data = {}
279 279 self.data['times'] = []
280 280 for plottype in self.plottypes:
281 281 self.data[plottype] = {}
282
282 self.data['noise'] = {}
283 283 self.isConfig = True
284 284
285 285 def event_monitor(self, monitor):
286 286
287 287 events = {}
288 288
289 289 for name in dir(zmq):
290 290 if name.startswith('EVENT_'):
291 291 value = getattr(zmq, name)
292 292 events[value] = name
293 293
294 294 while monitor.poll():
295 295 evt = recv_monitor_message(monitor)
296 296 if evt['event'] == 32:
297 297 self.connections += 1
298 298 if evt['event'] == 512:
299 299 pass
300 300 if self.connections == 0 and self.started is True:
301 301 self.ended = True
302 302 # send('ENDED')
303 303 evt.update({'description': events[evt['event']]})
304 304
305 305 if evt['event'] == zmq.EVENT_MONITOR_STOPPED:
306 306 break
307 307 monitor.close()
308 308 print("event monitor thread done!")
309 309
310 310 @throttle(seconds=throttle_value)
311 311 def sendData(self, data):
312 312 self.send(data)
313 313
314 314 def send(self, data):
315 315 print '[sending] data=%s size=%s' % (data.keys(), len(data['times']))
316 316 self.sender.send_pyobj(data)
317 317
318 318 def update(self):
319 319
320 320 t = self.dataOut.ltctime
321 321 self.data['times'].append(t)
322 322 self.data['dataOut'] = self.dataOut
323 323
324 324 for plottype in self.plottypes:
325 325
326 326 if plottype == 'spc':
327 327 z = self.dataOut.data_spc/self.dataOut.normFactor
328 zdB = 10*numpy.log10(z)
329 self.data[plottype] = zdB
328 self.data[plottype] = 10*numpy.log10(z)
329 self.data['noise'][t] = 10*numpy.log10(self.dataOut.getNoise()/self.dataOut.normFactor)
330 330 if plottype == 'rti':
331 331 self.data[plottype][t] = self.dataOut.getPower()
332 332 if plottype == 'snr':
333 333 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_SNR)
334 334 if plottype == 'dop':
335 335 self.data[plottype][t] = 10*numpy.log10(self.dataOut.data_DOP)
336 336 if plottype == 'coh':
337 337 self.data[plottype][t] = self.dataOut.getCoherence()
338 338 if plottype == 'phase':
339 339 self.data[plottype][t] = self.dataOut.getCoherence(phase=True)
340 340
341 341 def run(self):
342 342
343 343 print '[Starting] {} from {}'.format(self.name, self.address)
344 344
345 345 self.context = zmq.Context()
346 346 self.receiver = self.context.socket(zmq.PULL)
347 347 self.receiver.bind(self.address)
348 348 monitor = self.receiver.get_monitor_socket()
349 349 self.sender = self.context.socket(zmq.PUB)
350 350
351 351 self.sender.bind("ipc:///tmp/zmq.plots")
352 352
353 353 t = Thread(target=self.event_monitor, args=(monitor,))
354 354 t.start()
355 355
356 356 while True:
357 357 self.dataOut = self.receiver.recv_pyobj()
358 358 print '[Receiving] {} - {}'.format(self.dataOut.type,
359 359 self.dataOut.datatime.ctime())
360 360
361 361 self.update()
362 362
363 363 if self.dataOut.finished is True:
364 364 self.send(self.data)
365 365 self.connections -= 1
366 366 if self.connections==0 and self.started:
367 367 self.ended = True
368 368 self.data['ENDED'] = True
369 369 self.send(self.data)
370 370 self.setup()
371 371 else:
372 372 if self.realtime:
373 373 self.send(self.data)
374 374 else:
375 375 self.sendData(self.data)
376 376 self.started = True
377 377
378 378 return
General Comments 0
You need to be logged in to leave comments. Login now