You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
406 lines
18 KiB
Python
406 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
'''This script is designed to load quasi-static data into a PostGIS database
|
|
for rendering maps. It differs from the usual scripts to do this in that it is
|
|
designed to take its configuration from a file rather than be a series of shell
|
|
commands.
|
|
|
|
Some implicit assumptions are
|
|
- Time spent querying (rendering) the data is more valuable than the one-time
|
|
cost of loading it
|
|
- The script will not be running multiple times in parallel. This is not
|
|
normally likely because the script is likely to be called daily or less,
|
|
not minutely.
|
|
- Usage patterns will be similar to typical map rendering
|
|
'''
|
|
|
|
import yaml
|
|
from urllib.parse import urlparse
|
|
import os
|
|
import re
|
|
import argparse
|
|
import shutil
|
|
|
|
# modules for getting data
|
|
import zipfile
|
|
import requests
|
|
import io
|
|
|
|
# modules for converting and postgres loading
|
|
import subprocess
|
|
import psycopg2
|
|
|
|
import logging
|
|
|
|
|
|
def database_setup(conn, temp_schema, schema, metadata_table):
|
|
with conn.cursor() as cur:
|
|
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
|
|
.format(temp_schema=temp_schema))
|
|
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
|
|
''' (name text primary key, last_modified text);''')
|
|
.format(schema=schema, metadata_table=metadata_table))
|
|
conn.commit()
|
|
|
|
|
|
class Table:
|
|
def __init__(self, name, conn, temp_schema, schema, metadata_table):
|
|
self._name = name
|
|
self._conn = conn
|
|
self._temp_schema = temp_schema
|
|
self._dst_schema = schema
|
|
self._metadata_table = metadata_table
|
|
|
|
# Clean up the temporary schema in preparation for loading
|
|
def clean_temp(self):
|
|
with self._conn.cursor() as cur:
|
|
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
self._conn.commit()
|
|
|
|
# get the last modified date from the metadata table
|
|
def last_modified(self):
|
|
with self._conn.cursor() as cur:
|
|
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
|
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
|
|
results = cur.fetchone()
|
|
if results is not None:
|
|
return results[0]
|
|
self._conn.commit()
|
|
|
|
def grant_access(self, user):
|
|
with self._conn.cursor() as cur:
|
|
cur.execute('''GRANT SELECT ON "{temp_schema}"."{name}" TO "{user}";'''
|
|
.format(name=self._name, temp_schema=self._temp_schema, user=user))
|
|
self._conn.commit()
|
|
|
|
def index(self):
|
|
with self._conn.cursor() as cur:
|
|
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
|
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
# ogr creates a ogc_fid column we don't need
|
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
|
|
# Null geometries are useless for rendering
|
|
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
# sorting static tables helps performance and reduces size from the column drop above
|
|
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
|
|
'''(ST_Envelope(way));'''
|
|
'''CLUSTER "{temp_schema}"."{name}" '''
|
|
'''USING "{name}_order";'''
|
|
'''DROP INDEX "{temp_schema}"."{name}_order";'''
|
|
'''CREATE INDEX ON "{temp_schema}"."{name}" '''
|
|
'''USING GIST (way) WITH (fillfactor=100);''')
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
# Reset autovacuum. The table is static, so this doesn't really
|
|
# matter since it'll never need a vacuum.
|
|
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
self._conn.commit()
|
|
|
|
# VACUUM can't be run in transaction, so autocommit needs to be turned on
|
|
old_autocommit = self._conn.autocommit
|
|
try:
|
|
self._conn.autocommit = True
|
|
with self._conn.cursor() as cur:
|
|
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
|
|
.format(name=self._name, temp_schema=self._temp_schema))
|
|
finally:
|
|
self._conn.autocommit = old_autocommit
|
|
|
|
def replace(self, new_last_modified):
|
|
with self._conn.cursor() as cur:
|
|
cur.execute('''BEGIN;''')
|
|
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
|
|
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
|
|
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
|
|
|
|
# We checked if the metadata table had this table way up above
|
|
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
|
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
|
[self._name])
|
|
if cur.rowcount == 0:
|
|
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
|
|
'''(name, last_modified) VALUES (%s, %s)''')
|
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
|
[self._name, new_last_modified])
|
|
else:
|
|
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
|
|
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
|
|
[new_last_modified, self._name])
|
|
self._conn.commit()
|
|
|
|
|
|
class Downloader:
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *args, **kwargs):
|
|
self.session.close()
|
|
|
|
def _download(self, url, headers=None):
|
|
if url.startswith('file://'):
|
|
filename = url[7:]
|
|
if headers and 'If-Modified-Since' in headers:
|
|
if str(os.path.getmtime(filename)) == headers['If-Modified-Since']:
|
|
return DownloadResult(status_code = requests.codes.not_modified)
|
|
with open(filename, 'rb') as fp:
|
|
return DownloadResult(status_code = 200, content = fp.read(),
|
|
last_modified = str(os.fstat(fp.fileno()).st_mtime))
|
|
response = self.session.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return DownloadResult(status_code = response.status_code, content = response.content,
|
|
last_modified = response.headers.get('Last-Modified', None))
|
|
|
|
def download(self, url, name, opts, data_dir, table_last_modified):
|
|
filename = os.path.join(data_dir, os.path.basename(urlparse(url).path))
|
|
filename_lastmod = filename + '.lastmod'
|
|
if os.path.exists(filename) and os.path.exists(filename_lastmod):
|
|
with open(filename_lastmod, 'r') as fp:
|
|
lastmod_cache = fp.read()
|
|
with open(filename, 'rb') as fp:
|
|
cached_data = DownloadResult(status_code = 200, content = fp.read(),
|
|
last_modified = lastmod_cache)
|
|
else:
|
|
cached_data = None
|
|
lastmod_cache = None
|
|
|
|
result = None
|
|
# Variable used to tell if we downloaded something
|
|
download_happened = False
|
|
|
|
if opts.no_update and (cached_data or table_last_modified):
|
|
# It is ok if this returns None, because for this to be None,
|
|
# we need to have something in table and therefore need not import (since we are opts.no-update)
|
|
result = cached_data
|
|
else:
|
|
if opts.force:
|
|
headers = {}
|
|
else:
|
|
# If none of those 2 exist, value will be None and it will have the same effect as not having If-Modified-Since set
|
|
headers = {'If-Modified-Since': table_last_modified or lastmod_cache}
|
|
|
|
response = self._download(url, headers)
|
|
# Check status codes
|
|
if response.status_code == requests.codes.ok:
|
|
logging.info(" Download complete ({} bytes)".format(len(response.content)))
|
|
download_happened = True
|
|
if opts.cache:
|
|
# Write to cache
|
|
with open(filename, 'wb') as fp:
|
|
fp.write(response.content)
|
|
with open(filename_lastmod, 'w') as fp:
|
|
fp.write(response.last_modified)
|
|
result = response
|
|
elif response.status_code == requests.codes.not_modified:
|
|
# Now we need to figure out if our not modified data came from table or cache
|
|
if os.path.exists(filename) and os.path.exists(filename_lastmod):
|
|
logging.info(" Cached file {} did not require updating".format(url))
|
|
result = cached_data
|
|
else:
|
|
result = None
|
|
else:
|
|
logging.critical(" Unexpected response code ({}".format(response.status_code))
|
|
logging.critical(" Content {} was not downloaded".format(name))
|
|
return None
|
|
|
|
|
|
if opts.delete_cache or (not opts.cache and download_happened):
|
|
try:
|
|
os.remove(filename)
|
|
os.remove(filename_lastmod)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
class DownloadResult:
|
|
def __init__(self, status_code, content=None, last_modified=None):
|
|
self.status_code = status_code
|
|
self.content = content
|
|
self.last_modified = last_modified
|
|
|
|
|
|
def main():
|
|
# parse options
|
|
parser = argparse.ArgumentParser(
|
|
description="Load external data into a database")
|
|
|
|
parser.add_argument("-f", "--force", action="store_true",
|
|
help="Download and import new data, even if not required.")
|
|
parser.add_argument("-C", "--cache", action="store_true",
|
|
help="Cache downloaded data. Useful if you'll have your database volume deleted in the future")
|
|
parser.add_argument("--no-update", action="store_true",
|
|
help="Don't download newer data than what is locally available (either in cache or table). Overridden by --force")
|
|
|
|
parser.add_argument("--delete-cache", action="store_true",
|
|
help="Execute as usual, but delete cached data")
|
|
parser.add_argument("--force-import", action="store_true",
|
|
help="Import data into table even if may not be needed")
|
|
|
|
parser.add_argument("-c", "--config", action="store", default="external-data.yml",
|
|
help="Name of configuration file (default external-data.yml)")
|
|
parser.add_argument("-D", "--data", action="store",
|
|
help="Override data download directory")
|
|
|
|
parser.add_argument("-d", "--database", action="store",
|
|
help="Override database name to connect to")
|
|
parser.add_argument("-H", "--host", action="store",
|
|
help="Override database server host or socket directory")
|
|
parser.add_argument("-p", "--port", action="store",
|
|
help="Override database server port")
|
|
parser.add_argument("-U", "--username", action="store",
|
|
help="Override database user name")
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="Be more verbose. Overrides -q")
|
|
parser.add_argument("-q", "--quiet", action="store_true",
|
|
help="Only report serious problems")
|
|
parser.add_argument("-w", "--password", action="store",
|
|
help="Override database password")
|
|
|
|
parser.add_argument("-R", "--renderuser", action="store",
|
|
help="User to grant access for rendering")
|
|
|
|
opts = parser.parse_args()
|
|
|
|
if opts.verbose:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
elif opts.quiet:
|
|
logging.basicConfig(level=logging.WARNING)
|
|
else:
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
if opts.force and opts.no_update:
|
|
opts.no_update = False
|
|
logging.warning("Force (-f) flag overrides --no-update flag")
|
|
|
|
logging.info("Starting load of external data into database")
|
|
|
|
with open(opts.config) as config_file:
|
|
config = yaml.safe_load(config_file)
|
|
data_dir = opts.data or config["settings"]["data_dir"]
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
|
|
# If the DB options are unspecified in both on the command line and in the
|
|
# config file, libpq will pick what to use with the None
|
|
database = opts.database or config["settings"].get("database")
|
|
host = opts.host or config["settings"].get("host")
|
|
port = opts.port or config["settings"].get("port")
|
|
user = opts.username or config["settings"].get("username")
|
|
password = opts.password or config["settings"].get("password")
|
|
|
|
renderuser = opts.renderuser or config["settings"].get("renderuser")
|
|
|
|
with Downloader() as d:
|
|
conn = None
|
|
conn = psycopg2.connect(database=database,
|
|
host=host, port=port,
|
|
user=user,
|
|
password=password)
|
|
|
|
# DB setup
|
|
database_setup(conn, config["settings"]["temp_schema"],
|
|
config["settings"]["schema"],
|
|
config["settings"]["metadata_table"])
|
|
|
|
for name, source in config["sources"].items():
|
|
logging.info("Checking table {}".format(name))
|
|
# Don't attempt to handle strange names
|
|
# Even if there was code to escape them properly here, you don't want
|
|
# in a style with all the quoting headaches
|
|
if not re.match('''^[a-zA-Z0-9_]+$''', name):
|
|
raise RuntimeError(
|
|
"Only ASCII alphanumeric table are names supported")
|
|
|
|
this_table = Table(name, conn,
|
|
config["settings"]["temp_schema"],
|
|
config["settings"]["schema"],
|
|
config["settings"]["metadata_table"])
|
|
this_table.clean_temp()
|
|
|
|
# This will fetch data needed for import
|
|
download = d.download(source["url"], name, opts, data_dir, this_table.last_modified())
|
|
|
|
# Check if there is need to import
|
|
if download == None or (not opts.force and not opts.force_import and this_table.last_modified() == download.last_modified):
|
|
logging.info(" Table {} did not require updating".format(name))
|
|
continue
|
|
|
|
|
|
workingdir = os.path.join(data_dir, name)
|
|
shutil.rmtree(workingdir, ignore_errors=True)
|
|
os.makedirs(workingdir, exist_ok=True)
|
|
if "archive" in source and source["archive"]["format"] == "zip":
|
|
logging.info(" Decompressing file")
|
|
zip = zipfile.ZipFile(io.BytesIO(download.content))
|
|
for member in source["archive"]["files"]:
|
|
zip.extract(member, workingdir)
|
|
|
|
ogrpg = "PG:dbname={}".format(database)
|
|
|
|
if port is not None:
|
|
ogrpg = ogrpg + " port={}".format(port)
|
|
if user is not None:
|
|
ogrpg = ogrpg + " user={}".format(user)
|
|
if host is not None:
|
|
ogrpg = ogrpg + " host={}".format(host)
|
|
if password is not None:
|
|
ogrpg = ogrpg + " password={}".format(password)
|
|
|
|
ogrcommand = ["ogr2ogr",
|
|
'-f', 'PostgreSQL',
|
|
'-lco', 'GEOMETRY_NAME=way',
|
|
'-lco', 'SPATIAL_INDEX=FALSE',
|
|
'-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
|
|
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
|
|
|
|
if "ogropts" in source:
|
|
ogrcommand += source["ogropts"]
|
|
|
|
ogrcommand += [ogrpg,
|
|
os.path.join(workingdir, source["file"])]
|
|
|
|
logging.info(" Importing into database")
|
|
logging.debug("running {}".format(
|
|
subprocess.list2cmdline(ogrcommand)))
|
|
|
|
# ogr2ogr can raise errors here, so they need to be caught
|
|
try:
|
|
subprocess.check_output(
|
|
ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
# Add more detail on stdout for the logs
|
|
logging.critical(
|
|
"ogr2ogr returned {} with layer {}".format(e.returncode, name))
|
|
logging.critical("Command line was {}".format(
|
|
subprocess.list2cmdline(e.cmd)))
|
|
logging.critical("Output was\n{}".format(e.output))
|
|
logging.critical("Error was\n{}".format(e.stderr))
|
|
raise RuntimeError(
|
|
"ogr2ogr error when loading table {}".format(name))
|
|
|
|
logging.info(" Import complete")
|
|
|
|
this_table.index()
|
|
if renderuser is not None:
|
|
this_table.grant_access(renderuser)
|
|
this_table.replace(download.last_modified)
|
|
|
|
shutil.rmtree(workingdir, ignore_errors=True)
|
|
|
|
if conn:
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|