mirror of
https://git.sr.ht/~rjarry/aerc
synced 2025-07-13 04:30:22 +02:00

Currently, the bot only announces non-RFC patches. Let's also consider RFC patches. As long as they are applied, it does not matter. Signed-off-by: Robin Jarry <robin@jarry.cc> Reviewed-by: Moritz Poldrack <moritz@poldrack.dev>
141 lines
4.6 KiB
Python
141 lines
4.6 KiB
Python
import email.header
|
|
import email.utils
|
|
import json
|
|
import mailbox
|
|
from urllib.parse import quote
|
|
from urllib.request import urlopen
|
|
import re
|
|
import traceback
|
|
|
|
from supybot import callbacks, httpserver, ircmsgs, world
|
|
from supybot.ircutils import bold, italic, mircColor, underline
|
|
|
|
|
|
class Sourcehut(callbacks.Plugin):
|
|
"""
|
|
Supybot plugin to receive Sourcehut webhooks
|
|
"""
|
|
|
|
def __init__(self, irc):
|
|
super().__init__(irc)
|
|
httpserver.hook("sourcehut", SourcehutServerCallback(self))
|
|
|
|
def die(self):
|
|
httpserver.unhook("sourcehut")
|
|
super().die()
|
|
|
|
def announce(self, channel, message):
|
|
libera = world.getIrc("libera")
|
|
if libera is None:
|
|
print("error: no irc libera")
|
|
return
|
|
if channel not in libera.state.channels:
|
|
print(f"error: not in {channel} channel")
|
|
return
|
|
libera.sendMsg(ircmsgs.notice(channel, message))
|
|
|
|
|
|
def decode_header(header: str) -> str:
|
|
if not header:
|
|
return ""
|
|
text = ""
|
|
for chunk, encoding in email.header.decode_header(header):
|
|
if isinstance(chunk, bytes):
|
|
chunk = chunk.decode(encoding or "us-ascii")
|
|
text += chunk
|
|
return text
|
|
|
|
|
|
class SourcehutServerCallback(httpserver.SupyHTTPServerCallback):
|
|
name = "Sourcehut"
|
|
defaultResponse = "Bad request\n"
|
|
|
|
def __init__(self, plugin: Sourcehut):
|
|
super().__init__()
|
|
self.plugin = plugin
|
|
|
|
SUBJECT = "[PATCH {prefix} v{version}] {subject}"
|
|
URL = "https://lists.sr.ht/{list[owner][canonicalName]}/{list[name]}"
|
|
CHANS = {
|
|
"#public-inbox": "##rjarry",
|
|
"#aerc-devel": "#aerc",
|
|
}
|
|
|
|
def announce_patch(self, patchset):
|
|
subject = self.SUBJECT.format(**patchset)
|
|
url = self.URL.format(**patchset)
|
|
if not url.startswith("https://lists.sr.ht/~rjarry/"):
|
|
raise ValueError("unknown list")
|
|
url += "/patches/{id}".format(**patchset)
|
|
channel = f"#{patchset['list']['name']}"
|
|
channel = self.CHANS.get(channel, channel)
|
|
try:
|
|
submitter = patchset["submitter"]["canonicalName"]
|
|
except KeyError:
|
|
try:
|
|
submitter = patchset["submitter"]["name"]
|
|
except KeyError:
|
|
submitter = patchset["submitter"]["address"]
|
|
msg = f"{mircColor('received', 'light gray')} {bold(subject)}"
|
|
msg += f" from {italic(submitter)}: {underline(url)}"
|
|
self.plugin.announce(channel, msg)
|
|
|
|
def announce_apply(self, mail):
|
|
channel = f"#{mail['list']['name']}"
|
|
channel = self.CHANS.get(channel, channel)
|
|
refs = []
|
|
for header in mail['references']:
|
|
refs += header.split()
|
|
for ref in refs:
|
|
url = self.URL.format(**mail) + quote(f"/{ref}")
|
|
print(f"GET {url}/raw")
|
|
with urlopen(f"{url}/raw") as u:
|
|
msg = mailbox.Message(u.read())
|
|
subject = re.sub(r"\s+", " ", decode_header(msg["subject"]))
|
|
if not re.match(r"^\[(RFC )?PATCH", subject):
|
|
continue
|
|
for name, addr in email.utils.getaddresses([decode_header(msg["from"])]):
|
|
if name:
|
|
submitter = name
|
|
else:
|
|
submitter = addr
|
|
msg = f"{bold(mircColor('applied', 'green'))} {bold(subject)}"
|
|
msg += f" from {italic(submitter)}: {underline(url)}"
|
|
self.plugin.announce(channel, msg)
|
|
return
|
|
|
|
def doPost(self, handler, path, form=None):
|
|
if hasattr(form, "decode"):
|
|
form = form.decode("utf-8")
|
|
print(f"POST {path} {form}")
|
|
try:
|
|
body = json.loads(form)
|
|
hook = body["data"]["webhook"]
|
|
if hook["event"] == "PATCHSET_RECEIVED":
|
|
self.announce_patch(hook["patchset"])
|
|
handler.send_response(200)
|
|
handler.end_headers()
|
|
handler.wfile.write(b"")
|
|
return
|
|
|
|
if hook["event"] == "EMAIL_RECEIVED":
|
|
if hook["email"]["patchset_update"] == ["APPLIED"]:
|
|
self.announce_apply(hook["email"])
|
|
handler.send_response(200)
|
|
handler.end_headers()
|
|
handler.wfile.write(b"")
|
|
return
|
|
|
|
raise ValueError(f"unsupported webhook: {hook}")
|
|
|
|
except Exception as e:
|
|
traceback.print_exception(e)
|
|
handler.send_response(400)
|
|
handler.end_headers()
|
|
handler.wfile.write(b"Bad request\n")
|
|
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
|
|
Class = Sourcehut
|