##// END OF EJS Templates
v2.9.2 :: Add 'tqdm' - bar progress download
eynilupu -
r21:5cce8357a695
parent child
Show More
@@ -1,234 +1,236
1 1 #from ckanapi.datapackage import populate_schema_from_datastore
2 2 from ckanapi.cli import workers, dump
3 3 from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
4 4 from datetime import datetime
5 from tqdm import tqdm
5 6 import sys
6 7 import json
7 8 import os
8 9 import requests
9 10 import six
10 11
11 12 if sys.version_info.major == 3:
12 13 from urllib.parse import urlparse
13 14 else:
14 15 import urlparse
15 16
16 17 DL_CHUNK_SIZE = 100 * 1024
17 18
18 19 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
19 20 if worker_pool is None:
20 21 worker_pool = workers.worker_pool
21 22 if stdout is None:
22 23 stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
23 24 if stderr is None:
24 25 stderr = getattr(sys.stderr, 'buffer', sys.stderr)
25 26
26 27 if arguments['--worker']:
27 28 return dump.dump_things_worker(ckan, thing, arguments)
28 29 '''
29 30 log = None
30 31 if arguments['--log']:
31 32 log = open(arguments['--log'], 'a')
32 33 '''
33 34 jsonl_output = stdout
34 35 if arguments['--datapackages']:
35 36 jsonl_output = open(os.devnull, 'wb')
36 37
37 38 names = arguments['ID_OR_NAME']
38 39
39 40 if names and isinstance(names[0], dict):
40 41 names = [rec.get('name',rec.get('id')) for rec in names]
41 42 '''
42 43 if arguments['--datapackages']:
43 44 arguments['--datastore-fields'] = True
44 45 '''
45 46 #----------------------------#
46 47 filtered_urls = {}
47 48 for val in names:
48 49 try:
49 50 filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs)
50 51 except:
51 52 _, exc_value, _ = sys.exc_info()
52 53 return exc_value
53 54 #----------------------------#
54 55
55 56 cmd = dump._worker_command_line(thing, arguments)
56 57 processes = int(arguments['--processes'])
57 58 if hasattr(ckan, 'parallel_limit'):
58 59 processes = min(processes, ckan.parallel_limit)
59 60 stats = completion_stats(processes)
60 61 pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
61 62
62 63 results = {}
63 64 expecting_number = 0
64 65 with quiet_int_pipe() as errors:
65 66 for job_ids, finished, result in pool:
66 67 if not result:
67 68 return 1
68 69 timestamp, error, record = json.loads(result.decode('utf-8'))
69 70 results[finished] = record
70 71
71 72 #----------------------------------------#
72 73 datapackages_path = arguments['--datapackages']
73 74 datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path)
74 75 #----------------------------------------#
75 76 if not arguments['--quiet']:
76 77 stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format(
77 78 finished,
78 79 job_ids,
79 80 next(stats),
80 81 error,
81 82 datapackage_dir,
82 83 record.get('name', '') if record else '',
83 84 ).encode('utf-8'))
84 85 '''
85 86 if log:
86 87 log.write(compact_json([
87 88 timestamp,
88 89 finished,
89 90 error,
90 91 record.get('name', '') if record else None,
91 92 ]) + b'\n')
92 93 '''
93 94 if datapackages_path:
94 95 try:
95 96 filter_url = filtered_urls[record.get('name', '')]
96 97 except:
97 98 filter_url = filtered_urls[record.get('id', '')]
98 99 create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
99 100
100 101 while expecting_number in results:
101 102 record = results.pop(expecting_number)
102 103 if record:
103 104 jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
104 105 expecting_number += 1
105 106 if 'pipe' in errors:
106 107 return 1
107 108 if 'interrupt' in errors:
108 109 return 2
109 110
110 111 def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure):
111 112 resource_formats_to_ignore = ['API', 'api']
112 113
113 114 os.makedirs(os.path.join(datapackage_dir, 'data'))
114 115 record['path'] = datapackage_dir
115 116
116 117 ckan_resources = []
117 for resource in record.get('resources', []):
118 for resource in tqdm(record.get('resources', []), unit_scale=True):
119 #for resource in record.get('resources', []):
118 120 if resource['format'] in resource_formats_to_ignore:
119 121 continue
120 122
121 123 if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
122 124 continue
123 125
124 126 if len(resource['url']) == 0:
125 127 continue
126 128
127 129 filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
128 130 resource['path'] = os.path.join(datapackage_dir, 'data', filename)
129 131
130 132 cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
131 133 if not cres:
132 134 continue
133 135 '''
134 136 #----------------------------------------#
135 137 dres = {'path': os.path.join('data', filename),
136 138 'description': cres.get('description', ''),
137 139 'format': cres.get('format', ''),
138 140 'name': cres.get('name', ''),
139 141 'title': cres.get('name', '').title()}
140 142 #----------------------------------------#
141 143 populate_schema_from_datastore(cres, dres)
142 144 '''
143 145 ckan_resources.append(resource)
144 146
145 147 dataset = dict(record, resources=ckan_resources)
146 148 datapackage = dataset_to_datapackage_change(dataset)
147 149
148 150 json_path = os.path.join(datapackage_dir, 'datapackage.json')
149 151 with open(json_path, 'wb') as out:
150 152 out.write(pretty_json(datapackage))
151 153
152 154 return datapackage_dir, datapackage, json_path
153 155
154 156 def create_resource_change(resource, stderr, apikey, host_url, insecure):
155 157 # ---------- REPLACE URL --------- #
156 158 if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
157 159 resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
158 160 urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
159 161 #----------------------------------#
160 162 try:
161 163 r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
162 164 #---------------------------------------#
163 165 try:
164 166 r.raise_for_status()
165 167 except requests.exceptions.HTTPError as e:
166 168 return False
167 169 #---------------------------------------#
168 170 with open(resource['path'], 'wb') as f:
169 171 for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
170 172 if chunk:
171 173 f.write(chunk)
172 174
173 175 except requests.ConnectionError:
174 176 stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
175 177 except requests.exceptions.RequestException as e:
176 178 stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
177 179 except Exception as e:
178 180 stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
179 181 return resource
180 182
181 183 def dataset_to_datapackage_change(dataset_dict):
182 184 dp = {'name': dataset_dict['name'],
183 185 'id': dataset_dict['id'],
184 186 'path': dataset_dict['path'],
185 187 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
186 188
187 189 resources = dataset_dict.get('resources')
188 190 if resources:
189 191 dp['resources'] = [convert_to_datapackage_resource_change(r)
190 192 for r in resources]
191 193 return dp
192 194
193 195 def convert_to_datapackage_resource_change(resource_dict):
194 196 resource = {}
195 197
196 198 if resource_dict.get('id'):
197 199 resource['id'] = resource_dict['id']
198 200
199 201 if resource_dict.get('name'):
200 202 resource['name'] = resource_dict['name']
201 203
202 204 if resource_dict.get('path'):
203 205 if os.path.isfile(resource_dict['path']):
204 206 resource['path'] = resource_dict['path']
205 207 else:
206 208 resource['url'] = resource_dict['url']
207 209
208 210 schema = resource_dict.get('schema')
209 211 if isinstance(schema, six.string_types):
210 212 try:
211 213 resource['schema'] = json.loads(schema)
212 214 except ValueError:
213 215 resource['schema'] = schema
214 216 elif isinstance(schema, dict):
215 217 resource['schema'] = schema
216 218 return resource
217 219
218 220 def name_no_repetition(name, dir, option=''):
219 221 count = 0
220 222 while True:
221 223 count = count + 1
222 224 if not os.path.exists(os.path.join(dir, name)):
223 225 if option == 'resource':
224 226 return name
225 227 else:
226 228 return os.path.join(dir, name)
227 229
228 230 elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
229 231 if option == 'resource':
230 232 return '('+str(count)+')'+name
231 233 else:
232 234 return os.path.join(dir, '('+str(count)+')'+name)
233 235 else:
234 236 pass No newline at end of file
@@ -1,16 +1,17
1 1 # encoding: utf-8
2 2 from setuptools import setup
3 3
4 4 setup(
5 5 name = "jrodb",
6 6 version = "2.9.2.0",
7 7 description = "Data Repository - JRO",
8 8 author = "Edson Ynilupu Mattos",
9 9 author_email = "eynilupu@igp.gob.pe",
10 10 url = "http://intranet.igp.gob.pe:8082/DATABASES/ckanext-jro/api-cliente",
11 11 packages = ["jrodb"],
12 12 install_requires = [
13 13 "ckanapi==4.7",
14 "requests"
14 "requests",
15 "tqdm"
15 16 ],
16 17 ) No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now