##// END OF EJS Templates
v2.9.2 :: Add 'tqdm' - bar progress download
eynilupu -
r21:5cce8357a695
parent child
Show More
@@ -1,234 +1,236
1 #from ckanapi.datapackage import populate_schema_from_datastore
1 #from ckanapi.datapackage import populate_schema_from_datastore
2 from ckanapi.cli import workers, dump
2 from ckanapi.cli import workers, dump
3 from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
3 from ckanapi.cli.utils import pretty_json, completion_stats, compact_json, quiet_int_pipe
4 from datetime import datetime
4 from datetime import datetime
5 from tqdm import tqdm
5 import sys
6 import sys
6 import json
7 import json
7 import os
8 import os
8 import requests
9 import requests
9 import six
10 import six
10
11
11 if sys.version_info.major == 3:
12 if sys.version_info.major == 3:
12 from urllib.parse import urlparse
13 from urllib.parse import urlparse
13 else:
14 else:
14 import urlparse
15 import urlparse
15
16
16 DL_CHUNK_SIZE = 100 * 1024
17 DL_CHUNK_SIZE = 100 * 1024
17
18
18 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
19 def dump_things_change(ckan, thing, arguments, worker_pool=None, stdout=None, stderr=None, **kwargs):
19 if worker_pool is None:
20 if worker_pool is None:
20 worker_pool = workers.worker_pool
21 worker_pool = workers.worker_pool
21 if stdout is None:
22 if stdout is None:
22 stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
23 stdout = getattr(sys.__stdout__, 'buffer', sys.__stdout__)
23 if stderr is None:
24 if stderr is None:
24 stderr = getattr(sys.stderr, 'buffer', sys.stderr)
25 stderr = getattr(sys.stderr, 'buffer', sys.stderr)
25
26
26 if arguments['--worker']:
27 if arguments['--worker']:
27 return dump.dump_things_worker(ckan, thing, arguments)
28 return dump.dump_things_worker(ckan, thing, arguments)
28 '''
29 '''
29 log = None
30 log = None
30 if arguments['--log']:
31 if arguments['--log']:
31 log = open(arguments['--log'], 'a')
32 log = open(arguments['--log'], 'a')
32 '''
33 '''
33 jsonl_output = stdout
34 jsonl_output = stdout
34 if arguments['--datapackages']:
35 if arguments['--datapackages']:
35 jsonl_output = open(os.devnull, 'wb')
36 jsonl_output = open(os.devnull, 'wb')
36
37
37 names = arguments['ID_OR_NAME']
38 names = arguments['ID_OR_NAME']
38
39
39 if names and isinstance(names[0], dict):
40 if names and isinstance(names[0], dict):
40 names = [rec.get('name',rec.get('id')) for rec in names]
41 names = [rec.get('name',rec.get('id')) for rec in names]
41 '''
42 '''
42 if arguments['--datapackages']:
43 if arguments['--datapackages']:
43 arguments['--datastore-fields'] = True
44 arguments['--datastore-fields'] = True
44 '''
45 '''
45 #----------------------------#
46 #----------------------------#
46 filtered_urls = {}
47 filtered_urls = {}
47 for val in names:
48 for val in names:
48 try:
49 try:
49 filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs)
50 filtered_urls[val] = getattr(ckan.action, 'url_resources')(id=val, **kwargs)
50 except:
51 except:
51 _, exc_value, _ = sys.exc_info()
52 _, exc_value, _ = sys.exc_info()
52 return exc_value
53 return exc_value
53 #----------------------------#
54 #----------------------------#
54
55
55 cmd = dump._worker_command_line(thing, arguments)
56 cmd = dump._worker_command_line(thing, arguments)
56 processes = int(arguments['--processes'])
57 processes = int(arguments['--processes'])
57 if hasattr(ckan, 'parallel_limit'):
58 if hasattr(ckan, 'parallel_limit'):
58 processes = min(processes, ckan.parallel_limit)
59 processes = min(processes, ckan.parallel_limit)
59 stats = completion_stats(processes)
60 stats = completion_stats(processes)
60 pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
61 pool = worker_pool(cmd, processes, enumerate(compact_json(n) + b'\n' for n in names))
61
62
62 results = {}
63 results = {}
63 expecting_number = 0
64 expecting_number = 0
64 with quiet_int_pipe() as errors:
65 with quiet_int_pipe() as errors:
65 for job_ids, finished, result in pool:
66 for job_ids, finished, result in pool:
66 if not result:
67 if not result:
67 return 1
68 return 1
68 timestamp, error, record = json.loads(result.decode('utf-8'))
69 timestamp, error, record = json.loads(result.decode('utf-8'))
69 results[finished] = record
70 results[finished] = record
70
71
71 #----------------------------------------#
72 #----------------------------------------#
72 datapackages_path = arguments['--datapackages']
73 datapackages_path = arguments['--datapackages']
73 datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path)
74 datapackage_dir = name_no_repetition(record.get('name', ''), datapackages_path)
74 #----------------------------------------#
75 #----------------------------------------#
75 if not arguments['--quiet']:
76 if not arguments['--quiet']:
76 stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format(
77 stderr.write('** Finished: {0} | Job IDs: {1} | Next Report: {2} | Error: {3} | Path: {4} | Dataset Name: {5}\n'.format(
77 finished,
78 finished,
78 job_ids,
79 job_ids,
79 next(stats),
80 next(stats),
80 error,
81 error,
81 datapackage_dir,
82 datapackage_dir,
82 record.get('name', '') if record else '',
83 record.get('name', '') if record else '',
83 ).encode('utf-8'))
84 ).encode('utf-8'))
84 '''
85 '''
85 if log:
86 if log:
86 log.write(compact_json([
87 log.write(compact_json([
87 timestamp,
88 timestamp,
88 finished,
89 finished,
89 error,
90 error,
90 record.get('name', '') if record else None,
91 record.get('name', '') if record else None,
91 ]) + b'\n')
92 ]) + b'\n')
92 '''
93 '''
93 if datapackages_path:
94 if datapackages_path:
94 try:
95 try:
95 filter_url = filtered_urls[record.get('name', '')]
96 filter_url = filtered_urls[record.get('name', '')]
96 except:
97 except:
97 filter_url = filtered_urls[record.get('id', '')]
98 filter_url = filtered_urls[record.get('id', '')]
98 create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
99 create_datapackage_change(record, filter_url, datapackage_dir, stderr, arguments['--apikey'], arguments['--remote'], arguments['--insecure'])
99
100
100 while expecting_number in results:
101 while expecting_number in results:
101 record = results.pop(expecting_number)
102 record = results.pop(expecting_number)
102 if record:
103 if record:
103 jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
104 jsonl_output.write(compact_json(record, sort_keys=True) + b'\n')
104 expecting_number += 1
105 expecting_number += 1
105 if 'pipe' in errors:
106 if 'pipe' in errors:
106 return 1
107 return 1
107 if 'interrupt' in errors:
108 if 'interrupt' in errors:
108 return 2
109 return 2
109
110
110 def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure):
111 def create_datapackage_change(record, filtered_url, datapackage_dir, stderr, apikey, host_url, insecure):
111 resource_formats_to_ignore = ['API', 'api']
112 resource_formats_to_ignore = ['API', 'api']
112
113
113 os.makedirs(os.path.join(datapackage_dir, 'data'))
114 os.makedirs(os.path.join(datapackage_dir, 'data'))
114 record['path'] = datapackage_dir
115 record['path'] = datapackage_dir
115
116
116 ckan_resources = []
117 ckan_resources = []
117 for resource in record.get('resources', []):
118 for resource in tqdm(record.get('resources', []), unit_scale=True):
119 #for resource in record.get('resources', []):
118 if resource['format'] in resource_formats_to_ignore:
120 if resource['format'] in resource_formats_to_ignore:
119 continue
121 continue
120
122
121 if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
123 if not {'name': resource['name'], 'url': resource['url']} in filtered_url:
122 continue
124 continue
123
125
124 if len(resource['url']) == 0:
126 if len(resource['url']) == 0:
125 continue
127 continue
126
128
127 filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
129 filename = name_no_repetition(resource['name'], os.path.join(datapackage_dir, 'data'), 'resource')
128 resource['path'] = os.path.join(datapackage_dir, 'data', filename)
130 resource['path'] = os.path.join(datapackage_dir, 'data', filename)
129
131
130 cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
132 cres = create_resource_change(resource, stderr, apikey, host_url, insecure)
131 if not cres:
133 if not cres:
132 continue
134 continue
133 '''
135 '''
134 #----------------------------------------#
136 #----------------------------------------#
135 dres = {'path': os.path.join('data', filename),
137 dres = {'path': os.path.join('data', filename),
136 'description': cres.get('description', ''),
138 'description': cres.get('description', ''),
137 'format': cres.get('format', ''),
139 'format': cres.get('format', ''),
138 'name': cres.get('name', ''),
140 'name': cres.get('name', ''),
139 'title': cres.get('name', '').title()}
141 'title': cres.get('name', '').title()}
140 #----------------------------------------#
142 #----------------------------------------#
141 populate_schema_from_datastore(cres, dres)
143 populate_schema_from_datastore(cres, dres)
142 '''
144 '''
143 ckan_resources.append(resource)
145 ckan_resources.append(resource)
144
146
145 dataset = dict(record, resources=ckan_resources)
147 dataset = dict(record, resources=ckan_resources)
146 datapackage = dataset_to_datapackage_change(dataset)
148 datapackage = dataset_to_datapackage_change(dataset)
147
149
148 json_path = os.path.join(datapackage_dir, 'datapackage.json')
150 json_path = os.path.join(datapackage_dir, 'datapackage.json')
149 with open(json_path, 'wb') as out:
151 with open(json_path, 'wb') as out:
150 out.write(pretty_json(datapackage))
152 out.write(pretty_json(datapackage))
151
153
152 return datapackage_dir, datapackage, json_path
154 return datapackage_dir, datapackage, json_path
153
155
154 def create_resource_change(resource, stderr, apikey, host_url, insecure):
156 def create_resource_change(resource, stderr, apikey, host_url, insecure):
155 # ---------- REPLACE URL --------- #
157 # ---------- REPLACE URL --------- #
156 if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
158 if urlparse(host_url).netloc != 'www.igp.gob.pe' and urlparse(resource['url']).netloc == 'www.igp.gob.pe':
157 resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
159 resource['url'] = resource['url'].replace(urlparse(resource['url']).scheme + '://' + urlparse(resource['url']).netloc,
158 urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
160 urlparse(host_url).scheme + '://' + urlparse(host_url).netloc)
159 #----------------------------------#
161 #----------------------------------#
160 try:
162 try:
161 r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
163 r = requests.get(resource['url'], headers={'Authorization': apikey}, stream=True, verify=not insecure)
162 #---------------------------------------#
164 #---------------------------------------#
163 try:
165 try:
164 r.raise_for_status()
166 r.raise_for_status()
165 except requests.exceptions.HTTPError as e:
167 except requests.exceptions.HTTPError as e:
166 return False
168 return False
167 #---------------------------------------#
169 #---------------------------------------#
168 with open(resource['path'], 'wb') as f:
170 with open(resource['path'], 'wb') as f:
169 for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
171 for chunk in r.iter_content(chunk_size=DL_CHUNK_SIZE):
170 if chunk:
172 if chunk:
171 f.write(chunk)
173 f.write(chunk)
172
174
173 except requests.ConnectionError:
175 except requests.ConnectionError:
174 stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
176 stderr.write('URL {0} refused connection. The resource will not be downloaded\n'.format(resource['url']).encode('utf-8'))
175 except requests.exceptions.RequestException as e:
177 except requests.exceptions.RequestException as e:
176 stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
178 stderr.write('{0}\n'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
177 except Exception as e:
179 except Exception as e:
178 stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
180 stderr.write('{0}'.format(str(e.args[0]) if len(e.args) > 0 else '').encode('utf-8'))
179 return resource
181 return resource
180
182
181 def dataset_to_datapackage_change(dataset_dict):
183 def dataset_to_datapackage_change(dataset_dict):
182 dp = {'name': dataset_dict['name'],
184 dp = {'name': dataset_dict['name'],
183 'id': dataset_dict['id'],
185 'id': dataset_dict['id'],
184 'path': dataset_dict['path'],
186 'path': dataset_dict['path'],
185 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
187 'last_update': datetime.strptime(dataset_dict['metadata_modified'], "%Y-%m-%dT%H:%M:%S.%f").strftime("%d-%b-%Y %I.%M %p")}
186
188
187 resources = dataset_dict.get('resources')
189 resources = dataset_dict.get('resources')
188 if resources:
190 if resources:
189 dp['resources'] = [convert_to_datapackage_resource_change(r)
191 dp['resources'] = [convert_to_datapackage_resource_change(r)
190 for r in resources]
192 for r in resources]
191 return dp
193 return dp
192
194
193 def convert_to_datapackage_resource_change(resource_dict):
195 def convert_to_datapackage_resource_change(resource_dict):
194 resource = {}
196 resource = {}
195
197
196 if resource_dict.get('id'):
198 if resource_dict.get('id'):
197 resource['id'] = resource_dict['id']
199 resource['id'] = resource_dict['id']
198
200
199 if resource_dict.get('name'):
201 if resource_dict.get('name'):
200 resource['name'] = resource_dict['name']
202 resource['name'] = resource_dict['name']
201
203
202 if resource_dict.get('path'):
204 if resource_dict.get('path'):
203 if os.path.isfile(resource_dict['path']):
205 if os.path.isfile(resource_dict['path']):
204 resource['path'] = resource_dict['path']
206 resource['path'] = resource_dict['path']
205 else:
207 else:
206 resource['url'] = resource_dict['url']
208 resource['url'] = resource_dict['url']
207
209
208 schema = resource_dict.get('schema')
210 schema = resource_dict.get('schema')
209 if isinstance(schema, six.string_types):
211 if isinstance(schema, six.string_types):
210 try:
212 try:
211 resource['schema'] = json.loads(schema)
213 resource['schema'] = json.loads(schema)
212 except ValueError:
214 except ValueError:
213 resource['schema'] = schema
215 resource['schema'] = schema
214 elif isinstance(schema, dict):
216 elif isinstance(schema, dict):
215 resource['schema'] = schema
217 resource['schema'] = schema
216 return resource
218 return resource
217
219
218 def name_no_repetition(name, dir, option=''):
220 def name_no_repetition(name, dir, option=''):
219 count = 0
221 count = 0
220 while True:
222 while True:
221 count = count + 1
223 count = count + 1
222 if not os.path.exists(os.path.join(dir, name)):
224 if not os.path.exists(os.path.join(dir, name)):
223 if option == 'resource':
225 if option == 'resource':
224 return name
226 return name
225 else:
227 else:
226 return os.path.join(dir, name)
228 return os.path.join(dir, name)
227
229
228 elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
230 elif not os.path.exists(os.path.join(dir, '('+str(count)+')'+name)):
229 if option == 'resource':
231 if option == 'resource':
230 return '('+str(count)+')'+name
232 return '('+str(count)+')'+name
231 else:
233 else:
232 return os.path.join(dir, '('+str(count)+')'+name)
234 return os.path.join(dir, '('+str(count)+')'+name)
233 else:
235 else:
234 pass No newline at end of file
236 pass
@@ -1,16 +1,17
1 # encoding: utf-8
1 # encoding: utf-8
2 from setuptools import setup
2 from setuptools import setup
3
3
4 setup(
4 setup(
5 name = "jrodb",
5 name = "jrodb",
6 version = "2.9.2.0",
6 version = "2.9.2.0",
7 description = "Data Repository - JRO",
7 description = "Data Repository - JRO",
8 author = "Edson Ynilupu Mattos",
8 author = "Edson Ynilupu Mattos",
9 author_email = "eynilupu@igp.gob.pe",
9 author_email = "eynilupu@igp.gob.pe",
10 url = "http://intranet.igp.gob.pe:8082/DATABASES/ckanext-jro/api-cliente",
10 url = "http://intranet.igp.gob.pe:8082/DATABASES/ckanext-jro/api-cliente",
11 packages = ["jrodb"],
11 packages = ["jrodb"],
12 install_requires = [
12 install_requires = [
13 "ckanapi==4.7",
13 "ckanapi==4.7",
14 "requests"
14 "requests",
15 "tqdm"
15 ],
16 ],
16 ) No newline at end of file
17 )
General Comments 0
You need to be logged in to leave comments. Login now