##// END OF EJS Templates
v2.9.2 :: Add 'download_files_advance' API function
eynilupu -
r12:76348ccca4d9
parent child
Show More
@@ -0,0 +1,228
1 #from ckanapi.datapackage import populate_schema_from_datastore
2 from ckanapi.cli import workers, dump
3 from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
4 from datetime import datetime
5 import sys
6 import json
7 import os
8 import requests
9 import six
10
11 if sys.version_info.major == 3:
12 from urllib.parse import urlparse
13 else:
14 import urlparse
15
16 DL_CHUNK_SIZE = 100 * 1024
17
18 print()
19
20 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
21 if worker_pool is None:
22 worker_pool = workers.worker_pool
23 if stdout is None:
24 stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
25 if stderr is None:
26 stderr = getattr(sys.stderr, 'buffer', sys.stderr)
27
28 if arguments['--worker']:
29 return dump.dump_things_worker(ckan, thing, arguments)
30 '''
31 log = None
32 if arguments['--log']:
33 log = open(arguments['--log'], 'a')
34 '''
35 jsonl_output = stdout
36 if arguments['--datapackages']:
37 jsonl_output = open(os.devnull, 'wb')
38
39 names = arguments['ID_OR_NAME']
40
41 if names and isinstance(names[0], dict):
42 names = [rec.get('name',rec.get('id')) for rec in names]
43 '''
44 if arguments['--datapackages']:
45 arguments['--datastore-fields'] = True
46 '''
47 #----------------------------#
48 filtered_urls = {}
49 for name in names:
50 try:
51 response = getattr(ckan.action, 'url_resources')(id=name, **kwargs)
52 except:
53 _, exc_value, _ = sys.exc_info()
54 return exc_value
55 filtered_urls[name] = response
56 #----------------------------#
57
58 cmd = dump._worker_command_line(thing, arguments)
59 processes = int(arguments['--processes'])
60 if hasattr(ckan, 'parallel_limit'):
61 processes = min(processes, ckan.parallel_limit)
62 stats = completion_stats(processes)
63 pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
64
65 results = {}
66 expecting_number = 0
67 with quiet_int_pipe() as errors:
68 for job_ids, finished, result in pool:
69 if not result:
70 return 1
71 timestamp, error, record = json.loads(result.decode('utf-8'))
72 results[finished] = record
73
74 if not arguments['--quiet']:
75 stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Dataset Name: {4}\n'.format(
76 finished,
77 job_ids,
78 next(stats),
79 error,
80 record.get('name', '') if record else '',
81 ).encode('utf-8'))
82 '''
83 if log:
84 log.write(compact_json([
85 timestamp,
86 finished,
87 error,
88 record.get('name', '') if record else None,
89 ]) + b'\n')
90 '''
91 datapackages_path = arguments['--datapackages']
92 if datapackages_path:
93 create_datapackage_change(record, filtered_urls[record.get('name', '')], datapackages_path, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
94 while expecting_number in results:
95 record = results.pop(expecting_number)
96 if record:
97 jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
98 expecting_number += 1
99 if 'pipe' in errors:
100 return 1
101 if 'interrupt' in errors:
102 return 2
103
104 def create_datapackage_change(record, filtered_url, base_path, stderr, apikey, host_url, insecure):
105 resource_formats_to_ignore = ['API', 'api']
106 #----------------------------------------#
107 datapackage_dir = name_no_repetition(record.get('name', ''), base_path)
108 #----------------------------------------#
109 os.makedirs(os.path.join(datapackage_dir, 'data'))
110 record['path'] = datapackage_dir
111
112 ckan_resources = []
113 for resource in record.get('resources', []):
114 if resource['format'] in resource_formats_to_ignore:
115 continue
116
117 if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
118 continue
119
120 if len(resource['url']) == 0:
121 continue
122
123 filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
124 resource['path'] = os.path.join(datapackage_dir, 'data', filename)
125
126 cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
127 if not cres:
128 continue
129 '''
130 #----------------------------------------#
131 dres = {'path': os.path.join('data', filename),
132 'description': cres.get('description', ''),
133 'format': cres.get('format', ''),
134 'name': cres.get('name', ''),
135 'title': cres.get('name', '').title()}
136 #----------------------------------------#
137 populate_schema_from_datastore(cres, dres)
138 '''
139 ckan_resources.append(resource)
140
141 dataset = dict(record, resources=ckan_resources)
142 datapackage = dataset_to_datapackage_change(dataset)
143
144 json_path = os.path.join(datapackage_dir, 'datapackage.json')
145 with open(json_path, 'wb') as out:
146 out.write(pretty_json(datapackage))
147
148 return datapackage_dir, datapackage, json_path
149
150 def create_resource_change(resource, stderr, apikey, host_url, insecure):
151 # ---------- REPLACE URL --------- #
152 if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
153 resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
154 urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
155 #----------------------------------#
156 try:
157 r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
158 #---------------------------------------#
159 try:
160 r.raise_for_status()
161 except requests.exceptions.HTTPError as e:
162 return False
163 #---------------------------------------#
164 with open(resource['path'], 'wb') as f:
165 for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
166 if chunk:
167 f.write(chunk)
168
169 except requests.ConnectionError:
170 stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
171 except requests.exceptions.RequestException as e:
172 stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
173 except Exception as e:
174 stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
175 return resource
176
177 def dataset_to_datapackage_change(dataset_dict):
178 dp = {'name': dataset_dict['name'],
179 'id': dataset_dict['id'],
180 'path': dataset_dict['path'],
181 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
182
183 resources = dataset_dict.get('resources')
184 if resources:
185 dp['resources'] = [convert_to_datapackage_resource_change(r)
186 for r in resources]
187 return dp
188
189 def convert_to_datapackage_resource_change(resource_dict):
190 resource = {}
191
192 if resource_dict.get('id'):
193 resource['id'] = resource_dict['id']
194
195 if resource_dict.get('name'):
196 resource['name'] = resource_dict['name']
197
198 if resource_dict.get('path'):
199 resource['path'] = resource_dict['path']
200
201 schema = resource_dict.get('schema')
202 if isinstance(schema, six.string_types):
203 try:
204 resource['schema'] = json.loads(schema)
205 except ValueError:
206 resource['schema'] = schema
207 elif isinstance(schema, dict):
208 resource['schema'] = schema
209
210 return resource
211
212 def name_no_repetition(name, dir, option=''):
213 count = 0
214 while True:
215 count = count + 1
216 if not os.path.exists(os.path.join(dir, name)):
217 if option == 'resource':
218 return name
219 else:
220 return os.path.join(dir, name)
221
222 elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
223 if option == 'resource':
224 return '('+str(count)+')'+name
225 else:
226 return os.path.join(dir, '('+str(count)+')'+name)
227 else:
228 pass No newline at end of file
1 NO CONTENT: modified file, binary diff hidden
NO CONTENT: modified file, binary diff hidden
@@ -1,6 +1,7
1 from ckanapi import RemoteCKAN
1 from ckanapi import RemoteCKAN
2 from datetime import datetime
2 from datetime import datetime
3 from tqdm import tqdm
3 from tqdm import tqdm
4 from CKAN_JRO import logic_download
4 #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError
5 #from ckanapi.errors import NotAuthorized, NotFound, ValidationError, SearchQueryError, SearchError, CKANAPIError, ServerIncompatibleError
5 import sys
6 import sys
6 import platform
7 import platform
@@ -932,4 +933,90 class JROAPI():
932 shutil.rmtree(dir_name[:-1])
933 shutil.rmtree(dir_name[:-1])
933 return 'NO FILES WERE DOWNLOADED'
934 return 'NO FILES WERE DOWNLOADED'
934 else:
935 else:
935 return 'FILES NOT FOUND' No newline at end of file
936 return 'FILES NOT FOUND'
937
938 def download_files_advance(self, id_or_name, processes=1, path=os.path.expanduser("~"), **kwargs):
939 '''
940 FINALIDAD:
941 Funcion personalizada avanzada para la descarga de archivos existentes de un(os) dataset(s).
942
943 PARAMETROS DISPONIBLES:
944 CONSULTAR: "GUIA DE SCRIPT.pdf"
945
946 ESTRUCTURA:
947 <access_name>.download_files_advance(id_or_name= <class 'str' or 'list'>, param_1 = <class 'param_1'>, ...)
948 '''
949 #------------------ PATH ----------------------#
950 if isinstance(path, str):
951 if os.path.isdir(path):
952 if not path.endswith(os.sep):
953 path = path + os.sep
954 test_txt = path + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")+'.txt'
955 try:
956 file_txt = open(test_txt, 'w')
957 file_txt.close()
958 os.remove(test_txt)
959 except:
960 return 'ERROR:: Access denied, you are not authorized to write files: "%s"' % (path)
961 else:
962 return 'ERROR:: "path" does not exist'
963 else:
964 return 'ERROR:: "path" must be: <class "str">'
965
966 #------------------ PROCESSES -----------------#
967 if not isinstance(processes, int):
968 return 'ERROR:: "processes" must be: <class "int">'
969
970 #------------------ ID OR NAME ----------------#
971 if isinstance(id_or_name, str):
972 id_or_name = [id_or_name]
973 elif isinstance(id_or_name, list):
974 id_or_name = list(map(str, id_or_name))
975 else:
976 return 'ERROR:: dataset "id_or_name" must be: <class "str" or "list">'
977 #----------------------------------------------#
978 arguments = {
979 '--apikey': self.Authorization,
980 '--ckan-user': None,
981 '--config': None,
982 '--datapackages': path,
983 '--datastore-fields': False,
984 '--get-request': False,
985 '--insecure': not self.verify,
986 '--log': '/home/soporte/DUMP/download.txt',
987 '--processes': str(processes),
988 '--quiet': False,
989 '--remote': self.url,
990 '--worker': False,
991 #'--all': False,
992 #'--gzip': False,
993 #'--output': None,
994 #'--max-records': None,
995 #'--output-json': False,
996 #'--output-jsonl': False,
997 #'--create-only': False,
998 #'--help': False,
999 #'--input': None,
1000 #'--input-json': False,
1001 #'--start-record': '1',
1002 #'--update-only': False,
1003 #'--upload-logo': False,
1004 #'--upload-resources': False,
1005 #'--version': False,
1006 'ID_OR_NAME': id_or_name,
1007 'datasets': True,
1008 'dump': True,
1009 #'ACTION_NAME': None,
1010 #'KEY:JSON': [],
1011 #'KEY=STRING': [],
1012 #'KEY@FILE': [],
1013 #'action': False,
1014 #'delete': False,
1015 #'groups': False,
1016 #'load': False,
1017 #'organizations': False,
1018 #'related': False,
1019 #'search': False,
1020 #'users': False
1021 }
1022 return logic_download.dump_things_change(self.ckan, 'datasets', arguments, **kwargs) No newline at end of file
@@ -7,6 +7,6 setup(
7 description = "Data Repository - JRO",
7 description = "Data Repository - JRO",
8 author = "Edson Ynilupu Mattos",
8 author = "Edson Ynilupu Mattos",
9 author_email = "eynilupu@igp.gob.pe",
9 author_email = "eynilupu@igp.gob.pe",
10 url = "",
10 url = "http://intranet.igp.gob.pe:8082/DATABASES/ckanext-jro/api-cliente",
11 packages = ["CKAN_JRO"]
11 packages = ["CKAN_JRO"]
12 ) No newline at end of file
12 )
General Comments 0
You need to be logged in to leave comments. Login now