from ckanapi import RemoteCKAN from datetime import datetime from tqdm import tqdm from CKAN_JRO import logic_download #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError import sys import platform import os import tempfile import shutil import zipfile import concurrent.futures import requests import json #import pathlib import uuid if sys.version_info.major == 3: from urllib.parse import urlparse else: import urlparse class JROAPI(): """ FINALIDAD: Script para administrar y obtener la data del repositorio por medio de APIs. REQUISITIOS PREVIOS: - Paso 1: Tener "pip [Python 2]" o "pip3 [Python 3]" instalado: - Paso 2: Instalar lo siguiente como admininstrador: En Python 2 - pip install ckanapi==4.5 - pip install requests - pip install futures - pip install tqdm En Python > 3 - pip3 install ckanapi==4.5 - pip3 install requests - pip3 install tqdm FUNCIONES DISPONIBLES: - action - upload_file - upload_multiple_files - upload_multiple_files_advance - show - search - create - patch - delete - download_files EJEMPLOS: #1: with JROAPI('http://demo.example.com', Authorization='#########') as : ... some operation(s) ... #2: = JROAPI('http://example.com', Authorization='#########') ... some operation(s) ... .ckan.close() REPORTAR ALGUN PROBLEMA: Debe enviar un correo a eynilupu@igp.gob.pe detallando los siguientes pasos: 1) Correo para contactarlo 2) Descripcion del problema 3) ¿En que paso o seccion encontro el problema? 4) ¿Cual era el resultado que usted esperaba? """ def __init__(self, url, Authorization=None, secure=True): #-------- Check Secure -------# self.verify = secure if not secure and isinstance(secure, bool): session = requests.Session() session.verify = False else: session = None #------------------------------# self.url = url ua = 'CKAN_JRO/2.9.2 (+'+str(self.url)+')' #ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36' self.ckan = RemoteCKAN(self.url, apikey=Authorization, user_agent=ua, session=session) #self.ckan = RemoteCKAN(self.url, apikey=Authorization) self.Authorization = Authorization # Change for --> self.separator = os.sep if platform.system() == 'Windows': self.separator = '\\' else: self.separator = '/' self.chunk_size = 1024 self.list = [] self.dict = {} self.str = '' self.check = 1 self.cont = 0 def __enter__(self): return self def __exit__(self, *args): self.ckan.close() def action(self, action, **kwargs): """ FINALIDAD: Funcion para llamar a las APIs disponibles APIs DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" EJEMPLO: .action(, param_1 = , ...) """ #--------------- CASE: PACKAGE SEARCH ---------------# if kwargs is not None: if action == 'package_search': self.list = ['facet_mincount', 'facet_limit', 'facet_field'] for facet in self.list: if facet in kwargs: kwargs[facet.replace('_', '.')] = kwargs[facet] kwargs.pop(facet) #----------------------------------------------------# try: return getattr(self.ckan.action, action)(**kwargs) except: _, exc_value, _ = sys.exc_info() return exc_value def upload_file(self, dataset_id, file_path, file_date, file_type, **kwargs): # Agregar si es interruptido por teclado ''' FINALIDAD: Funcion para subir un unico archivo al repositorio del ROJ. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .upload_file(dataset_id = , file_date = , file_path = , file_type = , param_1 = , ...) ''' self.list = ['package_id', 'upload', 'voc_file_type', 'name'] #file_date for key1, value1 in kwargs.items(): if not key1 in self.list: self.dict[key1] = value1 #---------------------------# if not 'others' in kwargs: self.dict['others'] = '' else: if isinstance(kwargs['others'], list): self.dict['others'] = json.dumps(kwargs['others']) #---------------------------# if not os.path.isfile(file_path): return 'File "%s" not exist' % (file_path) #if not 'format' in self.dict: # self.str = ''.join(pathlib.Path(file_path).suffixes) # if len(self.str) > 0: # self.dict['format'] = self.str.upper()[1:] #-------------------------PACKAGE SHOW-----------------------# try: dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources'] except: _, exc_value, _ = sys.exc_info() print('ERROR obtaining metadata dataset:: Use the "print" for more information') return exc_value resources_name = [] for u in dataset_show: resources_name.append(u['name'].lower()) if os.path.basename(file_path).lower() in resources_name: return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(file_path)) #------------------------------------------------------------# try: return getattr(self.ckan.action, 'resource_create')(package_id=dataset_id, file_date=file_date, upload=open(file_path, 'rb'), voc_file_type=file_type, name=os.path.basename(file_path), **self.dict) except: _, exc_value, _ = sys.exc_info() return exc_value def upload_multiple_files_advance(self, dataset_id, path_files, file_date, file_type, max_size=100, max_count=500, ignore_repetition=False, **kwargs): # Agregar si es interruptido por teclado ''' FINALIDAD: Funcion para subir multiples archivos al repositorio del ROJ. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .upload_multiple_files_advance(dataset_id = , path_files = , file_date = , file_type = , param_1 = , ...) ''' #-------------------------PACKAGE SHOW-----------------------# try: dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources'] except: _, exc_value, _ = sys.exc_info() print('ERROR obtaining metadata dataset:: Use the "print" for more information') return exc_value #------------------------------------------------------------# resources_name = [] for u in dataset_show: resources_name.append(u['name'].lower()) #------------------------------------------------------------# self.list = ['package_id', 'upload', 'voc_file_type', 'name'] for key1, value1 in kwargs.items(): if not key1 in self.list: self.dict[key1] = value1 #------------------------------------------------------------# if not 'others' in kwargs: self.dict['others'] = '' else: if isinstance(kwargs['others'], list): self.dict['others'] = json.dumps(kwargs['others']) #------------------------------------------------------------# total_list = [] #---------------CASO : "path" or "path_list"-----------------# if type(path_files) is list: if len(path_files) != 0: path_files.sort() for u in path_files: if os.path.isfile(u): if os.path.basename(u).lower() in resources_name: if not ignore_repetition: return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u)) print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset') else: total_list.append({'name':os.path.basename(u), 'size': os.stat(u).st_size, 'upload':open(u, 'rb')}) else: return 'File "%s" does not exist' % (u) else: return 'ERROR:: "path_list is empty"' elif type(path_files) is str: if os.path.isdir(path_files): path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))] path_order.sort() if path_order: for name in path_order: if name.lower() in resources_name: if not ignore_repetition: return 'ERROR:: "%s" file already exist in this dataset' % (name) print('WARRING:: "'+ name +'" file was ignored because already exist in this dataset') else: total_list.append({'name':name, 'size': os.stat(os.path.join(path_files, name)).st_size, 'upload':open(os.path.join(path_files, name), 'rb')}) else: return "ERROR:: There aren't files in this directory" else: return 'ERROR:: Directory "%s" does not exist' % (path_files) else: return 'ERROR:: "path_files" must be a str or list' #------------------------------------------------------------# try: uuid.UUID(str(dataset_id), version=4) package_id_or_name = '"id": "' + str(dataset_id) + '"' except ValueError: package_id_or_name = '"name": "' + str(dataset_id) + '"' #------------------------------------------------------------# blocks = [[]] size_file = 0 count_file = 0 inter_num = 0 for value in total_list: if value['size'] > 1024 * 1024 * float(max_size): return 'ERROR:: The size of the "%s" file is %sMB aprox, please change "max_size" value' % (value['name'], str(round(value['size']/(1024 * 1024), 2))) if not 1 <= int(max_count) <= 999: return 'ERROR:: The count of the number of files must be between 1 and 999, please change "max_count" value' size_file = size_file + value['size'] count_file = count_file + 1 if size_file <= 1024 * 1024 * float(max_size) and count_file <= int(max_count): del value['size'] blocks[inter_num].append(value) else: inter_num = inter_num + 1 size_file = value['size'] count_file = 1 blocks.append([]) del value['size'] blocks[inter_num].append(value) #------------------------------------------------------------# if len(blocks[0]) > 0: print('BLOCK(S) IN TOTAL:: {}'.format(len(blocks))) for count1, block in enumerate(blocks): print('---- BLOCK N°{} ----'.format(count1 + 1)) resource_extend = [] files_dict = {} for count2, value2 in enumerate(block): value2['file_date'] = file_date value2['voc_file_type'] = file_type value2.update(self.dict) #if not 'format' in value2: # format = ''.join(pathlib.Path(value2['name']).suffixes) # if len(format) > 0: # value2['format'] = format.upper()[1:] files_dict['update__resources__-'+ str(len(block)-count2) +'__upload'] = (value2['name'], value2['upload']) del value2['upload'] resource_extend.append(value2) print('BLOCK N°{} :: "{}" file(s) found >> uploading'.format(count1 + 1, len(block))) try: result = self.ckan.call_action( 'package_revise', {'match': '{'+ str(package_id_or_name) +'}', 'update__resources__extend': json.dumps(resource_extend)}, files=files_dict ) print('BLOCK N°{} :: Uploaded file(s) successfully'.format(count1 + 1)) if len(blocks) == count1 + 1: return result except: print('ERROR :: Use the "print" for more information') _, exc_value, _ = sys.exc_info() return exc_value else: return "ERROR:: No file(s) found to upload" def upload_multiple_files(self, dataset_id, path_files, date_files, type_files, ignore_repetition=False, **kwargs): # Agregar si es interruptido por teclado ''' FINALIDAD: Funcion para subir multiples archivos al repositorio del ROJ. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .upload_multiple_files(dataset_id = , path_files = or , date_files = or , type_files = or , param_1 = , ...) ''' #-------------------------PACKAGE SHOW-----------------------# try: dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources'] except: _, exc_value, _ = sys.exc_info() print('ERROR obtaining metadata dataset:: Use the "print" for more information') return exc_value #------------------------------------------------------------# resources_name = [] for u in dataset_show: resources_name.append(u['name'].lower()) #------------------------------------------------------------# params_dict = {'upload':[], 'name':[]} #if not 'format' in kwargs: # params_dict.update({'format':[]}) #---------------CASO : "path" or "path_list"-----------------# if type(path_files) is list: if len(path_files) != 0: path_files.sort() for u in path_files: if os.path.isfile(u): if os.path.basename(u).lower() in resources_name: if not ignore_repetition: return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u)) print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset') else: params_dict['upload'].append(open(u, 'rb')) params_dict['name'].append(os.path.basename(u)) #if not 'format' in kwargs: # format = ''.join(pathlib.Path(u).suffixes) # if len(format) > 0: # params_dict['format'].append(format.upper()[1:]) # else: # params_dict['format'].append('') else: return 'File "%s" does not exist' % (u) else: return 'ERROR:: "path_list is empty"' elif type(path_files) is str: if os.path.isdir(path_files): path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))] path_order.sort() if path_order: for name in path_order: if name.lower() in resources_name: if not ignore_repetition: return 'ERROR:: "%s" file already exist in this dataset' % (name) print('WARRING:: "'+ str(name) +'" file was ignored because already exist in this dataset') else: params_dict['upload'].append(open(os.path.join(path_files, name), 'rb')) params_dict['name'].append(name) #if not 'format' in kwargs: # format = ''.join(pathlib.Path(name).suffixes) # if len(format) > 0: # params_dict['format'].append(format.upper()[1:]) # else: # params_dict['format'].append('') else: return "ERROR:: There aren't files in this directory" else: return 'ERROR:: Directory "%s" does not exist' % (path_files) else: return 'ERROR:: "path_files" must be a str or list' #------------------------------------------------------------# params_no_dict = {'package_id': dataset_id} if type(date_files) is list: params_dict['file_date'] = date_files else: params_no_dict['file_date'] = date_files if type(type_files) is list: params_dict['voc_file_type'] = type_files else: params_no_dict['voc_file_type'] = type_files for key1, value1 in kwargs.items(): if not key1 in params_dict and not key1 in params_no_dict and key1 != 'others': if type(value1) is list: params_dict[key1] = value1 else: params_no_dict[key1] = value1 #------------------------------------------# if not 'others' in kwargs: params_no_dict['others'] = '' else: if isinstance(kwargs['others'], tuple): params_dict['others'] = [json.dumps(w) for w in kwargs['others']] elif isinstance(kwargs['others'], list): params_no_dict['others'] = json.dumps(kwargs['others']) elif isinstance(kwargs['others'], str): params_no_dict['others'] = kwargs['others'] else: return 'ERROR:: "others" must be a tuple, list or str' #------------------------------------------# len_params_dict = [] for value2 in params_dict.values(): len_params_dict.append(len(value2)) if len(list(set(len_params_dict))) > 1: return 'ERROR:: All lists must be the same length: %s' % (len(params_dict['name'])) #------------------------------------------------------------# print('"{}" file(s) found >> uploading'.format(len(params_dict['name']))) for v in range(len(params_dict['name'])): try: send = {} for key_dict, value_dict in params_dict.items(): send[key_dict] = value_dict[v] for key_no_dict, value_no_dict in params_no_dict.items(): send[key_no_dict] = value_no_dict self.list.append(getattr(self.ckan.action, 'resource_create')(**send)) print('File #{} :: "{}" was uploaded successfully'.format(v+1, params_dict['name'][v])) except: _, exc_value, _ = sys.exc_info() self.list.append(exc_value) print('File #{} :: Error uploading "{}" file'.format(v+1, params_dict['name'][v])) return self.list #------------------------------------------------------------# def show(self, type_option, id, **kwargs): ''' FINALIDAD: Funcion personalizada para una busqueda en especifico. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .show(type_option = , id = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_show')(id=id, **kwargs) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_show')(id=id, **kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_show')(id=id, **kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_list_for_user')(id=id, **kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_list_for_user')(id=id, **kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_show')(id=id, **kwargs) elif type_option == 'tag': if not 'vocabulary_id' in kwargs: print('Missing "vocabulary_id" value: assume it is a free tag') return getattr(self.ckan.action, 'tag_show')(id=id, **kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_show')(id=id, **kwargs) elif type_option == 'job': return getattr(self.ckan.action, 'job_show')(id=id, **kwargs) else: return 'ERROR:: "type_option = %s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be a str' def search(self, type_option, query=None, **kwargs): ''' FINALIDAD: Funcion personalizada para busquedas que satisfagan algun criterio. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .search(type_option = , query = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': key_replace = ['fq', 'fq_list', 'include_private'] key_point = ['facet_mincount', 'facet_limit', 'facet_field'] for key1, value1 in kwargs.items(): if not key1 in key_replace: if key1 in key_point: self.dict[key1.replace('_', '.')] = value1 else: self.dict[key1] = value1 if query is not None: if type(query) is dict: self.dict['fq_list'] = [] #NUM_RESOURCES_MIN / NUM_RESOURCES_MAX #----------------------------------------------------# if 'dataset_start_date' in query: if type(query['dataset_start_date']) is str: try: datetime.strptime(query['dataset_start_date'], '%Y-%m-%d') if len(query['dataset_start_date']) != 10: return '"dataset_start_date", must be: ' self.dict['fq_list'].append('dataset_start_date:"'+query['dataset_start_date']+'"') self.list.append('dataset_start_date') except: return '"dataset_start_date" incorrect: "%s"' % (query['dataset_start_date']) else: return '"dataset_start_date" must be ' #----------------------------------------------------# if 'dataset_end_date' in query: if type(query['dataset_end_date']) is str: try: datetime.strptime(query['dataset_end_date'], '%Y-%m-%d') if len(query['dataset_end_date']) != 10: return '"dataset_end_date", must be: ' if 'dataset_start_date' in query: if query['dataset_start_date'] > query['dataset_end_date']: return '"dataset_end_date" must be greater than "dataset_start_date"' self.dict['fq_list'].append('dataset_end_date:"'+query['dataset_end_date']+'"') self.list.append('dataset_end_date') except: return '"dataset_end_date" incorrect: "%s"' % (query['dataset_end_date']) else: return '"dataset_end_date" must be ' #----------------------------------------------------# for key, value in query.items(): if value is not None and not key in self.list: self.dict['fq_list'].append(str(key)+':"'+str(value)+'"') else: return '"query" must be ' return getattr(self.ckan.action, 'package_search')(include_private=True, **self.dict) elif type_option == 'resource': for key1, value1 in kwargs.items(): if key1 != 'fields': self.dict[key1] = value1 if query is not None: if type(query) is dict: #----------------------------------------------------# if 'file_date_min' in query: if type(query['file_date_min']) is str: try: datetime.strptime(query['file_date_min'], '%Y-%m-%d') if len(query['file_date_min']) != 10: return '"file_date_min", must be: ' except: return '"file_date_min" incorrect: "%s"' % (query['file_date_min']) else: return '"file_date_min" must be ' #----------------------------------------------------# if 'file_date_max' in query: if type(query['file_date_max']) is str: try: datetime.strptime(query['file_date_max'], '%Y-%m-%d') if len(query['file_date_max']) != 10: return '"file_date_max", must be: ' if 'file_date_min' in query: if query['file_date_min'] > query['file_date_max']: return '"file_date_max" must be greater than "file_date_min"' except: return '"file_date_max" incorrect: "%s"' % (query['file_date_max']) else: return '"file_date_max" must be ' #----------------------------------------------------# self.dict['query'] = query else: return '"query" must be ' return getattr(self.ckan.action, 'resources_search')(**self.dict) elif type_option == 'tag': for key1, value1 in kwargs.items(): if key1 != 'fields': self.dict[key1] = value1 if not 'vocabulary_id' in kwargs: print('Missing "vocabulary_id" value: tags that don’t belong to any vocabulary') else: print('Only tags that belong to "{}" vocabulary'.format(kwargs['vocabulary_id'])) if query is not None: if type(query) is dict: if 'search' in query: if type(query['search']) is list or type(query['search']) is str: self.dict['query'] = query['search'] else: return '"search" must be or ' else: return '"query" must be ' return getattr(self.ckan.action, 'tag_search')(**self.dict) else: return 'ERROR:: "type_option = %s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def create(self, type_option, select=None, **kwargs): ''' FINALIDAD: Funcion personalizada para crear. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .create(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_create')(**kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_create')(**kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_member_create')(**kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_create')(**kwargs) elif type_option == 'tag': return getattr(self.ckan.action, 'tag_create')(**kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_create')(**kwargs) elif type_option == 'views': if 'resource' == select: self.list = ['package'] for key1, value1 in kwargs.items(): if not key1 in self.list: self.dict[key1] = value1 return getattr(self.ckan.action, 'resource_create_default_resource_views')(**self.dict) elif 'dataset' == select: return getattr(self.ckan.action, 'package_create_default_resource_views')(**kwargs) else: return 'ERROR:: "select = %s" is not accepted' % (select) else: return 'ERROR:: "type_option = %s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def patch(self, type_option, **kwargs): ''' FINALIDAD: Funciones personalizadas para actualizar PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .patch(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_patch')(**kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_patch')(**kwargs) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_patch')(**kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_member_create')(**kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs) else: return 'ERROR:: "type_option = %s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def delete(self, type_option, select=None, **kwargs): ''' FINALIDAD: Función personalizada para eliminar y/o purgar. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .delete(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': if select is None: return 'ERROR:: "select" must not be "None"' else: if 'delete' == select: return getattr(self.ckan.action, 'package_delete')(**kwargs) elif 'purge' == select: return getattr(self.ckan.action, 'dataset_purge')(**kwargs) else: return 'ERROR:: "select = %s" is not accepted' % (select) elif type_option == 'project': if select is None: return 'ERROR:: "select" must not be "None"' else: if 'delete' == select: return getattr(self.ckan.action, 'organization_delete')(**kwargs) elif 'purge' == select: return getattr(self.ckan.action, 'organization_purge')(**kwargs) else: return 'ERROR:: "select = %s" is not accepted' % (select) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_delete')(**kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_delete')(**kwargs) elif type_option == 'tag': return getattr(self.ckan.action, 'tag_delete')(**kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_delete')(**kwargs) else: return 'ERROR:: "type_option = %s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def f_status_note(self, total, result, path): file_txt = open(path+'status_note.txt', 'w') file_txt = open(path+'status_note.txt', 'a') file_txt.write('DOWNLOADED FILE(S): "%s"' % (len(result['name']))) file_txt.write(''+ os.linesep) for u in result['name']: file_txt.write(' - '+ u + os.linesep) file_txt.write(''+ os.linesep) file_txt.write('FAILED FILE(S): "%s"' % (len(total['name'])-len(result['name']))) file_txt.write(''+ os.linesep) if len(total['name'])-len(result['name']) != 0: for u in total['name']: if not u in result['name']: file_txt.write(' - '+ u + os.linesep) else: file_txt.write(' "None"'+ os.linesep) def f_name(self, name_dataset, ext, tempdir): while self.check: self.str = '' if self.cont == 0: if os.path.exists(tempdir + name_dataset + ext): self.str = name_dataset+'('+str(self.cont+1)+')'+ext else: self.check = self.check * 0 self.str = name_dataset + ext else: if not os.path.exists(tempdir + name_dataset+'('+str(self.cont)+')'+ext): self.check = self.check * 0 self.str = name_dataset+'('+str(self.cont)+')'+ ext self.cont = self.cont+1 return self.str def f_zipdir(self, path, ziph, zip_name): for root, _, files in os.walk(path): print('.....') print('Creating: "{}" >>'.format(zip_name)) for __file in tqdm(iterable=files, total=len(files)): new_dir = os.path.relpath(os.path.join(root, __file), os.path.join(path, '..')) ziph.write(os.path.join(root, __file), new_dir) print('Created >>') def download_by_step(self, response, tempdir_name): try: # ---------- REPLACE URL --------- # if urlparse(self.url).netloc != 'www.igp.gob.pe' and urlparse(response['url']).netloc == 'www.igp.gob.pe': response['url'] = response['url'].replace(urlparse(response['url']).scheme + '://' + urlparse(response['url']).netloc, urlparse(self.url).scheme + '://' + urlparse(self.url).netloc) #----------------------------------# with requests.get(response['url'], stream=True, headers={'Authorization': self.Authorization}, verify=self.verify) as resp: if resp.status_code == 200: with open(tempdir_name+response['name'], 'wb') as file: for chunk in resp.iter_content(chunk_size = self.chunk_size): if chunk: file.write(chunk) except requests.exceptions.RequestException: pass def download_files(self, **kwargs): ''' FINALIDAD: Funcion personalizada para la descarga de archivos existentes de un dataset. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .download_files(id = , param_1 = , ...) ''' dict_local = {} #----------------------------------------------# if 'zip' in kwargs: if type(kwargs['zip']) is not bool: return 'ERROR:: "zip" must be: ' else: dict_local['zip'] = kwargs['zip'] else: dict_local['zip'] = False #----------------------------------------------# if 'status_note' in kwargs: if type(kwargs['status_note']) is not bool: return 'ERROR:: "status_note" must be: ' else: dict_local['status_note'] = kwargs['status_note'] else: dict_local['status_note'] = False #----------------------------------------------# if 'path' in kwargs: if type(kwargs['path']) is str: if os.path.isdir(kwargs['path']) == False: return 'ERROR:: "path" does not exist' else: if kwargs['path'][-1:] != self.separator: dict_local['path'] = kwargs['path']+self.separator else: dict_local['path'] = kwargs['path'] txt = dict_local['path']+datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt' if int(platform.python_version()[0]) == 3: try: file_txt = open(txt, 'w') file_txt.close() os.remove(txt) except PermissionError: return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path']) else: try: file_txt = open(txt, 'w') file_txt.close() os.remove(txt) except: return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path']) else: return 'ERROR:: "path" must be: ' else: dict_local['path'] = '' #----------------------------------------------# for key, value in kwargs.items(): if not key in dict_local: self.dict[key] = value try: response = getattr(self.ckan.action, 'url_resources')(**self.dict) except: _, exc_value, _ = sys.exc_info() return exc_value if len(response) != 0: #--------------TEMP PATH---------------# if dict_local['zip']: tempdir = tempfile.mkdtemp(prefix=kwargs['id']+'-')+self.separator os.mkdir(tempdir+kwargs['id']) dir_name = tempdir + kwargs['id'] + self.separator else: dir = self.f_name(kwargs['id'], '', dict_local['path']) os.mkdir(dict_local['path'] + dir) dir_name = dict_local['path'] + dir + self.separator #-----------DOWNLOAD FILES-------------# print('.....') print('Downloading "{}" file(s) >>'.format(len(response))) name_total = {'name': []} with concurrent.futures.ThreadPoolExecutor() as executor: for u in tqdm(iterable=response, total=len(response)): name_total['name'].append(u['name']) executor.submit(self.download_by_step, u, dir_name) name_check = {} name_check['name'] = [f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))] print('"{}" downloaded file(s) successfully >>'.format(len(name_check['name']))) #--------------------------------------# if len(name_check['name']) != 0: #----------Status Note---------# if dict_local['status_note']: print('.....') print('Creating: "status_note.txt" >>') self.f_status_note(name_total, name_check, dir_name) print('Created>>') #----------ZIP CREATE----------# if dict_local['zip']: zip_name = self.f_name(kwargs['id'], '.zip', dict_local['path']) ziph = zipfile.ZipFile(dict_local['path'] + zip_name, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) self.f_zipdir(dir_name, ziph, zip_name) ziph.close() #Delete Temporal Path if os.path.exists(tempdir[:-1]): shutil.rmtree(tempdir[:-1]) #------------------------------# print('.....') return 'DOWNLOAD FINISHED' else: #Delete Temporal Path if dict_local['zip']: if os.path.exists(tempdir[:-1]): shutil.rmtree(tempdir[:-1]) else: if os.path.exists(dir_name[:-1]): shutil.rmtree(dir_name[:-1]) return 'NO FILES WERE DOWNLOADED' else: return 'FILES NOT FOUND' def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs): ''' FINALIDAD: Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s). PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .download_files_advance(id_or_name= , param_1 = , ...) ''' #------------------ PATH ----------------------# if isinstance(path, str): if os.path.isdir(path): if not path.endswith(os.sep): path = path + os.sep test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt' try: file_txt = open(test_txt, 'w') file_txt.close() os.remove(test_txt) except: return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path) else: return 'ERROR:: "path" does not exist' else: return 'ERROR:: "path" must be: ' #------------------ PROCESSES -----------------# if not isinstance(processes, int): return 'ERROR:: "processes" must be: ' #------------------ ID OR NAME ----------------# if isinstance(id_or_name, str): id_or_name = [id_or_name] elif isinstance(id_or_name, list): id_or_name = list(map(str, id_or_name)) else: return 'ERROR:: dataset "id_or_name" must be: ' #----------------------------------------------# arguments = { '--apikey': self.Authorization, '--ckan-user': None, '--config': None, '--datapackages': path, '--datastore-fields': False, '--get-request': False, '--insecure': not self.verify, '--log': '/home/soporte/DUMP/download.txt', '--processes': str(processes), '--quiet': False, '--remote': self.url, '--worker': False, #'--all': False, #'--gzip': False, #'--output': None, #'--max-records': None, #'--output-json': False, #'--output-jsonl': False, #'--create-only': False, #'--help': False, #'--input': None, #'--input-json': False, #'--start-record': '1', #'--update-only': False, #'--upload-logo': False, #'--upload-resources': False, #'--version': False, 'ID_OR_NAME': id_or_name, 'datasets': True, 'dump': True, #'ACTION_NAME': None, #'KEY:JSON': [], #'KEY=STRING': [], #'KEY@FILE': [], #'action': False, #'delete': False, #'groups': False, #'load': False, #'organizations': False, #'related': False, #'search': False, #'users': False } return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs)