|
|
#from ckanapi.datapackage import populate_schema_from_datastore
|
|
|
from ckanapi.cli import workers, dump
|
|
|
from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
|
|
|
from datetime import datetime
|
|
|
import sys
|
|
|
import json
|
|
|
import os
|
|
|
import requests
|
|
|
import six
|
|
|
|
|
|
if sys.version_info.major == 3:
|
|
|
from urllib.parse import urlparse
|
|
|
else:
|
|
|
import urlparse
|
|
|
|
|
|
DL_CHUNK_SIZE = 100 * 1024
|
|
|
|
|
|
def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
|
|
|
if worker_pool is None:
|
|
|
worker_pool = workers.worker_pool
|
|
|
if stdout is None:
|
|
|
stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
|
|
|
if stderr is None:
|
|
|
stderr = getattr(sys.stderr, 'buffer', sys.stderr)
|
|
|
|
|
|
if arguments['--worker']:
|
|
|
return dump.dump_things_worker(ckan, thing, arguments)
|
|
|
'''
|
|
|
log = None
|
|
|
if arguments['--log']:
|
|
|
log = open(arguments['--log'], 'a')
|
|
|
'''
|
|
|
jsonl_output = stdout
|
|
|
if arguments['--datapackages']:
|
|
|
jsonl_output = open(os.devnull, 'wb')
|
|
|
|
|
|
names = arguments['ID_OR_NAME']
|
|
|
|
|
|
if names and isinstance(names[0], dict):
|
|
|
names = [rec.get('name',rec.get('id')) for rec in names]
|
|
|
'''
|
|
|
if arguments['--datapackages']:
|
|
|
arguments['--datastore-fields'] = True
|
|
|
'''
|
|
|
#----------------------------#
|
|
|
filtered_urls = {}
|
|
|
for val in names:
|
|
|
try:
|
|
|
filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs)
|
|
|
except:
|
|
|
_, exc_value, _ = sys.exc_info()
|
|
|
return exc_value
|
|
|
#----------------------------#
|
|
|
|
|
|
cmd = dump._worker_command_line(thing, arguments)
|
|
|
processes = int(arguments['--processes'])
|
|
|
if hasattr(ckan, 'parallel_limit'):
|
|
|
processes = min(processes, ckan.parallel_limit)
|
|
|
stats = completion_stats(processes)
|
|
|
pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
|
|
|
|
|
|
results = {}
|
|
|
expecting_number = 0
|
|
|
with quiet_int_pipe() as errors:
|
|
|
for job_ids, finished, result in pool:
|
|
|
if not result:
|
|
|
return 1
|
|
|
timestamp, error, record = json.loads(result.decode('utf-8'))
|
|
|
results[finished] = record
|
|
|
|
|
|
#----------------------------------------#
|
|
|
datapackages_path = arguments['--datapackages']
|
|
|
datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path)
|
|
|
#----------------------------------------#
|
|
|
if not arguments['--quiet']:
|
|
|
stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format(
|
|
|
finished,
|
|
|
job_ids,
|
|
|
next(stats),
|
|
|
error,
|
|
|
datapackage_dir,
|
|
|
record.get('name', '') if record else '',
|
|
|
).encode('utf-8'))
|
|
|
'''
|
|
|
if log:
|
|
|
log.write(compact_json([
|
|
|
timestamp,
|
|
|
finished,
|
|
|
error,
|
|
|
record.get('name', '') if record else None,
|
|
|
]) + b'\n')
|
|
|
'''
|
|
|
if datapackages_path:
|
|
|
try:
|
|
|
filter_url = filtered_urls[record.get('name', '')]
|
|
|
except:
|
|
|
filter_url = filtered_urls[record.get('id', '')]
|
|
|
create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
|
|
|
|
|
|
while expecting_number in results:
|
|
|
record = results.pop(expecting_number)
|
|
|
if record:
|
|
|
jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
|
|
|
expecting_number += 1
|
|
|
if 'pipe' in errors:
|
|
|
return 1
|
|
|
if 'interrupt' in errors:
|
|
|
return 2
|
|
|
|
|
|
def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure):
|
|
|
resource_formats_to_ignore = ['API', 'api']
|
|
|
|
|
|
os.makedirs(os.path.join(datapackage_dir, 'data'))
|
|
|
record['path'] = datapackage_dir
|
|
|
|
|
|
ckan_resources = []
|
|
|
for resource in record.get('resources', []):
|
|
|
if resource['format'] in resource_formats_to_ignore:
|
|
|
continue
|
|
|
|
|
|
if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
|
|
|
continue
|
|
|
|
|
|
if len(resource['url']) == 0:
|
|
|
continue
|
|
|
|
|
|
filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
|
|
|
resource['path'] = os.path.join(datapackage_dir, 'data', filename)
|
|
|
|
|
|
cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
|
|
|
if not cres:
|
|
|
continue
|
|
|
'''
|
|
|
#----------------------------------------#
|
|
|
dres = {'path': os.path.join('data', filename),
|
|
|
'description': cres.get('description', ''),
|
|
|
'format': cres.get('format', ''),
|
|
|
'name': cres.get('name', ''),
|
|
|
'title': cres.get('name', '').title()}
|
|
|
#----------------------------------------#
|
|
|
populate_schema_from_datastore(cres, dres)
|
|
|
'''
|
|
|
ckan_resources.append(resource)
|
|
|
|
|
|
dataset = dict(record, resources=ckan_resources)
|
|
|
datapackage = dataset_to_datapackage_change(dataset)
|
|
|
|
|
|
json_path = os.path.join(datapackage_dir, 'datapackage.json')
|
|
|
with open(json_path, 'wb') as out:
|
|
|
out.write(pretty_json(datapackage))
|
|
|
|
|
|
return datapackage_dir, datapackage, json_path
|
|
|
|
|
|
def create_resource_change(resource, stderr, apikey, host_url, insecure):
|
|
|
# ---------- REPLACE URL --------- #
|
|
|
if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
|
|
|
resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
|
|
|
urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
|
|
|
#----------------------------------#
|
|
|
try:
|
|
|
r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
|
|
|
#---------------------------------------#
|
|
|
try:
|
|
|
r.raise_for_status()
|
|
|
except requests.exceptions.HTTPError as e:
|
|
|
return False
|
|
|
#---------------------------------------#
|
|
|
with open(resource['path'], 'wb') as f:
|
|
|
for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
|
|
|
if chunk:
|
|
|
f.write(chunk)
|
|
|
|
|
|
except requests.ConnectionError:
|
|
|
stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
|
|
|
except Exception as e:
|
|
|
stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
|
|
|
return resource
|
|
|
|
|
|
def dataset_to_datapackage_change(dataset_dict):
|
|
|
dp = {'name': dataset_dict['name'],
|
|
|
'id': dataset_dict['id'],
|
|
|
'path': dataset_dict['path'],
|
|
|
'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
|
|
|
|
|
|
resources = dataset_dict.get('resources')
|
|
|
if resources:
|
|
|
dp['resources'] = [convert_to_datapackage_resource_change(r)
|
|
|
for r in resources]
|
|
|
return dp
|
|
|
|
|
|
def convert_to_datapackage_resource_change(resource_dict):
|
|
|
resource = {}
|
|
|
|
|
|
if resource_dict.get('id'):
|
|
|
resource['id'] = resource_dict['id']
|
|
|
|
|
|
if resource_dict.get('name'):
|
|
|
resource['name'] = resource_dict['name']
|
|
|
|
|
|
if resource_dict.get('path'):
|
|
|
if os.path.isfile(resource_dict['path']):
|
|
|
resource['path'] = resource_dict['path']
|
|
|
else:
|
|
|
resource['url'] = resource_dict['url']
|
|
|
|
|
|
schema = resource_dict.get('schema')
|
|
|
if isinstance(schema, six.string_types):
|
|
|
try:
|
|
|
resource['schema'] = json.loads(schema)
|
|
|
except ValueError:
|
|
|
resource['schema'] = schema
|
|
|
elif isinstance(schema, dict):
|
|
|
resource['schema'] = schema
|
|
|
return resource
|
|
|
|
|
|
def name_no_repetition(name, dir, option=''):
|
|
|
count = 0
|
|
|
while True:
|
|
|
count = count + 1
|
|
|
if not os.path.exists(os.path.join(dir, name)):
|
|
|
if option == 'resource':
|
|
|
return name
|
|
|
else:
|
|
|
return os.path.join(dir, name)
|
|
|
|
|
|
elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
|
|
|
if option == 'resource':
|
|
|
return '('+str(count)+')'+name
|
|
|
else:
|
|
|
return os.path.join(dir, '('+str(count)+')'+name)
|
|
|
else:
|
|
|
pass
|