@@ -0,0 +1,228 | |||||
|
1 | #from ckanapi.datapackage import populate_schema_from_datastore | |||
|
2 | from ckanapi.cli import workers, dump | |||
|
3 | from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe | |||
|
4 | from datetime import datetime | |||
|
5 | import sys | |||
|
6 | import json | |||
|
7 | import os | |||
|
8 | import requests | |||
|
9 | import six | |||
|
10 | ||||
|
11 | if sys.version_info.major == 3: | |||
|
12 | from urllib.parse import urlparse | |||
|
13 | else: | |||
|
14 | import urlparse | |||
|
15 | ||||
|
16 | DL_CHUNK_SIZE = 100 * 1024 | |||
|
17 | ||||
|
18 | print() | |||
|
19 | ||||
|
20 | def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs): | |||
|
21 | if worker_pool is None: | |||
|
22 | worker_pool = workers.worker_pool | |||
|
23 | if stdout is None: | |||
|
24 | stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__) | |||
|
25 | if stderr is None: | |||
|
26 | stderr = getattr(sys.stderr, 'buffer', sys.stderr) | |||
|
27 | ||||
|
28 | if arguments['--worker']: | |||
|
29 | return dump.dump_things_worker(ckan, thing, arguments) | |||
|
30 | ''' | |||
|
31 | log = None | |||
|
32 | if arguments['--log']: | |||
|
33 | log = open(arguments['--log'], 'a') | |||
|
34 | ''' | |||
|
35 | jsonl_output = stdout | |||
|
36 | if arguments['--datapackages']: | |||
|
37 | jsonl_output = open(os.devnull, 'wb') | |||
|
38 | ||||
|
39 | names = arguments['ID_OR_NAME'] | |||
|
40 | ||||
|
41 | if names and isinstance(names[0], dict): | |||
|
42 | names = [rec.get('name',rec.get('id')) for rec in names] | |||
|
43 | ''' | |||
|
44 | if arguments['--datapackages']: | |||
|
45 | arguments['--datastore-fields'] = True | |||
|
46 | ''' | |||
|
47 | #----------------------------# | |||
|
48 | filtered_urls = {} | |||
|
49 | for name in names: | |||
|
50 | try: | |||
|
51 | response = getattr(ckan.action, 'url_resources')(id=name, **kwargs) | |||
|
52 | except: | |||
|
53 | _, exc_value, _ = sys.exc_info() | |||
|
54 | return exc_value | |||
|
55 | filtered_urls[name] = response | |||
|
56 | #----------------------------# | |||
|
57 | ||||
|
58 | cmd = dump._worker_command_line(thing, arguments) | |||
|
59 | processes = int(arguments['--processes']) | |||
|
60 | if hasattr(ckan, 'parallel_limit'): | |||
|
61 | processes = min(processes, ckan.parallel_limit) | |||
|
62 | stats = completion_stats(processes) | |||
|
63 | pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names)) | |||
|
64 | ||||
|
65 | results = {} | |||
|
66 | expecting_number = 0 | |||
|
67 | with quiet_int_pipe() as errors: | |||
|
68 | for job_ids, finished, result in pool: | |||
|
69 | if not result: | |||
|
70 | return 1 | |||
|
71 | timestamp, error, record = json.loads(result.decode('utf-8')) | |||
|
72 | results[finished] = record | |||
|
73 | ||||
|
74 | if not arguments['--quiet']: | |||
|
75 | stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Dataset Name: {4}\n'.format( | |||
|
76 | finished, | |||
|
77 | job_ids, | |||
|
78 | next(stats), | |||
|
79 | error, | |||
|
80 | record.get('name', '') if record else '', | |||
|
81 | ).encode('utf-8')) | |||
|
82 | ''' | |||
|
83 | if log: | |||
|
84 | log.write(compact_json([ | |||
|
85 | timestamp, | |||
|
86 | finished, | |||
|
87 | error, | |||
|
88 | record.get('name', '') if record else None, | |||
|
89 | ]) + b'\n') | |||
|
90 | ''' | |||
|
91 | datapackages_path = arguments['--datapackages'] | |||
|
92 | if datapackages_path: | |||
|
93 | create_datapackage_change(record, filtered_urls[record.get('name', '')], datapackages_path, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure']) | |||
|
94 | while expecting_number in results: | |||
|
95 | record = results.pop(expecting_number) | |||
|
96 | if record: | |||
|
97 | jsonl_output.write(compact_json(record, sort_keys=True) + b'\n') | |||
|
98 | expecting_number += 1 | |||
|
99 | if 'pipe' in errors: | |||
|
100 | return 1 | |||
|
101 | if 'interrupt' in errors: | |||
|
102 | return 2 | |||
|
103 | ||||
|
104 | def create_datapackage_change(record, filtered_url, base_path, stderr, apikey, host_url, insecure): | |||
|
105 | resource_formats_to_ignore = ['API', 'api'] | |||
|
106 | #----------------------------------------# | |||
|
107 | datapackage_dir = name_no_repetition(record.get('name', ''), base_path) | |||
|
108 | #----------------------------------------# | |||
|
109 | os.makedirs(os.path.join(datapackage_dir, 'data')) | |||
|
110 | record['path'] = datapackage_dir | |||
|
111 | ||||
|
112 | ckan_resources = [] | |||
|
113 | for resource in record.get('resources', []): | |||
|
114 | if resource['format'] in resource_formats_to_ignore: | |||
|
115 | continue | |||
|
116 | ||||
|
117 | if not {'name': resource['name'], 'url': resource['url']} in filtered_url: | |||
|
118 | continue | |||
|
119 | ||||
|
120 | if len(resource['url']) == 0: | |||
|
121 | continue | |||
|
122 | ||||
|
123 | filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource') | |||
|
124 | resource['path'] = os.path.join(datapackage_dir, 'data', filename) | |||
|
125 | ||||
|
126 | cres = create_resource_change(resource, stderr, apikey, host_url, insecure) | |||
|
127 | if not cres: | |||
|
128 | continue | |||
|
129 | ''' | |||
|
130 | #----------------------------------------# | |||
|
131 | dres = {'path': os.path.join('data', filename), | |||
|
132 | 'description': cres.get('description', ''), | |||
|
133 | 'format': cres.get('format', ''), | |||
|
134 | 'name': cres.get('name', ''), | |||
|
135 | 'title': cres.get('name', '').title()} | |||
|
136 | #----------------------------------------# | |||
|
137 | populate_schema_from_datastore(cres, dres) | |||
|
138 | ''' | |||
|
139 | ckan_resources.append(resource) | |||
|
140 | ||||
|
141 | dataset = dict(record, resources=ckan_resources) | |||
|
142 | datapackage = dataset_to_datapackage_change(dataset) | |||
|
143 | ||||
|
144 | json_path = os.path.join(datapackage_dir, 'datapackage.json') | |||
|
145 | with open(json_path, 'wb') as out: | |||
|
146 | out.write(pretty_json(datapackage)) | |||
|
147 | ||||
|
148 | return datapackage_dir, datapackage, json_path | |||
|
149 | ||||
|
150 | def create_resource_change(resource, stderr, apikey, host_url, insecure): | |||
|
151 | # ---------- REPLACE URL --------- # | |||
|
152 | if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe': | |||
|
153 | resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc, | |||
|
154 | urlparse(host_url).scheme + '://' + urlparse(host_url).netloc) | |||
|
155 | #----------------------------------# | |||
|
156 | try: | |||
|
157 | r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure) | |||
|
158 | #---------------------------------------# | |||
|
159 | try: | |||
|
160 | r.raise_for_status() | |||
|
161 | except requests.exceptions.HTTPError as e: | |||
|
162 | return False | |||
|
163 | #---------------------------------------# | |||
|
164 | with open(resource['path'], 'wb') as f: | |||
|
165 | for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE): | |||
|
166 | if chunk: | |||
|
167 | f.write(chunk) | |||
|
168 | ||||
|
169 | except requests.ConnectionError: | |||
|
170 | stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8')) | |||
|
171 | except requests.exceptions.RequestException as e: | |||
|
172 | stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |||
|
173 | except Exception as e: | |||
|
174 | stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8')) | |||
|
175 | return resource | |||
|
176 | ||||
|
177 | def dataset_to_datapackage_change(dataset_dict): | |||
|
178 | dp = {'name': dataset_dict['name'], | |||
|
179 | 'id': dataset_dict['id'], | |||
|
180 | 'path': dataset_dict['path'], | |||
|
181 | 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")} | |||
|
182 | ||||
|
183 | resources = dataset_dict.get('resources') | |||
|
184 | if resources: | |||
|
185 | dp['resources'] = [convert_to_datapackage_resource_change(r) | |||
|
186 | for r in resources] | |||
|
187 | return dp | |||
|
188 | ||||
|
189 | def convert_to_datapackage_resource_change(resource_dict): | |||
|
190 | resource = {} | |||
|
191 | ||||
|
192 | if resource_dict.get('id'): | |||
|
193 | resource['id'] = resource_dict['id'] | |||
|
194 | ||||
|
195 | if resource_dict.get('name'): | |||
|
196 | resource['name'] = resource_dict['name'] | |||
|
197 | ||||
|
198 | if resource_dict.get('path'): | |||
|
199 | resource['path'] = resource_dict['path'] | |||
|
200 | ||||
|
201 | schema = resource_dict.get('schema') | |||
|
202 | if isinstance(schema, six.string_types): | |||
|
203 | try: | |||
|
204 | resource['schema'] = json.loads(schema) | |||
|
205 | except ValueError: | |||
|
206 | resource['schema'] = schema | |||
|
207 | elif isinstance(schema, dict): | |||
|
208 | resource['schema'] = schema | |||
|
209 | ||||
|
210 | return resource | |||
|
211 | ||||
|
212 | def name_no_repetition(name, dir, option=''): | |||
|
213 | count = 0 | |||
|
214 | while True: | |||
|
215 | count = count + 1 | |||
|
216 | if not os.path.exists(os.path.join(dir, name)): | |||
|
217 | if option == 'resource': | |||
|
218 | return name | |||
|
219 | else: | |||
|
220 | return os.path.join(dir, name) | |||
|
221 | ||||
|
222 | elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)): | |||
|
223 | if option == 'resource': | |||
|
224 | return '('+str(count)+')'+name | |||
|
225 | else: | |||
|
226 | return os.path.join(dir, '('+str(count)+')'+name) | |||
|
227 | else: | |||
|
228 | pass No newline at end of file |
1 | NO CONTENT: modified file, binary diff hidden |
|
NO CONTENT: modified file, binary diff hidden |
@@ -1,6 +1,7 | |||||
1 | from ckanapi import RemoteCKAN |
|
1 | from ckanapi import RemoteCKAN | |
2 | from datetime import datetime |
|
2 | from datetime import datetime | |
3 | from tqdm import tqdm |
|
3 | from tqdm import tqdm | |
|
4 | from CKAN_JRO import logic_download | |||
4 | #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError |
|
5 | #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError | |
5 | import sys |
|
6 | import sys | |
6 | import platform |
|
7 | import platform | |
@@ -932,4 +933,90 class JROAPI(): | |||||
932 | shutil.rmtree(dir_name[:-1]) |
|
933 | shutil.rmtree(dir_name[:-1]) | |
933 | return 'NO FILES WERE DOWNLOADED' |
|
934 | return 'NO FILES WERE DOWNLOADED' | |
934 | else: |
|
935 | else: | |
935 | return 'FILES NOT FOUND' No newline at end of file |
|
936 | return 'FILES NOT FOUND' | |
|
937 | ||||
|
938 | def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs): | |||
|
939 | ''' | |||
|
940 | FINALIDAD: | |||
|
941 | Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s). | |||
|
942 | ||||
|
943 | PARAMETROS DISPONIBLES: | |||
|
944 | CONSULTAR: "GUIA DE SCRIPT.pdf" | |||
|
945 | ||||
|
946 | ESTRUCTURA: | |||
|
947 | <access_name>.download_files_advance(id_or_name= <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...) | |||
|
948 | ''' | |||
|
949 | #------------------ PATH ----------------------# | |||
|
950 | if isinstance(path, str): | |||
|
951 | if os.path.isdir(path): | |||
|
952 | if not path.endswith(os.sep): | |||
|
953 | path = path + os.sep | |||
|
954 | test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt' | |||
|
955 | try: | |||
|
956 | file_txt = open(test_txt, 'w') | |||
|
957 | file_txt.close() | |||
|
958 | os.remove(test_txt) | |||
|
959 | except: | |||
|
960 | return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path) | |||
|
961 | else: | |||
|
962 | return 'ERROR:: "path" does not exist' | |||
|
963 | else: | |||
|
964 | return 'ERROR:: "path" must be: <class "str">' | |||
|
965 | ||||
|
966 | #------------------ PROCESSES -----------------# | |||
|
967 | if not isinstance(processes, int): | |||
|
968 | return 'ERROR:: "processes" must be: <class "int">' | |||
|
969 | ||||
|
970 | #------------------ ID OR NAME ----------------# | |||
|
971 | if isinstance(id_or_name, str): | |||
|
972 | id_or_name = [id_or_name] | |||
|
973 | elif isinstance(id_or_name, list): | |||
|
974 | id_or_name = list(map(str, id_or_name)) | |||
|
975 | else: | |||
|
976 | return 'ERROR:: dataset "id_or_name" must be: <class "str" or "list">' | |||
|
977 | #----------------------------------------------# | |||
|
978 | arguments = { | |||
|
979 | '--apikey': self.Authorization, | |||
|
980 | '--ckan-user': None, | |||
|
981 | '--config': None, | |||
|
982 | '--datapackages': path, | |||
|
983 | '--datastore-fields': False, | |||
|
984 | '--get-request': False, | |||
|
985 | '--insecure': not self.verify, | |||
|
986 | '--log': '/home/soporte/DUMP/download.txt', | |||
|
987 | '--processes': str(processes), | |||
|
988 | '--quiet': False, | |||
|
989 | '--remote': self.url, | |||
|
990 | '--worker': False, | |||
|
991 | #'--all': False, | |||
|
992 | #'--gzip': False, | |||
|
993 | #'--output': None, | |||
|
994 | #'--max-records': None, | |||
|
995 | #'--output-json': False, | |||
|
996 | #'--output-jsonl': False, | |||
|
997 | #'--create-only': False, | |||
|
998 | #'--help': False, | |||
|
999 | #'--input': None, | |||
|
1000 | #'--input-json': False, | |||
|
1001 | #'--start-record': '1', | |||
|
1002 | #'--update-only': False, | |||
|
1003 | #'--upload-logo': False, | |||
|
1004 | #'--upload-resources': False, | |||
|
1005 | #'--version': False, | |||
|
1006 | 'ID_OR_NAME': id_or_name, | |||
|
1007 | 'datasets': True, | |||
|
1008 | 'dump': True, | |||
|
1009 | #'ACTION_NAME': None, | |||
|
1010 | #'KEY:JSON': [], | |||
|
1011 | #'KEY=STRING': [], | |||
|
1012 | #'KEY@FILE': [], | |||
|
1013 | #'action': False, | |||
|
1014 | #'delete': False, | |||
|
1015 | #'groups': False, | |||
|
1016 | #'load': False, | |||
|
1017 | #'organizations': False, | |||
|
1018 | #'related': False, | |||
|
1019 | #'search': False, | |||
|
1020 | #'users': False | |||
|
1021 | } | |||
|
1022 | return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs) No newline at end of file |
@@ -7,6 +7,6 setup( | |||||
7 | description = "Data Repository - JRO", |
|
7 | description = "Data Repository - JRO", | |
8 | author = "Edson Ynilupu Mattos", |
|
8 | author = "Edson Ynilupu Mattos", | |
9 | author_email = "eynilupu@igp.gob.pe", |
|
9 | author_email = "eynilupu@igp.gob.pe", | |
10 | url = "", |
|
10 | url = "http://intranet.igp.gob.pe:8082/DATABASES/ckanext-jro/api-cliente", | |
11 | packages = ["CKAN_JRO"] |
|
11 | packages = ["CKAN_JRO"] | |
12 | ) No newline at end of file |
|
12 | ) |
General Comments 0
You need to be logged in to leave comments.
Login now