##// END OF EJS Templates
v2.9.2 :: Update files restructuring
v2.9.2 :: Update files restructuring

File last commit:

r20:dce75cd83ead
r20:dce75cd83ead
Show More
download.py
233 lines | 8.7 KiB | text/x-python | PythonLexer
#from ckanapi.datapackage import populate_schema_from_datastore
from ckanapi.cli import workers, dump
from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
from datetime import datetime
import sys
import json
import os
import requests
import six
if sys.version_info.major == 3:
from urllib.parse import urlparse
else:
import urlparse
DL_CHUNK_SIZE = 100 * 1024
def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
if worker_pool is None:
worker_pool = workers.worker_pool
if stdout is None:
stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
if stderr is None:
stderr = getattr(sys.stderr, 'buffer', sys.stderr)
if arguments['--worker']:
return dump.dump_things_worker(ckan, thing, arguments)
'''
log = None
if arguments['--log']:
log = open(arguments['--log'], 'a')
'''
jsonl_output = stdout
if arguments['--datapackages']:
jsonl_output = open(os.devnull, 'wb')
names = arguments['ID_OR_NAME']
if names and isinstance(names[0], dict):
names = [rec.get('name',rec.get('id')) for rec in names]
'''
if arguments['--datapackages']:
arguments['--datastore-fields'] = True
'''
#----------------------------#
filtered_urls = {}
for val in names:
try:
filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs)
except:
_, exc_value, _ = sys.exc_info()
return exc_value
#----------------------------#
cmd = dump._worker_command_line(thing, arguments)
processes = int(arguments['--processes'])
if hasattr(ckan, 'parallel_limit'):
processes = min(processes, ckan.parallel_limit)
stats = completion_stats(processes)
pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
results = {}
expecting_number = 0
with quiet_int_pipe() as errors:
for job_ids, finished, result in pool:
if not result:
return 1
timestamp, error, record = json.loads(result.decode('utf-8'))
results[finished] = record
#----------------------------------------#
datapackages_path = arguments['--datapackages']
datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path)
#----------------------------------------#
if not arguments['--quiet']:
stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format(
finished,
job_ids,
next(stats),
error,
datapackage_dir,
record.get('name', '') if record else '',
).encode('utf-8'))
'''
if log:
log.write(compact_json([
timestamp,
finished,
error,
record.get('name', '') if record else None,
]) + b'\n')
'''
if datapackages_path:
try:
filter_url = filtered_urls[record.get('name', '')]
except:
filter_url = filtered_urls[record.get('id', '')]
create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
while expecting_number in results:
record = results.pop(expecting_number)
if record:
jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
expecting_number += 1
if 'pipe' in errors:
return 1
if 'interrupt' in errors:
return 2
def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure):
resource_formats_to_ignore = ['API', 'api']
os.makedirs(os.path.join(datapackage_dir, 'data'))
record['path'] = datapackage_dir
ckan_resources = []
for resource in record.get('resources', []):
if resource['format'] in resource_formats_to_ignore:
continue
if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
continue
if len(resource['url']) == 0:
continue
filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
resource['path'] = os.path.join(datapackage_dir, 'data', filename)
cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
if not cres:
continue
'''
#----------------------------------------#
dres = {'path': os.path.join('data', filename),
'description': cres.get('description', ''),
'format': cres.get('format', ''),
'name': cres.get('name', ''),
'title': cres.get('name', '').title()}
#----------------------------------------#
populate_schema_from_datastore(cres, dres)
'''
ckan_resources.append(resource)
dataset = dict(record, resources=ckan_resources)
datapackage = dataset_to_datapackage_change(dataset)
json_path = os.path.join(datapackage_dir, 'datapackage.json')
with open(json_path, 'wb') as out:
out.write(pretty_json(datapackage))
return datapackage_dir, datapackage, json_path
def create_resource_change(resource, stderr, apikey, host_url, insecure):
# ---------- REPLACE URL --------- #
if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
#----------------------------------#
try:
r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
#---------------------------------------#
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
return False
#---------------------------------------#
with open(resource['path'], 'wb') as f:
for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
if chunk:
f.write(chunk)
except requests.ConnectionError:
stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
except requests.exceptions.RequestException as e:
stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
except Exception as e:
stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
return resource
def dataset_to_datapackage_change(dataset_dict):
dp = {'name': dataset_dict['name'],
'id': dataset_dict['id'],
'path': dataset_dict['path'],
'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
resources = dataset_dict.get('resources')
if resources:
dp['resources'] = [convert_to_datapackage_resource_change(r)
for r in resources]
return dp
def convert_to_datapackage_resource_change(resource_dict):
resource = {}
if resource_dict.get('id'):
resource['id'] = resource_dict['id']
if resource_dict.get('name'):
resource['name'] = resource_dict['name']
if resource_dict.get('path'):
if os.path.isfile(resource_dict['path']):
resource['path'] = resource_dict['path']
else:
resource['url'] = resource_dict['url']
schema = resource_dict.get('schema')
if isinstance(schema, six.string_types):
try:
resource['schema'] = json.loads(schema)
except ValueError:
resource['schema'] = schema
elif isinstance(schema, dict):
resource['schema'] = schema
return resource
def name_no_repetition(name, dir, option=''):
count = 0
while True:
count = count + 1
if not os.path.exists(os.path.join(dir, name)):
if option == 'resource':
return name
else:
return os.path.join(dir, name)
elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
if option == 'resource':
return '('+str(count)+')'+name
else:
return os.path.join(dir, '('+str(count)+')'+name)
else:
pass