@@ -0,0 +1,228 | |||
|
1 | #from ckanapi.datapackage import populate_schema_from_datastore | |
|
2 | from ckanapi.cli import workers, dump | |
|
3 | from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe | |
|
4 | from datetime import datetime | |
|
5 | import sys | |
|
6 | import json | |
|
7 | import os | |
|
8 | import requests | |
|
9 | import six | |
|
10 | ||
|
11 | if sys.version_info.major == 3: | |
|
12 | from urllib.parse import urlparse | |
|
13 | else: | |
|
14 | import urlparse | |
|
15 | ||
|
16 | DL_CHUNK_SIZE = 100 * 1024 | |
|
17 | ||
|
18 | print() | |
|
19 | ||
|
20 | def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs): | |
|
21 | if worker_pool is None: | |
|
22 | worker_pool = workers.worker_pool | |
|
23 | if stdout is None: | |
|
24 | stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__) | |
|
25 | if stderr is None: | |
|
26 | stderr = getattr(sys.stderr, 'buffer', sys.stderr) | |
|
27 | ||
|
28 | if arguments['--worker']: | |
|
29 | return dump.dump_things_worker(ckan, thing, arguments) | |
|
30 | ''' | |
|
31 | log = None | |
|
32 | if arguments['--log']: | |
|
33 | log = open(arguments['--log'], 'a') | |
|
34 | ''' | |
|
35 | jsonl_output = stdout | |
|
36 | if arguments['--datapackages']: | |
|
37 | jsonl_output = open(os.devnull, 'wb') | |
|
38 | ||
|
39 | names = arguments['ID_OR_NAME'] | |
|
40 | ||
|
41 | if names and isinstance(names[0], dict): | |
|
42 | names = [rec.get('name',rec.get('id')) for rec in names] | |
|
43 | ''' | |
|
44 | if arguments['--datapackages']: | |
|
45 | arguments['--datastore-fields'] = True | |
|
46 | ''' | |
|
47 | #----------------------------# | |
|
48 | filtered_urls = {} | |
|
49 | for name in names: | |
|
50 | try: | |
|
51 | response = getattr(ckan.action, 'url_resources')(id=name, **kwargs) | |
|
52 | except: | |
|
53 | _, exc_value, _ = sys.exc_info() | |
|
54 | return exc_value | |
|
55 | filtered_urls[name] = response | |
|
56 | #----------------------------# | |
|
57 | ||
|
58 | cmd = dump._worker_command_line(thing, arguments) | |
|
59 | processes = int(arguments['--processes']) | |
|
60 | if hasattr(ckan, 'parallel_limit'): | |
|
61 | processes = min(processes, ckan.parallel_limit) | |
|
62 | stats = completion_stats(processes) | |
|
63 | pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names)) | |
|
64 | ||
|
65 | results = {} | |
|
66 | expecting_number = 0 | |
|
67 | with quiet_int_pipe() as errors: | |
|
68 | for job_ids, finished, result in pool: | |
|
69 | if not result: | |
|
70 | return 1 | |
|
71 | timestamp, error, record = json.loads(result.decode('utf-8')) | |
|
72 | results[finished] = record | |
|
73 | ||
|
74 | if not arguments['--quiet']: | |
|
75 | stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Dataset Name: {4}\n'.format( | |
|
76 | finished, | |
|
77 | job_ids, | |
|
78 | next(stats), | |
|
79 | error, | |
|
80 | record.get('name', '') if record else '', | |
|
81 | ).encode('utf-8')) | |
|
82 | ''' | |
|
83 | if log: | |
|
84 | log.write(compact_json([ | |
|
85 | timestamp, | |
|
86 | finished, | |
|
87 | error, | |
|
88 | record.get('name', '') if record else None, | |
|
89 | ]) + b'\n') | |
|
90 | ''' | |
|
91 | datapackages_path = arguments['--datapackages'] | |
|
92 | if datapackages_path: | |
|
93 | create_datapackage_change(record, filtered_urls[record.get('name', '')], datapackages_path, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure']) | |
|
94 | while expecting_number in results: | |
|
95 | record = results.pop(expecting_number) | |
|
96 | if record: | |
|
97 | jsonl_output.write(compact_json(record, sort_keys=True) + b'\n') | |
|
98 | expecting_number += 1 | |
|
99 | if 'pipe' in errors: | |
|
100 | return 1 | |
|
101 | if 'interrupt' in errors: | |
|
102 | return 2 | |
|
103 | ||
|
104 | def create_datapackage_change(record, filtered_url, base_path, stderr, apikey, host_url, insecure): | |
|
105 | resource_formats_to_ignore = ['API', 'api'] | |
|
106 | #----------------------------------------# | |
|
107 | datapackage_dir = name_no_repetition(record.get('name', ''), base_path) | |
|
108 | #----------------------------------------# | |
|
109 | os.makedirs(os.path.join(datapackage_dir, 'data')) | |
|
110 | record['path'] = datapackage_dir | |
|
111 | ||
|
112 | ckan_resources = [] | |
|
113 | for resource in record.get('resources', []): | |
|
114 | if resource['format'] in resource_formats_to_ignore: | |
|
115 | continue | |
|
116 | ||
|
117 | if not {'name': resource['name'], 'url': resource['url']} in filtered_url: | |
|
118 | continue | |
|
119 | ||
|
120 | if len(resource['url']) == 0: | |
|
121 | continue | |
|
122 | ||
|
123 | filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource') | |
|
124 | resource['path'] = os.path.join(datapackage_dir, 'data', filename) | |
|
125 | ||
|
126 | cres = create_resource_change(resource, stderr, apikey, host_url, insecure) | |
|
127 | if not cres: | |
|
128 | continue | |
|
129 | ''' | |
|
130 | #----------------------------------------# | |
|
131 | dres = {'path': os.path.join('data', filename), | |
|
132 | 'description': cres.get('description', ''), | |
|
133 | 'format': cres.get('format', ''), | |
|
134 | 'name': cres.get('name', ''), | |
|
135 | 'title': cres.get('name', '').title()} | |
|
136 | #----------------------------------------# | |
|
137 | populate_schema_from_datastore(cres, dres) | |
|
138 | ''' | |
|
139 | ckan_resources.append(resource) | |
|
140 | ||
|
141 | dataset = dict(record, resources=ckan_resources) | |
|
142 | datapackage = dataset_to_datapackage_change(dataset) | |
|
143 | ||
|
144 | json_path = os.path.join(datapackage_dir, 'datapackage.json') | |
|
145 | with open(json_path, 'wb') as out: | |
|
146 | out.write(pretty_json(datapackage)) | |
|
147 | ||
|
148 | return datapackage_dir, datapackage, json_path | |
|
149 | ||
|
150 | def create_resource_change(resource, stderr, apikey, host_url, insecure): | |
|
151 | # ---------- REPLACE URL --------- # | |
|
152 | if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe': | |
|
153 | resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc, | |
|
154 | urlparse(host_url).scheme + '://' + urlparse(host_url).netloc) | |
|
155 | #----------------------------------# | |
|
156 | try: | |
|
157 | r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure) | |
|
158 | #---------------------------------------# | |
|
159 | try: | |
|
160 | r.raise_for_status() | |
|
161 | except requests.exceptions.HTTPError as e: | |
|
162 | return False | |
|
163 | #---------------------------------------# | |
|
164 | with open(resource['path'], 'wb') as f: | |
|
165 | for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE): | |
|
166 | if chunk: | |
|
167 | f.write(chunk) | |
|
168 | ||
|
169 | except requests.ConnectionError: | |
|
170 | stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8')) | |
|
171 | except requests.exceptions.RequestException as e: | |
|
172 | stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |
|
173 | except Exception as e: | |
|
174 | stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |
|
175 | return resource | |
|
176 | ||
|
177 | def dataset_to_datapackage_change(dataset_dict): | |
|
178 | dp = {'name': dataset_dict['name'], | |
|
179 | 'id': dataset_dict['id'], | |
|
180 | 'path': dataset_dict['path'], | |
|
181 | 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")} | |
|
182 | ||
|
183 | resources = dataset_dict.get('resources') | |
|
184 | if resources: | |
|
185 | dp['resources'] = [convert_to_datapackage_resource_change(r) | |
|
186 | for r in resources] | |
|
187 | return dp | |
|
188 | ||
|
189 | def convert_to_datapackage_resource_change(resource_dict): | |
|
190 | resource = {} | |
|
191 | ||
|
192 | if resource_dict.get('id'): | |
|
193 | resource['id'] = resource_dict['id'] | |
|
194 | ||
|
195 | if resource_dict.get('name'): | |
|
196 | resource['name'] = resource_dict['name'] | |
|
197 | ||
|
198 | if resource_dict.get('path'): | |
|
199 | resource['path'] = resource_dict['path'] | |
|
200 | ||
|
201 | schema = resource_dict.get('schema') | |
|
202 | if isinstance(schema, six.string_types): | |
|
203 | try: | |
|
204 | resource['schema'] = json.loads(schema) | |
|
205 | except ValueError: | |
|
206 | resource['schema'] = schema | |
|
207 | elif isinstance(schema, dict): | |
|
208 | resource['schema'] = schema | |
|
209 | ||
|
210 | return resource | |
|
211 | ||
|
212 | def name_no_repetition(name, dir, option=''): | |
|
213 | count = 0 | |
|
214 | while True: | |
|
215 | count = count + 1 | |
|
216 | if not os.path.exists(os.path.join(dir, name)): | |
|
217 | if option == 'resource': | |
|
218 | return name | |
|
219 | else: | |
|
220 | return os.path.join(dir, name) | |
|
221 | ||
|
222 | elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)): | |
|
223 | if option == 'resource': | |
|
224 | return '('+str(count)+')'+name | |
|
225 | else: | |
|
226 | return os.path.join(dir, '('+str(count)+')'+name) | |
|
227 | else: | |
|
228 | pass No newline at end of file |
|
1 | NO CONTENT: modified file, binary diff hidden |
@@ -1,6 +1,7 | |||
|
1 | 1 | from ckanapi import RemoteCKAN |
|
2 | 2 | from datetime import datetime |
|
3 | 3 | from tqdm import tqdm |
|
4 | from CKAN_JRO import logic_download | |
|
4 | 5 | #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError |
|
5 | 6 | import sys |
|
6 | 7 | import platform |
@@ -932,4 +933,90 class JROAPI(): | |||
|
932 | 933 | shutil.rmtree(dir_name[:-1]) |
|
933 | 934 | return 'NO FILES WERE DOWNLOADED' |
|
934 | 935 | else: |
|
935 | return 'FILES NOT FOUND' No newline at end of file | |
|
936 | return 'FILES NOT FOUND' | |
|
937 | ||
|
938 | def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs): | |
|
939 | ''' | |
|
940 | FINALIDAD: | |
|
941 | Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s). | |
|
942 | ||
|
943 | PARAMETROS DISPONIBLES: | |
|
944 | CONSULTAR: "GUIA DE SCRIPT.pdf" | |
|
945 | ||
|
946 | ESTRUCTURA: | |
|
947 | <access_name>.download_files_advance(id_or_name= <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...) | |
|
948 | ''' | |
|
949 | #------------------ PATH ----------------------# | |
|
950 | if isinstance(path, str): | |
|
951 | if os.path.isdir(path): | |
|
952 | if not path.endswith(os.sep): | |
|
953 | path = path + os.sep | |
|
954 | test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt' | |
|
955 | try: | |
|
956 | file_txt = open(test_txt, 'w') | |
|
957 | file_txt.close() | |
|
958 | os.remove(test_txt) | |
|
959 | except: | |
|
960 | return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path) | |
|
961 | else: | |
|
962 | return 'ERROR:: "path" does not exist' | |
|
963 | else: | |
|
964 | return 'ERROR:: "path" must be: <class "str">' | |
|
965 | ||
|
966 | #------------------ PROCESSES -----------------# | |
|
967 | if not isinstance(processes, int): | |
|
968 | return 'ERROR:: "processes" must be: <class "int">' | |
|
969 | ||
|
970 | #------------------ ID OR NAME ----------------# | |
|
971 | if isinstance(id_or_name, str): | |
|
972 | id_or_name = [id_or_name] | |
|
973 | elif isinstance(id_or_name, list): | |
|
974 | id_or_name = list(map(str, id_or_name)) | |
|
975 | else: | |
|
976 | return 'ERROR:: dataset "id_or_name" must be: <class "str" or "list">' | |
|
977 | #----------------------------------------------# | |
|
978 | arguments = { | |
|
979 | '--apikey': self.Authorization, | |
|
980 | '--ckan-user': None, | |
|
981 | '--config': None, | |
|
982 | '--datapackages': path, | |
|
983 | '--datastore-fields': False, | |
|
984 | '--get-request': False, | |
|
985 | '--insecure': not self.verify, | |
|
986 | '--log': '/home/soporte/DUMP/download.txt', | |
|
987 | '--processes': str(processes), | |
|
988 | '--quiet': False, | |
|
989 | '--remote': self.url, | |
|
990 | '--worker': False, | |
|
991 | #'--all': False, | |
|
992 | #'--gzip': False, | |
|
993 | #'--output': None, | |
|
994 | #'--max-records': None, | |
|
995 | #'--output-json': False, | |
|
996 | #'--output-jsonl': False, | |
|
997 | #'--create-only': False, | |
|
998 | #'--help': False, | |
|
999 | #'--input': None, | |
|
1000 | #'--input-json': False, | |
|
1001 | #'--start-record': '1', | |
|
1002 | #'--update-only': False, | |
|
1003 | #'--upload-logo': False, | |
|
1004 | #'--upload-resources': False, | |
|
1005 | #'--version': False, | |
|
1006 | 'ID_OR_NAME': id_or_name, | |
|
1007 | 'datasets': True, | |
|
1008 | 'dump': True, | |
|
1009 | #'ACTION_NAME': None, | |
|
1010 | #'KEY:JSON': [], | |
|
1011 | #'KEY=STRING': [], | |
|
1012 | #'KEY@FILE': [], | |
|
1013 | #'action': False, | |
|
1014 | #'delete': False, | |
|
1015 | #'groups': False, | |
|
1016 | #'load': False, | |
|
1017 | #'organizations': False, | |
|
1018 | #'related': False, | |
|
1019 | #'search': False, | |
|
1020 | #'users': False | |
|
1021 | } | |
|
1022 | return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs) No newline at end of file |
General Comments 0
You need to be logged in to leave comments.
Login now