##// END OF EJS Templates
v2.9.2 :: Fixed 'download' bugs
v2.9.2 :: Fixed 'download' bugs

File last commit:

r19:1886da7a44e6
r19:1886da7a44e6
Show More
jro_api.py
499 lines | 22.8 KiB | text/x-python | PythonLexer
from ckanapi import RemoteCKAN
from datetime import datetime
from CKAN_JRO import logic_download
from CKAN_JRO import resource
#from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError
import sys
import platform
import os
import requests
class JROAPI():
"""
FINALIDAD:
Script para administrar y obtener la data del repositorio por medio de APIs.
REQUISITIOS PREVIOS:
- Paso 1: Tener "pip [Python 2]" o "pip3 [Python 3]" instalado:
- Paso 2: Instalar los siguientes paquetes:
ckanapi==4.7
requests
FUNCIONES DISPONIBLES:
- action
- show
- search
- create
- patch
- delete
- download_files
EJEMPLOS:
#1:
with JROAPI('http://demo.example.com', Authorization='#########') as <access_name>:
... some operation(s) ...
#2:
<access_name> = JROAPI('http://example.com', Authorization='#########')
... some operation(s) ...
<access_name>.ckan.close()
REPORTAR ALGUN PROBLEMA:
Debe enviar un correo a eynilupu@igp.gob.pe detallando los siguientes pasos:
1) Correo para contactarlo
2) Descripcion del problema
3) ¿En que paso o seccion encontro el problema?
4) ¿Cual era el resultado que usted esperaba?
"""
def __init__(self, url, Authorization=None, secure=True):
#-------- Check Secure -------#
self.verify = secure
if not secure and isinstance(secure, bool):
session = requests.Session()
session.verify = False
else:
session = None
#------------------------------#
self.url = url
ua = 'CKAN_JRO/2.9.2 (+'+str(self.url)+')'
#ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'
self.ckan = RemoteCKAN(self.url, apikey=Authorization, user_agent=ua, session=session)
#self.ckan = RemoteCKAN(self.url, apikey=Authorization)
self.Authorization = Authorization
# Change for --> self.separator = os.sep
if platform.system() == 'Windows':
self.separator = '\\'
else:
self.separator = '/'
self.chunk_size = 1024
self.list = []
self.dict = {}
self.str = ''
self.check = 1
self.cont = 0
def __enter__(self):
return self
def __exit__(self, *args):
self.ckan.close()
def action(self, action, **kwargs):
"""
FINALIDAD:
Funcion para llamar a las APIs disponibles
APIs DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
EJEMPLO:
<access_name>.action(<consuming API>, param_1 = <class 'param_1'>, ...)
"""
#--------------- CASE: PACKAGE SEARCH ---------------#
if kwargs is not None:
if action == 'package_search':
self.list = ['facet_mincount', 'facet_limit', 'facet_field']
for facet in self.list:
if facet in kwargs:
kwargs[facet.replace('_', '.')] = kwargs[facet]
kwargs.pop(facet)
#----------------------------------------------------#
try:
return getattr(self.ckan.action, action)(**kwargs)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
def show(self, type_option, id, **kwargs):
'''
FINALIDAD:
Funcion personalizada para una busqueda en especifico.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.show(type_option = <class 'str'>, id = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
return getattr(self.ckan.action, 'package_show')(id=id, **kwargs)
elif type_option == 'resource':
return getattr(self.ckan.action, 'resource_show')(id=id, **kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_show')(id=id, **kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_list_for_user')(id=id, **kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_list_for_user')(id=id, **kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_show')(id=id, **kwargs)
elif type_option == 'tag':
if not 'vocabulary_id' in kwargs:
print('Missing "vocabulary_id" value: assume it is a free tag')
return getattr(self.ckan.action, 'tag_show')(id=id, **kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_show')(id=id, **kwargs)
elif type_option == 'job':
return getattr(self.ckan.action, 'job_show')(id=id, **kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be a str'
def search(self, type_option, query=None, **kwargs):
'''
FINALIDAD:
Funcion personalizada para busquedas que satisfagan algun criterio.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.search(type_option = <class 'str'>, query = <class 'dict'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
key_replace = ['fq', 'fq_list', 'include_private']
key_point = ['facet_mincount', 'facet_limit', 'facet_field']
for key1, value1 in kwargs.items():
if not key1 in key_replace:
if key1 in key_point:
self.dict[key1.replace('_', '.')] = value1
else:
self.dict[key1] = value1
if query is not None:
if type(query) is dict:
self.dict['fq_list'] = []
#NUM_RESOURCES_MIN / NUM_RESOURCES_MAX
#----------------------------------------------------#
if 'dataset_start_date' in query:
if type(query['dataset_start_date']) is str:
try:
datetime.strptime(query['dataset_start_date'], '%Y-%m-%d')
if len(query['dataset_start_date']) != 10:
return '"dataset_start_date", must be: <YYYY-MM-DD>'
self.dict['fq_list'].append('dataset_start_date:"'+query['dataset_start_date']+'"')
self.list.append('dataset_start_date')
except:
return '"dataset_start_date" incorrect: "%s"' % (query['dataset_start_date'])
else:
return '"dataset_start_date" must be <str>'
#----------------------------------------------------#
if 'dataset_end_date' in query:
if type(query['dataset_end_date']) is str:
try:
datetime.strptime(query['dataset_end_date'], '%Y-%m-%d')
if len(query['dataset_end_date']) != 10:
return '"dataset_end_date", must be: <YYYY-MM-DD>'
if 'dataset_start_date' in query:
if query['dataset_start_date'] > query['dataset_end_date']:
return '"dataset_end_date" must be greater than "dataset_start_date"'
self.dict['fq_list'].append('dataset_end_date:"'+query['dataset_end_date']+'"')
self.list.append('dataset_end_date')
except:
return '"dataset_end_date" incorrect: "%s"' % (query['dataset_end_date'])
else:
return '"dataset_end_date" must be <str>'
#----------------------------------------------------#
for key, value in query.items():
if value is not None and not key in self.list:
self.dict['fq_list'].append(str(key)+':"'+str(value)+'"')
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'package_search')(include_private=True, **self.dict)
elif type_option == 'resource':
for key1, value1 in kwargs.items():
if key1 != 'fields':
self.dict[key1] = value1
if query is not None:
if type(query) is dict:
#----------------------------------------------------#
if 'file_date_min' in query:
if type(query['file_date_min']) is str:
try:
datetime.strptime(query['file_date_min'], '%Y-%m-%d')
if len(query['file_date_min']) != 10:
return '"file_date_min", must be: <YYYY-MM-DD>'
except:
return '"file_date_min" incorrect: "%s"' % (query['file_date_min'])
else:
return '"file_date_min" must be <str>'
#----------------------------------------------------#
if 'file_date_max' in query:
if type(query['file_date_max']) is str:
try:
datetime.strptime(query['file_date_max'], '%Y-%m-%d')
if len(query['file_date_max']) != 10:
return '"file_date_max", must be: <YYYY-MM-DD>'
if 'file_date_min' in query:
if query['file_date_min'] > query['file_date_max']:
return '"file_date_max" must be greater than "file_date_min"'
except:
return '"file_date_max" incorrect: "%s"' % (query['file_date_max'])
else:
return '"file_date_max" must be <str>'
#----------------------------------------------------#
self.dict['query'] = query
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'resources_search')(**self.dict)
elif type_option == 'tag':
for key1, value1 in kwargs.items():
if key1 != 'fields':
self.dict[key1] = value1
if not 'vocabulary_id' in kwargs:
print('Missing "vocabulary_id" value: tags that don’t belong to any vocabulary')
else:
print('Only tags that belong to "{}" vocabulary'.format(kwargs['vocabulary_id']))
if query is not None:
if type(query) is dict:
if 'search' in query:
if type(query['search']) is list or type(query['search']) is str:
self.dict['query'] = query['search']
else:
return '"search" must be <list> or <str>'
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'tag_search')(**self.dict)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def create(self, type_option, select=None, **kwargs):
'''
FINALIDAD:
Funcion personalizada para crear.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.create(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
return getattr(self.ckan.action, 'package_create')(**kwargs)
if type_option == 'resource':
return resource.resource_create(self, **kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_create')(**kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_create')(**kwargs)
elif type_option == 'tag':
return getattr(self.ckan.action, 'tag_create')(**kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_create')(**kwargs)
elif type_option == 'views':
if 'resource' == select:
self.list = ['package']
for key1, value1 in kwargs.items():
if not key1 in self.list:
self.dict[key1] = value1
return getattr(self.ckan.action, 'resource_create_default_resource_views')(**self.dict)
elif 'dataset' == select:
return getattr(self.ckan.action, 'package_create_default_resource_views')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def patch(self, type_option, **kwargs):
'''
FINALIDAD:
Funciones personalizadas para actualizar
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.patch(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
#Agregar que solo se debe modificar parámetros del Dataset y que no incluya Resources
return getattr(self.ckan.action, 'package_patch')(**kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_patch')(**kwargs)
elif type_option == 'resource':
return resource.resource_patch(self, **kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def delete(self, type_option, select=None, **kwargs):
'''
FINALIDAD:
Función personalizada para eliminar y/o purgar.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.delete(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
if select is None:
return 'ERROR:: "select" must not be "None"'
else:
if 'delete' == select:
return getattr(self.ckan.action, 'package_delete')(**kwargs)
elif 'purge' == select:
return getattr(self.ckan.action, 'dataset_purge')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
elif type_option == 'project':
if select is None:
return 'ERROR:: "select" must not be "None"'
else:
if 'delete' == select:
return getattr(self.ckan.action, 'organization_delete')(**kwargs)
elif 'purge' == select:
return getattr(self.ckan.action, 'organization_purge')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
elif type_option == 'resource':
if select is None:
return 'ERROR:: "select" must not be "None"'
else:
return resource.resource_delete(self, select, **kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_delete')(**kwargs)
elif type_option == 'tag':
return getattr(self.ckan.action, 'tag_delete')(**kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_delete')(**kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def download_files(self, id, processes=1, path=os.path.expanduser("~"), **kwargs):
'''
FINALIDAD:
Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s).
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.download_files(id = <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...)
'''
#------------------ PATH ----------------------#
if isinstance(path, str):
if os.path.isdir(path):
if not path.endswith(os.sep):
path = path + os.sep
test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
try:
file_txt = open(test_txt, 'w')
file_txt.close()
os.remove(test_txt)
except:
return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path)
else:
return 'ERROR:: "path" does not exist'
else:
return 'ERROR:: "path" must be: <class "str">'
#------------------ PROCESSES -----------------#
if not isinstance(processes, int):
return 'ERROR:: "processes" must be: <class "int">'
#------------------ ID OR NAME ----------------#
if isinstance(id, str):
id = [id]
elif isinstance(id, list):
id = list(map(str, id))
else:
return 'ERROR:: dataset "id" must be: <class "str" or "list">'
#----------------------------------------------#
arguments = {
'--apikey': self.Authorization,
'--ckan-user': None,
'--config': None,
'--datapackages': path,
'--datastore-fields': False,
'--get-request': False,
'--insecure': not self.verify,
'--processes': str(processes),
'--quiet': False,
'--remote': self.url,
'--worker': False,
#'--log': 'log.txt',
#'--all': False,
#'--gzip': False,
#'--output': None,
#'--max-records': None,
#'--output-json': False,
#'--output-jsonl': False,
#'--create-only': False,
#'--help': False,
#'--input': None,
#'--input-json': False,
#'--start-record': '1',
#'--update-only': False,
#'--upload-logo': False,
#'--upload-resources': False,
#'--version': False,
'ID_OR_NAME': id,
'datasets': True,
'dump': True,
#'ACTION_NAME': None,
#'KEY:JSON': [],
#'KEY=STRING': [],
#'KEY@FILE': [],
#'action': False,
#'delete': False,
#'groups': False,
#'load': False,
#'organizations': False,
#'related': False,
#'search': False,
#'users': False
}
return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs)