#from ckanapi.datapackage import populate_schema_from_datastore from ckanapi.cli import workers, dump from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe from datetime import datetime import sys import json import os import requests import six if sys.version_info.major == 3: from urllib.parse import urlparse else: import urlparse DL_CHUNK_SIZE = 100 * 1024 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs): if worker_pool is None: worker_pool = workers.worker_pool if stdout is None: stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__) if stderr is None: stderr = getattr(sys.stderr, 'buffer', sys.stderr) if arguments['--worker']: return dump.dump_things_worker(ckan, thing, arguments) ''' log = None if arguments['--log']: log = open(arguments['--log'], 'a') ''' jsonl_output = stdout if arguments['--datapackages']: jsonl_output = open(os.devnull, 'wb') names = arguments['ID_OR_NAME'] if names and isinstance(names[0], dict): names = [rec.get('name',rec.get('id')) for rec in names] ''' if arguments['--datapackages']: arguments['--datastore-fields'] = True ''' #----------------------------# filtered_urls = {} for val in names: try: filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs) except: _, exc_value, _ = sys.exc_info() return exc_value #----------------------------# cmd = dump._worker_command_line(thing, arguments) processes = int(arguments['--processes']) if hasattr(ckan, 'parallel_limit'): processes = min(processes, ckan.parallel_limit) stats = completion_stats(processes) pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names)) results = {} expecting_number = 0 with quiet_int_pipe() as errors: for job_ids, finished, result in pool: if not result: return 1 timestamp, error, record = json.loads(result.decode('utf-8')) results[finished] = record #----------------------------------------# datapackages_path = arguments['--datapackages'] datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path) #----------------------------------------# if not arguments['--quiet']: stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format( finished, job_ids, next(stats), error, datapackage_dir, record.get('name', '') if record else '', ).encode('utf-8')) ''' if log: log.write(compact_json([ timestamp, finished, error, record.get('name', '') if record else None, ]) + b'\n') ''' if datapackages_path: try: filter_url = filtered_urls[record.get('name', '')] except: filter_url = filtered_urls[record.get('id', '')] create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure']) while expecting_number in results: record = results.pop(expecting_number) if record: jsonl_output.write(compact_json(record, sort_keys=True) + b'\n') expecting_number += 1 if 'pipe' in errors: return 1 if 'interrupt' in errors: return 2 def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure): resource_formats_to_ignore = ['API', 'api'] os.makedirs(os.path.join(datapackage_dir, 'data')) record['path'] = datapackage_dir ckan_resources = [] for resource in record.get('resources', []): if resource['format'] in resource_formats_to_ignore: continue if not {'name': resource['name'], 'url': resource['url']} in filtered_url: continue if len(resource['url']) == 0: continue filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource') resource['path'] = os.path.join(datapackage_dir, 'data', filename) cres = create_resource_change(resource, stderr, apikey, host_url, insecure) if not cres: continue ''' #----------------------------------------# dres = {'path': os.path.join('data', filename), 'description': cres.get('description', ''), 'format': cres.get('format', ''), 'name': cres.get('name', ''), 'title': cres.get('name', '').title()} #----------------------------------------# populate_schema_from_datastore(cres, dres) ''' ckan_resources.append(resource) dataset = dict(record, resources=ckan_resources) datapackage = dataset_to_datapackage_change(dataset) json_path = os.path.join(datapackage_dir, 'datapackage.json') with open(json_path, 'wb') as out: out.write(pretty_json(datapackage)) return datapackage_dir, datapackage, json_path def create_resource_change(resource, stderr, apikey, host_url, insecure): # ---------- REPLACE URL --------- # if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe': resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc, urlparse(host_url).scheme + '://' + urlparse(host_url).netloc) #----------------------------------# try: r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure) #---------------------------------------# try: r.raise_for_status() except requests.exceptions.HTTPError as e: return False #---------------------------------------# with open(resource['path'], 'wb') as f: for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE): if chunk: f.write(chunk) except requests.ConnectionError: stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8')) except requests.exceptions.RequestException as e: stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) except Exception as e: stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) return resource def dataset_to_datapackage_change(dataset_dict): dp = {'name': dataset_dict['name'], 'id': dataset_dict['id'], 'path': dataset_dict['path'], 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")} resources = dataset_dict.get('resources') if resources: dp['resources'] = [convert_to_datapackage_resource_change(r) for r in resources] return dp def convert_to_datapackage_resource_change(resource_dict): resource = {} if resource_dict.get('id'): resource['id'] = resource_dict['id'] if resource_dict.get('name'): resource['name'] = resource_dict['name'] if resource_dict.get('path'): if os.path.isfile(resource_dict['path']): resource['path'] = resource_dict['path'] else: resource['url'] = resource_dict['url'] schema = resource_dict.get('schema') if isinstance(schema, six.string_types): try: resource['schema'] = json.loads(schema) except ValueError: resource['schema'] = schema elif isinstance(schema, dict): resource['schema'] = schema return resource def name_no_repetition(name, dir, option=''): count = 0 while True: count = count + 1 if not os.path.exists(os.path.join(dir, name)): if option == 'resource': return name else: return os.path.join(dir, name) elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)): if option == 'resource': return '('+str(count)+')'+name else: return os.path.join(dir, '('+str(count)+')'+name) else: pass