##// END OF EJS Templates
v2.9.2 :: Update 'upload_file' function - Add empty resource
v2.9.2 :: Update 'upload_file' function - Add empty resource

File last commit:

r14:a8d46058b456
r14:a8d46058b456
Show More
jro_api.py
1036 lines | 48.5 KiB | text/x-python | PythonLexer
from ckanapi import RemoteCKAN
from datetime import datetime
from tqdm import tqdm
from CKAN_JRO import logic_download
#from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError
import sys
import platform
import os
import tempfile
import shutil
import zipfile
import concurrent.futures
import requests
import json
#import pathlib
import uuid
if sys.version_info.major == 3:
from urllib.parse import urlparse
else:
import urlparse
class JROAPI():
"""
FINALIDAD:
Script para administrar y obtener la data del repositorio por medio de APIs.
REQUISITIOS PREVIOS:
- Paso 1: Tener "pip [Python 2]" o "pip3 [Python 3]" instalado:
- Paso 2: Instalar lo siguiente como admininstrador:
En Python 2
- pip install ckanapi==4.5
- pip install requests
- pip install futures
- pip install tqdm
En Python > 3
- pip3 install ckanapi==4.5
- pip3 install requests
- pip3 install tqdm
FUNCIONES DISPONIBLES:
- action
- upload_file
- upload_multiple_files
- upload_multiple_files_advance
- show
- search
- create
- patch
- delete
- download_files
EJEMPLOS:
#1:
with JROAPI('http://demo.example.com', Authorization='#########') as <access_name>:
... some operation(s) ...
#2:
<access_name> = JROAPI('http://example.com', Authorization='#########')
... some operation(s) ...
<access_name>.ckan.close()
REPORTAR ALGUN PROBLEMA:
Debe enviar un correo a eynilupu@igp.gob.pe detallando los siguientes pasos:
1) Correo para contactarlo
2) Descripcion del problema
3) ¿En que paso o seccion encontro el problema?
4) ¿Cual era el resultado que usted esperaba?
"""
def __init__(self, url, Authorization=None, secure=True):
#-------- Check Secure -------#
self.verify = secure
if not secure and isinstance(secure, bool):
session = requests.Session()
session.verify = False
else:
session = None
#------------------------------#
self.url = url
ua = 'CKAN_JRO/2.9.2 (+'+str(self.url)+')'
#ua = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'
self.ckan = RemoteCKAN(self.url, apikey=Authorization, user_agent=ua, session=session)
#self.ckan = RemoteCKAN(self.url, apikey=Authorization)
self.Authorization = Authorization
# Change for --> self.separator = os.sep
if platform.system() == 'Windows':
self.separator = '\\'
else:
self.separator = '/'
self.chunk_size = 1024
self.list = []
self.dict = {}
self.str = ''
self.check = 1
self.cont = 0
def __enter__(self):
return self
def __exit__(self, *args):
self.ckan.close()
def action(self, action, **kwargs):
"""
FINALIDAD:
Funcion para llamar a las APIs disponibles
APIs DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
EJEMPLO:
<access_name>.action(<consuming API>, param_1 = <class 'param_1'>, ...)
"""
#--------------- CASE: PACKAGE SEARCH ---------------#
if kwargs is not None:
if action == 'package_search':
self.list = ['facet_mincount', 'facet_limit', 'facet_field']
for facet in self.list:
if facet in kwargs:
kwargs[facet.replace('_', '.')] = kwargs[facet]
kwargs.pop(facet)
#----------------------------------------------------#
try:
return getattr(self.ckan.action, action)(**kwargs)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
def upload_file(self, dataset_id, file_date, file_type, file_path=False, url_or_path=False, ignore_repetition=False, **kwargs):
# Agregar si es interruptido por teclado
'''
FINALIDAD:
Funcion para crear un unico recurso (puede incluir un archivo asociado) al repositorio del ROJ.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.upload_file(dataset_id = <class 'str'>, file_date = <class 'str'>, file_type = <class 'str'>, file_path = <class 'str'>, url_or_path = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
#self.list = ['package_id', 'upload', 'voc_file_type', 'name'] #file_date
self.list = ['package_id', 'upload', 'voc_file_type'] #file_date
for key1, value1 in kwargs.items():
if not key1 in self.list:
self.dict[key1] = value1
#---------------------------#
if not 'others' in kwargs:
self.dict['others'] = ''
else:
if isinstance(kwargs['others'], list):
self.dict['others'] = json.dumps(kwargs['others'])
#---------------------------#
if isinstance(file_path, str) and isinstance(url_or_path, str):
return 'ERROR:: Choose one: "file_path" or "url_or_path" parameters'
if isinstance(file_path, str):
if not os.path.isfile(file_path):
return 'File "%s" not exist' % (file_path)
self.dict['upload'] = open(file_path, 'rb')
self.dict['name'] = os.path.basename(file_path)
elif isinstance(url_or_path, str):
self.dict['url'] = url_or_path
if not 'name' in self.dict:
self.dict['name'] = os.path.basename(url_or_path)
else:
return 'ERROR: Verify "file_path" or "url_or_path" parameters: <class "str"> or choose one'
#if not 'format' in self.dict:
# self.str = ''.join(pathlib.Path(file_path).suffixes)
# if len(self.str) > 0:
# self.dict['format'] = self.str.upper()[1:]
#-------------------------PACKAGE SHOW-----------------------#
try:
dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
except:
_, exc_value, _ = sys.exc_info()
print('ERROR obtaining metadata dataset:: Use the "print" for more information')
return exc_value
resources_name = []
for u in dataset_show:
resources_name.append(u['name'].lower())
if self.dict['name'].lower() in resources_name:
if not ignore_repetition:
return 'ERROR:: "%s" resource already exist in this dataset' % (self.dict['name'])
print('WARRING:: "'+ str(self.dict['name']) +'" resource already exist in this dataset')
#------------------------------------------------------------#
try:
return getattr(self.ckan.action, 'resource_create')(package_id=dataset_id, file_date=file_date, voc_file_type=file_type, **self.dict)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
def upload_multiple_files_advance(self, dataset_id, path_files, file_date, file_type, max_size=100, max_count=500, ignore_repetition=False, **kwargs):
# Agregar si es interruptido por teclado
'''
FINALIDAD:
Funcion para subir multiples archivos al repositorio del ROJ.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.upload_multiple_files_advance(dataset_id = <class 'str'>, path_files = <class 'list of strings'>, file_date = <class 'str'>, file_type = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
#-------------------------PACKAGE SHOW-----------------------#
try:
dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
except:
_, exc_value, _ = sys.exc_info()
print('ERROR obtaining metadata dataset:: Use the "print" for more information')
return exc_value
#------------------------------------------------------------#
resources_name = []
for u in dataset_show:
resources_name.append(u['name'].lower())
#------------------------------------------------------------#
self.list = ['package_id', 'upload', 'voc_file_type', 'name']
for key1, value1 in kwargs.items():
if not key1 in self.list:
self.dict[key1] = value1
#------------------------------------------------------------#
if not 'others' in kwargs:
self.dict['others'] = ''
else:
if isinstance(kwargs['others'], list):
self.dict['others'] = json.dumps(kwargs['others'])
#------------------------------------------------------------#
total_list = []
#---------------CASO : "path" or "path_list"-----------------#
if type(path_files) is list:
if len(path_files) != 0:
path_files.sort()
for u in path_files:
if os.path.isfile(u):
if os.path.basename(u).lower() in resources_name:
if not ignore_repetition:
return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u))
print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset')
else:
total_list.append({'name':os.path.basename(u), 'size': os.stat(u).st_size, 'upload':open(u, 'rb')})
else:
return 'File "%s" does not exist' % (u)
else:
return 'ERROR:: "path_list is empty"'
elif type(path_files) is str:
if os.path.isdir(path_files):
path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))]
path_order.sort()
if path_order:
for name in path_order:
if name.lower() in resources_name:
if not ignore_repetition:
return 'ERROR:: "%s" file already exist in this dataset' % (name)
print('WARRING:: "'+ name +'" file was ignored because already exist in this dataset')
else:
total_list.append({'name':name, 'size': os.stat(os.path.join(path_files, name)).st_size, 'upload':open(os.path.join(path_files, name), 'rb')})
else:
return "ERROR:: There aren't files in this directory"
else:
return 'ERROR:: Directory "%s" does not exist' % (path_files)
else:
return 'ERROR:: "path_files" must be a str or list'
#------------------------------------------------------------#
try:
uuid.UUID(str(dataset_id), version=4)
package_id_or_name = '"id": "' + str(dataset_id) + '"'
except ValueError:
package_id_or_name = '"name": "' + str(dataset_id) + '"'
#------------------------------------------------------------#
blocks = [[]]
size_file = 0
count_file = 0
inter_num = 0
for value in total_list:
if value['size'] > 1024 * 1024 * float(max_size):
return 'ERROR:: The size of the "%s" file is %sMB aprox, please change "max_size" value' % (value['name'], str(round(value['size']/(1024 * 1024), 2)))
if not 1 <= int(max_count) <= 999:
return 'ERROR:: The count of the number of files must be between 1 and 999, please change "max_count" value'
size_file = size_file + value['size']
count_file = count_file + 1
if size_file <= 1024 * 1024 * float(max_size) and count_file <= int(max_count):
del value['size']
blocks[inter_num].append(value)
else:
inter_num = inter_num + 1
size_file = value['size']
count_file = 1
blocks.append([])
del value['size']
blocks[inter_num].append(value)
#------------------------------------------------------------#
if len(blocks[0]) > 0:
print('BLOCK(S) IN TOTAL:: {}'.format(len(blocks)))
for count1, block in enumerate(blocks):
print('---- BLOCK N°{} ----'.format(count1 + 1))
resource_extend = []
files_dict = {}
for count2, value2 in enumerate(block):
value2['file_date'] = file_date
value2['voc_file_type'] = file_type
value2.update(self.dict)
#if not 'format' in value2:
# format = ''.join(pathlib.Path(value2['name']).suffixes)
# if len(format) > 0:
# value2['format'] = format.upper()[1:]
files_dict['update__resources__-'+ str(len(block)-count2) +'__upload'] = (value2['name'], value2['upload'])
del value2['upload']
resource_extend.append(value2)
print('BLOCK N°{} :: "{}" file(s) found >> uploading'.format(count1 + 1, len(block)))
try:
result = self.ckan.call_action(
'package_revise',
{'match': '{'+ str(package_id_or_name) +'}', 'update__resources__extend': json.dumps(resource_extend)},
files=files_dict
)
print('BLOCK N°{} :: Uploaded file(s) successfully'.format(count1 + 1))
if len(blocks) == count1 + 1:
return result
except:
print('ERROR :: Use the "print" for more information')
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return "ERROR:: No file(s) found to upload"
def upload_multiple_files(self, dataset_id, path_files, date_files, type_files, ignore_repetition=False, **kwargs):
# Agregar si es interruptido por teclado
'''
FINALIDAD:
Funcion para subir multiples archivos al repositorio del ROJ.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.upload_multiple_files(dataset_id = <class 'str'>, path_files = <class 'str'> or <class 'list of strings'>, date_files = <class 'str'> or <class 'list of strings'>, type_files = <class 'str'> or <class 'list of strings'>, param_1 = <class 'param_1'>, ...)
'''
#-------------------------PACKAGE SHOW-----------------------#
try:
dataset_show = getattr(self.ckan.action, 'package_show')(id=dataset_id)['resources']
except:
_, exc_value, _ = sys.exc_info()
print('ERROR obtaining metadata dataset:: Use the "print" for more information')
return exc_value
#------------------------------------------------------------#
resources_name = []
for u in dataset_show:
resources_name.append(u['name'].lower())
#------------------------------------------------------------#
params_dict = {'upload':[], 'name':[]}
#if not 'format' in kwargs:
# params_dict.update({'format':[]})
#---------------CASO : "path" or "path_list"-----------------#
if type(path_files) is list:
if len(path_files) != 0:
path_files.sort()
for u in path_files:
if os.path.isfile(u):
if os.path.basename(u).lower() in resources_name:
if not ignore_repetition:
return 'ERROR:: "%s" file already exist in this dataset' % (os.path.basename(u))
print('WARRING:: "'+ str(os.path.basename(u)) +'" file was ignored because already exist in this dataset')
else:
params_dict['upload'].append(open(u, 'rb'))
params_dict['name'].append(os.path.basename(u))
#if not 'format' in kwargs:
# format = ''.join(pathlib.Path(u).suffixes)
# if len(format) > 0:
# params_dict['format'].append(format.upper()[1:])
# else:
# params_dict['format'].append('')
else:
return 'File "%s" does not exist' % (u)
else:
return 'ERROR:: "path_list is empty"'
elif type(path_files) is str:
if os.path.isdir(path_files):
path_order = [f for f in os.listdir(path_files) if os.path.isfile(os.path.join(path_files, f))]
path_order.sort()
if path_order:
for name in path_order:
if name.lower() in resources_name:
if not ignore_repetition:
return 'ERROR:: "%s" file already exist in this dataset' % (name)
print('WARRING:: "'+ str(name) +'" file was ignored because already exist in this dataset')
else:
params_dict['upload'].append(open(os.path.join(path_files, name), 'rb'))
params_dict['name'].append(name)
#if not 'format' in kwargs:
# format = ''.join(pathlib.Path(name).suffixes)
# if len(format) > 0:
# params_dict['format'].append(format.upper()[1:])
# else:
# params_dict['format'].append('')
else:
return "ERROR:: There aren't files in this directory"
else:
return 'ERROR:: Directory "%s" does not exist' % (path_files)
else:
return 'ERROR:: "path_files" must be a str or list'
#------------------------------------------------------------#
params_no_dict = {'package_id': dataset_id}
if type(date_files) is list:
params_dict['file_date'] = date_files
else:
params_no_dict['file_date'] = date_files
if type(type_files) is list:
params_dict['voc_file_type'] = type_files
else:
params_no_dict['voc_file_type'] = type_files
for key1, value1 in kwargs.items():
if not key1 in params_dict and not key1 in params_no_dict and key1 != 'others':
if type(value1) is list:
params_dict[key1] = value1
else:
params_no_dict[key1] = value1
#------------------------------------------#
if not 'others' in kwargs:
params_no_dict['others'] = ''
else:
if isinstance(kwargs['others'], tuple):
params_dict['others'] = [json.dumps(w) for w in kwargs['others']]
elif isinstance(kwargs['others'], list):
params_no_dict['others'] = json.dumps(kwargs['others'])
elif isinstance(kwargs['others'], str):
params_no_dict['others'] = kwargs['others']
else:
return 'ERROR:: "others" must be a tuple, list or str'
#------------------------------------------#
len_params_dict = []
for value2 in params_dict.values():
len_params_dict.append(len(value2))
if len(list(set(len_params_dict))) > 1:
return 'ERROR:: All lists must be the same length: %s' % (len(params_dict['name']))
#------------------------------------------------------------#
print('"{}" file(s) found >> uploading'.format(len(params_dict['name'])))
for v in range(len(params_dict['name'])):
try:
send = {}
for key_dict, value_dict in params_dict.items():
send[key_dict] = value_dict[v]
for key_no_dict, value_no_dict in params_no_dict.items():
send[key_no_dict] = value_no_dict
self.list.append(getattr(self.ckan.action, 'resource_create')(**send))
print('File #{} :: "{}" was uploaded successfully'.format(v+1, params_dict['name'][v]))
except:
_, exc_value, _ = sys.exc_info()
self.list.append(exc_value)
print('File #{} :: Error uploading "{}" file'.format(v+1, params_dict['name'][v]))
return self.list
#------------------------------------------------------------#
def show(self, type_option, id, **kwargs):
'''
FINALIDAD:
Funcion personalizada para una busqueda en especifico.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.show(type_option = <class 'str'>, id = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
return getattr(self.ckan.action, 'package_show')(id=id, **kwargs)
elif type_option == 'resource':
return getattr(self.ckan.action, 'resource_show')(id=id, **kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_show')(id=id, **kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_list_for_user')(id=id, **kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_list_for_user')(id=id, **kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_show')(id=id, **kwargs)
elif type_option == 'tag':
if not 'vocabulary_id' in kwargs:
print('Missing "vocabulary_id" value: assume it is a free tag')
return getattr(self.ckan.action, 'tag_show')(id=id, **kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_show')(id=id, **kwargs)
elif type_option == 'job':
return getattr(self.ckan.action, 'job_show')(id=id, **kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be a str'
def search(self, type_option, query=None, **kwargs):
'''
FINALIDAD:
Funcion personalizada para busquedas que satisfagan algun criterio.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.search(type_option = <class 'str'>, query = <class 'dict'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
key_replace = ['fq', 'fq_list', 'include_private']
key_point = ['facet_mincount', 'facet_limit', 'facet_field']
for key1, value1 in kwargs.items():
if not key1 in key_replace:
if key1 in key_point:
self.dict[key1.replace('_', '.')] = value1
else:
self.dict[key1] = value1
if query is not None:
if type(query) is dict:
self.dict['fq_list'] = []
#NUM_RESOURCES_MIN / NUM_RESOURCES_MAX
#----------------------------------------------------#
if 'dataset_start_date' in query:
if type(query['dataset_start_date']) is str:
try:
datetime.strptime(query['dataset_start_date'], '%Y-%m-%d')
if len(query['dataset_start_date']) != 10:
return '"dataset_start_date", must be: <YYYY-MM-DD>'
self.dict['fq_list'].append('dataset_start_date:"'+query['dataset_start_date']+'"')
self.list.append('dataset_start_date')
except:
return '"dataset_start_date" incorrect: "%s"' % (query['dataset_start_date'])
else:
return '"dataset_start_date" must be <str>'
#----------------------------------------------------#
if 'dataset_end_date' in query:
if type(query['dataset_end_date']) is str:
try:
datetime.strptime(query['dataset_end_date'], '%Y-%m-%d')
if len(query['dataset_end_date']) != 10:
return '"dataset_end_date", must be: <YYYY-MM-DD>'
if 'dataset_start_date' in query:
if query['dataset_start_date'] > query['dataset_end_date']:
return '"dataset_end_date" must be greater than "dataset_start_date"'
self.dict['fq_list'].append('dataset_end_date:"'+query['dataset_end_date']+'"')
self.list.append('dataset_end_date')
except:
return '"dataset_end_date" incorrect: "%s"' % (query['dataset_end_date'])
else:
return '"dataset_end_date" must be <str>'
#----------------------------------------------------#
for key, value in query.items():
if value is not None and not key in self.list:
self.dict['fq_list'].append(str(key)+':"'+str(value)+'"')
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'package_search')(include_private=True, **self.dict)
elif type_option == 'resource':
for key1, value1 in kwargs.items():
if key1 != 'fields':
self.dict[key1] = value1
if query is not None:
if type(query) is dict:
#----------------------------------------------------#
if 'file_date_min' in query:
if type(query['file_date_min']) is str:
try:
datetime.strptime(query['file_date_min'], '%Y-%m-%d')
if len(query['file_date_min']) != 10:
return '"file_date_min", must be: <YYYY-MM-DD>'
except:
return '"file_date_min" incorrect: "%s"' % (query['file_date_min'])
else:
return '"file_date_min" must be <str>'
#----------------------------------------------------#
if 'file_date_max' in query:
if type(query['file_date_max']) is str:
try:
datetime.strptime(query['file_date_max'], '%Y-%m-%d')
if len(query['file_date_max']) != 10:
return '"file_date_max", must be: <YYYY-MM-DD>'
if 'file_date_min' in query:
if query['file_date_min'] > query['file_date_max']:
return '"file_date_max" must be greater than "file_date_min"'
except:
return '"file_date_max" incorrect: "%s"' % (query['file_date_max'])
else:
return '"file_date_max" must be <str>'
#----------------------------------------------------#
self.dict['query'] = query
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'resources_search')(**self.dict)
elif type_option == 'tag':
for key1, value1 in kwargs.items():
if key1 != 'fields':
self.dict[key1] = value1
if not 'vocabulary_id' in kwargs:
print('Missing "vocabulary_id" value: tags that don’t belong to any vocabulary')
else:
print('Only tags that belong to "{}" vocabulary'.format(kwargs['vocabulary_id']))
if query is not None:
if type(query) is dict:
if 'search' in query:
if type(query['search']) is list or type(query['search']) is str:
self.dict['query'] = query['search']
else:
return '"search" must be <list> or <str>'
else:
return '"query" must be <dict>'
return getattr(self.ckan.action, 'tag_search')(**self.dict)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def create(self, type_option, select=None, **kwargs):
'''
FINALIDAD:
Funcion personalizada para crear.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.create(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
return getattr(self.ckan.action, 'package_create')(**kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_create')(**kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_create')(**kwargs)
elif type_option == 'tag':
return getattr(self.ckan.action, 'tag_create')(**kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_create')(**kwargs)
elif type_option == 'views':
if 'resource' == select:
self.list = ['package']
for key1, value1 in kwargs.items():
if not key1 in self.list:
self.dict[key1] = value1
return getattr(self.ckan.action, 'resource_create_default_resource_views')(**self.dict)
elif 'dataset' == select:
return getattr(self.ckan.action, 'package_create_default_resource_views')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def patch(self, type_option, **kwargs):
'''
FINALIDAD:
Funciones personalizadas para actualizar
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.patch(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
return getattr(self.ckan.action, 'package_patch')(**kwargs)
elif type_option == 'project':
return getattr(self.ckan.action, 'organization_patch')(**kwargs)
elif type_option == 'resource':
return getattr(self.ckan.action, 'resource_patch')(**kwargs)
elif type_option == 'member':
return getattr(self.ckan.action, 'organization_member_create')(**kwargs)
elif type_option == 'collaborator':
return getattr(self.ckan.action, 'package_collaborator_create')(**kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def delete(self, type_option, select=None, **kwargs):
'''
FINALIDAD:
Función personalizada para eliminar y/o purgar.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.delete(type_option = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
if type(type_option) is str:
try:
if type_option == 'dataset':
if select is None:
return 'ERROR:: "select" must not be "None"'
else:
if 'delete' == select:
return getattr(self.ckan.action, 'package_delete')(**kwargs)
elif 'purge' == select:
return getattr(self.ckan.action, 'dataset_purge')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
elif type_option == 'project':
if select is None:
return 'ERROR:: "select" must not be "None"'
else:
if 'delete' == select:
return getattr(self.ckan.action, 'organization_delete')(**kwargs)
elif 'purge' == select:
return getattr(self.ckan.action, 'organization_purge')(**kwargs)
else:
return 'ERROR:: "select = %s" is not accepted' % (select)
elif type_option == 'resource':
return getattr(self.ckan.action, 'resource_delete')(**kwargs)
elif type_option == 'vocabulary':
return getattr(self.ckan.action, 'vocabulary_delete')(**kwargs)
elif type_option == 'tag':
return getattr(self.ckan.action, 'tag_delete')(**kwargs)
elif type_option == 'user':
return getattr(self.ckan.action, 'user_delete')(**kwargs)
else:
return 'ERROR:: "type_option = %s" is not accepted' % (type_option)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
else:
return 'ERROR:: "type_option" must be <str>'
def f_status_note(self, total, result, path):
file_txt = open(path+'status_note.txt', 'w')
file_txt = open(path+'status_note.txt', 'a')
file_txt.write('DOWNLOADED FILE(S): "%s"' % (len(result['name'])))
file_txt.write(''+ os.linesep)
for u in result['name']:
file_txt.write(' - '+ u + os.linesep)
file_txt.write(''+ os.linesep)
file_txt.write('FAILED FILE(S): "%s"' % (len(total['name'])-len(result['name'])))
file_txt.write(''+ os.linesep)
if len(total['name'])-len(result['name']) != 0:
for u in total['name']:
if not u in result['name']:
file_txt.write(' - '+ u + os.linesep)
else:
file_txt.write(' "None"'+ os.linesep)
def f_name(self, name_dataset, ext, tempdir):
while self.check:
self.str = ''
if self.cont == 0:
if os.path.exists(tempdir + name_dataset + ext):
self.str = name_dataset+'('+str(self.cont+1)+')'+ext
else:
self.check = self.check * 0
self.str = name_dataset + ext
else:
if not os.path.exists(tempdir + name_dataset+'('+str(self.cont)+')'+ext):
self.check = self.check * 0
self.str = name_dataset+'('+str(self.cont)+')'+ ext
self.cont = self.cont+1
return self.str
def f_zipdir(self, path, ziph, zip_name):
for root, _, files in os.walk(path):
print('.....')
print('Creating: "{}" >>'.format(zip_name))
for __file in tqdm(iterable=files, total=len(files)):
new_dir = os.path.relpath(os.path.join(root, __file), os.path.join(path, '..'))
ziph.write(os.path.join(root, __file), new_dir)
print('Created >>')
def download_by_step(self, response, tempdir_name):
try:
# ---------- REPLACE URL --------- #
if urlparse(self.url).netloc != 'www.igp.gob.pe' and urlparse(response['url']).netloc == 'www.igp.gob.pe':
response['url'] = response['url'].replace(urlparse(response['url']).scheme + '://' + urlparse(response['url']).netloc,
urlparse(self.url).scheme + '://' + urlparse(self.url).netloc)
#----------------------------------#
with requests.get(response['url'], stream=True, headers={'Authorization': self.Authorization}, verify=self.verify) as resp:
if resp.status_code == 200:
with open(tempdir_name+response['name'], 'wb') as file:
for chunk in resp.iter_content(chunk_size = self.chunk_size):
if chunk:
file.write(chunk)
except requests.exceptions.RequestException:
pass
def download_files(self, **kwargs):
'''
FINALIDAD:
Funcion personalizada para la descarga de archivos existentes de un dataset.
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.download_files(id = <class 'str'>, param_1 = <class 'param_1'>, ...)
'''
dict_local = {}
#----------------------------------------------#
if 'zip' in kwargs:
if type(kwargs['zip']) is not bool:
return 'ERROR:: "zip" must be: <class "bool">'
else:
dict_local['zip'] = kwargs['zip']
else:
dict_local['zip'] = False
#----------------------------------------------#
if 'status_note' in kwargs:
if type(kwargs['status_note']) is not bool:
return 'ERROR:: "status_note" must be: <class "bool">'
else:
dict_local['status_note'] = kwargs['status_note']
else:
dict_local['status_note'] = False
#----------------------------------------------#
if 'path' in kwargs:
if type(kwargs['path']) is str:
if os.path.isdir(kwargs['path']) == False:
return 'ERROR:: "path" does not exist'
else:
if kwargs['path'][-1:] != self.separator:
dict_local['path'] = kwargs['path']+self.separator
else:
dict_local['path'] = kwargs['path']
txt = dict_local['path']+datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
if int(platform.python_version()[0]) == 3:
try:
file_txt = open(txt, 'w')
file_txt.close()
os.remove(txt)
except PermissionError:
return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path'])
else:
try:
file_txt = open(txt, 'w')
file_txt.close()
os.remove(txt)
except:
return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (dict_local['path'])
else:
return 'ERROR:: "path" must be: <class "str">'
else:
dict_local['path'] = ''
#----------------------------------------------#
for key, value in kwargs.items():
if not key in dict_local:
self.dict[key] = value
try:
response = getattr(self.ckan.action, 'url_resources')(**self.dict)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
if len(response) != 0:
#--------------TEMP PATH---------------#
if dict_local['zip']:
tempdir = tempfile.mkdtemp(prefix=kwargs['id']+'-')+self.separator
os.mkdir(tempdir+kwargs['id'])
dir_name = tempdir + kwargs['id'] + self.separator
else:
dir = self.f_name(kwargs['id'], '', dict_local['path'])
os.mkdir(dict_local['path'] + dir)
dir_name = dict_local['path'] + dir + self.separator
#-----------DOWNLOAD FILES-------------#
print('.....')
print('Downloading "{}" file(s) >>'.format(len(response)))
name_total = {'name': []}
with concurrent.futures.ThreadPoolExecutor() as executor:
for u in tqdm(iterable=response, total=len(response)):
name_total['name'].append(u['name'])
executor.submit(self.download_by_step, u, dir_name)
name_check = {}
name_check['name'] = [f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))]
print('"{}" downloaded file(s) successfully >>'.format(len(name_check['name'])))
#--------------------------------------#
if len(name_check['name']) != 0:
#----------Status Note---------#
if dict_local['status_note']:
print('.....')
print('Creating: "status_note.txt" >>')
self.f_status_note(name_total, name_check, dir_name)
print('Created>>')
#----------ZIP CREATE----------#
if dict_local['zip']:
zip_name = self.f_name(kwargs['id'], '.zip', dict_local['path'])
ziph = zipfile.ZipFile(dict_local['path'] + zip_name, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
self.f_zipdir(dir_name, ziph, zip_name)
ziph.close()
#Delete Temporal Path
if os.path.exists(tempdir[:-1]):
shutil.rmtree(tempdir[:-1])
#------------------------------#
print('.....')
return 'DOWNLOAD FINISHED'
else:
#Delete Temporal Path
if dict_local['zip']:
if os.path.exists(tempdir[:-1]):
shutil.rmtree(tempdir[:-1])
else:
if os.path.exists(dir_name[:-1]):
shutil.rmtree(dir_name[:-1])
return 'NO FILES WERE DOWNLOADED'
else:
return 'FILES NOT FOUND'
def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs):
'''
FINALIDAD:
Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s).
PARAMETROS DISPONIBLES:
CONSULTAR: "GUIA DE SCRIPT.pdf"
ESTRUCTURA:
<access_name>.download_files_advance(id_or_name= <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...)
'''
#------------------ PATH ----------------------#
if isinstance(path, str):
if os.path.isdir(path):
if not path.endswith(os.sep):
path = path + os.sep
test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
try:
file_txt = open(test_txt, 'w')
file_txt.close()
os.remove(test_txt)
except:
return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path)
else:
return 'ERROR:: "path" does not exist'
else:
return 'ERROR:: "path" must be: <class "str">'
#------------------ PROCESSES -----------------#
if not isinstance(processes, int):
return 'ERROR:: "processes" must be: <class "int">'
#------------------ ID OR NAME ----------------#
if isinstance(id_or_name, str):
id_or_name = [id_or_name]
elif isinstance(id_or_name, list):
id_or_name = list(map(str, id_or_name))
else:
return 'ERROR:: dataset "id_or_name" must be: <class "str" or "list">'
#----------------------------------------------#
arguments = {
'--apikey': self.Authorization,
'--ckan-user': None,
'--config': None,
'--datapackages': path,
'--datastore-fields': False,
'--get-request': False,
'--insecure': not self.verify,
'--log': '/home/soporte/DUMP/download.txt',
'--processes': str(processes),
'--quiet': False,
'--remote': self.url,
'--worker': False,
#'--all': False,
#'--gzip': False,
#'--output': None,
#'--max-records': None,
#'--output-json': False,
#'--output-jsonl': False,
#'--create-only': False,
#'--help': False,
#'--input': None,
#'--input-json': False,
#'--start-record': '1',
#'--update-only': False,
#'--upload-logo': False,
#'--upload-resources': False,
#'--version': False,
'ID_OR_NAME': id_or_name,
'datasets': True,
'dump': True,
#'ACTION_NAME': None,
#'KEY:JSON': [],
#'KEY=STRING': [],
#'KEY@FILE': [],
#'action': False,
#'delete': False,
#'groups': False,
#'load': False,
#'organizations': False,
#'related': False,
#'search': False,
#'users': False
}
return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs)