mirror of
https://github.com/Hopiu/bowser.git
synced 2026-03-16 19:10:24 +00:00
- **Link Detection and Navigation**: Added functionality to detect if a mouse click falls within a link area. If a link is clicked, the browser now navigates to the corresponding URL while logging the action. This also includes handling relative URLs based on the current page context. - **Line Layout Enhancements**: The `LayoutLine` class now includes optional attributes for color and href, allowing links to maintain their designated colors in the rendered output. - **Color Parsing**: Implemented a new `_parse_color` method in the `RenderPipeline` class to convert various color formats (hex and named colors) to Skia-compatible values. This ensures that default link colors are correctly applied and that extremely light colors are rendered as black for visibility. - **Rendering Links**: During the rendering process, links in the text layout are now rendered with their specified colors, and an underline is drawn under links to indicate interactivity. - **Document Layout Updates**: The document parsing system has been updated to extract link information correctly while preserving text hierarchy. - **Tests**: A comprehensive suite of tests has been added, including tests for link parsing, layout characteristics, styling application, and default color handling for links.
233 lines
6.5 KiB
Python
233 lines
6.5 KiB
Python
"""Tests for the async task queue system."""
|
|
|
|
import time
|
|
import threading
|
|
from unittest.mock import patch
|
|
|
|
|
|
class TestTaskQueue:
|
|
"""Tests for the TaskQueue class."""
|
|
|
|
def test_task_queue_singleton(self):
|
|
"""Test that TaskQueue is a singleton."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
# Reset singleton for clean test
|
|
TaskQueue.reset_instance()
|
|
|
|
q1 = TaskQueue()
|
|
q2 = TaskQueue()
|
|
|
|
assert q1 is q2
|
|
|
|
# Clean up
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_submit_task_returns_id(self):
|
|
"""Test that submit returns a task ID."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
# Mock GLib.idle_add to avoid GTK dependency
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
task_id = queue.submit(lambda: 42)
|
|
|
|
# Task ID should be non-negative (or -1 for cached)
|
|
assert isinstance(task_id, int)
|
|
|
|
# Wait for task to complete
|
|
time.sleep(0.1)
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_task_executes_function(self):
|
|
"""Test that submitted tasks are executed."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
result = []
|
|
threading.Event()
|
|
|
|
def task():
|
|
result.append("executed")
|
|
return "done"
|
|
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
queue.submit(task)
|
|
|
|
# Wait for task to complete
|
|
time.sleep(0.2)
|
|
|
|
assert "executed" in result
|
|
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_on_complete_callback(self):
|
|
"""Test that on_complete callback is called with result."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
results = []
|
|
|
|
def task():
|
|
return 42
|
|
|
|
def on_complete(result):
|
|
results.append(result)
|
|
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
# Make idle_add execute immediately
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
queue.submit(task, on_complete=on_complete)
|
|
|
|
# Wait for task to complete (may need more time under load)
|
|
for _ in range(10):
|
|
if 42 in results:
|
|
break
|
|
time.sleep(0.05)
|
|
|
|
assert 42 in results
|
|
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_on_error_callback(self):
|
|
"""Test that on_error callback is called on exception."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
errors = []
|
|
|
|
def failing_task():
|
|
raise ValueError("Test error")
|
|
|
|
def on_error(e):
|
|
errors.append(str(e))
|
|
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
queue.submit(failing_task, on_error=on_error)
|
|
|
|
# Wait for task to complete (may need more time under load)
|
|
for _ in range(10):
|
|
if len(errors) == 1:
|
|
break
|
|
time.sleep(0.05)
|
|
|
|
assert len(errors) == 1
|
|
assert "Test error" in errors[0]
|
|
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_cancel_task(self):
|
|
"""Test task cancellation."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
result = []
|
|
|
|
def slow_task():
|
|
time.sleep(1)
|
|
result.append("completed")
|
|
return True
|
|
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
task_id = queue.submit(slow_task)
|
|
|
|
# Cancel immediately
|
|
cancelled = queue.cancel(task_id)
|
|
|
|
# May or may not be cancellable depending on timing
|
|
assert isinstance(cancelled, bool)
|
|
|
|
# Wait briefly
|
|
time.sleep(0.1)
|
|
|
|
TaskQueue.reset_instance()
|
|
|
|
def test_pending_count(self):
|
|
"""Test pending task count."""
|
|
from src.network.tasks import TaskQueue
|
|
|
|
TaskQueue.reset_instance()
|
|
queue = TaskQueue()
|
|
|
|
initial_count = queue.pending_count
|
|
assert initial_count >= 0
|
|
|
|
TaskQueue.reset_instance()
|
|
|
|
|
|
class TestAsyncImageLoading:
|
|
"""Tests for async image loading."""
|
|
|
|
def test_load_image_async_cached(self):
|
|
"""Test that cached images return -1 (no task needed)."""
|
|
from src.network.images import load_image_async, load_image, ImageCache
|
|
|
|
# Clear cache
|
|
ImageCache().clear()
|
|
|
|
# Load an image synchronously first (to cache it)
|
|
data_url = (
|
|
"data:image/png;base64,"
|
|
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
|
|
)
|
|
image = load_image(data_url)
|
|
assert image is not None
|
|
|
|
# Now load async - should hit cache and return -1 (no task)
|
|
# We don't need a callback for this test - just checking return value
|
|
task_id = load_image_async(data_url, on_complete=None)
|
|
|
|
# Cached loads return -1 (no task created)
|
|
assert task_id == -1
|
|
|
|
def test_load_image_async_uncached(self):
|
|
"""Test that uncached images create tasks."""
|
|
from src.network.images import load_image_async, ImageCache
|
|
from src.network.tasks import TaskQueue
|
|
|
|
# Clear cache
|
|
ImageCache().clear()
|
|
TaskQueue.reset_instance()
|
|
|
|
# Use a data URL that's not cached
|
|
data_url = (
|
|
"data:image/png;base64,"
|
|
"iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAYAAABytg0kAAAADklEQVR42mP8z8DwHwYAAQYBA/5h2aw4AAAAAElFTkSuQmCC"
|
|
)
|
|
|
|
# Patch GLib.idle_add to call callbacks immediately (no GTK main loop in tests)
|
|
with patch('src.network.tasks.GLib') as mock_glib:
|
|
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
|
|
|
|
# Without a callback, it just submits the task
|
|
task_id = load_image_async(data_url, on_complete=None)
|
|
|
|
# Should create a task (non-negative ID)
|
|
assert task_id >= 0
|
|
|
|
# Wait for task to complete
|
|
time.sleep(0.3)
|
|
|
|
# Image should now be cached
|
|
assert ImageCache().has(data_url)
|
|
|
|
TaskQueue.reset_instance()
|