##// END OF EJS Templates
v2.9.2 :: Update 'upload_file' function - Add empty resource
eynilupu -
r14:a8d46058b456
parent child
Show More
1 NO CONTENT: modified file, binary diff hidden
@@ -1,1022 +1,1037
1 1 from ckanapi import RemoteCKAN
2 2 from datetime import datetime
3 3 from tqdm import tqdm
4 4 from CKAN_JRO import logic_download
5 5 #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError
6 6 import sys
7 7 import platform
8 8 import os
9 9 import tempfile
10 10 import shutil
11 11 import zipfile
12 12 import concurrent.futures
13 13 import requests
14 14 import json
15 15 #import pathlib
16 16 import uuid
17 17
18 18 if sys.version_info.major == 3:
19 19 from urllib.parse import urlparse
20 20 else:
21 21 import urlparse
22 22
23 23 class JROAPI():
24 24 """
25 25 FINALIDAD:
26 26 Script para administrar y obtener la data del repositorio por medio de APIs.
27 27
28 28 REQUISITIOS PREVIOS:
29 29 - Paso 1: Tener "pip [Python 2]" o "pip3 [Python 3]" instalado:
30 30 - Paso 2: Instalar lo siguiente como admininstrador:
31 31 En Python 2
32 32 - pip install ckanapi==4.5
33 33 - pip install requests
34 34 - pip install futures
35 35 - pip install tqdm
36 36 En Python > 3
37 37 - pip3 install ckanapi==4.5
38 38 - pip3 install requests
39 39 - pip3 install tqdm
40 40
41 41 FUNCIONES DISPONIBLES:
42 42 - action
43 43 - upload_file
44 44 - upload_multiple_files
45 45 - upload_multiple_files_advance
46 46 - show
47 47 - search
48 48 - create
49 49 - patch
50 50 - delete
51 51 - download_files
52 52
53 53 EJEMPLOS:
54 54 #1:
55 55 with JROAPI('http://demo.example.com', Authorization='#########') as <access_name>:
56 56 ... some operation(s) ...
57 57 #2:
58 58 <access_name> = JROAPI('http://example.com', Authorization='#########')
59 59 ... some operation(s) ...
60 60 <access_name>.ckan.close()
61 61
62 62 REPORTAR ALGUN PROBLEMA:
63 63 Debe enviar un correo a eynilupu@igp.gob.pe detallando los siguientes pasos:
64 64 1) Correo para contactarlo
65 65 2) Descripcion del problema
66 66 3) ¿En que paso o seccion encontro el problema?
67 67 4) ¿Cual era el resultado que usted esperaba?
68 68 """
69 69 def __init__(self, url, Authorization=None, secure=True):
70 70 #-------- Check Secure -------#
71 71 self.verify = secure
72 72 if not secure and isinstance(secure, bool):
73 73 session = requests.Session()
74 74 session.verify = False
75 75 else:
76 76 session = None
77 77 #------------------------------#
78 78 self.url = url
79 79 ua = 'CKAN_JRO/2.9.2 (+'+str(self.url)+')'
80 80 #ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'
81 81 self.ckan = RemoteCKAN(self.url, apikey=Authorization, user_agent=ua, session=session)
82 82 #self.ckan = RemoteCKAN(self.url, apikey=Authorization)
83 83 self.Authorization = Authorization
84 84 # Change for --> self.separator = os.sep
85 85 if platform.system() == 'Windows':
86 86 self.separator = '\\'
87 87 else:
88 88 self.separator = '/'
89 89
90 90 self.chunk_size = 1024
91 91 self.list = []
92 92 self.dict = {}
93 93 self.str = ''
94 94 self.check = 1
95 95 self.cont = 0
96 96
97 97 def __enter__(self):
98 98 return self
99 99
100 100 def __exit__(self, *args):
101 101 self.ckan.close()
102 102
103 103 def action(self, action, **kwargs):
104 104 """
105 105 FINALIDAD:
106 106 Funcion para llamar a las APIs disponibles
107 107
108 108 APIs DISPONIBLES:
109 109 CONSULTAR: "GUIA DE SCRIPT.pdf"
110 110
111 111 EJEMPLO:
112 112 <access_name>.action(<consuming API>, param_1 = <class 'param_1'>, ...)
113 113 """
114 114 #--------------- CASE: PACKAGE SEARCH ---------------#
115 115 if kwargs is not None:
116 116 if action == 'package_search':
117 117 self.list = ['facet_mincount', 'facet_limit', 'facet_field']
118 118 for facet in self.list:
119 119 if facet in kwargs:
120 120 kwargs[facet.replace('_', '.')] = kwargs[facet]
121 121 kwargs.pop(facet)
122 122 #----------------------------------------------------#
123 123 try:
124 124 return getattr(self.ckan.action, action)(**kwargs)
125 125 except:
126 126 _, exc_value, _ = sys.exc_info()
127 127 return exc_value
128
129 def upload_file(self, dataset_id, file_path, file_date, file_type, **kwargs):
128
129 def upload_file(self, dataset_id, file_date, file_type, file_path=False, url_or_path=False, ignore_repetition=False, **kwargs):
130 130 # Agregar si es interruptido por teclado
131 131 '''
132 132 FINALIDAD:
133 Funcion para subir un unico archivo al repositorio del ROJ.
133 Funcion para crear un unico recurso (puede incluir un archivo asociado) al repositorio del ROJ.
134 134
135 135 PARAMETROS DISPONIBLES:
136 136 CONSULTAR: "GUIA DE SCRIPT.pdf"
137 137
138 138 ESTRUCTURA:
139 <access_name>.upload_file(dataset_id = <class 'str'>, file_date = <class 'str'>, file_path = <class 'str'>, file_type = <class 'str'>, param_1 = <class 'param_1'>, ...)
139 <access_name>.upload_file(dataset_id = <class 'str'>, file_date = <class 'str'>, file_type = <class 'str'>, file_path = <class 'str'>, url_or_path = <class 'str'>, param_1 = <class 'param_1'>, ...)
140 140 '''
141 self.list = ['package_id', 'upload', 'voc_file_type', 'name'] #file_date
141 #self.list = ['package_id', 'upload', 'voc_file_type', 'name'] #file_date
142 self.list = ['package_id', 'upload', 'voc_file_type'] #file_date
142 143 for key1, value1 in kwargs.items():
143 144 if not key1 in self.list:
144 145 self.dict[key1] = value1
145 146
146 147 #---------------------------#
147 148 if not 'others' in kwargs:
148 149 self.dict['others'] = ''
149 150 else:
150 151 if isinstance(kwargs['others'], list):
151 152 self.dict['others'] = json.dumps(kwargs['others'])
152 153 #---------------------------#
153 154
154 if not os.path.isfile(file_path):
155 return 'File "%s" not exist' % (file_path)
156
155 if isinstance(file_path, str) and isinstance(url_or_path, str):
156 return 'ERROR:: Choose one: "file_path" or "url_or_path" parameters'
157
158 if isinstance(file_path, str):
159 if not os.path.isfile(file_path):
160 return 'File "%s" not exist' % (file_path)
161
162 self.dict['upload'] = open(file_path, 'rb')
163 self.dict['name'] = os.path.basename(file_path)
164 elif isinstance(url_or_path, str):
165 self.dict['url'] = url_or_path
166 if not 'name' in self.dict:
167 self.dict['name'] = os.path.basename(url_or_path)
168 else:
169 return 'ERROR: Verify "file_path" or "url_or_path" parameters: <class "str"> or choose one'
170
157 171 #if not 'format' in self.dict:
158 172 # self.str = ''.join(pathlib.Path(file_path).suffixes)
159 173 # if len(self.str) > 0:
160 174 # self.dict['format'] = self.str.upper()[1:]
161 175
162 176 #-------------------------PACKAGE SHOW-----------------------#
163 177 try:
164 178 dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
165 179 except:
166 180 _, exc_value, _ = sys.exc_info()
167 181 print('ERROR obtaining metadata dataset:: Use the "print" for more information')
168 182 return exc_value
169 183
170 184 resources_name = []
171 185 for u in dataset_show:
172 186 resources_name.append(u['name'].lower())
173 187
174 if os.path.basename(file_path).lower() in resources_name:
175 return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(file_path))
188 if self.dict['name'].lower() in resources_name:
189 if not ignore_repetition:
190 return 'ERROR:: "%s" resource already exist in this dataset' % (self.dict['name'])
191 print('WARRING:: "'+ str(self.dict['name']) +'" resource already exist in this dataset')
176 192 #------------------------------------------------------------#
177
178 193 try:
179 return getattr(self.ckan.action, 'resource_create')(package_id=dataset_id, file_date=file_date, upload=open(file_path, 'rb'), voc_file_type=file_type, name=os.path.basename(file_path), **self.dict)
194 return getattr(self.ckan.action, 'resource_create')(package_id=dataset_id, file_date=file_date, voc_file_type=file_type, **self.dict)
180 195 except:
181 196 _, exc_value, _ = sys.exc_info()
182 197 return exc_value
183 198
184 199 def upload_multiple_files_advance(self, dataset_id, path_files, file_date, file_type, max_size=100, max_count=500, ignore_repetition=False, **kwargs):
185 200 # Agregar si es interruptido por teclado
186 201 '''
187 202 FINALIDAD:
188 203 Funcion para subir multiples archivos al repositorio del ROJ.
189 204
190 205 PARAMETROS DISPONIBLES:
191 206 CONSULTAR: "GUIA DE SCRIPT.pdf"
192 207
193 208 ESTRUCTURA:
194 209 <access_name>.upload_multiple_files_advance(dataset_id = <class 'str'>, path_files = <class 'list of strings'>, file_date = <class 'str'>, file_type = <class 'str'>, param_1 = <class 'param_1'>, ...)
195 210 '''
196 211 #-------------------------PACKAGE SHOW-----------------------#
197 212 try:
198 213 dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
199 214 except:
200 215 _, exc_value, _ = sys.exc_info()
201 216 print('ERROR obtaining metadata dataset:: Use the "print" for more information')
202 217 return exc_value
203 218 #------------------------------------------------------------#
204 219 resources_name = []
205 220 for u in dataset_show:
206 221 resources_name.append(u['name'].lower())
207 222 #------------------------------------------------------------#
208 223 self.list = ['package_id', 'upload', 'voc_file_type', 'name']
209 224 for key1, value1 in kwargs.items():
210 225 if not key1 in self.list:
211 226 self.dict[key1] = value1
212 227 #------------------------------------------------------------#
213 228 if not 'others' in kwargs:
214 229 self.dict['others'] = ''
215 230 else:
216 231 if isinstance(kwargs['others'], list):
217 232 self.dict['others'] = json.dumps(kwargs['others'])
218 233 #------------------------------------------------------------#
219 234 total_list = []
220 235 #---------------CASO : "path" or "path_list"-----------------#
221 236 if type(path_files) is list:
222 237 if len(path_files) != 0:
223 238 path_files.sort()
224 239 for u in path_files:
225 240 if os.path.isfile(u):
226 241 if os.path.basename(u).lower() in resources_name:
227 242 if not ignore_repetition:
228 243 return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u))
229 244 print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset')
230 245 else:
231 246 total_list.append({'name':os.path.basename(u), 'size': os.stat(u).st_size, 'upload':open(u, 'rb')})
232 247 else:
233 248 return 'File "%s" does not exist' % (u)
234 249 else:
235 250 return 'ERROR:: "path_list is empty"'
236 251
237 252 elif type(path_files) is str:
238 253 if os.path.isdir(path_files):
239 254 path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))]
240 255 path_order.sort()
241 256 if path_order:
242 257 for name in path_order:
243 258 if name.lower() in resources_name:
244 259 if not ignore_repetition:
245 260 return 'ERROR:: "%s" file already exist in this dataset' % (name)
246 261 print('WARRING:: "'+ name +'" file was ignored because already exist in this dataset')
247 262 else:
248 263 total_list.append({'name':name, 'size': os.stat(os.path.join(path_files, name)).st_size, 'upload':open(os.path.join(path_files, name), 'rb')})
249 264 else:
250 265 return "ERROR:: There aren't files in this directory"
251 266 else:
252 267 return 'ERROR:: Directory "%s" does not exist' % (path_files)
253 268 else:
254 269 return 'ERROR:: "path_files" must be a str or list'
255 270 #------------------------------------------------------------#
256 271 try:
257 272 uuid.UUID(str(dataset_id), version=4)
258 273 package_id_or_name = '"id": "' + str(dataset_id) + '"'
259 274 except ValueError:
260 275 package_id_or_name = '"name": "' + str(dataset_id) + '"'
261 276 #------------------------------------------------------------#
262 277 blocks = [[]]
263 278 size_file = 0
264 279 count_file = 0
265 280 inter_num = 0
266 281 for value in total_list:
267 282 if value['size'] > 1024 * 1024 * float(max_size):
268 283 return 'ERROR:: The size of the "%s" file is %sMB aprox, please change "max_size" value' % (value['name'], str(round(value['size']/(1024 * 1024), 2)))
269 284 if not 1 <= int(max_count) <= 999:
270 285 return 'ERROR:: The count of the number of files must be between 1 and 999, please change "max_count" value'
271 286
272 287 size_file = size_file + value['size']
273 288 count_file = count_file + 1
274 289 if size_file <= 1024 * 1024 * float(max_size) and count_file <= int(max_count):
275 290 del value['size']
276 291 blocks[inter_num].append(value)
277 292 else:
278 293 inter_num = inter_num + 1
279 294 size_file = value['size']
280 295 count_file = 1
281 296 blocks.append([])
282 297 del value['size']
283 298 blocks[inter_num].append(value)
284 299 #------------------------------------------------------------#
285 300 if len(blocks[0]) > 0:
286 301 print('BLOCK(S) IN TOTAL:: {}'.format(len(blocks)))
287 302 for count1, block in enumerate(blocks):
288 303 print('---- BLOCK N°{} ----'.format(count1 + 1))
289 304 resource_extend = []
290 305 files_dict = {}
291 306 for count2, value2 in enumerate(block):
292 307 value2['file_date'] = file_date
293 308 value2['voc_file_type'] = file_type
294 309 value2.update(self.dict)
295 310
296 311 #if not 'format' in value2:
297 312 # format = ''.join(pathlib.Path(value2['name']).suffixes)
298 313 # if len(format) > 0:
299 314 # value2['format'] = format.upper()[1:]
300 315
301 316 files_dict['update__resources__-'+ str(len(block)-count2) +'__upload'] = (value2['name'], value2['upload'])
302 317 del value2['upload']
303 318 resource_extend.append(value2)
304 319
305 320 print('BLOCK N°{} :: "{}" file(s) found >> uploading'.format(count1 + 1, len(block)))
306 321 try:
307 322 result = self.ckan.call_action(
308 323 'package_revise',
309 324 {'match': '{'+ str(package_id_or_name) +'}', 'update__resources__extend': json.dumps(resource_extend)},
310 325 files=files_dict
311 326 )
312 327 print('BLOCK N°{} :: Uploaded file(s) successfully'.format(count1 + 1))
313 328 if len(blocks) == count1 + 1:
314 329 return result
315 330 except:
316 331 print('ERROR :: Use the "print" for more information')
317 332 _, exc_value, _ = sys.exc_info()
318 333 return exc_value
319 334 else:
320 335 return "ERROR:: No file(s) found to upload"
321 336
322 337 def upload_multiple_files(self, dataset_id, path_files, date_files, type_files, ignore_repetition=False, **kwargs):
323 338 # Agregar si es interruptido por teclado
324 339 '''
325 340 FINALIDAD:
326 341 Funcion para subir multiples archivos al repositorio del ROJ.
327 342
328 343 PARAMETROS DISPONIBLES:
329 344 CONSULTAR: "GUIA DE SCRIPT.pdf"
330 345
331 346 ESTRUCTURA:
332 347 <access_name>.upload_multiple_files(dataset_id = <class 'str'>, path_files = <class 'str'> or <class 'list of strings'>, date_files = <class 'str'> or <class 'list of strings'>, type_files = <class 'str'> or <class 'list of strings'>, param_1 = <class 'param_1'>, ...)
333 348 '''
334 349 #-------------------------PACKAGE SHOW-----------------------#
335 350 try:
336 351 dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
337 352 except:
338 353 _, exc_value, _ = sys.exc_info()
339 354 print('ERROR obtaining metadata dataset:: Use the "print" for more information')
340 355 return exc_value
341 356 #------------------------------------------------------------#
342 357 resources_name = []
343 358 for u in dataset_show:
344 359 resources_name.append(u['name'].lower())
345 360 #------------------------------------------------------------#
346 361
347 362 params_dict = {'upload':[], 'name':[]}
348 363 #if not 'format' in kwargs:
349 364 # params_dict.update({'format':[]})
350 365 #---------------CASO : "path" or "path_list"-----------------#
351 366 if type(path_files) is list:
352 367 if len(path_files) != 0:
353 368 path_files.sort()
354 369 for u in path_files:
355 370 if os.path.isfile(u):
356 371 if os.path.basename(u).lower() in resources_name:
357 372 if not ignore_repetition:
358 373 return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u))
359 374 print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset')
360 375 else:
361 376 params_dict['upload'].append(open(u, 'rb'))
362 377 params_dict['name'].append(os.path.basename(u))
363 378 #if not 'format' in kwargs:
364 379 # format = ''.join(pathlib.Path(u).suffixes)
365 380 # if len(format) > 0:
366 381 # params_dict['format'].append(format.upper()[1:])
367 382 # else:
368 383 # params_dict['format'].append('')
369 384 else:
370 385 return 'File "%s" does not exist' % (u)
371 386 else:
372 387 return 'ERROR:: "path_list is empty"'
373 388 elif type(path_files) is str:
374 389 if os.path.isdir(path_files):
375 390 path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))]
376 391 path_order.sort()
377 392 if path_order:
378 393 for name in path_order:
379 394 if name.lower() in resources_name:
380 395 if not ignore_repetition:
381 396 return 'ERROR:: "%s" file already exist in this dataset' % (name)
382 397 print('WARRING:: "'+ str(name) +'" file was ignored because already exist in this dataset')
383 398 else:
384 399 params_dict['upload'].append(open(os.path.join(path_files, name), 'rb'))
385 400 params_dict['name'].append(name)
386 401 #if not 'format' in kwargs:
387 402 # format = ''.join(pathlib.Path(name).suffixes)
388 403 # if len(format) > 0:
389 404 # params_dict['format'].append(format.upper()[1:])
390 405 # else:
391 406 # params_dict['format'].append('')
392 407 else:
393 408 return "ERROR:: There aren't files in this directory"
394 409 else:
395 410 return 'ERROR:: Directory "%s" does not exist' % (path_files)
396 411 else:
397 412 return 'ERROR:: "path_files" must be a str or list'
398 413 #------------------------------------------------------------#
399 414 params_no_dict = {'package_id': dataset_id}
400 415 if type(date_files) is list:
401 416 params_dict['file_date'] = date_files
402 417 else:
403 418 params_no_dict['file_date'] = date_files
404 419
405 420 if type(type_files) is list:
406 421 params_dict['voc_file_type'] = type_files
407 422 else:
408 423 params_no_dict['voc_file_type'] = type_files
409 424
410 425 for key1, value1 in kwargs.items():
411 426 if not key1 in params_dict and not key1 in params_no_dict and key1 != 'others':
412 427 if type(value1) is list:
413 428 params_dict[key1] = value1
414 429 else:
415 430 params_no_dict[key1] = value1
416 431 #------------------------------------------#
417 432 if not 'others' in kwargs:
418 433 params_no_dict['others'] = ''
419 434 else:
420 435 if isinstance(kwargs['others'], tuple):
421 436 params_dict['others'] = [json.dumps(w) for w in kwargs['others']]
422 437 elif isinstance(kwargs['others'], list):
423 438 params_no_dict['others'] = json.dumps(kwargs['others'])
424 439 elif isinstance(kwargs['others'], str):
425 440 params_no_dict['others'] = kwargs['others']
426 441 else:
427 442 return 'ERROR:: "others" must be a tuple, list or str'
428 443 #------------------------------------------#
429 444 len_params_dict = []
430 445 for value2 in params_dict.values():
431 446 len_params_dict.append(len(value2))
432 447
433 448 if len(list(set(len_params_dict))) > 1:
434 449 return 'ERROR:: All lists must be the same length: %s' % (len(params_dict['name']))
435 450 #------------------------------------------------------------#
436 451 print('"{}" file(s) found >> uploading'.format(len(params_dict['name'])))
437 452 for v in range(len(params_dict['name'])):
438 453 try:
439 454 send = {}
440 455 for key_dict, value_dict in params_dict.items():
441 456 send[key_dict] = value_dict[v]
442 457 for key_no_dict, value_no_dict in params_no_dict.items():
443 458 send[key_no_dict] = value_no_dict
444 459
445 460 self.list.append(getattr(self.ckan.action, 'resource_create')(**send))
446 461 print('File #{} :: "{}" was uploaded successfully'.format(v+1, params_dict['name'][v]))
447 462 except:
448 463 _, exc_value, _ = sys.exc_info()
449 464 self.list.append(exc_value)
450 465 print('File #{} :: Error uploading "{}" file'.format(v+1, params_dict['name'][v]))
451 466 return self.list
452 467 #------------------------------------------------------------#
453 468
454 469 def show(self, type_option, id, **kwargs):
455 470 '''
456 471 FINALIDAD:
457 472 Funcion personalizada para una busqueda en especifico.
458 473
459 474 PARAMETROS DISPONIBLES:
460 475 CONSULTAR: "GUIA DE SCRIPT.pdf"
461 476
462 477 ESTRUCTURA:
463 478 <access_name>.show(type_option = <class 'str'>, id = <class 'str'>, param_1 = <class 'param_1'>, ...)
464 479 '''
465 480 if type(type_option) is str:
466 481 try:
467 482 if type_option == 'dataset':
468 483 return getattr(self.ckan.action, 'package_show')(id=id, **kwargs)
469 484 elif type_option == 'resource':
470 485 return getattr(self.ckan.action, 'resource_show')(id=id, **kwargs)
471 486 elif type_option == 'project':
472 487 return getattr(self.ckan.action, 'organization_show')(id=id, **kwargs)
473 488 elif type_option == 'collaborator':
474 489 return getattr(self.ckan.action, 'package_collaborator_list_for_user')(id=id, **kwargs)
475 490 elif type_option == 'member':
476 491 return getattr(self.ckan.action, 'organization_list_for_user')(id=id, **kwargs)
477 492 elif type_option == 'vocabulary':
478 493 return getattr(self.ckan.action, 'vocabulary_show')(id=id, **kwargs)
479 494 elif type_option == 'tag':
480 495 if not 'vocabulary_id' in kwargs:
481 496 print('Missing "vocabulary_id" value: assume it is a free tag')
482 497 return getattr(self.ckan.action, 'tag_show')(id=id, **kwargs)
483 498 elif type_option == 'user':
484 499 return getattr(self.ckan.action, 'user_show')(id=id, **kwargs)
485 500 elif type_option == 'job':
486 501 return getattr(self.ckan.action, 'job_show')(id=id, **kwargs)
487 502 else:
488 503 return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
489 504 except:
490 505 _, exc_value, _ = sys.exc_info()
491 506 return exc_value
492 507 else:
493 508 return 'ERROR:: "type_option" must be a str'
494 509
495 510 def search(self, type_option, query=None, **kwargs):
496 511 '''
497 512 FINALIDAD:
498 513 Funcion personalizada para busquedas que satisfagan algun criterio.
499 514
500 515 PARAMETROS DISPONIBLES:
501 516 CONSULTAR: "GUIA DE SCRIPT.pdf"
502 517
503 518 ESTRUCTURA:
504 519 <access_name>.search(type_option = <class 'str'>, query = <class 'dict'>, param_1 = <class 'param_1'>, ...)
505 520 '''
506 521 if type(type_option) is str:
507 522 try:
508 523 if type_option == 'dataset':
509 524 key_replace = ['fq', 'fq_list', 'include_private']
510 525 key_point = ['facet_mincount', 'facet_limit', 'facet_field']
511 526 for key1, value1 in kwargs.items():
512 527 if not key1 in key_replace:
513 528 if key1 in key_point:
514 529 self.dict[key1.replace('_', '.')] = value1
515 530 else:
516 531 self.dict[key1] = value1
517 532
518 533 if query is not None:
519 534 if type(query) is dict:
520 535 self.dict['fq_list'] = []
521 536 #NUM_RESOURCES_MIN / NUM_RESOURCES_MAX
522 537 #----------------------------------------------------#
523 538 if 'dataset_start_date' in query:
524 539 if type(query['dataset_start_date']) is str:
525 540 try:
526 541 datetime.strptime(query['dataset_start_date'], '%Y-%m-%d')
527 542 if len(query['dataset_start_date']) != 10:
528 543 return '"dataset_start_date", must be: <YYYY-MM-DD>'
529 544 self.dict['fq_list'].append('dataset_start_date:"'+query['dataset_start_date']+'"')
530 545 self.list.append('dataset_start_date')
531 546 except:
532 547 return '"dataset_start_date" incorrect: "%s"' % (query['dataset_start_date'])
533 548 else:
534 549 return '"dataset_start_date" must be <str>'
535 550 #----------------------------------------------------#
536 551 if 'dataset_end_date' in query:
537 552 if type(query['dataset_end_date']) is str:
538 553 try:
539 554 datetime.strptime(query['dataset_end_date'], '%Y-%m-%d')
540 555 if len(query['dataset_end_date']) != 10:
541 556 return '"dataset_end_date", must be: <YYYY-MM-DD>'
542 557
543 558 if 'dataset_start_date' in query:
544 559 if query['dataset_start_date'] > query['dataset_end_date']:
545 560 return '"dataset_end_date" must be greater than "dataset_start_date"'
546 561
547 562 self.dict['fq_list'].append('dataset_end_date:"'+query['dataset_end_date']+'"')
548 563 self.list.append('dataset_end_date')
549 564 except:
550 565 return '"dataset_end_date" incorrect: "%s"' % (query['dataset_end_date'])
551 566 else:
552 567 return '"dataset_end_date" must be <str>'
553 568 #----------------------------------------------------#
554 569 for key, value in query.items():
555 570 if value is not None and not key in self.list:
556 571 self.dict['fq_list'].append(str(key)+':"'+str(value)+'"')
557 572 else:
558 573 return '"query" must be <dict>'
559 574
560 575 return getattr(self.ckan.action, 'package_search')(include_private=True, **self.dict)
561 576
562 577 elif type_option == 'resource':
563 578 for key1, value1 in kwargs.items():
564 579 if key1 != 'fields':
565 580 self.dict[key1] = value1
566 581
567 582 if query is not None:
568 583 if type(query) is dict:
569 584 #----------------------------------------------------#
570 585 if 'file_date_min' in query:
571 586 if type(query['file_date_min']) is str:
572 587 try:
573 588 datetime.strptime(query['file_date_min'], '%Y-%m-%d')
574 589 if len(query['file_date_min']) != 10:
575 590 return '"file_date_min", must be: <YYYY-MM-DD>'
576 591 except:
577 592 return '"file_date_min" incorrect: "%s"' % (query['file_date_min'])
578 593 else:
579 594 return '"file_date_min" must be <str>'
580 595 #----------------------------------------------------#
581 596 if 'file_date_max' in query:
582 597 if type(query['file_date_max']) is str:
583 598 try:
584 599 datetime.strptime(query['file_date_max'], '%Y-%m-%d')
585 600 if len(query['file_date_max']) != 10:
586 601 return '"file_date_max", must be: <YYYY-MM-DD>'
587 602
588 603 if 'file_date_min' in query:
589 604 if query['file_date_min'] > query['file_date_max']:
590 605 return '"file_date_max" must be greater than "file_date_min"'
591 606 except:
592 607 return '"file_date_max" incorrect: "%s"' % (query['file_date_max'])
593 608 else:
594 609 return '"file_date_max" must be <str>'
595 610 #----------------------------------------------------#
596 611 self.dict['query'] = query
597 612 else:
598 613 return '"query" must be <dict>'
599 614 return getattr(self.ckan.action, 'resources_search')(**self.dict)
600 615
601 616 elif type_option == 'tag':
602 617 for key1, value1 in kwargs.items():
603 618 if key1 != 'fields':
604 619 self.dict[key1] = value1
605 620
606 621 if not 'vocabulary_id' in kwargs:
607 622 print('Missing "vocabulary_id" value: tags that don’t belong to any vocabulary')
608 623 else:
609 624 print('Only tags that belong to "{}" vocabulary'.format(kwargs['vocabulary_id']))
610 625
611 626 if query is not None:
612 627 if type(query) is dict:
613 628 if 'search' in query:
614 629 if type(query['search']) is list or type(query['search']) is str:
615 630 self.dict['query'] = query['search']
616 631 else:
617 632 return '"search" must be <list> or <str>'
618 633 else:
619 634 return '"query" must be <dict>'
620 635 return getattr(self.ckan.action, 'tag_search')(**self.dict)
621 636
622 637 else:
623 638 return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
624 639
625 640 except:
626 641 _, exc_value, _ = sys.exc_info()
627 642 return exc_value
628 643 else:
629 644 return 'ERROR:: "type_option" must be <str>'
630 645
631 646 def create(self, type_option, select=None, **kwargs):
632 647 '''
633 648 FINALIDAD:
634 649 Funcion personalizada para crear.
635 650
636 651 PARAMETROS DISPONIBLES:
637 652 CONSULTAR: "GUIA DE SCRIPT.pdf"
638 653
639 654 ESTRUCTURA:
640 655 <access_name>.create(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
641 656 '''
642 657 if type(type_option) is str:
643 658 try:
644 659 if type_option == 'dataset':
645 660 return getattr(self.ckan.action, 'package_create')(**kwargs)
646 661 elif type_option == 'project':
647 662 return getattr(self.ckan.action, 'organization_create')(**kwargs)
648 663 elif type_option == 'member':
649 664 return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
650 665 elif type_option == 'collaborator':
651 666 return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
652 667 elif type_option == 'vocabulary':
653 668 return getattr(self.ckan.action, 'vocabulary_create')(**kwargs)
654 669 elif type_option == 'tag':
655 670 return getattr(self.ckan.action, 'tag_create')(**kwargs)
656 671 elif type_option == 'user':
657 672 return getattr(self.ckan.action, 'user_create')(**kwargs)
658 673 elif type_option == 'views':
659 674 if 'resource' == select:
660 675 self.list = ['package']
661 676 for key1, value1 in kwargs.items():
662 677 if not key1 in self.list:
663 678 self.dict[key1] = value1
664 679 return getattr(self.ckan.action, 'resource_create_default_resource_views')(**self.dict)
665 680 elif 'dataset' == select:
666 681 return getattr(self.ckan.action, 'package_create_default_resource_views')(**kwargs)
667 682 else:
668 683 return 'ERROR:: "select = %s" is not accepted' % (select)
669 684 else:
670 685 return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
671 686 except:
672 687 _, exc_value, _ = sys.exc_info()
673 688 return exc_value
674 689 else:
675 690 return 'ERROR:: "type_option" must be <str>'
676 691
677 692 def patch(self, type_option, **kwargs):
678 693 '''
679 694 FINALIDAD:
680 695 Funciones personalizadas para actualizar
681 696
682 697 PARAMETROS DISPONIBLES:
683 698 CONSULTAR: "GUIA DE SCRIPT.pdf"
684 699
685 700 ESTRUCTURA:
686 701 <access_name>.patch(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
687 702 '''
688 703 if type(type_option) is str:
689 704 try:
690 705 if type_option == 'dataset':
691 706 return getattr(self.ckan.action, 'package_patch')(**kwargs)
692 707 elif type_option == 'project':
693 708 return getattr(self.ckan.action, 'organization_patch')(**kwargs)
694 709 elif type_option == 'resource':
695 710 return getattr(self.ckan.action, 'resource_patch')(**kwargs)
696 711 elif type_option == 'member':
697 712 return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
698 713 elif type_option == 'collaborator':
699 714 return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
700 715 else:
701 716 return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
702 717 except:
703 718 _, exc_value, _ = sys.exc_info()
704 719 return exc_value
705 720 else:
706 721 return 'ERROR:: "type_option" must be <str>'
707 722
708 723 def delete(self, type_option, select=None, **kwargs):
709 724 '''
710 725 FINALIDAD:
711 726 Función personalizada para eliminar y/o purgar.
712 727
713 728 PARAMETROS DISPONIBLES:
714 729 CONSULTAR: "GUIA DE SCRIPT.pdf"
715 730
716 731 ESTRUCTURA:
717 732 <access_name>.delete(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
718 733 '''
719 734 if type(type_option) is str:
720 735 try:
721 736 if type_option == 'dataset':
722 737 if select is None:
723 738 return 'ERROR:: "select" must not be "None"'
724 739 else:
725 740 if 'delete' == select:
726 741 return getattr(self.ckan.action, 'package_delete')(**kwargs)
727 742 elif 'purge' == select:
728 743 return getattr(self.ckan.action, 'dataset_purge')(**kwargs)
729 744 else:
730 745 return 'ERROR:: "select = %s" is not accepted' % (select)
731 746 elif type_option == 'project':
732 747 if select is None:
733 748 return 'ERROR:: "select" must not be "None"'
734 749 else:
735 750 if 'delete' == select:
736 751 return getattr(self.ckan.action, 'organization_delete')(**kwargs)
737 752 elif 'purge' == select:
738 753 return getattr(self.ckan.action, 'organization_purge')(**kwargs)
739 754 else:
740 755 return 'ERROR:: "select = %s" is not accepted' % (select)
741 756 elif type_option == 'resource':
742 757 return getattr(self.ckan.action, 'resource_delete')(**kwargs)
743 758 elif type_option == 'vocabulary':
744 759 return getattr(self.ckan.action, 'vocabulary_delete')(**kwargs)
745 760 elif type_option == 'tag':
746 761 return getattr(self.ckan.action, 'tag_delete')(**kwargs)
747 762 elif type_option == 'user':
748 763 return getattr(self.ckan.action, 'user_delete')(**kwargs)
749 764 else:
750 765 return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
751 766 except:
752 767 _, exc_value, _ = sys.exc_info()
753 768 return exc_value
754 769 else:
755 770 return 'ERROR:: "type_option" must be <str>'
756 771
757 772 def f_status_note(self, total, result, path):
758 773 file_txt = open(path+'status_note.txt', 'w')
759 774 file_txt = open(path+'status_note.txt', 'a')
760 775
761 776 file_txt.write('DOWNLOADED FILE(S): "%s"' % (len(result['name'])))
762 777 file_txt.write(''+ os.linesep)
763 778 for u in result['name']:
764 779 file_txt.write(' - '+ u + os.linesep)
765 780 file_txt.write(''+ os.linesep)
766 781
767 782 file_txt.write('FAILED FILE(S): "%s"' % (len(total['name'])-len(result['name'])))
768 783 file_txt.write(''+ os.linesep)
769 784 if len(total['name'])-len(result['name']) != 0:
770 785 for u in total['name']:
771 786 if not u in result['name']:
772 787 file_txt.write(' - '+ u + os.linesep)
773 788 else:
774 789 file_txt.write(' "None"'+ os.linesep)
775 790
776 791 def f_name(self, name_dataset, ext, tempdir):
777 792 while self.check:
778 793 self.str = ''
779 794 if self.cont == 0:
780 795 if os.path.exists(tempdir + name_dataset + ext):
781 796 self.str = name_dataset+'('+str(self.cont+1)+')'+ext
782 797 else:
783 798 self.check = self.check * 0
784 799 self.str = name_dataset + ext
785 800 else:
786 801 if not os.path.exists(tempdir + name_dataset+'('+str(self.cont)+')'+ext):
787 802 self.check = self.check * 0
788 803 self.str = name_dataset+'('+str(self.cont)+')'+ ext
789 804 self.cont = self.cont+1
790 805 return self.str
791 806
792 807 def f_zipdir(self, path, ziph, zip_name):
793 808 for root, _, files in os.walk(path):
794 809 print('.....')
795 810 print('Creating: "{}" >>'.format(zip_name))
796 811 for __file in tqdm(iterable=files, total=len(files)):
797 812 new_dir = os.path.relpath(os.path.join(root, __file), os.path.join(path, '..'))
798 813 ziph.write(os.path.join(root, __file), new_dir)
799 814 print('Created >>')
800 815
801 816 def download_by_step(self, response, tempdir_name):
802 817 try:
803 818 # ---------- REPLACE URL --------- #
804 819 if urlparse(self.url).netloc != 'www.igp.gob.pe' and urlparse(response['url']).netloc == 'www.igp.gob.pe':
805 820 response['url'] = response['url'].replace(urlparse(response['url']).scheme + '://' + urlparse(response['url']).netloc,
806 821 urlparse(self.url).scheme + '://' + urlparse(self.url).netloc)
807 822 #----------------------------------#
808 823 with requests.get(response['url'], stream=True, headers={'Authorization': self.Authorization}, verify=self.verify) as resp:
809 824 if resp.status_code == 200:
810 825 with open(tempdir_name+response['name'], 'wb') as file:
811 826 for chunk in resp.iter_content(chunk_size = self.chunk_size):
812 827 if chunk:
813 828 file.write(chunk)
814 829 except requests.exceptions.RequestException:
815 830 pass
816 831
817 832 def download_files(self, **kwargs):
818 833 '''
819 834 FINALIDAD:
820 835 Funcion personalizada para la descarga de archivos existentes de un dataset.
821 836
822 837 PARAMETROS DISPONIBLES:
823 838 CONSULTAR: "GUIA DE SCRIPT.pdf"
824 839
825 840 ESTRUCTURA:
826 841 <access_name>.download_files(id = <class 'str'>, param_1 = <class 'param_1'>, ...)
827 842 '''
828 843 dict_local = {}
829 844 #----------------------------------------------#
830 845 if 'zip' in kwargs:
831 846 if type(kwargs['zip']) is not bool:
832 847 return 'ERROR:: "zip" must be: <class "bool">'
833 848 else:
834 849 dict_local['zip'] = kwargs['zip']
835 850 else:
836 851 dict_local['zip'] = False
837 852 #----------------------------------------------#
838 853 if 'status_note' in kwargs:
839 854 if type(kwargs['status_note']) is not bool:
840 855 return 'ERROR:: "status_note" must be: <class "bool">'
841 856 else:
842 857 dict_local['status_note'] = kwargs['status_note']
843 858 else:
844 859 dict_local['status_note'] = False
845 860 #----------------------------------------------#
846 861 if 'path' in kwargs:
847 862 if type(kwargs['path']) is str:
848 863 if os.path.isdir(kwargs['path']) == False:
849 864 return 'ERROR:: "path" does not exist'
850 865 else:
851 866 if kwargs['path'][-1:] != self.separator:
852 867 dict_local['path'] = kwargs['path']+self.separator
853 868 else:
854 869 dict_local['path'] = kwargs['path']
855 870
856 871 txt = dict_local['path']+datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
857 872 if int(platform.python_version()[0]) == 3:
858 873 try:
859 874 file_txt = open(txt, 'w')
860 875 file_txt.close()
861 876 os.remove(txt)
862 877 except PermissionError:
863 878 return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path'])
864 879 else:
865 880 try:
866 881 file_txt = open(txt, 'w')
867 882 file_txt.close()
868 883 os.remove(txt)
869 884 except:
870 885 return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path'])
871 886 else:
872 887 return 'ERROR:: "path" must be: <class "str">'
873 888 else:
874 889 dict_local['path'] = ''
875 890 #----------------------------------------------#
876 891 for key, value in kwargs.items():
877 892 if not key in dict_local:
878 893 self.dict[key] = value
879 894 try:
880 895 response = getattr(self.ckan.action, 'url_resources')(**self.dict)
881 896 except:
882 897 _, exc_value, _ = sys.exc_info()
883 898 return exc_value
884 899
885 900 if len(response) != 0:
886 901 #--------------TEMP PATH---------------#
887 902 if dict_local['zip']:
888 903 tempdir = tempfile.mkdtemp(prefix=kwargs['id']+'-')+self.separator
889 904 os.mkdir(tempdir+kwargs['id'])
890 905 dir_name = tempdir + kwargs['id'] + self.separator
891 906 else:
892 907 dir = self.f_name(kwargs['id'], '', dict_local['path'])
893 908 os.mkdir(dict_local['path'] + dir)
894 909 dir_name = dict_local['path'] + dir + self.separator
895 910 #-----------DOWNLOAD FILES-------------#
896 911 print('.....')
897 912 print('Downloading "{}" file(s) >>'.format(len(response)))
898 913 name_total = {'name': []}
899 914 with concurrent.futures.ThreadPoolExecutor() as executor:
900 915 for u in tqdm(iterable=response, total=len(response)):
901 916 name_total['name'].append(u['name'])
902 917 executor.submit(self.download_by_step, u, dir_name)
903 918 name_check = {}
904 919 name_check['name'] = [f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))]
905 920 print('"{}" downloaded file(s) successfully >>'.format(len(name_check['name'])))
906 921 #--------------------------------------#
907 922 if len(name_check['name']) != 0:
908 923 #----------Status Note---------#
909 924 if dict_local['status_note']:
910 925 print('.....')
911 926 print('Creating: "status_note.txt" >>')
912 927 self.f_status_note(name_total, name_check, dir_name)
913 928 print('Created>>')
914 929 #----------ZIP CREATE----------#
915 930 if dict_local['zip']:
916 931 zip_name = self.f_name(kwargs['id'], '.zip', dict_local['path'])
917 932 ziph = zipfile.ZipFile(dict_local['path'] + zip_name, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
918 933 self.f_zipdir(dir_name, ziph, zip_name)
919 934 ziph.close()
920 935 #Delete Temporal Path
921 936 if os.path.exists(tempdir[:-1]):
922 937 shutil.rmtree(tempdir[:-1])
923 938 #------------------------------#
924 939 print('.....')
925 940 return 'DOWNLOAD FINISHED'
926 941 else:
927 942 #Delete Temporal Path
928 943 if dict_local['zip']:
929 944 if os.path.exists(tempdir[:-1]):
930 945 shutil.rmtree(tempdir[:-1])
931 946 else:
932 947 if os.path.exists(dir_name[:-1]):
933 948 shutil.rmtree(dir_name[:-1])
934 949 return 'NO FILES WERE DOWNLOADED'
935 950 else:
936 951 return 'FILES NOT FOUND'
937 952
938 953 def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs):
939 954 '''
940 955 FINALIDAD:
941 956 Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s).
942 957
943 958 PARAMETROS DISPONIBLES:
944 959 CONSULTAR: "GUIA DE SCRIPT.pdf"
945 960
946 961 ESTRUCTURA:
947 962 <access_name>.download_files_advance(id_or_name= <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...)
948 963 '''
949 964 #------------------ PATH ----------------------#
950 965 if isinstance(path, str):
951 966 if os.path.isdir(path):
952 967 if not path.endswith(os.sep):
953 968 path = path + os.sep
954 969 test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
955 970 try:
956 971 file_txt = open(test_txt, 'w')
957 972 file_txt.close()
958 973 os.remove(test_txt)
959 974 except:
960 975 return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path)
961 976 else:
962 977 return 'ERROR:: "path" does not exist'
963 978 else:
964 979 return 'ERROR:: "path" must be: <class "str">'
965 980
966 981 #------------------ PROCESSES -----------------#
967 982 if not isinstance(processes, int):
968 983 return 'ERROR:: "processes" must be: <class "int">'
969 984
970 985 #------------------ ID OR NAME ----------------#
971 986 if isinstance(id_or_name, str):
972 987 id_or_name = [id_or_name]
973 988 elif isinstance(id_or_name, list):
974 989 id_or_name = list(map(str, id_or_name))
975 990 else:
976 991 return 'ERROR:: dataset "id_or_name" must be: <class "str" or "list">'
977 992 #----------------------------------------------#
978 993 arguments = {
979 994 '--apikey': self.Authorization,
980 995 '--ckan-user': None,
981 996 '--config': None,
982 997 '--datapackages': path,
983 998 '--datastore-fields': False,
984 999 '--get-request': False,
985 1000 '--insecure': not self.verify,
986 1001 '--log': '/home/soporte/DUMP/download.txt',
987 1002 '--processes': str(processes),
988 1003 '--quiet': False,
989 1004 '--remote': self.url,
990 1005 '--worker': False,
991 1006 #'--all': False,
992 1007 #'--gzip': False,
993 1008 #'--output': None,
994 1009 #'--max-records': None,
995 1010 #'--output-json': False,
996 1011 #'--output-jsonl': False,
997 1012 #'--create-only': False,
998 1013 #'--help': False,
999 1014 #'--input': None,
1000 1015 #'--input-json': False,
1001 1016 #'--start-record': '1',
1002 1017 #'--update-only': False,
1003 1018 #'--upload-logo': False,
1004 1019 #'--upload-resources': False,
1005 1020 #'--version': False,
1006 1021 'ID_OR_NAME': id_or_name,
1007 1022 'datasets': True,
1008 1023 'dump': True,
1009 1024 #'ACTION_NAME': None,
1010 1025 #'KEY:JSON': [],
1011 1026 #'KEY=STRING': [],
1012 1027 #'KEY@FILE': [],
1013 1028 #'action': False,
1014 1029 #'delete': False,
1015 1030 #'groups': False,
1016 1031 #'load': False,
1017 1032 #'organizations': False,
1018 1033 #'related': False,
1019 1034 #'search': False,
1020 1035 #'users': False
1021 1036 }
1022 1037 return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs) No newline at end of file
@@ -1,228 +1,226
1 1 #from ckanapi.datapackage import populate_schema_from_datastore
2 2 from ckanapi.cli import workers, dump
3 3 from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
4 4 from datetime import datetime
5 5 import sys
6 6 import json
7 7 import os
8 8 import requests
9 9 import six
10 10
11 11 if sys.version_info.major == 3:
12 12 from urllib.parse import urlparse
13 13 else:
14 14 import urlparse
15 15
16 16 DL_CHUNK_SIZE = 100 * 1024
17 17
18 print()
19
20 18 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
21 19 if worker_pool is None:
22 20 worker_pool = workers.worker_pool
23 21 if stdout is None:
24 22 stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
25 23 if stderr is None:
26 24 stderr = getattr(sys.stderr, 'buffer', sys.stderr)
27 25
28 26 if arguments['--worker']:
29 27 return dump.dump_things_worker(ckan, thing, arguments)
30 28 '''
31 29 log = None
32 30 if arguments['--log']:
33 31 log = open(arguments['--log'], 'a')
34 32 '''
35 33 jsonl_output = stdout
36 34 if arguments['--datapackages']:
37 35 jsonl_output = open(os.devnull, 'wb')
38 36
39 37 names = arguments['ID_OR_NAME']
40 38
41 39 if names and isinstance(names[0], dict):
42 40 names = [rec.get('name',rec.get('id')) for rec in names]
43 41 '''
44 42 if arguments['--datapackages']:
45 43 arguments['--datastore-fields'] = True
46 44 '''
47 45 #----------------------------#
48 46 filtered_urls = {}
49 47 for name in names:
50 48 try:
51 49 response = getattr(ckan.action, 'url_resources')(id=name, **kwargs)
52 50 except:
53 51 _, exc_value, _ = sys.exc_info()
54 52 return exc_value
55 53 filtered_urls[name] = response
56 54 #----------------------------#
57 55
58 56 cmd = dump._worker_command_line(thing, arguments)
59 57 processes = int(arguments['--processes'])
60 58 if hasattr(ckan, 'parallel_limit'):
61 59 processes = min(processes, ckan.parallel_limit)
62 60 stats = completion_stats(processes)
63 61 pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
64 62
65 63 results = {}
66 64 expecting_number = 0
67 65 with quiet_int_pipe() as errors:
68 66 for job_ids, finished, result in pool:
69 67 if not result:
70 68 return 1
71 69 timestamp, error, record = json.loads(result.decode('utf-8'))
72 70 results[finished] = record
73 71
74 72 if not arguments['--quiet']:
75 73 stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Dataset Name: {4}\n'.format(
76 74 finished,
77 75 job_ids,
78 76 next(stats),
79 77 error,
80 78 record.get('name', '') if record else '',
81 79 ).encode('utf-8'))
82 80 '''
83 81 if log:
84 82 log.write(compact_json([
85 83 timestamp,
86 84 finished,
87 85 error,
88 86 record.get('name', '') if record else None,
89 87 ]) + b'\n')
90 88 '''
91 89 datapackages_path = arguments['--datapackages']
92 90 if datapackages_path:
93 91 create_datapackage_change(record, filtered_urls[record.get('name', '')], datapackages_path, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
94 92 while expecting_number in results:
95 93 record = results.pop(expecting_number)
96 94 if record:
97 95 jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
98 96 expecting_number += 1
99 97 if 'pipe' in errors:
100 98 return 1
101 99 if 'interrupt' in errors:
102 100 return 2
103 101
104 102 def create_datapackage_change(record, filtered_url, base_path, stderr, apikey, host_url, insecure):
105 103 resource_formats_to_ignore = ['API', 'api']
106 104 #----------------------------------------#
107 105 datapackage_dir = name_no_repetition(record.get('name', ''), base_path)
108 106 #----------------------------------------#
109 107 os.makedirs(os.path.join(datapackage_dir, 'data'))
110 108 record['path'] = datapackage_dir
111 109
112 110 ckan_resources = []
113 111 for resource in record.get('resources', []):
114 112 if resource['format'] in resource_formats_to_ignore:
115 113 continue
116 114
117 115 if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
118 116 continue
119 117
120 118 if len(resource['url']) == 0:
121 119 continue
122 120
123 121 filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
124 122 resource['path'] = os.path.join(datapackage_dir, 'data', filename)
125 123
126 124 cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
127 125 if not cres:
128 126 continue
129 127 '''
130 128 #----------------------------------------#
131 129 dres = {'path': os.path.join('data', filename),
132 130 'description': cres.get('description', ''),
133 131 'format': cres.get('format', ''),
134 132 'name': cres.get('name', ''),
135 133 'title': cres.get('name', '').title()}
136 134 #----------------------------------------#
137 135 populate_schema_from_datastore(cres, dres)
138 136 '''
139 137 ckan_resources.append(resource)
140 138
141 139 dataset = dict(record, resources=ckan_resources)
142 140 datapackage = dataset_to_datapackage_change(dataset)
143 141
144 142 json_path = os.path.join(datapackage_dir, 'datapackage.json')
145 143 with open(json_path, 'wb') as out:
146 144 out.write(pretty_json(datapackage))
147 145
148 146 return datapackage_dir, datapackage, json_path
149 147
150 148 def create_resource_change(resource, stderr, apikey, host_url, insecure):
151 149 # ---------- REPLACE URL --------- #
152 150 if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
153 151 resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
154 152 urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
155 153 #----------------------------------#
156 154 try:
157 155 r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
158 156 #---------------------------------------#
159 157 try:
160 158 r.raise_for_status()
161 159 except requests.exceptions.HTTPError as e:
162 160 return False
163 161 #---------------------------------------#
164 162 with open(resource['path'], 'wb') as f:
165 163 for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
166 164 if chunk:
167 165 f.write(chunk)
168 166
169 167 except requests.ConnectionError:
170 168 stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
171 169 except requests.exceptions.RequestException as e:
172 170 stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
173 171 except Exception as e:
174 172 stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
175 173 return resource
176 174
177 175 def dataset_to_datapackage_change(dataset_dict):
178 176 dp = {'name': dataset_dict['name'],
179 177 'id': dataset_dict['id'],
180 178 'path': dataset_dict['path'],
181 179 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
182 180
183 181 resources = dataset_dict.get('resources')
184 182 if resources:
185 183 dp['resources'] = [convert_to_datapackage_resource_change(r)
186 184 for r in resources]
187 185 return dp
188 186
189 187 def convert_to_datapackage_resource_change(resource_dict):
190 188 resource = {}
191 189
192 190 if resource_dict.get('id'):
193 191 resource['id'] = resource_dict['id']
194 192
195 193 if resource_dict.get('name'):
196 194 resource['name'] = resource_dict['name']
197 195
198 196 if resource_dict.get('path'):
199 197 resource['path'] = resource_dict['path']
200 198
201 199 schema = resource_dict.get('schema')
202 200 if isinstance(schema, six.string_types):
203 201 try:
204 202 resource['schema'] = json.loads(schema)
205 203 except ValueError:
206 204 resource['schema'] = schema
207 205 elif isinstance(schema, dict):
208 206 resource['schema'] = schema
209 207
210 208 return resource
211 209
212 210 def name_no_repetition(name, dir, option=''):
213 211 count = 0
214 212 while True:
215 213 count = count + 1
216 214 if not os.path.exists(os.path.join(dir, name)):
217 215 if option == 'resource':
218 216 return name
219 217 else:
220 218 return os.path.join(dir, name)
221 219
222 220 elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
223 221 if option == 'resource':
224 222 return '('+str(count)+')'+name
225 223 else:
226 224 return os.path.join(dir, '('+str(count)+')'+name)
227 225 else:
228 226 pass No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now