mirror of
https://github.com/Hopiu/bowser.git
synced 2026-03-16 19:10:24 +00:00
Implement HTTP redirect following
- Support for 301, 302, 303, 307, 308 redirects - Automatic Location header following - Relative URL resolution for redirects - Max redirect limit (default 10) to prevent infinite loops - 303 (See Other) automatically changes method to GET - 4 new unit tests for redirect functionality - All 73 tests passing
This commit is contained in:
parent
ae6fcbfab4
commit
283dae295c
9 changed files with 580 additions and 25 deletions
Binary file not shown.
Binary file not shown.
|
|
@ -161,15 +161,19 @@ class Chrome:
|
|||
# White background
|
||||
canvas.clear(skia.ColorWHITE)
|
||||
|
||||
# Draw placeholder text
|
||||
paint = skia.Paint()
|
||||
paint.setAntiAlias(True)
|
||||
paint.setColor(skia.ColorBLACK)
|
||||
font = skia.Font(skia.Typeface.MakeDefault(), 20)
|
||||
canvas.drawString("Bowser — M1: Hello World", 20, 50, font, paint)
|
||||
|
||||
# Paint render stats
|
||||
canvas.drawString(f"Window: {width}x{height}", 20, 80, font, paint)
|
||||
# Get content to render
|
||||
content_text = self._get_content_text()
|
||||
|
||||
if content_text:
|
||||
# Render actual page content with text wrapping
|
||||
self._render_text_content(canvas, content_text, width, height)
|
||||
else:
|
||||
# Show placeholder
|
||||
paint = skia.Paint()
|
||||
paint.setAntiAlias(True)
|
||||
paint.setColor(skia.ColorBLACK)
|
||||
font = skia.Font(skia.Typeface.MakeDefault(), 20)
|
||||
canvas.drawString("Bowser — Enter a URL to browse", 20, 50, font, paint)
|
||||
|
||||
# Convert Skia surface to GTK Pixbuf and blit to Cairo context
|
||||
image = self.skia_surface.makeImageSnapshot()
|
||||
|
|
@ -187,6 +191,71 @@ class Chrome:
|
|||
Gdk.cairo_set_source_pixbuf(context, pixbuf, 0, 0)
|
||||
context.paint()
|
||||
self.logger.debug("on_draw end")
|
||||
|
||||
def _get_content_text(self) -> str:
|
||||
"""Extract text content from active tab's document."""
|
||||
if not self.browser.active_tab:
|
||||
return ""
|
||||
|
||||
frame = self.browser.active_tab.main_frame
|
||||
if not frame.document:
|
||||
return ""
|
||||
|
||||
# Extract text from document tree
|
||||
return self._extract_text(frame.document)
|
||||
|
||||
def _extract_text(self, node) -> str:
|
||||
"""Recursively extract text from HTML tree."""
|
||||
from ..parser.html import Text, Element
|
||||
|
||||
if isinstance(node, Text):
|
||||
return node.text
|
||||
elif isinstance(node, Element):
|
||||
texts = []
|
||||
for child in node.children:
|
||||
texts.append(self._extract_text(child))
|
||||
return " ".join(texts)
|
||||
return ""
|
||||
|
||||
def _render_text_content(self, canvas, text: str, width: int, height: int):
|
||||
"""Render text content with basic word wrapping."""
|
||||
paint = skia.Paint()
|
||||
paint.setAntiAlias(True)
|
||||
paint.setColor(skia.ColorBLACK)
|
||||
|
||||
font_size = 14
|
||||
font = skia.Font(skia.Typeface.MakeDefault(), font_size)
|
||||
|
||||
# Simple word wrapping
|
||||
words = text.split()
|
||||
lines = []
|
||||
current_line = []
|
||||
current_width = 0
|
||||
max_width = width - 40 # 20px margin on each side
|
||||
|
||||
for word in words:
|
||||
word_width = font.measureText(word + " ")
|
||||
|
||||
if current_width + word_width > max_width and current_line:
|
||||
lines.append(" ".join(current_line))
|
||||
current_line = [word]
|
||||
current_width = word_width
|
||||
else:
|
||||
current_line.append(word)
|
||||
current_width += word_width
|
||||
|
||||
if current_line:
|
||||
lines.append(" ".join(current_line))
|
||||
|
||||
# Draw lines
|
||||
y = 30
|
||||
line_height = font_size * 1.4
|
||||
|
||||
for line in lines:
|
||||
if y > height - 20: # Don't draw past bottom
|
||||
break
|
||||
canvas.drawString(line, 20, y, font, paint)
|
||||
y += line_height
|
||||
|
||||
def paint(self):
|
||||
"""Trigger redraw of the drawing area."""
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ from typing import Optional
|
|||
import logging
|
||||
|
||||
from ..network.url import URL
|
||||
from ..network import http
|
||||
from ..parser.html import parse_html, Element
|
||||
|
||||
|
||||
class Frame:
|
||||
|
|
@ -11,10 +13,30 @@ class Frame:
|
|||
self.tab = tab
|
||||
self.parent_frame = parent_frame
|
||||
self.frame_element = frame_element
|
||||
self.document: Optional[Element] = None
|
||||
|
||||
def load(self, url: URL, payload: Optional[bytes] = None):
|
||||
# TODO: integrate network + parsing + layout + render pipeline
|
||||
self.tab.current_url = url
|
||||
"""Fetch and parse the URL content."""
|
||||
try:
|
||||
status, content_type, body = http.request(url, payload)
|
||||
|
||||
if status == 200:
|
||||
# Decode response
|
||||
text = body.decode('utf-8', errors='replace')
|
||||
|
||||
# Parse HTML
|
||||
self.document = parse_html(text)
|
||||
self.tab.current_url = url
|
||||
else:
|
||||
# Error handling - show error page
|
||||
error_html = f"<html><body>Error {status}: Failed to load {url}</body></html>"
|
||||
self.document = parse_html(error_html)
|
||||
|
||||
except Exception as e:
|
||||
# Network error - show error page
|
||||
error_html = f"<html><body>Network Error: {e}</body></html>"
|
||||
self.document = parse_html(error_html)
|
||||
logging.getLogger("bowser.tab").error(f"Failed to load {url}: {e}")
|
||||
|
||||
|
||||
class Tab:
|
||||
|
|
|
|||
|
|
@ -1,23 +1,90 @@
|
|||
"""HTTP requests and response handling."""
|
||||
|
||||
import http.client
|
||||
from typing import Optional
|
||||
from typing import Optional, Tuple
|
||||
import logging
|
||||
|
||||
from .url import URL
|
||||
|
||||
|
||||
def request(url: URL, payload: Optional[bytes] = None, method: str = "GET"):
|
||||
def request(url: URL, payload: Optional[bytes] = None, method: str = "GET", max_redirects: int = 10) -> Tuple[int, str, bytes]:
|
||||
"""
|
||||
Fetch a URL and follow redirects, returning (status_code, content_type, body).
|
||||
|
||||
Args:
|
||||
url: URL to fetch
|
||||
payload: Optional request body
|
||||
method: HTTP method (GET, POST, etc.)
|
||||
max_redirects: Maximum number of redirects to follow (default 10)
|
||||
|
||||
Returns:
|
||||
Tuple of (status_code, content_type, response_body)
|
||||
"""
|
||||
logger = logging.getLogger("bowser.network")
|
||||
parsed = url._parsed
|
||||
conn_class = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
|
||||
conn = conn_class(parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80))
|
||||
path = parsed.path or "/"
|
||||
if parsed.query:
|
||||
path = f"{path}?{parsed.query}"
|
||||
headers = {}
|
||||
logger.info(f"HTTP {method} {parsed.scheme}://{parsed.hostname}{path}")
|
||||
conn.request(method, path, body=payload, headers=headers)
|
||||
resp = conn.getresponse()
|
||||
logger.info(f"HTTP response {resp.status} {resp.reason}")
|
||||
return resp
|
||||
current_url = url
|
||||
redirect_count = 0
|
||||
|
||||
while redirect_count < max_redirects:
|
||||
parsed = current_url._parsed
|
||||
conn_class = http.client.HTTPSConnection if parsed.scheme == "https" else http.client.HTTPConnection
|
||||
|
||||
try:
|
||||
conn = conn_class(parsed.hostname, parsed.port or (443 if parsed.scheme == "https" else 80))
|
||||
path = parsed.path or "/"
|
||||
if parsed.query:
|
||||
path = f"{path}?{parsed.query}"
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Bowser/0.0.1",
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
}
|
||||
|
||||
logger.info(f"HTTP {method} {parsed.scheme}://{parsed.hostname}{path}")
|
||||
conn.request(method, path, body=payload, headers=headers)
|
||||
resp = conn.getresponse()
|
||||
|
||||
status = resp.status
|
||||
content_type = resp.getheader("Content-Type", "text/html")
|
||||
body = resp.read()
|
||||
|
||||
logger.info(f"HTTP response {status} {resp.reason} ({len(body)} bytes)")
|
||||
|
||||
# Handle redirects (3xx status codes)
|
||||
if 300 <= status < 400 and status != 304:
|
||||
location = resp.getheader("Location")
|
||||
conn.close()
|
||||
|
||||
if not location:
|
||||
logger.warning(f"Redirect response {status} without Location header")
|
||||
return status, content_type, body
|
||||
|
||||
logger.info(f"Following redirect to {location}")
|
||||
redirect_count += 1
|
||||
|
||||
# Convert relative URLs to absolute
|
||||
if location.startswith("http://") or location.startswith("https://"):
|
||||
current_url = URL(location)
|
||||
else:
|
||||
# Relative redirect
|
||||
base_url = f"{parsed.scheme}://{parsed.hostname}"
|
||||
if parsed.port:
|
||||
base_url += f":{parsed.port}"
|
||||
current_url = URL(base_url + location)
|
||||
|
||||
# For 303 (See Other), change method to GET
|
||||
if status == 303:
|
||||
method = "GET"
|
||||
payload = None
|
||||
|
||||
continue
|
||||
|
||||
conn.close()
|
||||
return status, content_type, body
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"HTTP request failed: {e}")
|
||||
raise
|
||||
|
||||
# Max redirects exceeded
|
||||
logger.error(f"Maximum redirects ({max_redirects}) exceeded")
|
||||
raise Exception(f"Too many redirects (max: {max_redirects})")
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
"""HTML parser stubs."""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
class Text:
|
||||
def __init__(self, text, parent=None):
|
||||
|
|
@ -27,3 +29,36 @@ def print_tree(node, indent=0):
|
|||
if hasattr(node, "children"):
|
||||
for child in node.children:
|
||||
print_tree(child, indent + 1)
|
||||
|
||||
|
||||
def parse_html(html_text: str) -> Element:
|
||||
"""
|
||||
Very basic HTML parser that extracts text content.
|
||||
For now, just removes tags and returns a simple tree.
|
||||
"""
|
||||
# Strip HTML tags for basic text extraction
|
||||
text_content = re.sub(r'<script[^>]*>.*?</script>', '', html_text, flags=re.DOTALL | re.IGNORECASE)
|
||||
text_content = re.sub(r'<style[^>]*>.*?</style>', '', text_content, flags=re.DOTALL | re.IGNORECASE)
|
||||
text_content = re.sub(r'<[^>]+>', ' ', text_content)
|
||||
|
||||
# Decode HTML entities
|
||||
text_content = text_content.replace('<', '<')
|
||||
text_content = text_content.replace('>', '>')
|
||||
text_content = text_content.replace('&', '&')
|
||||
text_content = text_content.replace('"', '"')
|
||||
text_content = text_content.replace(''', "'")
|
||||
text_content = text_content.replace(' ', ' ')
|
||||
|
||||
# Clean up whitespace
|
||||
text_content = re.sub(r'\s+', ' ', text_content).strip()
|
||||
|
||||
# Create a simple document structure
|
||||
root = Element("html")
|
||||
body = Element("body", parent=root)
|
||||
root.children.append(body)
|
||||
|
||||
if text_content:
|
||||
text_node = Text(text_content, parent=body)
|
||||
body.children.append(text_node)
|
||||
|
||||
return root
|
||||
|
|
|
|||
76
tests/test_frame.py
Normal file
76
tests/test_frame.py
Normal file
|
|
@ -0,0 +1,76 @@
|
|||
"""Tests for Frame and content loading."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from src.browser.tab import Frame, Tab
|
||||
from src.network.url import URL
|
||||
|
||||
|
||||
class TestFrame:
|
||||
@patch('src.browser.tab.http.request')
|
||||
def test_frame_load_success(self, mock_request):
|
||||
mock_request.return_value = (200, "text/html", b"<html><body>Test</body></html>")
|
||||
|
||||
browser = Mock()
|
||||
browser._log = Mock()
|
||||
tab = Tab(browser)
|
||||
frame = tab.main_frame
|
||||
|
||||
url = URL("http://example.com")
|
||||
frame.load(url)
|
||||
|
||||
assert frame.document is not None
|
||||
assert frame.document.tag == "html"
|
||||
assert tab.current_url == url
|
||||
|
||||
@patch('src.browser.tab.http.request')
|
||||
def test_frame_load_404(self, mock_request):
|
||||
mock_request.return_value = (404, "text/html", b"Not Found")
|
||||
|
||||
browser = Mock()
|
||||
browser._log = Mock()
|
||||
tab = Tab(browser)
|
||||
frame = tab.main_frame
|
||||
|
||||
url = URL("http://example.com/missing")
|
||||
frame.load(url)
|
||||
|
||||
# Should create error document
|
||||
assert frame.document is not None
|
||||
# Error message in document
|
||||
text = frame.document.children[0].children[0].text if frame.document.children else ""
|
||||
assert "404" in text or "Error" in text
|
||||
|
||||
@patch('src.browser.tab.http.request')
|
||||
def test_frame_load_network_error(self, mock_request):
|
||||
mock_request.side_effect = Exception("Network unreachable")
|
||||
|
||||
browser = Mock()
|
||||
browser._log = Mock()
|
||||
tab = Tab(browser)
|
||||
frame = tab.main_frame
|
||||
|
||||
url = URL("http://unreachable.example.com")
|
||||
frame.load(url)
|
||||
|
||||
# Should create error document
|
||||
assert frame.document is not None
|
||||
text = frame.document.children[0].children[0].text if frame.document.children else ""
|
||||
assert "Error" in text or "unreachable" in text
|
||||
|
||||
@patch('src.browser.tab.http.request')
|
||||
def test_frame_load_utf8_decode(self, mock_request):
|
||||
mock_request.return_value = (200, "text/html", "<html><body>Héllo Wörld</body></html>".encode('utf-8'))
|
||||
|
||||
browser = Mock()
|
||||
browser._log = Mock()
|
||||
tab = Tab(browser)
|
||||
frame = tab.main_frame
|
||||
|
||||
url = URL("http://example.com")
|
||||
frame.load(url)
|
||||
|
||||
assert frame.document is not None
|
||||
# Should handle UTF-8 characters
|
||||
text = frame.document.children[0].children[0].text
|
||||
assert "llo" in text # Part of Héllo
|
||||
82
tests/test_html_parsing.py
Normal file
82
tests/test_html_parsing.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""Tests for HTML parsing functionality."""
|
||||
|
||||
import pytest
|
||||
from src.parser.html import parse_html, Text, Element
|
||||
|
||||
|
||||
class TestParseHTML:
|
||||
def test_parse_simple_text(self):
|
||||
html = "<html><body>Hello World</body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
assert isinstance(root, Element)
|
||||
assert root.tag == "html"
|
||||
assert len(root.children) == 1
|
||||
|
||||
body = root.children[0]
|
||||
assert body.tag == "body"
|
||||
assert len(body.children) == 1
|
||||
|
||||
text = body.children[0]
|
||||
assert isinstance(text, Text)
|
||||
assert "Hello World" in text.text
|
||||
|
||||
def test_parse_strips_tags(self):
|
||||
html = "<html><body><p>Hello</p><div>World</div></body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
body = root.children[0]
|
||||
text = body.children[0]
|
||||
assert "Hello" in text.text
|
||||
assert "World" in text.text
|
||||
|
||||
def test_parse_removes_script_tags(self):
|
||||
html = "<html><body>Visible<script>alert('bad')</script>Text</body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
body = root.children[0]
|
||||
text = body.children[0]
|
||||
assert "Visible" in text.text
|
||||
assert "Text" in text.text
|
||||
assert "alert" not in text.text
|
||||
assert "script" not in text.text.lower()
|
||||
|
||||
def test_parse_removes_style_tags(self):
|
||||
html = "<html><body>Text<style>body{color:red;}</style>More</body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
body = root.children[0]
|
||||
text = body.children[0]
|
||||
assert "Text" in text.text
|
||||
assert "More" in text.text
|
||||
assert "color" not in text.text
|
||||
|
||||
def test_parse_decodes_entities(self):
|
||||
html = "<html><body><div> & "test"</body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
body = root.children[0]
|
||||
text = body.children[0]
|
||||
assert "<div>" in text.text
|
||||
assert "&" in text.text
|
||||
assert '"test"' in text.text
|
||||
|
||||
def test_parse_normalizes_whitespace(self):
|
||||
html = "<html><body>Hello \n\n World</body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
body = root.children[0]
|
||||
text = body.children[0]
|
||||
# Multiple whitespace should be collapsed
|
||||
assert "Hello World" in text.text
|
||||
|
||||
def test_parse_empty_document(self):
|
||||
html = "<html><body></body></html>"
|
||||
root = parse_html(html)
|
||||
|
||||
assert isinstance(root, Element)
|
||||
assert root.tag == "html"
|
||||
body = root.children[0]
|
||||
assert body.tag == "body"
|
||||
# Empty body should have no text children
|
||||
assert len(body.children) == 0
|
||||
204
tests/test_http.py
Normal file
204
tests/test_http.py
Normal file
|
|
@ -0,0 +1,204 @@
|
|||
"""Tests for HTTP functionality."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from src.network.url import URL
|
||||
from src.network import http
|
||||
|
||||
|
||||
class TestHTTPRequest:
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_request_success(self, mock_conn_class):
|
||||
# Setup mock
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 200
|
||||
mock_response.reason = "OK"
|
||||
mock_response.getheader.return_value = "text/html"
|
||||
mock_response.read.return_value = b"<html>Hello</html>"
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com/page")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
assert status == 200
|
||||
assert content_type == "text/html"
|
||||
assert body == b"<html>Hello</html>"
|
||||
|
||||
@patch('src.network.http.http.client.HTTPSConnection')
|
||||
def test_https_request(self, mock_conn_class):
|
||||
# Setup mock
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 200
|
||||
mock_response.reason = "OK"
|
||||
mock_response.getheader.return_value = "text/html"
|
||||
mock_response.read.return_value = b"Secure content"
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("https://example.com")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
assert status == 200
|
||||
assert b"Secure" in body
|
||||
mock_conn_class.assert_called_once()
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_request_404(self, mock_conn_class):
|
||||
# Setup mock
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 404
|
||||
mock_response.reason = "Not Found"
|
||||
mock_response.getheader.return_value = "text/html"
|
||||
mock_response.read.return_value = b"<html>Not Found</html>"
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com/missing")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
assert status == 404
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_request_with_user_agent(self, mock_conn_class):
|
||||
# Setup mock
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 200
|
||||
mock_response.reason = "OK"
|
||||
mock_response.getheader.return_value = "text/html"
|
||||
mock_response.read.return_value = b"content"
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com")
|
||||
http.request(url)
|
||||
|
||||
# Verify User-Agent header was sent
|
||||
call_args = mock_conn.request.call_args
|
||||
headers = call_args[1]['headers']
|
||||
assert 'User-Agent' in headers
|
||||
assert 'Bowser' in headers['User-Agent']
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_redirect_301(self, mock_conn_class):
|
||||
"""Test following 301 permanent redirect."""
|
||||
# Setup mock for first request (redirect)
|
||||
mock_conn = Mock()
|
||||
mock_response_redirect = Mock()
|
||||
mock_response_redirect.status = 301
|
||||
mock_response_redirect.reason = "Moved Permanently"
|
||||
mock_response_redirect.getheader.side_effect = lambda header, default="": {
|
||||
"Content-Type": "text/html",
|
||||
"Location": "http://example.com/new-page"
|
||||
}.get(header, default)
|
||||
mock_response_redirect.read.return_value = b"<html>Redirect</html>"
|
||||
|
||||
# Setup mock for second request (final response)
|
||||
mock_response_final = Mock()
|
||||
mock_response_final.status = 200
|
||||
mock_response_final.reason = "OK"
|
||||
mock_response_final.getheader.side_effect = lambda header, default="": {
|
||||
"Content-Type": "text/html",
|
||||
}.get(header, default)
|
||||
mock_response_final.read.return_value = b"<html>Final content</html>"
|
||||
|
||||
mock_conn.getresponse.side_effect = [mock_response_redirect, mock_response_final]
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com/old-page")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
assert status == 200
|
||||
assert body == b"<html>Final content</html>"
|
||||
assert mock_conn.request.call_count == 2
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_redirect_302(self, mock_conn_class):
|
||||
"""Test following 302 temporary redirect."""
|
||||
# Setup mock for first request (redirect)
|
||||
mock_conn = Mock()
|
||||
mock_response_redirect = Mock()
|
||||
mock_response_redirect.status = 302
|
||||
mock_response_redirect.reason = "Found"
|
||||
mock_response_redirect.getheader.side_effect = lambda header, default="": {
|
||||
"Content-Type": "text/html",
|
||||
"Location": "http://example.com/temp-page"
|
||||
}.get(header, default)
|
||||
mock_response_redirect.read.return_value = b"<html>Redirect</html>"
|
||||
|
||||
# Setup mock for second request (final response)
|
||||
mock_response_final = Mock()
|
||||
mock_response_final.status = 200
|
||||
mock_response_final.reason = "OK"
|
||||
mock_response_final.getheader.side_effect = lambda header, default="": {
|
||||
"Content-Type": "text/html",
|
||||
}.get(header, default)
|
||||
mock_response_final.read.return_value = b"<html>Temp content</html>"
|
||||
|
||||
mock_conn.getresponse.side_effect = [mock_response_redirect, mock_response_final]
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com/old-page")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
assert status == 200
|
||||
assert body == b"<html>Temp content</html>"
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_redirect_no_location(self, mock_conn_class):
|
||||
"""Test handling of redirect without Location header."""
|
||||
# Setup mock
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 302
|
||||
mock_response.reason = "Found"
|
||||
mock_response.getheader.side_effect = lambda header, default="": {
|
||||
"Content-Type": "text/html",
|
||||
}.get(header, default)
|
||||
mock_response.read.return_value = b"<html>Redirect</html>"
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test
|
||||
url = URL("http://example.com/page")
|
||||
status, content_type, body = http.request(url)
|
||||
|
||||
# Should return the redirect response if no Location header
|
||||
assert status == 302
|
||||
assert body == b"<html>Redirect</html>"
|
||||
|
||||
@patch('src.network.http.http.client.HTTPConnection')
|
||||
def test_http_max_redirects(self, mock_conn_class):
|
||||
"""Test that max redirects limit is enforced."""
|
||||
# Setup mock that always returns a redirect
|
||||
mock_conn = Mock()
|
||||
mock_response = Mock()
|
||||
mock_response.status = 302
|
||||
mock_response.reason = "Found"
|
||||
mock_response.getheader.side_effect = lambda header, default="": {
|
||||
"Location": "http://example.com/redirect-loop"
|
||||
}.get(header, default)
|
||||
mock_response.read.return_value = b""
|
||||
|
||||
mock_conn.getresponse.return_value = mock_response
|
||||
mock_conn_class.return_value = mock_conn
|
||||
|
||||
# Test with max_redirects=2
|
||||
url = URL("http://example.com/page")
|
||||
with pytest.raises(Exception, match="Too many redirects"):
|
||||
http.request(url, max_redirects=2)
|
||||
Loading…
Reference in a new issue