bowser/tests/test_tasks.py

235 lines
7.1 KiB
Python
Raw Normal View History

"""Tests for the async task queue system."""
import pytest
import time
import threading
from unittest.mock import Mock, patch
class TestTaskQueue:
"""Tests for the TaskQueue class."""
def test_task_queue_singleton(self):
"""Test that TaskQueue is a singleton."""
from src.network.tasks import TaskQueue
# Reset singleton for clean test
TaskQueue.reset_instance()
q1 = TaskQueue()
q2 = TaskQueue()
assert q1 is q2
# Clean up
TaskQueue.reset_instance()
def test_submit_task_returns_id(self):
"""Test that submit returns a task ID."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
# Mock GLib.idle_add to avoid GTK dependency
with patch('src.network.tasks.GLib') as mock_glib:
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
task_id = queue.submit(lambda: 42)
# Task ID should be non-negative (or -1 for cached)
assert isinstance(task_id, int)
# Wait for task to complete
time.sleep(0.1)
TaskQueue.reset_instance()
def test_task_executes_function(self):
"""Test that submitted tasks are executed."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
result = []
event = threading.Event()
def task():
result.append("executed")
return "done"
with patch('src.network.tasks.GLib') as mock_glib:
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
queue.submit(task)
# Wait for task to complete
time.sleep(0.2)
assert "executed" in result
TaskQueue.reset_instance()
def test_on_complete_callback(self):
"""Test that on_complete callback is called with result."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
results = []
def task():
return 42
def on_complete(result):
results.append(result)
with patch('src.network.tasks.GLib') as mock_glib:
# Make idle_add execute immediately
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
queue.submit(task, on_complete=on_complete)
# Wait for task to complete (may need more time under load)
for _ in range(10):
if 42 in results:
break
time.sleep(0.05)
assert 42 in results
TaskQueue.reset_instance()
def test_on_error_callback(self):
"""Test that on_error callback is called on exception."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
errors = []
def failing_task():
raise ValueError("Test error")
def on_error(e):
errors.append(str(e))
with patch('src.network.tasks.GLib') as mock_glib:
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
queue.submit(failing_task, on_error=on_error)
# Wait for task to complete (may need more time under load)
for _ in range(10):
if len(errors) == 1:
break
time.sleep(0.05)
assert len(errors) == 1
assert "Test error" in errors[0]
TaskQueue.reset_instance()
def test_cancel_task(self):
"""Test task cancellation."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
result = []
def slow_task():
time.sleep(1)
result.append("completed")
return True
with patch('src.network.tasks.GLib') as mock_glib:
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
task_id = queue.submit(slow_task)
# Cancel immediately
cancelled = queue.cancel(task_id)
# May or may not be cancellable depending on timing
assert isinstance(cancelled, bool)
# Wait briefly
time.sleep(0.1)
TaskQueue.reset_instance()
def test_pending_count(self):
"""Test pending task count."""
from src.network.tasks import TaskQueue
TaskQueue.reset_instance()
queue = TaskQueue()
initial_count = queue.pending_count
assert initial_count >= 0
TaskQueue.reset_instance()
class TestAsyncImageLoading:
"""Tests for async image loading."""
def test_load_image_async_cached(self):
"""Test that cached images return -1 (no task needed)."""
from src.network.images import load_image_async, load_image, ImageCache
# Clear cache
ImageCache().clear()
# Load an image synchronously first (to cache it)
data_url = (
"data:image/png;base64,"
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=="
)
image = load_image(data_url)
assert image is not None
# Now load async - should hit cache and return -1 (no task)
# We don't need a callback for this test - just checking return value
task_id = load_image_async(data_url, on_complete=None)
# Cached loads return -1 (no task created)
assert task_id == -1
def test_load_image_async_uncached(self):
"""Test that uncached images create tasks."""
from src.network.images import load_image_async, ImageCache
from src.network.tasks import TaskQueue
# Clear cache
ImageCache().clear()
TaskQueue.reset_instance()
# Use a data URL that's not cached
data_url = (
"data:image/png;base64,"
"iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAYAAABytg0kAAAADklEQVR42mP8z8DwHwYAAQYBA/5h2aw4AAAAAElFTkSuQmCC"
)
# Patch GLib.idle_add to call callbacks immediately (no GTK main loop in tests)
with patch('src.network.tasks.GLib') as mock_glib:
mock_glib.idle_add = lambda cb, *args: cb(*args) if args else cb()
# Without a callback, it just submits the task
task_id = load_image_async(data_url, on_complete=None)
# Should create a task (non-negative ID)
assert task_id >= 0
# Wait for task to complete
time.sleep(0.3)
# Image should now be cached
assert ImageCache().has(data_url)
TaskQueue.reset_instance()