openstreetmap-carto/scripts/get-common-values.py
dch0ph 82ae037b2e
Stop shop/office catch-all (revised) (#5169)
* Add common POI values scripting

* Tweak exclusions

* [fixup]

* Cleaning up

* Revisions in response to initial review

* Re-add shop=marketplace to blacklist

* Better input sanitisation

* Make script executable
2026-02-22 09:59:59 +01:00

130 lines
5 KiB
Python
Executable file

#!/usr/bin/env python3
# This script generates list of popular values for a given key in OpenStreetMap database according to taginfo
# It is used to creating/update a database table to determine which key value pairs will be rendered
import sys
import yaml
import argparse
from itertools import count
from datetime import date
from operator import itemgetter
import urllib.request
import re
import json
configfilename = 'common-values.yml'
tablename = 'carto_pois'
valid_tag_chars = re.compile(r'[a-z0-9_\-;]+')
def get_common_values(key, min_count, settings, exclude, verbose):
candidates = []
taginfo_url = settings["taginfo_url"]
max_page = settings.get("max_page", 100)
all_exclude = set(settings["common_exclusions"]).union(exclude)
rejected = []
def check_include(x):
""" Check whether a taginfo object should be included as valid candidate """
if x["count"] < min_count:
return False
tag = x["value"]
if not valid_tag_chars.fullmatch(tag) or (tag in all_exclude):
rejected.append(tag)
return False
return True
for page in count(1):
url = f'{taginfo_url}/values?key={key}&sortname=count&sortorder=desc&rp={max_page}&page={page}'
request = urllib.request.Request(url=url, headers={'User-Agent': 'get-common-values.py/osm-carto'})
with urllib.request.urlopen(request) as url:
page_data = json.loads(url.read().decode())
page_data = page_data["data"]
if (len(page_data) == 0) or (page_data[0]["count"] < min_count):
break
candidates += [(x["value"], x["count"]) for x in page_data if check_include(x)]
if not candidates:
sys.exit(f"No valid values found for key {key}")
return (candidates, sorted(rejected))
def main():
# parse options
parser = argparse.ArgumentParser(
description="Get key frequency information from taginfo.")
parser.add_argument("-v", "--verbose", action="store_true",
help="Be more verbose.")
parser.add_argument("-R", "--renderuser", action="store",
help="User to grant access for rendering (overwrites configuration file)")
opts = parser.parse_args()
with open(configfilename) as config_file:
config = yaml.safe_load(config_file)
keys = config.get("keys")
if keys is None or not keys:
sys.exit("No keys specified in configuration file")
settings = config["settings"]
renderuser = opts.renderuser or settings.get("renderuser")
schema = settings.get("schema")
results = dict()
for key, val in keys.items():
if "'" in key:
sys.exit(f"SQL string delimiter found in key: {key} !")
specific_exclusions = set(val.get("exclusions", []))
results[key] = get_common_values(key, val["min_count"],
settings=settings,
exclude=specific_exclusions,
verbose=opts.verbose)
use_tablename = tablename if schema is None else f"{schema}.{tablename}"
print("-- This is generated code; it is not recommended to change this file manually.")
print(f"-- To update the contents, review settings in {configfilename} and run:")
print("-- scripts/get-common-values.py > common-values.sql")
print("-- Use psql to execute the generated SQL and recreate the POI table")
print("-- You should check that the output is as expected before execution.\n")
scriptname = sys.argv[0]
print(f'-- Output generated by {scriptname} accessing {settings["taginfo_url"]} on {date.today()}')
print(f'DROP TABLE IF EXISTS {use_tablename};')
print(f'''CREATE TABLE {use_tablename} (\n'''
''' key text NOT NULL,\n'''
''' value text NOT NULL,\n'''
''' PRIMARY KEY (key, value));''')
if renderuser is not None:
print(f'GRANT SELECT ON {use_tablename} TO {renderuser};')
else:
print(f'''-- If there are permission problems reading the {tablename} table,\n'''
'''-- uncomment line below, setting <render user> to relevant database user\n'''
f'''-- GRANT SELECT ON {use_tablename} TO <render user>;''')
for key, valrej in results.items():
vals, rejected = valrej
print(f"-- Found {len(vals)} matches for key {key} using threshold of {keys[key]["min_count"]}")
print(f'INSERT INTO {use_tablename} (key, value) VALUES')
if settings.get("sort_by_name", False):
vals = sorted(vals, key=itemgetter(0))
end_item = len(vals) - 1
endstr = ','
for ind, item in enumerate(vals):
comment = f" -- count: {item[1]}"
if ind == end_item:
endstr = ';'
print(f" ('{key}', '{item[0]}'){endstr}{comment}")
if rejected:
print(f"-- Rejected these invalid tags for {key}: {', '.join(rejected)}")
if __name__ == '__main__':
main()