logic_download.py
225 lines
| 8.4 KiB
| text/x-python
|
PythonLexer
r12 | #from ckanapi.datapackage import populate_schema_from_datastore | ||
from ckanapi.cli import workers, dump | |||
from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe | |||
from datetime import datetime | |||
import sys | |||
import json | |||
import os | |||
import requests | |||
import six | |||
if sys.version_info.major == 3: | |||
from urllib.parse import urlparse | |||
else: | |||
import urlparse | |||
DL_CHUNK_SIZE = 100 * 1024 | |||
def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs): | |||
if worker_pool is None: | |||
worker_pool = workers.worker_pool | |||
if stdout is None: | |||
stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__) | |||
if stderr is None: | |||
stderr = getattr(sys.stderr, 'buffer', sys.stderr) | |||
if arguments['--worker']: | |||
return dump.dump_things_worker(ckan, thing, arguments) | |||
''' | |||
log = None | |||
if arguments['--log']: | |||
log = open(arguments['--log'], 'a') | |||
''' | |||
jsonl_output = stdout | |||
if arguments['--datapackages']: | |||
jsonl_output = open(os.devnull, 'wb') | |||
names = arguments['ID_OR_NAME'] | |||
if names and isinstance(names[0], dict): | |||
names = [rec.get('name',rec.get('id')) for rec in names] | |||
''' | |||
if arguments['--datapackages']: | |||
arguments['--datastore-fields'] = True | |||
''' | |||
#----------------------------# | |||
filtered_urls = {} | |||
for name in names: | |||
try: | |||
response = getattr(ckan.action, 'url_resources')(id=name, **kwargs) | |||
except: | |||
_, exc_value, _ = sys.exc_info() | |||
return exc_value | |||
filtered_urls[name] = response | |||
#----------------------------# | |||
cmd = dump._worker_command_line(thing, arguments) | |||
processes = int(arguments['--processes']) | |||
if hasattr(ckan, 'parallel_limit'): | |||
processes = min(processes, ckan.parallel_limit) | |||
stats = completion_stats(processes) | |||
pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names)) | |||
results = {} | |||
expecting_number = 0 | |||
with quiet_int_pipe() as errors: | |||
for job_ids, finished, result in pool: | |||
if not result: | |||
return 1 | |||
timestamp, error, record = json.loads(result.decode('utf-8')) | |||
results[finished] = record | |||
if not arguments['--quiet']: | |||
stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Dataset Name: {4}\n'.format( | |||
finished, | |||
job_ids, | |||
next(stats), | |||
error, | |||
record.get('name', '') if record else '', | |||
).encode('utf-8')) | |||
''' | |||
if log: | |||
log.write(compact_json([ | |||
timestamp, | |||
finished, | |||
error, | |||
record.get('name', '') if record else None, | |||
]) + b'\n') | |||
''' | |||
datapackages_path = arguments['--datapackages'] | |||
if datapackages_path: | |||
create_datapackage_change(record, filtered_urls[record.get('name', '')], datapackages_path, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure']) | |||
while expecting_number in results: | |||
record = results.pop(expecting_number) | |||
if record: | |||
jsonl_output.write(compact_json(record, sort_keys=True) + b'\n') | |||
expecting_number += 1 | |||
if 'pipe' in errors: | |||
return 1 | |||
if 'interrupt' in errors: | |||
return 2 | |||
def create_datapackage_change(record, filtered_url, base_path, stderr, apikey, host_url, insecure): | |||
resource_formats_to_ignore = ['API', 'api'] | |||
#----------------------------------------# | |||
datapackage_dir = name_no_repetition(record.get('name', ''), base_path) | |||
#----------------------------------------# | |||
os.makedirs(os.path.join(datapackage_dir, 'data')) | |||
record['path'] = datapackage_dir | |||
ckan_resources = [] | |||
for resource in record.get('resources', []): | |||
if resource['format'] in resource_formats_to_ignore: | |||
continue | |||
if not {'name': resource['name'], 'url': resource['url']} in filtered_url: | |||
continue | |||
if len(resource['url']) == 0: | |||
continue | |||
filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource') | |||
resource['path'] = os.path.join(datapackage_dir, 'data', filename) | |||
cres = create_resource_change(resource, stderr, apikey, host_url, insecure) | |||
if not cres: | |||
continue | |||
''' | |||
#----------------------------------------# | |||
dres = {'path': os.path.join('data', filename), | |||
'description': cres.get('description', ''), | |||
'format': cres.get('format', ''), | |||
'name': cres.get('name', ''), | |||
'title': cres.get('name', '').title()} | |||
#----------------------------------------# | |||
populate_schema_from_datastore(cres, dres) | |||
''' | |||
ckan_resources.append(resource) | |||
dataset = dict(record, resources=ckan_resources) | |||
datapackage = dataset_to_datapackage_change(dataset) | |||
json_path = os.path.join(datapackage_dir, 'datapackage.json') | |||
with open(json_path, 'wb') as out: | |||
out.write(pretty_json(datapackage)) | |||
return datapackage_dir, datapackage, json_path | |||
def create_resource_change(resource, stderr, apikey, host_url, insecure): | |||
# ---------- REPLACE URL --------- # | |||
if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe': | |||
resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc, | |||
urlparse(host_url).scheme + '://' + urlparse(host_url).netloc) | |||
#----------------------------------# | |||
try: | |||
r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure) | |||
#---------------------------------------# | |||
try: | |||
r.raise_for_status() | |||
except requests.exceptions.HTTPError as e: | |||
return False | |||
#---------------------------------------# | |||
with open(resource['path'], 'wb') as f: | |||
for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE): | |||
if chunk: | |||
f.write(chunk) | |||
except requests.ConnectionError: | |||
stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8')) | |||
except requests.exceptions.RequestException as e: | |||
stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |||
except Exception as e: | |||
stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |||
return resource | |||
def dataset_to_datapackage_change(dataset_dict): | |||
dp = {'name': dataset_dict['name'], | |||
'id': dataset_dict['id'], | |||
'path': dataset_dict['path'], | |||
'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")} | |||
resources = dataset_dict.get('resources') | |||
if resources: | |||
dp['resources'] = [convert_to_datapackage_resource_change(r) | |||
for r in resources] | |||
return dp | |||
def convert_to_datapackage_resource_change(resource_dict): | |||
resource = {} | |||
if resource_dict.get('id'): | |||
resource['id'] = resource_dict['id'] | |||
if resource_dict.get('name'): | |||
resource['name'] = resource_dict['name'] | |||
if resource_dict.get('path'): | |||
resource['path'] = resource_dict['path'] | |||
schema = resource_dict.get('schema') | |||
if isinstance(schema, six.string_types): | |||
try: | |||
resource['schema'] = json.loads(schema) | |||
except ValueError: | |||
resource['schema'] = schema | |||
elif isinstance(schema, dict): | |||
resource['schema'] = schema | |||
return resource | |||
def name_no_repetition(name, dir, option=''): | |||
count = 0 | |||
while True: | |||
count = count + 1 | |||
if not os.path.exists(os.path.join(dir, name)): | |||
if option == 'resource': | |||
return name | |||
else: | |||
return os.path.join(dir, name) | |||
elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)): | |||
if option == 'resource': | |||
return '('+str(count)+')'+name | |||
else: | |||
return os.path.join(dir, '('+str(count)+')'+name) | |||
else: | |||
pass |