mirror of
https://git.sr.ht/~rjarry/aerc
synced 2025-07-09 07:00:21 +02:00

There is an issue with the carddav-query script that was causing it to fail when trying to query Fastmail. The script made a successful query, but did not return any results on STDOUT. Some of the entries returned by Fastmail are in the following format: email0.EMAIL;TYPE=HOME:name@example.com However, the regex for parsing the response is only set to handle `EMAIL` or `ITEMx.EMAIL` (where x is a number). I changed the regex to be more permissive and match `any_word.EMAIL`, which fixed the problem. This solution allows Fastmail CardDAV to be queries and parsed for address completion, while still excluding unrelated (non-email address) fields. Signed-off-by: Daniel Fichtinger <daniel@ficd.ca> Acked-by: Robin Jarry <robin@jarry.cc>
268 lines
8.4 KiB
Python
Executable file
268 lines
8.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: MIT
|
|
# Copyright (c) 2023 Robin Jarry
|
|
|
|
"""
|
|
Query a CardDAV server for contact names and emails.
|
|
"""
|
|
|
|
import argparse
|
|
import base64
|
|
import configparser
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import xml.etree.ElementTree as xml
|
|
from urllib import error, parse, request
|
|
|
|
|
|
def main():
|
|
try:
|
|
args = parse_args()
|
|
|
|
C = "urn:ietf:params:xml:ns:carddav"
|
|
D = "DAV:"
|
|
xml.register_namespace("C", C)
|
|
xml.register_namespace("D", D)
|
|
|
|
# perform the actual address book query
|
|
query = xml.Element(f"{{{C}}}addressbook-query")
|
|
prop = xml.SubElement(query, f"{{{D}}}prop")
|
|
xml.SubElement(prop, f"{{{D}}}getetag")
|
|
data = xml.SubElement(prop, f"{{{C}}}address-data")
|
|
xml.SubElement(data, f"{{{C}}}prop", name="FN")
|
|
xml.SubElement(data, f"{{{C}}}prop", name="EMAIL")
|
|
limit = xml.SubElement(query, f"{{{C}}}limit")
|
|
xml.SubElement(limit, f"{{{C}}}nresults").text = str(args.limit)
|
|
filtre = xml.SubElement(query, f"{{{C}}}filter", test="anyof")
|
|
for term in args.terms:
|
|
for attr in "FN", "EMAIL", "NICKNAME", "ORG", "TITLE":
|
|
prop = xml.SubElement(filtre, f"{{{C}}}prop-filter", name=attr)
|
|
match = xml.SubElement(
|
|
prop, f"{{{C}}}text-match", {"match-type": "contains"}
|
|
)
|
|
match.text = term
|
|
data = http_request_xml(
|
|
"REPORT",
|
|
args.server_url,
|
|
query,
|
|
username=args.username,
|
|
password=args.password,
|
|
debug=args.verbose,
|
|
Depth="1",
|
|
)
|
|
for vcard in data.iterfind(f".//{{{C}}}address-data"):
|
|
for name, email in parse_vcard(vcard.text.strip()):
|
|
print(f"{email}\t{name}")
|
|
|
|
except Exception as e:
|
|
if isinstance(e, error.HTTPError):
|
|
if args.verbose:
|
|
debug_response(e.fp)
|
|
e = e.fp.read().decode()
|
|
print(f"error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def http_request_xml(
|
|
method: str,
|
|
url: str,
|
|
data: xml.Element,
|
|
username: str = None,
|
|
password: str = None,
|
|
debug: bool = False,
|
|
**headers,
|
|
) -> xml.Element:
|
|
req = request.Request(
|
|
url=url,
|
|
method=method,
|
|
headers={
|
|
"Content-Type": 'text/xml; charset="utf-8"',
|
|
**headers,
|
|
},
|
|
data=xml.tostring(data, encoding="utf-8", xml_declaration=True),
|
|
)
|
|
if username is not None and password is not None:
|
|
auth = f"{username}:{password}"
|
|
auth = base64.standard_b64encode(auth.encode("utf-8")).decode("ascii")
|
|
req.add_header("Authorization", f"Basic {auth}")
|
|
|
|
if debug:
|
|
uri = parse.urlparse(req.full_url)
|
|
print(f"> {req.method} {uri.path} HTTP/1.1", file=sys.stderr)
|
|
print(f"> Host: {uri.hostname}", file=sys.stderr)
|
|
for name, value in req.headers.items():
|
|
print(f"> {name}: {value}", file=sys.stderr)
|
|
print(f"{req.data.decode('utf-8')}\n", file=sys.stderr)
|
|
|
|
with request.urlopen(req) as resp:
|
|
data = resp.read().decode("utf-8")
|
|
if debug:
|
|
debug_response(resp)
|
|
print(f"{data}", file=sys.stderr)
|
|
|
|
return xml.fromstring(data)
|
|
|
|
|
|
def debug_response(resp):
|
|
print(f"< HTTP/1.1 {resp.code}", file=sys.stderr)
|
|
for name, value in resp.headers.items():
|
|
print(f"< {name}: {value}", file=sys.stderr)
|
|
|
|
|
|
def parse_vcard(txt):
|
|
lines = txt.splitlines()
|
|
if len(lines) < 4 or lines[0] != "BEGIN:VCARD" or lines[-1] != "END:VCARD":
|
|
return
|
|
name = None
|
|
emails = []
|
|
for line in lines[1:-1]:
|
|
if line.startswith("FN:"):
|
|
name = line[len("FN:") :].replace("\\,", ",")
|
|
continue
|
|
match = re.match(r"^(?:\w+\.)?EMAIL(?:;[\w-]+=[^;:]+)*:(.+@.+)$", line, flags=re.I)
|
|
if match:
|
|
email = match.group(1).lower().replace("\\,", ",")
|
|
if email not in emails:
|
|
if "TYPE=pref" in line or "PREF=1" in line:
|
|
emails.insert(0, email)
|
|
else:
|
|
emails.append(email)
|
|
if name is not None:
|
|
for e in emails:
|
|
yield name, e
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument(
|
|
"-l",
|
|
"--limit",
|
|
default=10,
|
|
type=int,
|
|
help="""
|
|
Maximum number of results returned by the server (default: 10).
|
|
If the server does not support limiting, this will be disregarded.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="store_true",
|
|
help="""
|
|
Print debug info on stderr.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config-file",
|
|
metavar="FILE",
|
|
default=os.path.expanduser("~/.config/aerc/accounts.conf"),
|
|
help="""
|
|
INI configuration file from which to read the CardDAV URL endpoint
|
|
(default: ~/.config/aerc/accounts.conf).
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-S",
|
|
"--config-section",
|
|
metavar="SECTION",
|
|
help="""
|
|
INI configuration section where to find CONFIG_KEY. By default the
|
|
first section where CONFIG_KEY is found will be used.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-k",
|
|
"--config-key-source",
|
|
metavar="KEY_SOURCE",
|
|
default="carddav-source",
|
|
help="""
|
|
INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE.
|
|
The value must respect the following format:
|
|
https?://USERNAME[:PASSWORD]@HOSTNAME/PATH/TO/ADDRESSBOOK.
|
|
Both USERNAME and PASSWORD must be percent encoded.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-C",
|
|
"--config-key-cred-cmd",
|
|
metavar="KEY_CRED_CMD",
|
|
default="carddav-source-cred-cmd",
|
|
help="""
|
|
INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE. The
|
|
value is a command that will be used to determine PASSWORD if it is not
|
|
present in CONFIG_KEY_SOURCE.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--server-url",
|
|
help="""
|
|
CardDAV server URL endpoint. Overrides configuration file.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-u",
|
|
"--username",
|
|
help="""
|
|
Username to authenticate on the server. Overrides configuration file.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--password",
|
|
help="""
|
|
Password for the specified user. Overrides configuration file.
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"terms",
|
|
nargs="+",
|
|
metavar="TERM",
|
|
help="""
|
|
Search term. Will be used to search contacts from their FN (formatted
|
|
name), EMAIL, NICKNAME, ORG (company) and TITLE fields.
|
|
""",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
cfg = configparser.RawConfigParser(strict=False)
|
|
cfg.read([args.config_file])
|
|
source = cred_cmd = None
|
|
if args.config_section:
|
|
source = cfg.get(args.config_section, args.config_key_source, fallback=None)
|
|
cred_cmd = cfg.get(args.config_section, args.config_key_cred_cmd, fallback=None)
|
|
else:
|
|
for sec in cfg.sections():
|
|
source = cfg.get(sec, args.config_key_source, fallback=None)
|
|
if source is not None:
|
|
cred_cmd = cfg.get(sec, args.config_key_cred_cmd, fallback=None)
|
|
break
|
|
if source is not None:
|
|
try:
|
|
u = parse.urlparse(source)
|
|
if args.username is None and u.username is not None:
|
|
args.username = parse.unquote(u.username)
|
|
if args.password is None and u.password is not None:
|
|
args.password = parse.unquote(u.password)
|
|
if not args.password and cred_cmd is not None:
|
|
args.password = subprocess.check_output(
|
|
cred_cmd, shell=True, text=True, encoding="utf-8"
|
|
).strip()
|
|
if args.server_url is None:
|
|
args.server_url = f"{u.scheme}://{u.hostname}"
|
|
if u.port is not None:
|
|
args.server_url += f":{u.port}"
|
|
args.server_url += u.path
|
|
except ValueError as e:
|
|
parser.error(f"{args.config_file}: {e}")
|
|
if args.server_url is None:
|
|
parser.error("SERVER_URL is required")
|
|
|
|
return args
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|