from __future__ import annotations
import html as _html
import logging
from os.path import splitext
from urllib.parse import urlparse
from xml.etree.ElementTree import Element
import markdown
import nh3
from markdown.inlinepatterns import InlineProcessor
from mdx_oembed.oembed import OEmbedConsumer, OEmbedNoEndpoint
LOG = logging.getLogger(__name__)
# Image extensions to exclude from oEmbed processing
_IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".avif", ".webp",
".svg", ".bmp", ".tiff", ".ico",
})
# Matches Markdown image syntax with an absolute URL: 
OEMBED_LINK_RE = r"!\[([^\]]*)\]\(((?:https?:)?//[^\)]+)\)"
# Allowed HTML tags and attributes for sanitizing oEmbed responses
_SANITIZE_TAGS = {
"iframe", "video", "audio", "source", "img",
"blockquote", "div", "p", "a", "span", "figure",
}
_SANITIZE_ATTRS = {
"*": {"class", "style", "title"},
"iframe": {
"src", "width", "height", "frameborder",
"allowfullscreen", "allow", "referrerpolicy", "sandbox",
},
"video": {
"src", "width", "height", "controls",
"autoplay", "loop", "muted", "poster", "preload",
},
"audio": {
"src", "controls", "autoplay", "loop", "muted", "preload",
},
"source": {"src", "type"},
"img": {"src", "alt", "width", "height", "loading"},
"a": {"href", "target"},
}
def _is_image_url(url: str) -> bool:
"""Check if a URL points to an image based on its path extension."""
try:
path = urlparse(url).path
_, ext = splitext(path)
return ext.lower() in _IMAGE_EXTENSIONS
except Exception:
return False
def _sanitize_html(html: str) -> str:
"""Sanitize oEmbed HTML to prevent XSS."""
return nh3.clean(html, tags=_SANITIZE_TAGS, attributes=_SANITIZE_ATTRS)
class OEmbedLinkPattern(InlineProcessor):
"""Inline processor that replaces Markdown image links with oEmbed content."""
def __init__(
self,
pattern: str,
md: markdown.Markdown | None = None,
oembed_consumer: OEmbedConsumer | None = None,
wrapper_class: str = "oembed",
) -> None:
super().__init__(pattern, md)
self.consumer = oembed_consumer
self.wrapper_class = wrapper_class
def handleMatch(self, m, data): # noqa: N802
url = m.group(2).strip()
alt = m.group(1)
# Skip image URLs — let Markdown's default image handler process them
if _is_image_url(url):
return None, None, None
html = self._get_oembed_html(url, alt)
if html is None:
return None, None, None
html = _sanitize_html(html)
if self.wrapper_class:
html = f'{html}'
# Stash raw HTML so it survives Markdown's escaping; place the
# placeholder inside an inline element that the tree-processor will
# later replace with the real HTML.
placeholder = self.md.htmlStash.store(html)
el = Element("span")
el.text = placeholder
return el, m.start(0), m.end(0)
def _get_oembed_html(self, url: str, alt: str = "") -> str | None:
"""Fetch oEmbed HTML for a URL, handling different response types."""
if self.consumer is None:
LOG.warning("No oEmbed consumer configured")
return None
try:
response = self.consumer.embed(url)
except OEmbedNoEndpoint:
LOG.warning("No oEmbed endpoint for URL: %s", url)
return None
except Exception:
LOG.exception("Error fetching oEmbed for URL: %s", url)
return None
# oEmbed 'video' and 'rich' types include an 'html' field
html = response.get("html")
if html:
return html
# oEmbed 'photo' type — construct an
tag
photo_url = response.get("url")
if photo_url:
width = response.get("width", "")
height = response.get("height", "")
return (
f'
'
)
LOG.warning("oEmbed response for %s has no 'html' or 'url' field", url)
return None