from ckanapi import RemoteCKAN #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError import sys import platform import os import tempfile import shutil import zipfile import concurrent.futures import requests import json import ast from datetime import datetime from tqdm import tqdm from six import string_types class JROAPI(): """ FINALIDAD: Script para administrar y obtener la data del repositorio por medio de APIs. REQUISITIOS PREVIOS: - Paso 1: Tener "pip [Python 2]" o "pip3 [Python 3]" instalado: - Paso 2: Instalar lo siguiente como admininstrador: En Python 2 - pip install ckanapi==4.5 - pip install requests - pip install tqdm En Python 3 - pip3 install ckanapi==4.5 - pip3 install requests - pip3 install tqdm FUNCIONES DISPONIBLES: - action - upload_file - upload_multiple_files - show - search - create - patch - delete - download_files EJEMPLOS: #1: with JROAPI('http://demo.example.com', Authorization='#########') as : ... some operation(s) ... #2: = JROAPI('http://example.com', Authorization='#########') ... some operation(s) ... .ckan.close() REPORTAR ALGUN PROBLEMA: Debe enviar un correo a eynilupu@igp.gob.pe detallando los siguientes pasos: 1) Identifiquese 2) Describir el problema 3) ¿En que funcion esta el problema? 4) ¿Que esperaba que hiciera la funcion sin el problema? """ def __init__(self, url, Authorization=None): ua = 'CKAN_JRO/1.1 (+'+str(url)+')' #ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36' self.ckan = RemoteCKAN(url, apikey=Authorization, user_agent=ua) #self.ckan = RemoteCKAN(url, apikey=Authorization) self.Authorization = Authorization if platform.system() == 'Windows': self.separator = '\\' else: self.separator = '/' self.chunk_size = 1024 self.list = [] self.dict = {} self.str = '' self.check = 1 self.cont = 0 def __enter__(self): return self def __exit__(self, *args): self.ckan.close() def action(self, action, **kwargs): """ FINALIDAD: Funcion para llamar a las APIs disponibles APIs DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" EJEMPLO: .action(, param_1 = , ...) """ #--------------- CASE: PACKAGE SEARCH ---------------# if kwargs is not None: if action == 'package_search': self.list = ['facet_mincount', 'facet_limit', 'facet_field'] for facet in self.list: if facet in kwargs: kwargs[facet.replace('_', '.')] = kwargs[facet] kwargs.pop(facet) #----------------------------------------------------# try: return getattr(self.ckan.action, action)(**kwargs) except: _, exc_value, _ = sys.exc_info() return exc_value def upload_file(self, dataset_id, file_path, file_date, file_type, **kwargs): ''' FINALIDAD: Funcion para subir un unico archivo al repositorio del ROJ. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .upload_file(dataset_id = , file_date = , file_path = , file_type = , param_1 = , ...) ''' self.list = ['package_id', 'upload', 'voc_file_type', 'name'] #file_date for key1, value1 in kwargs.items(): if not key1 in self.list: self.dict[key1] = value1 #---------------------------# if not 'others' in kwargs: self.dict['others'] = '' else: if isinstance(kwargs['others'], list): self.dict['others'] = json.dumps(kwargs['others']) #---------------------------# if not os.path.isfile(file_path): return 'File "%s" not exist' % (file_path) try: return getattr(self.ckan.action, 'resource_create')(package_id=dataset_id, file_date=file_date, upload=open(file_path, 'rb'), voc_file_type=file_type, name=file_path.split(self.separator)[-1], **self.dict) except: _, exc_value, _ = sys.exc_info() return exc_value def upload_multiple_files(self, dataset_id, path_files, date_files, type_files, **kwargs): ''' FINALIDAD: Funcion para subir multiples archivos al repositorio del ROJ. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .upload_multiple_files(dataset_id = , path_files = or , date_files = or , type_files = or , param_1 = , ...) ''' params_dict = {'upload':[], 'name':[]} #ADD FORMAT #---------------CASO : "path" or "path_list"-----------------# if type(path_files) is list: if len(path_files) != 0: path_files.sort() for u in path_files: if os.path.isfile(u): params_dict['upload'].append(open(u, 'rb')) params_dict['name'].append(u.split(self.separator)[-1]) else: return 'File "%s" not exist' % (u) else: return 'ERROR:: "path_list is empty"' elif type(path_files) is str: if os.path.isdir(path_files): path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))] path_order.sort() if path_order: for name in path_order: params_dict['upload'].append(open(os.path.join(path_files, name), 'rb')) params_dict['name'].append(name) else: return 'ERROR:: Directory is empty' else: return 'ERROR:: Directory "%s" not exist' % (path_files) else: return 'ERROR:: "path_files" must be a str or list' #------------------------------------------------------------# params_no_dict = {'package_id': dataset_id} if type(date_files) is list: params_dict['file_date'] = date_files else: params_no_dict['file_date'] = date_files if type(type_files) is list: params_dict['voc_file_type'] = type_files else: params_no_dict['voc_file_type'] = type_files for key1, value1 in kwargs.items(): if not key1 in params_dict and not key1 in params_no_dict and key1 != 'others': if type(value1) is list: params_dict[key1] = value1 else: params_no_dict[key1] = value1 #------------------------------------------# if not 'others' in kwargs: params_no_dict['others'] = '' else: if isinstance(kwargs['others'], tuple): params_dict['others'] = [json.dumps(w) for w in kwargs['others']] elif isinstance(kwargs['others'], list): params_no_dict['others'] = json.dumps(kwargs['others']) elif isinstance(kwargs['others'], str): params_no_dict['others'] = kwargs['others'] else: return 'ERROR:: "others" must be a tuple, list or str' #------------------------------------------# len_params_dict = [] for value2 in params_dict.values(): len_params_dict.append(len(value2)) if len(list(set(len_params_dict))) > 1: return 'ERROR:: All lists must be the same length: %s' % (len(params_dict['name'])) #------------------------------------------------------------# for v in range(len(params_dict['name'])): try: send = {} for key_dict, value_dict in params_dict.items(): send[key_dict] = value_dict[v] for key_no_dict, value_no_dict in params_no_dict.items(): send[key_no_dict] = value_no_dict self.list.append(getattr(self.ckan.action, 'resource_create')(**send)) print('File "{}" was uploaded successfully'.format(params_dict['name'][v])) except: _, exc_value, _ = sys.exc_info() self.list.append(exc_value) print('Error uploading "{}" file'.format(params_dict['name'][v])) return self.list #------------------------------------------------------------# def show(self, type_option, id, **kwargs): ''' FINALIDAD: Funcion personalizada para una busqueda en especifico. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .show(type_option = , id = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_show')(id=id, **kwargs) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_show')(id=id, **kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_show')(id=id, **kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_list_for_user')(id=id, **kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_list_for_user')(id=id, **kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_show')(id=id, **kwargs) elif type_option == 'tag': if not 'vocabulary_id' in kwargs: print('Missing "vocabulary_id" value: assume it is a free tag') return getattr(self.ckan.action, 'tag_show')(id=id, **kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_show')(id=id, **kwargs) elif type_option == 'job': return getattr(self.ckan.action, 'job_show')(id=id, **kwargs) else: return 'ERROR:: "%s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be a str' def search(self, type_option, query=None, **kwargs): ''' FINALIDAD: Funcion personalizada para busquedas que satisfagan algun criterio. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .search(type_option = , query = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': key_replace = ['fq', 'fq_list', 'include_private'] key_point = ['facet_mincount', 'facet_limit', 'facet_field'] for key1, value1 in kwargs.items(): if not key1 in key_replace: if key1 in key_point: self.dict[key1.replace('_', '.')] = value1 else: self.dict[key1] = value1 if query is not None: if type(query) is dict: self.dict['fq_list'] = [] #NUM_RESOURCES_MIN / NUM_RESOURCES_MAX #----------------------------------------------------# if 'dataset_start_date' in query: if type(query['dataset_start_date']) is str: try: datetime.strptime(query['dataset_start_date'], '%Y-%m-%d') if len(query['dataset_start_date']) != 10: return '"dataset_start_date", must be: ' self.dict['fq_list'].append('dataset_start_date:"'+query['dataset_start_date']+'"') self.list.append('dataset_start_date') except: return '"dataset_start_date" incorrect: "%s"' % (query['dataset_start_date']) else: return '"dataset_start_date" must be ' #----------------------------------------------------# if 'dataset_end_date' in query: if type(query['dataset_end_date']) is str: try: datetime.strptime(query['dataset_end_date'], '%Y-%m-%d') if len(query['dataset_end_date']) != 10: return '"dataset_end_date", must be: ' if 'dataset_start_date' in query: if query['dataset_start_date'] > query['dataset_end_date']: return '"dataset_end_date" must be greater than "dataset_start_date"' self.dict['fq_list'].append('dataset_end_date:"'+query['dataset_end_date']+'"') self.list.append('dataset_end_date') except: return '"dataset_end_date" incorrect: "%s"' % (query['dataset_end_date']) else: return '"dataset_end_date" must be ' #----------------------------------------------------# for key, value in query.items(): if value is not None and not key in self.list: self.dict['fq_list'].append(str(key)+':"'+str(value)+'"') else: return '"query" must be ' return getattr(self.ckan.action, 'package_search')(include_private=True, **self.dict) elif type_option == 'resource': for key1, value1 in kwargs.items(): if key1 != 'fields': self.dict[key1] = value1 if query is not None: if type(query) is dict: #----------------------------------------------------# if 'file_date_min' in query: if type(query['file_date_min']) is str: try: datetime.strptime(query['file_date_min'], '%Y-%m-%d') if len(query['file_date_min']) != 10: return '"file_date_min", must be: ' except: return '"file_date_min" incorrect: "%s"' % (query['file_date_min']) else: return '"file_date_min" must be ' #----------------------------------------------------# if 'file_date_max' in query: if type(query['file_date_max']) is str: try: datetime.strptime(query['file_date_max'], '%Y-%m-%d') if len(query['file_date_max']) != 10: return '"file_date_max", must be: ' if 'file_date_min' in query: if query['file_date_min'] > query['file_date_max']: return '"file_date_max" must be greater than "file_date_min"' except: return '"file_date_max" incorrect: "%s"' % (query['file_date_max']) else: return '"file_date_max" must be ' #----------------------------------------------------# self.dict['query'] = query else: return '"query" must be ' return getattr(self.ckan.action, 'resources_search')(**self.dict) elif type_option == 'tag': for key1, value1 in kwargs.items(): if key1 != 'fields': self.dict[key1] = value1 if not 'vocabulary_id' in kwargs: print('Missing "vocabulary_id" value: tags that don’t belong to any vocabulary') else: print('Only tags that belong to "{}" vocabulary'.format(kwargs['vocabulary_id'])) if query is not None: if type(query) is dict: if 'search' in query: if type(query['search']) is list or type(query['search']) is str: self.dict['query'] = query['search'] else: return '"search" must be or ' else: return '"query" must be ' return getattr(self.ckan.action, 'tag_search')(**self.dict) else: return 'ERROR:: "%s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def create(self, type_option, **kwargs): ''' FINALIDAD: Funcion personalizada para crear. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .create(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_create')(**kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_create')(**kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_member_create')(**kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_create')(**kwargs) elif type_option == 'tag': return getattr(self.ckan.action, 'tag_create')(**kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_create')(**kwargs) else: return 'ERROR:: "%s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def patch(self, type_option, **kwargs): ''' FINALIDAD: Funciones personalizadas para actualizar PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .patch(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': return getattr(self.ckan.action, 'package_patch')(**kwargs) elif type_option == 'project': return getattr(self.ckan.action, 'organization_patch')(**kwargs) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_patch')(**kwargs) elif type_option == 'member': return getattr(self.ckan.action, 'organization_member_create')(**kwargs) elif type_option == 'collaborator': return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs) else: return 'ERROR:: "%s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def delete(self, type_option, select=None, **kwargs): ''' FINALIDAD: Función personalizada para eliminar y/o purgar. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .delete(type_option = , param_1 = , ...) ''' if type(type_option) is str: try: if type_option == 'dataset': if select is None: return 'ERROR:: "select" must not be "None"' else: if 'delete' == select: return getattr(self.ckan.action, 'package_delete')(**kwargs) elif 'purge' == select: return getattr(self.ckan.action, 'dataset_purge')(**kwargs) else: return 'ERROR:: "%s" is not accepted' % (select) elif type_option == 'project': if select is None: return 'ERROR:: "select" must not be "None"' else: if 'delete' == select: return getattr(self.ckan.action, 'organization_delete')(**kwargs) elif 'purge' == select: return getattr(self.ckan.action, 'organization_purge')(**kwargs) else: return 'ERROR:: "%s" is not accepted' % (select) elif type_option == 'resource': return getattr(self.ckan.action, 'resource_delete')(**kwargs) elif type_option == 'vocabulary': return getattr(self.ckan.action, 'vocabulary_delete')(**kwargs) elif type_option == 'tag': return getattr(self.ckan.action, 'tag_delete')(**kwargs) elif type_option == 'user': return getattr(self.ckan.action, 'user_delete')(**kwargs) else: return 'ERROR:: "%s" is not accepted' % (type_option) except: _, exc_value, _ = sys.exc_info() return exc_value else: return 'ERROR:: "type_option" must be ' def f_status_note(self, total, result, path): file_txt = open(path+'status_note.txt', 'w') file_txt = open(path+'status_note.txt', 'a') file_txt.write('DOWNLOADED FILE(S): "%s"' % (len(result['name']))) file_txt.write(''+ os.linesep) for u in result['name']: file_txt.write(' - '+ u + os.linesep) file_txt.write(''+ os.linesep) file_txt.write('FAILED FILE(S): "%s"' % (len(total['name'])-len(result['name']))) file_txt.write(''+ os.linesep) if len(total['name'])-len(result['name']) != 0: for u in total['name']: if not u in result['name']: file_txt.write(' - '+ u + os.linesep) else: file_txt.write(' "None"'+ os.linesep) def f_name(self, name_dataset, ext, tempdir): while self.check: self.str = '' if self.cont == 0: if os.path.exists(tempdir + name_dataset + ext): self.str = name_dataset+'('+str(self.cont+1)+')'+ext else: self.check = self.check * 0 self.str = name_dataset + ext else: if not os.path.exists(tempdir + name_dataset+'('+str(self.cont)+')'+ext): self.check = self.check * 0 self.str = name_dataset+'('+str(self.cont)+')'+ ext self.cont = self.cont+1 return self.str def f_zipdir(self, path, ziph, zip_name): for root, _, files in os.walk(path): print('.....') print('Creating: "{}" >>'.format(zip_name)) for __file in tqdm(iterable=files, total=len(files)): new_dir = os.path.relpath(os.path.join(root, __file), os.path.join(path, '..')) ziph.write(os.path.join(root, __file), new_dir) print('Created >>') def download_by_step(self, response, tempdir_name): try: with requests.get(response['url'], stream=True, headers={'Authorization': self.Authorization}) as resp: if resp.status_code == 200: with open(tempdir_name+response['name'], 'wb') as file: for chunk in resp.iter_content(chunk_size = self.chunk_size): if chunk: file.write(chunk) except requests.exceptions.RequestException: pass def download_files(self, **kwargs): ''' FINALIDAD: Funcion personalizada para la descarga de archivos existentes de un dataset. PARAMETROS DISPONIBLES: CONSULTAR: "GUIA DE SCRIPT.pdf" ESTRUCTURA: .download_files(id = , param_1 = , ...) ''' dict_local = {} #----------------------------------------------# if 'zip' in kwargs: if type(kwargs['zip']) is not bool: return 'ERROR:: "zip" must be: ' else: dict_local['zip'] = kwargs['zip'] else: dict_local['zip'] = False #----------------------------------------------# if 'status_note' in kwargs: if type(kwargs['status_note']) is not bool: return 'ERROR:: "status_note" must be: ' else: dict_local['status_note'] = kwargs['status_note'] else: dict_local['status_note'] = False #----------------------------------------------# if 'path' in kwargs: if type(kwargs['path']) is str: if os.path.isdir(kwargs['path']) == False: return 'ERROR:: "path" does not exist' else: if kwargs['path'][-1:] != self.separator: dict_local['path'] = kwargs['path']+self.separator else: dict_local['path'] = kwargs['path'] txt = dict_local['path']+datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt' if int(platform.python_version()[0]) == 3: try: file_txt = open(txt, 'w') file_txt.close() os.remove(txt) except PermissionError: return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path']) else: try: file_txt = open(txt, 'w') file_txt.close() os.remove(txt) except: return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path']) else: return 'ERROR:: "path" must be: ' else: dict_local['path'] = '' #----------------------------------------------# for key, value in kwargs.items(): if not key in dict_local: self.dict[key] = value try: response = getattr(self.ckan.action, 'url_resources')(**self.dict) except: _, exc_value, _ = sys.exc_info() return exc_value if len(response) != 0: #--------------TEMP PATH---------------# if dict_local['zip']: tempdir = tempfile.mkdtemp(prefix=kwargs['id']+'-')+self.separator os.mkdir(tempdir+kwargs['id']) dir_name = tempdir + kwargs['id'] + self.separator else: dir = self.f_name(kwargs['id'], '', dict_local['path']) os.mkdir(dict_local['path'] + dir) dir_name = dict_local['path'] + dir + self.separator #-----------DOWNLOAD FILES-------------# print('.....') print('Downloading "{}" file(s) >>'.format(len(response))) name_total = {'name': []} with concurrent.futures.ThreadPoolExecutor() as executor: for u in tqdm(iterable=response, total=len(response)): name_total['name'].append(u['name']) executor.submit(self.download_by_step, u, dir_name) name_check = {} name_check['name'] = [f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))] print('"{}" downloaded file(s) successfully >>'.format(len(name_check['name']))) #--------------------------------------# if len(name_check['name']) != 0: #----------Status Note---------# if dict_local['status_note']: print('.....') print('Creating: "status_note.txt" >>') self.f_status_note(name_total, name_check, dir_name) print('Created>>') #----------ZIP CREATE----------# if dict_local['zip']: zip_name = self.f_name(kwargs['id'], '.zip', dict_local['path']) ziph = zipfile.ZipFile(dict_local['path'] + zip_name, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) self.f_zipdir(dir_name, ziph, zip_name) ziph.close() #Delete Temporal Path if os.path.exists(tempdir[:-1]): shutil.rmtree(tempdir[:-1]) #------------------------------# print('.....') return 'DOWNLOAD FINISHED' else: #Delete Temporal Path if dict_local['zip']: if os.path.exists(tempdir[:-1]): shutil.rmtree(tempdir[:-1]) else: if os.path.exists(dir_name[:-1]): shutil.rmtree(dir_name[:-1]) return 'NO FILES WERE DOWNLOADED' else: return 'FILES NOT FOUND'