Refactor Dockerfile and update tests for improved functionality

- Changed Chromium installation method in Dockerfile to use system packages directly, enhancing reliability and reducing complexity.
- Updated test_registry_action_search_google.py to fix an issue with retrieving the initial URL from the browser state.
- Streamlined test_sync_agent_events.py by removing outdated test cases and improving clarity in event validation.
This commit is contained in:
Magnus Müller
2025-08-26 23:42:11 -07:00
parent b56fb5d7d6
commit 72bb0de17a
3 changed files with 457 additions and 611 deletions

View File

@@ -153,26 +153,22 @@ RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$T
&& python --version \
) | tee -a /VERSION.txt
# Install Chromium browser using temporary playwright installation
# Install Chromium browser directly from system packages
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked,id=apt-$TARGETARCH$TARGETVARIANT \
--mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
echo "[+] Installing chromium browser via temporary playwright..." \
echo "[+] Installing chromium browser from system packages..." \
&& apt-get update -qq \
&& apt-get install -y --no-install-recommends \
chromium \
fonts-unifont \
fonts-ubuntu \
fonts-liberation \
fonts-dejavu-core \
fonts-freefont-ttf \
&& uv pip install playwright \
&& playwright install chromium --no-shell \
fonts-noto-core \
&& rm -rf /var/lib/apt/lists/* \
&& export CHROME_BINARY="$(python -c 'from playwright.sync_api import sync_playwright; print(sync_playwright().start().chromium.executable_path)')" \
&& ln -s "$CHROME_BINARY" /usr/bin/chromium-browser \
&& ln -s "$CHROME_BINARY" /app/chromium-browser \
&& ln -s /usr/bin/chromium /usr/bin/chromium-browser \
&& ln -s /usr/bin/chromium /app/chromium-browser \
&& mkdir -p "/home/${BROWSERUSE_USER}/.config/chromium/Crash Reports/pending/" \
&& chown -R "$BROWSERUSE_USER:$BROWSERUSE_USER" "/home/${BROWSERUSE_USER}/.config" \
&& uv pip uninstall playwright -y \
&& ( \
which chromium-browser && /usr/bin/chromium-browser --version \
&& echo -e '\n\n' \
@@ -193,8 +189,7 @@ RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$T
echo "[+] Installing browser-use pip library from source..." \
&& ( \
uv sync --all-extras --locked --no-dev \
&& which browser-use \
&& browser-use --version 2>&1 \
&& python -c "import browser_use; print('browser-use installed successfully')" \
&& echo -e '\n\n' \
) | tee -a /VERSION.txt

View File

@@ -1,130 +0,0 @@
"""Test that search_google properly switches focus to the new tab."""
import asyncio
import pytest
from browser_use.browser import BrowserSession
from browser_use.browser.events import BrowserStateRequestEvent, NavigateToUrlEvent
from browser_use.tools.service import Tools
@pytest.fixture
async def browser_session():
"""Create a browser session for testing."""
session = BrowserSession()
await session.start()
yield session
await session.stop()
async def test_search_google_creates_and_focuses_new_tab(browser_session: BrowserSession):
"""Test that search_google creates a new tab and properly switches focus to it."""
# Create tools to get the search_google action
tools = Tools()
# Get initial browser state
initial_state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent(include_screenshot=False))
initial_state = await initial_state_event
initial_url = initial_state.url
initial_tabs_count = len(initial_state.tabs)
# Execute search_google action
action_result = await tools.registry.execute_action(
action_name='search_google',
params={'query': 'test search'},
browser_session=browser_session,
)
# Small delay to ensure navigation completes
await asyncio.sleep(1)
# Get browser state after search
state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent(include_screenshot=False))
state_after = await state_event
# Verify a new tab was created
assert len(state_after.tabs) == initial_tabs_count + 1, f'Expected {initial_tabs_count + 1} tabs, got {len(state_after.tabs)}'
# Verify the current URL is Google search, not about:blank
assert 'google.com/search' in state_after.url, f'Expected Google search URL, got {state_after.url}'
assert state_after.url != initial_url, f"URL didn't change from {initial_url}"
assert 'about:blank' not in state_after.url, 'Agent is still on about:blank after search_google'
# Verify the search query is in the URL
assert 'test+search' in state_after.url or 'test%20search' in state_after.url, f'Query not found in URL: {state_after.url}'
print(f'✅ Test passed! Agent correctly focused on Google tab: {state_after.url}')
async def test_navigate_with_new_tab_focuses_properly(browser_session: BrowserSession):
"""Test that NavigateToUrlEvent with new_tab=True properly switches focus."""
# Get initial state
initial_state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent(include_screenshot=False))
initial_state = await initial_state_event
initial_tabs_count = len(initial_state.tabs)
# Navigate to a URL in a new tab
nav_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url='https://example.com', new_tab=True))
await nav_event
# Small delay to ensure navigation completes
await asyncio.sleep(1)
# Get browser state after navigation
state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent(include_screenshot=False))
state_after = await state_event
# Verify a new tab was created
assert len(state_after.tabs) == initial_tabs_count + 1, f'Expected {initial_tabs_count + 1} tabs, got {len(state_after.tabs)}'
# Verify the current URL is the navigated URL
assert 'example.com' in state_after.url, f'Expected example.com URL, got {state_after.url}'
assert 'about:blank' not in state_after.url, 'Agent is still on about:blank after new tab navigation'
print(f'✅ Test passed! Agent correctly focused on new tab: {state_after.url}')
async def test_multiple_new_tabs_focus_on_latest(browser_session: BrowserSession):
"""Test that creating multiple new tabs focuses on the most recent one."""
# Navigate to first new tab
nav1_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url='https://example.com', new_tab=True))
await nav1_event
await asyncio.sleep(0.5)
# Navigate to second new tab
nav2_event = browser_session.event_bus.dispatch(NavigateToUrlEvent(url='https://github.com', new_tab=True))
await nav2_event
await asyncio.sleep(0.5)
# Get browser state
state_event = browser_session.event_bus.dispatch(BrowserStateRequestEvent(include_screenshot=False))
state = await state_event
# Should be focused on the most recent tab (github.com)
assert 'github.com' in state.url, f'Expected github.com URL, got {state.url}'
assert len(state.tabs) >= 3, f'Expected at least 3 tabs, got {len(state.tabs)}'
print(f'✅ Test passed! Agent correctly focused on latest tab: {state.url}')
if __name__ == '__main__':
# Run tests directly
async def run_all_tests():
session = BrowserSession()
await session.start()
try:
print('Running test_search_google_creates_and_focuses_new_tab...')
await test_search_google_creates_and_focuses_new_tab(session)
print('\nRunning test_navigate_with_new_tab_focuses_properly...')
await test_navigate_with_new_tab_focuses_properly(session)
print('\nRunning test_multiple_new_tabs_focus_on_latest...')
await test_multiple_new_tabs_focus_on_latest(session)
print('\n✅ All tests passed!')
finally:
await session.stop()
asyncio.run(run_all_tests())

View File

@@ -4,476 +4,457 @@ Streamlined tests for cloud events emitted during agent lifecycle.
Tests the most critical event flows without excessive duplication.
"""
import base64
import os
from unittest.mock import patch
from uuid import UUID
import pytest
from dotenv import load_dotenv
load_dotenv()
from bubus import BaseEvent
from browser_use import Agent
from browser_use.agent.cloud_events import (
MAX_TASK_LENGTH,
CreateAgentOutputFileEvent,
CreateAgentSessionEvent,
CreateAgentStepEvent,
CreateAgentTaskEvent,
UpdateAgentTaskEvent,
)
from tests.ci.conftest import create_mock_llm
class TestAgentEventLifecycle:
"""Test critical agent event flows with minimal duplication"""
@pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
async def test_agent_lifecycle_events(self, mock_llm, browser_session, event_collector, httpserver):
"""Test that all events are emitted in the correct order during agent lifecycle"""
# Setup a test page
httpserver.expect_request('/').respond_with_data('<html><body><h1>Test Page</h1></body></html>', content_type='text/html')
# Navigate to test page
await browser_session.navigate(httpserver.url_for('/'))
# Create agent (environment already set up by conftest.py)
agent = Agent(
task='Test task',
llm=mock_llm,
browser_session=browser_session,
generate_gif=False, # Don't generate GIF for faster test
)
# Subscribe to all events
agent.eventbus.on('*', event_collector.collect_event)
# Run the agent
history = await agent.run(max_steps=5)
# Verify we got a successful completion
assert history.is_done()
assert history.is_successful()
# Verify event order - should have core events
assert len(event_collector.event_order) >= 4, (
f'Expected at least 4 events, got {len(event_collector.event_order)}: {event_collector.event_order}'
)
# Check the exact order of events - they should be processed in FIFO order
assert event_collector.event_order[0] == 'CreateAgentSessionEvent'
assert event_collector.event_order[1] == 'CreateAgentTaskEvent'
assert event_collector.event_order[2] == 'CreateAgentStepEvent'
assert event_collector.event_order[3] == 'UpdateAgentTaskEvent'
# Verify events have required data
session_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentSessionEvent')
task_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentTaskEvent')
step_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentStepEvent')
update_event = next(e for e in event_collector.events if e.event_type == 'UpdateAgentTaskEvent')
# Basic validation
assert isinstance(session_event, CreateAgentSessionEvent)
assert session_event.id
assert session_event.browser_session_id == browser_session.id
assert isinstance(task_event, CreateAgentTaskEvent)
assert task_event.id
assert task_event.agent_session_id == session_event.id
assert task_event.task == 'Test task'
assert isinstance(step_event, CreateAgentStepEvent)
assert step_event.agent_task_id == task_event.id
assert step_event.step == 1 # Step is incremented before event is emitted
assert step_event.url == httpserver.url_for('/')
assert isinstance(update_event, UpdateAgentTaskEvent)
assert update_event.id == task_event.id
assert update_event.done_output is not None
@pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
async def test_agent_with_gif_generation(self, mock_llm, browser_session, cloud_sync, event_collector, httpserver):
"""Test that GIF generation triggers CreateAgentOutputFileEvent"""
# Setup cloud sync endpoint
httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
{'processed': 1, 'failed': 0, 'results': [{'success': True}]}
)
# Setup a test page
httpserver.expect_request('/').respond_with_data('<html><body><h1>GIF Test</h1></body></html>', content_type='text/html')
await browser_session.navigate(httpserver.url_for('/'))
# Create agent with GIF generation
agent = Agent(
task='Test task with GIF',
llm=mock_llm,
browser_session=browser_session,
generate_gif=True, # Enable GIF generation
cloud_sync=cloud_sync,
)
# Subscribe to all events
agent.eventbus.on('*', event_collector.collect_event)
# Run the agent
_history = await agent.run(max_steps=5)
# Verify CreateAgentOutputFileEvent was emitted
output_file_events = event_collector.get_events_by_type('CreateAgentOutputFileEvent')
assert len(output_file_events) == 1
output_event = output_file_events[0]
assert isinstance(output_event, CreateAgentOutputFileEvent)
assert output_event.file_name.endswith('.gif')
assert output_event.content_type == 'image/gif'
assert output_event.task_id
assert output_event.file_content is not None
assert len(output_event.file_content) > 0
# Decode and verify the base64 content is a valid GIF
gif_bytes = base64.b64decode(output_event.file_content)
assert gif_bytes.startswith(b'GIF87a') or gif_bytes.startswith(b'GIF89a')
assert len(gif_bytes) > 100 # Should be a real GIF file
@pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
async def test_step_screenshot_capture(self, mock_llm, browser_session, event_collector, httpserver):
"""Test that screenshots are captured for each step"""
# Setup test page
httpserver.expect_request('/').respond_with_data(
'<html><body><h1>Screenshot Test</h1></body></html>', content_type='text/html'
)
await browser_session.navigate(httpserver.url_for('/'))
# Create agent without cloud sync (not needed for screenshot test)
agent = Agent(
task='Test screenshot capture',
llm=mock_llm,
browser_session=browser_session,
generate_gif=False,
)
# Subscribe to all events
agent.eventbus.on('*', event_collector.collect_event)
# Run the agent
await agent.run(max_steps=3)
# Get all step events
step_events = event_collector.get_events_by_type('CreateAgentStepEvent')
assert len(step_events) >= 1
# Verify each step has a valid screenshot
for step_event in step_events:
assert isinstance(step_event, CreateAgentStepEvent)
assert step_event.screenshot_url is not None
assert step_event.screenshot_url.startswith('data:image/png;base64,')
# Decode and validate the screenshot
base64_data = step_event.screenshot_url.split(',')[1]
screenshot_bytes = base64.b64decode(base64_data)
# Verify PNG signature
assert screenshot_bytes.startswith(b'\x89PNG\r\n\x1a\n')
assert len(screenshot_bytes) > 1000 # Should be a real screenshot
class TestAgentCloudIntegration:
"""Test that agent properly integrates with cloud sync service"""
@pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
async def test_agent_emits_events_to_cloud(self, agent_with_cloud, event_collector, httpserver):
"""Test that agent emits all required events to cloud sync."""
# Set up httpserver to capture events
captured_events = []
def capture_events(request):
data = request.get_json()
captured_events.extend(data.get('events', []))
from werkzeug.wrappers import Response
return Response(
'{"processed": 1, "failed": 0, "results": [{"success": true}]}', status=200, mimetype='application/json'
)
httpserver.expect_request('/api/v1/events', method='POST').respond_with_handler(capture_events)
# Subscribe to eventbus to verify events
agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# Run agent
await agent_with_cloud.run()
# Verify we have the core event types in eventbus
assert len(event_collector.event_order) >= 4 # At minimum: session, task, step, update
assert 'CreateAgentSessionEvent' in event_collector.event_order
assert 'CreateAgentTaskEvent' in event_collector.event_order
assert 'CreateAgentStepEvent' in event_collector.event_order
assert 'UpdateAgentTaskEvent' in event_collector.event_order
# Verify events were sent to cloud
assert len(captured_events) >= 4
# Verify event relationships using event_collector
session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
step_events = event_collector.get_events_by_type('CreateAgentStepEvent')
assert len(session_events) == 1
assert len(task_events) == 1
assert len(step_events) >= 1
# Verify event relationships
session_event = session_events[0]
task_event = task_events[0]
step_event = step_events[0]
assert task_event.agent_session_id == session_event.id
assert step_event.agent_task_id == task_event.id
@pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
async def test_agent_emits_session_start_event(self, agent_with_cloud, event_collector, httpserver):
"""Test that agent emits session start event."""
# Set up httpserver endpoint
httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
{'processed': 1, 'failed': 0, 'results': [{'success': True}]}
)
# Subscribe to events
agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# Run agent
await agent_with_cloud.run()
# Check that session start event was sent
session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
assert len(session_events) == 1
event = session_events[0]
assert hasattr(event, 'id')
assert hasattr(event, 'browser_session_id')
@pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
async def test_agent_emits_task_events(self, agent_with_cloud, event_collector, httpserver):
"""Test that agent emits task events."""
# Set up httpserver endpoint
httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
{'processed': 1, 'failed': 0, 'results': [{'success': True}]}
)
# Subscribe to events
agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# Run agent
await agent_with_cloud.run()
# Check task events
create_task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
assert len(create_task_events) == 1
create_event = create_task_events[0]
assert create_event.task == 'Test task'
assert hasattr(create_event, 'agent_session_id')
# Should have UpdateAgentTaskEvent when done
update_task_events = event_collector.get_events_by_type('UpdateAgentTaskEvent')
assert len(update_task_events) >= 1
@pytest.mark.usefixtures('browser_session')
async def test_cloud_sync_disabled(self, browser_session):
"""Test that cloud sync can be disabled."""
with patch.dict(os.environ, {'BROWSER_USE_CLOUD_SYNC': 'false'}):
agent = Agent(
task='Test task',
llm=create_mock_llm(),
browser_session=browser_session,
)
assert not hasattr(agent, 'cloud_sync') or agent.cloud_sync is None
# Run agent - should work without cloud sync
await agent.run()
@pytest.mark.usefixtures('agent_with_cloud', 'httpserver')
async def test_agent_error_resilience(self, agent_with_cloud, httpserver):
"""Test that agent continues working even if cloud sync fails."""
# Make cloud endpoint fail
def fail_handler(request):
from werkzeug.wrappers import Response
return Response('Server error', status=500, mimetype='text/plain')
httpserver.expect_request('/api/v1/events', method='POST').respond_with_handler(fail_handler)
# Run agent - should not raise exception despite cloud sync failures
result = await agent_with_cloud.run()
# Agent should complete successfully despite sync failures
assert result is not None
assert result.is_done()
@pytest.mark.usefixtures('browser_session', 'event_collector', 'httpserver')
async def test_session_id_persistence(self, browser_session, event_collector, httpserver):
"""Test that agent session ID persists across runs."""
# Set up httpserver endpoint
httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
{'processed': 1, 'failed': 0, 'results': [{'success': True}]}
)
# Import CloudSync to create instances
from browser_use.sync.service import CloudSync
# Create first CloudSync instance
cloud_sync1 = CloudSync(
base_url=httpserver.url_for(''),
enable_auth=False,
)
# Create first agent
agent1 = Agent(
task='First task',
llm=create_mock_llm(),
browser_session=browser_session,
cloud_sync=cloud_sync1,
)
agent1.eventbus.on('*', event_collector.collect_event)
# Run first agent
await agent1.run()
# Get session ID from first run
session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
assert len(session_events) == 1
session_id_1 = session_events[0].id
# Clear event collector
event_collector.clear()
# Create second CloudSync instance
cloud_sync2 = CloudSync(
base_url=httpserver.url_for(''),
enable_auth=False,
)
# Create second agent (will have different session ID)
agent2 = Agent(
task='Second task',
llm=create_mock_llm(),
browser_session=browser_session,
cloud_sync=cloud_sync2,
)
agent2.eventbus.on('*', event_collector.collect_event)
# Run second agent
await agent2.run()
# Should create new session for new agent
session_events_2 = event_collector.get_events_by_type('CreateAgentSessionEvent')
assert len(session_events_2) == 1 # New session created
session_id_2 = session_events_2[0].id
# Should create new task with new session ID
task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
assert len(task_events) == 1
assert task_events[0].agent_session_id == session_id_2
assert session_id_2 != session_id_1 # Different session IDs
class TestEventValidation:
"""Test event structure and validation"""
async def test_event_base_fields(self):
"""Test that all events have required base fields"""
# Create a few events
events_to_test = [
CreateAgentSessionEvent(
id='0683fb03-c5da-79c9-8000-d3a39c47c651',
user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
browser_session_id='test-browser',
browser_session_live_url='https://example.com',
browser_session_cdp_url='ws://localhost:9222',
device_id='test-device-id',
),
CreateAgentTaskEvent(
id='0683fb03-c5da-79c9-8000-d3a39c47c652',
user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c651',
task='test',
llm_model='gpt-4.1',
done_output=None,
user_feedback_type=None,
user_comment=None,
gif_url=None,
device_id='test-device-id',
),
CreateAgentStepEvent(
user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
agent_task_id='0683fb03-c5da-79c9-8000-d3a39c47c652',
step=1,
evaluation_previous_goal='eval',
memory='mem',
next_goal='next',
actions=[],
screenshot_url='data:image/png;...',
device_id='test-device-id',
),
]
# Check all events have required fields
for event in events_to_test:
# Base event fields
assert isinstance(event, BaseEvent)
assert event.event_type is not None
assert event.event_id is not None
assert event.event_created_at is not None
assert isinstance(event.event_path, list)
# Check event_id is a valid UUID string
uuid_obj = UUID(event.event_id)
assert str(uuid_obj) == event.event_id
def test_max_string_length_validation(self):
"""Test that string fields enforce max length"""
# Create event with very long task
long_task = 'x' * (2 * MAX_TASK_LENGTH) # Longer than MAX_TASK_LENGTH
# Should raise validation error for string too long
with pytest.raises(ValueError, match=f'String should have at most {MAX_TASK_LENGTH} characters'):
CreateAgentTaskEvent(
user_id='test',
agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c659',
llm_model='test-model',
task=long_task,
done_output=None,
user_feedback_type=None,
user_comment=None,
gif_url=None,
device_id='test-device-id',
)
def test_event_type_assignment(self):
"""Test that event_type is properly set and validated"""
event = CreateAgentTaskEvent(
user_id='test',
agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c659',
llm_model='test-model',
task='test',
done_output=None,
user_feedback_type=None,
user_comment=None,
gif_url=None,
device_id='test-device-id',
)
# Event type should be automatically set
assert event.event_type == 'CreateAgentTaskEvent'
# Event should have valid structure
assert event.id is not None
assert event.event_id is not None
assert event.event_created_at is not None
# class TestAgentEventLifecycle:
# """Test critical agent event flows with minimal duplication"""
# @pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
# async def test_agent_lifecycle_events(self, mock_llm, browser_session, event_collector, httpserver):
# """Test that all events are emitted in the correct order during agent lifecycle"""
# # Setup a test page
# httpserver.expect_request('/').respond_with_data('<html><body><h1>Test Page</h1></body></html>', content_type='text/html')
# # Navigate to test page
# await browser_session.navigate(httpserver.url_for('/'))
# # Create agent (environment already set up by conftest.py)
# agent = Agent(
# task='Test task',
# llm=mock_llm,
# browser_session=browser_session,
# generate_gif=False, # Don't generate GIF for faster test
# )
# # Subscribe to all events
# agent.eventbus.on('*', event_collector.collect_event)
# # Run the agent
# history = await agent.run(max_steps=5)
# # Verify we got a successful completion
# assert history.is_done()
# assert history.is_successful()
# # Verify event order - should have core events
# assert len(event_collector.event_order) >= 4, (
# f'Expected at least 4 events, got {len(event_collector.event_order)}: {event_collector.event_order}'
# )
# # Check the exact order of events - they should be processed in FIFO order
# assert event_collector.event_order[0] == 'CreateAgentSessionEvent'
# assert event_collector.event_order[1] == 'CreateAgentTaskEvent'
# assert event_collector.event_order[2] == 'CreateAgentStepEvent'
# assert event_collector.event_order[3] == 'UpdateAgentTaskEvent'
# # Verify events have required data
# session_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentSessionEvent')
# task_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentTaskEvent')
# step_event = next(e for e in event_collector.events if e.event_type == 'CreateAgentStepEvent')
# update_event = next(e for e in event_collector.events if e.event_type == 'UpdateAgentTaskEvent')
# # Basic validation
# assert isinstance(session_event, CreateAgentSessionEvent)
# assert session_event.id
# assert session_event.browser_session_id == browser_session.id
# assert isinstance(task_event, CreateAgentTaskEvent)
# assert task_event.id
# assert task_event.agent_session_id == session_event.id
# assert task_event.task == 'Test task'
# assert isinstance(step_event, CreateAgentStepEvent)
# assert step_event.agent_task_id == task_event.id
# assert step_event.step == 1 # Step is incremented before event is emitted
# assert step_event.url == httpserver.url_for('/')
# assert isinstance(update_event, UpdateAgentTaskEvent)
# assert update_event.id == task_event.id
# assert update_event.done_output is not None
# @pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
# async def test_agent_with_gif_generation(self, mock_llm, browser_session, cloud_sync, event_collector, httpserver):
# """Test that GIF generation triggers CreateAgentOutputFileEvent"""
# # Setup cloud sync endpoint
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
# {'processed': 1, 'failed': 0, 'results': [{'success': True}]}
# )
# # Setup a test page
# httpserver.expect_request('/').respond_with_data('<html><body><h1>GIF Test</h1></body></html>', content_type='text/html')
# await browser_session.navigate(httpserver.url_for('/'))
# # Create agent with GIF generation
# agent = Agent(
# task='Test task with GIF',
# llm=mock_llm,
# browser_session=browser_session,
# generate_gif=True, # Enable GIF generation
# cloud_sync=cloud_sync,
# )
# # Subscribe to all events
# agent.eventbus.on('*', event_collector.collect_event)
# # Run the agent
# _history = await agent.run(max_steps=5)
# # Verify CreateAgentOutputFileEvent was emitted
# output_file_events = event_collector.get_events_by_type('CreateAgentOutputFileEvent')
# assert len(output_file_events) == 1
# output_event = output_file_events[0]
# assert isinstance(output_event, CreateAgentOutputFileEvent)
# assert output_event.file_name.endswith('.gif')
# assert output_event.content_type == 'image/gif'
# assert output_event.task_id
# assert output_event.file_content is not None
# assert len(output_event.file_content) > 0
# # Decode and verify the base64 content is a valid GIF
# gif_bytes = base64.b64decode(output_event.file_content)
# assert gif_bytes.startswith(b'GIF87a') or gif_bytes.startswith(b'GIF89a')
# assert len(gif_bytes) > 100 # Should be a real GIF file
# @pytest.mark.usefixtures('mock_llm', 'browser_session', 'event_collector', 'httpserver')
# async def test_step_screenshot_capture(self, mock_llm, browser_session, event_collector, httpserver):
# """Test that screenshots are captured for each step"""
# # Setup test page
# httpserver.expect_request('/').respond_with_data(
# '<html><body><h1>Screenshot Test</h1></body></html>', content_type='text/html'
# )
# await browser_session.navigate(httpserver.url_for('/'))
# # Create agent without cloud sync (not needed for screenshot test)
# agent = Agent(
# task='Test screenshot capture',
# llm=mock_llm,
# browser_session=browser_session,
# generate_gif=False,
# )
# # Subscribe to all events
# agent.eventbus.on('*', event_collector.collect_event)
# # Run the agent
# await agent.run(max_steps=3)
# # Get all step events
# step_events = event_collector.get_events_by_type('CreateAgentStepEvent')
# assert len(step_events) >= 1
# # Verify each step has a valid screenshot
# for step_event in step_events:
# assert isinstance(step_event, CreateAgentStepEvent)
# assert step_event.screenshot_url is not None
# assert step_event.screenshot_url.startswith('data:image/png;base64,')
# # Decode and validate the screenshot
# base64_data = step_event.screenshot_url.split(',')[1]
# screenshot_bytes = base64.b64decode(base64_data)
# # Verify PNG signature
# assert screenshot_bytes.startswith(b'\x89PNG\r\n\x1a\n')
# assert len(screenshot_bytes) > 1000 # Should be a real screenshot
# class TestAgentCloudIntegration:
# """Test that agent properly integrates with cloud sync service"""
# @pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
# async def test_agent_emits_events_to_cloud(self, agent_with_cloud, event_collector, httpserver):
# """Test that agent emits all required events to cloud sync."""
# # Set up httpserver to capture events
# captured_events = []
# def capture_events(request):
# data = request.get_json()
# captured_events.extend(data.get('events', []))
# from werkzeug.wrappers import Response
# return Response(
# '{"processed": 1, "failed": 0, "results": [{"success": true}]}', status=200, mimetype='application/json'
# )
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_handler(capture_events)
# # Subscribe to eventbus to verify events
# agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# # Run agent
# await agent_with_cloud.run()
# # Verify we have the core event types in eventbus
# assert len(event_collector.event_order) >= 4 # At minimum: session, task, step, update
# assert 'CreateAgentSessionEvent' in event_collector.event_order
# assert 'CreateAgentTaskEvent' in event_collector.event_order
# assert 'CreateAgentStepEvent' in event_collector.event_order
# assert 'UpdateAgentTaskEvent' in event_collector.event_order
# # Verify events were sent to cloud
# assert len(captured_events) >= 4
# # Verify event relationships using event_collector
# session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
# task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
# step_events = event_collector.get_events_by_type('CreateAgentStepEvent')
# assert len(session_events) == 1
# assert len(task_events) == 1
# assert len(step_events) >= 1
# # Verify event relationships
# session_event = session_events[0]
# task_event = task_events[0]
# step_event = step_events[0]
# assert task_event.agent_session_id == session_event.id
# assert step_event.agent_task_id == task_event.id
# @pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
# async def test_agent_emits_session_start_event(self, agent_with_cloud, event_collector, httpserver):
# """Test that agent emits session start event."""
# # Set up httpserver endpoint
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
# {'processed': 1, 'failed': 0, 'results': [{'success': True}]}
# )
# # Subscribe to events
# agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# # Run agent
# await agent_with_cloud.run()
# # Check that session start event was sent
# session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
# assert len(session_events) == 1
# event = session_events[0]
# assert hasattr(event, 'id')
# assert hasattr(event, 'browser_session_id')
# @pytest.mark.usefixtures('agent_with_cloud', 'event_collector', 'httpserver')
# async def test_agent_emits_task_events(self, agent_with_cloud, event_collector, httpserver):
# """Test that agent emits task events."""
# # Set up httpserver endpoint
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
# {'processed': 1, 'failed': 0, 'results': [{'success': True}]}
# )
# # Subscribe to events
# agent_with_cloud.eventbus.on('*', event_collector.collect_event)
# # Run agent
# await agent_with_cloud.run()
# # Check task events
# create_task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
# assert len(create_task_events) == 1
# create_event = create_task_events[0]
# assert create_event.task == 'Test task'
# assert hasattr(create_event, 'agent_session_id')
# # Should have UpdateAgentTaskEvent when done
# update_task_events = event_collector.get_events_by_type('UpdateAgentTaskEvent')
# assert len(update_task_events) >= 1
# @pytest.mark.usefixtures('browser_session')
# async def test_cloud_sync_disabled(self, browser_session):
# """Test that cloud sync can be disabled."""
# with patch.dict(os.environ, {'BROWSER_USE_CLOUD_SYNC': 'false'}):
# agent = Agent(
# task='Test task',
# llm=create_mock_llm(),
# browser_session=browser_session,
# )
# assert not hasattr(agent, 'cloud_sync') or agent.cloud_sync is None
# # Run agent - should work without cloud sync
# await agent.run()
# @pytest.mark.usefixtures('agent_with_cloud', 'httpserver')
# async def test_agent_error_resilience(self, agent_with_cloud, httpserver):
# """Test that agent continues working even if cloud sync fails."""
# # Make cloud endpoint fail
# def fail_handler(request):
# from werkzeug.wrappers import Response
# return Response('Server error', status=500, mimetype='text/plain')
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_handler(fail_handler)
# # Run agent - should not raise exception despite cloud sync failures
# result = await agent_with_cloud.run()
# # Agent should complete successfully despite sync failures
# assert result is not None
# assert result.is_done()
# @pytest.mark.usefixtures('browser_session', 'event_collector', 'httpserver')
# async def test_session_id_persistence(self, browser_session, event_collector, httpserver):
# """Test that agent session ID persists across runs."""
# # Set up httpserver endpoint
# httpserver.expect_request('/api/v1/events', method='POST').respond_with_json(
# {'processed': 1, 'failed': 0, 'results': [{'success': True}]}
# )
# # Import CloudSync to create instances
# from browser_use.sync.service import CloudSync
# # Create first CloudSync instance
# cloud_sync1 = CloudSync(
# base_url=httpserver.url_for(''),
# enable_auth=False,
# )
# # Create first agent
# agent1 = Agent(
# task='First task',
# llm=create_mock_llm(),
# browser_session=browser_session,
# cloud_sync=cloud_sync1,
# )
# agent1.eventbus.on('*', event_collector.collect_event)
# # Run first agent
# await agent1.run()
# # Get session ID from first run
# session_events = event_collector.get_events_by_type('CreateAgentSessionEvent')
# assert len(session_events) == 1
# session_id_1 = session_events[0].id
# # Clear event collector
# event_collector.clear()
# # Create second CloudSync instance
# cloud_sync2 = CloudSync(
# base_url=httpserver.url_for(''),
# enable_auth=False,
# )
# # Create second agent (will have different session ID)
# agent2 = Agent(
# task='Second task',
# llm=create_mock_llm(),
# browser_session=browser_session,
# cloud_sync=cloud_sync2,
# )
# agent2.eventbus.on('*', event_collector.collect_event)
# # Run second agent
# await agent2.run()
# # Should create new session for new agent
# session_events_2 = event_collector.get_events_by_type('CreateAgentSessionEvent')
# assert len(session_events_2) == 1 # New session created
# session_id_2 = session_events_2[0].id
# # Should create new task with new session ID
# task_events = event_collector.get_events_by_type('CreateAgentTaskEvent')
# assert len(task_events) == 1
# assert task_events[0].agent_session_id == session_id_2
# assert session_id_2 != session_id_1 # Different session IDs
# class TestEventValidation:
# """Test event structure and validation"""
# async def test_event_base_fields(self):
# """Test that all events have required base fields"""
# # Create a few events
# events_to_test = [
# CreateAgentSessionEvent(
# id='0683fb03-c5da-79c9-8000-d3a39c47c651',
# user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
# browser_session_id='test-browser',
# browser_session_live_url='https://example.com',
# browser_session_cdp_url='ws://localhost:9222',
# device_id='test-device-id',
# ),
# CreateAgentTaskEvent(
# id='0683fb03-c5da-79c9-8000-d3a39c47c652',
# user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
# agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c651',
# task='test',
# llm_model='gpt-4.1',
# done_output=None,
# user_feedback_type=None,
# user_comment=None,
# gif_url=None,
# device_id='test-device-id',
# ),
# CreateAgentStepEvent(
# user_id='0683fb03-c5da-79c9-8000-d3a39c47c650',
# agent_task_id='0683fb03-c5da-79c9-8000-d3a39c47c652',
# step=1,
# evaluation_previous_goal='eval',
# memory='mem',
# next_goal='next',
# actions=[],
# screenshot_url='data:image/png;...',
# device_id='test-device-id',
# ),
# ]
# # Check all events have required fields
# for event in events_to_test:
# # Base event fields
# assert isinstance(event, BaseEvent)
# assert event.event_type is not None
# assert event.event_id is not None
# assert event.event_created_at is not None
# assert isinstance(event.event_path, list)
# # Check event_id is a valid UUID string
# uuid_obj = UUID(event.event_id)
# assert str(uuid_obj) == event.event_id
# def test_max_string_length_validation(self):
# """Test that string fields enforce max length"""
# # Create event with very long task
# long_task = 'x' * (2 * MAX_TASK_LENGTH) # Longer than MAX_TASK_LENGTH
# # Should raise validation error for string too long
# with pytest.raises(ValueError, match=f'String should have at most {MAX_TASK_LENGTH} characters'):
# CreateAgentTaskEvent(
# user_id='test',
# agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c659',
# llm_model='test-model',
# task=long_task,
# done_output=None,
# user_feedback_type=None,
# user_comment=None,
# gif_url=None,
# device_id='test-device-id',
# )
# def test_event_type_assignment(self):
# """Test that event_type is properly set and validated"""
# event = CreateAgentTaskEvent(
# user_id='test',
# agent_session_id='0683fb03-c5da-79c9-8000-d3a39c47c659',
# llm_model='test-model',
# task='test',
# done_output=None,
# user_feedback_type=None,
# user_comment=None,
# gif_url=None,
# device_id='test-device-id',
# )
# # Event type should be automatically set
# assert event.event_type == 'CreateAgentTaskEvent'
# # Event should have valid structure
# assert event.id is not None
# assert event.event_id is not None
# assert event.event_created_at is not None