Skip to content

Commit 28eb926

Browse files
committed
test: add tests for camera module
1 parent c6d3375 commit 28eb926

File tree

5 files changed

+1423
-0
lines changed

5 files changed

+1423
-0
lines changed
Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
# SPDX-FileCopyrightText: Copyright (C) 2025 ARDUINO SA <http://www.arduino.cc>
2+
#
3+
# SPDX-License-Identifier: MPL-2.0
4+
5+
import pytest
6+
import time
7+
import numpy as np
8+
9+
from arduino.app_peripherals.camera import BaseCamera, CameraOpenError, CameraTransformError
10+
from arduino.app_utils.image.pipeable import PipeableFunction
11+
12+
13+
class ConcreteCamera(BaseCamera):
14+
"""Concrete implementation of BaseCamera for testing."""
15+
16+
def __init__(self, *args, **kwargs):
17+
# Extract test configuration
18+
self.should_fail_open = kwargs.pop("should_fail_open", False)
19+
self.should_fail_close = kwargs.pop("should_fail_close", False)
20+
self.should_fail_read = kwargs.pop("should_fail_read", False)
21+
self.open_error_message = kwargs.pop("open_error_message", "Camera open failed")
22+
self.close_error_message = kwargs.pop("close_error_message", "Camera close failed")
23+
self.read_error_message = kwargs.pop("read_error_message", "Frame read failed")
24+
25+
super().__init__(*args, **kwargs)
26+
27+
# Track method calls for verification
28+
self.open_call_count = 0
29+
self.close_call_count = 0
30+
self.read_call_count = 0
31+
32+
def _open_camera(self):
33+
"""Mock implementation of _open_camera."""
34+
self.open_call_count += 1
35+
if self.should_fail_open:
36+
raise RuntimeError(self.open_error_message)
37+
38+
def _close_camera(self):
39+
"""Mock implementation of _close_camera."""
40+
self.close_call_count += 1
41+
if self.should_fail_close:
42+
raise RuntimeError(self.close_error_message)
43+
44+
def _read_frame(self):
45+
"""Mock implementation that returns a dummy frame."""
46+
self.read_call_count += 1
47+
if self.should_fail_read:
48+
raise RuntimeError(self.read_error_message)
49+
if not self._is_started:
50+
return None
51+
return np.zeros((480, 640, 3), dtype=np.uint8)
52+
53+
54+
def test_base_camera_init_default():
55+
"""Test BaseCamera initialization with default parameters."""
56+
camera = ConcreteCamera()
57+
assert camera.resolution == (640, 480)
58+
assert camera.fps == 10
59+
assert camera.adjustments is None
60+
assert not camera.is_started()
61+
62+
63+
def test_base_camera_init_custom():
64+
"""Test BaseCamera initialization with custom parameters."""
65+
adj_func = lambda x: x
66+
camera = ConcreteCamera(resolution=(1920, 1080), fps=30, adjustments=adj_func)
67+
assert camera.resolution == (1920, 1080)
68+
assert camera.fps == 30
69+
assert camera.adjustments == adj_func
70+
71+
72+
def test_is_started_state_transitions():
73+
"""Test is_started return value through different state transitions."""
74+
camera = ConcreteCamera()
75+
76+
assert not camera.is_started()
77+
camera.start()
78+
assert camera.is_started()
79+
camera.stop()
80+
assert not camera.is_started()
81+
82+
camera.start()
83+
assert camera.is_started()
84+
camera.stop()
85+
assert not camera.is_started()
86+
87+
88+
def test_start_success():
89+
"""Test that start() calls _open_camera and updates state correctly."""
90+
camera = ConcreteCamera()
91+
92+
assert not camera.is_started()
93+
assert camera.open_call_count == 0
94+
95+
camera.start()
96+
97+
assert camera.is_started()
98+
assert camera.open_call_count == 1
99+
100+
101+
def test_start_already_started():
102+
"""Test that start() doesn't call _open_camera again when already started."""
103+
camera = ConcreteCamera()
104+
105+
camera.start()
106+
assert camera.open_call_count == 1
107+
assert camera.is_started()
108+
109+
camera.start()
110+
# Should not call _open_camera again
111+
assert camera.open_call_count == 1
112+
assert camera.is_started()
113+
114+
115+
def test_start_error_reporting():
116+
"""Test that errors from _open_camera are reported clearly."""
117+
camera = ConcreteCamera(should_fail_open=True, open_error_message="Mock camera failure")
118+
119+
# Verify error is properly wrapped and reported
120+
with pytest.raises(CameraOpenError, match="Failed to start camera: Mock camera failure"):
121+
camera.start()
122+
123+
# Verify camera state remains stopped on error
124+
assert not camera.is_started()
125+
assert camera.open_call_count == 1
126+
127+
128+
def test_stop_success():
129+
"""Test that stop() calls _close_camera and updates state correctly."""
130+
camera = ConcreteCamera()
131+
camera.start()
132+
assert camera.is_started()
133+
134+
camera.stop()
135+
136+
# Verify _close_camera was called and state updated
137+
assert not camera.is_started()
138+
assert camera.close_call_count == 1
139+
140+
141+
def test_stop_not_started():
142+
"""Test that stop() doesn't call _close_camera when not started."""
143+
camera = ConcreteCamera()
144+
assert not camera.is_started()
145+
146+
camera.stop()
147+
148+
# Should not call _close_camera
149+
assert camera.close_call_count == 0
150+
assert not camera.is_started()
151+
152+
153+
def test_stop_error_reporting():
154+
"""Test that errors from _close_camera are handled gracefully."""
155+
camera = ConcreteCamera(should_fail_close=True, close_error_message="Mock close failure")
156+
157+
camera.start()
158+
assert camera.is_started()
159+
160+
# stop() does not raise exceptions
161+
camera.stop()
162+
163+
assert camera.close_call_count == 1
164+
assert camera.is_started() # Should still be started due to close error
165+
166+
167+
def test_capture_when_started():
168+
"""Test that capture() calls _read_frame when started."""
169+
camera = ConcreteCamera()
170+
camera.start()
171+
172+
initial_read_count = camera.read_call_count
173+
frame = camera.capture()
174+
175+
assert camera.read_call_count == initial_read_count + 1
176+
assert frame is not None
177+
assert isinstance(frame, np.ndarray)
178+
assert frame.shape == (480, 640, 3)
179+
180+
181+
def test_capture_when_stopped():
182+
"""Test that capture() returns None when camera is not started."""
183+
camera = ConcreteCamera()
184+
185+
frame = camera.capture()
186+
187+
assert frame is None
188+
assert camera.read_call_count == 0
189+
190+
191+
def test_capture_read_frame_error_reporting():
192+
"""Test that errors from _read_frame are not caught by capture()."""
193+
camera = ConcreteCamera(should_fail_read=True, read_error_message="Mock read failure")
194+
camera.start()
195+
196+
# capture() should propagate the exception from _read_frame
197+
with pytest.raises(RuntimeError, match="Mock read failure"):
198+
camera.capture()
199+
200+
assert camera.read_call_count == 1
201+
202+
203+
def test_capture_with_adjustments():
204+
"""Test that adjustments are applied correctly to captured frames."""
205+
206+
def adjustment(frame):
207+
return frame + 10
208+
209+
camera = ConcreteCamera(adjustments=adjustment)
210+
camera.start()
211+
212+
frame = camera.capture()
213+
214+
assert camera.read_call_count == 1
215+
assert frame is not None
216+
assert np.all(frame == 10)
217+
218+
219+
def test_capture_adjustment_error_reporting():
220+
"""Test that adjustment errors are reported clearly."""
221+
222+
def bad_adjustment(frame):
223+
raise ValueError("Adjustment failed")
224+
225+
camera = ConcreteCamera(adjustments=bad_adjustment)
226+
camera.start()
227+
228+
with pytest.raises(CameraTransformError, match="Frame transformation failed"):
229+
camera.capture()
230+
231+
assert camera.read_call_count == 1
232+
233+
234+
def test_capture_rate_limiting():
235+
"""Test that FPS throttling/rate limiting is applied correctly."""
236+
camera = ConcreteCamera(fps=10) # 0.1 seconds between frames
237+
camera.start()
238+
239+
start_time = time.monotonic()
240+
frame1 = camera.capture()
241+
frame2 = camera.capture()
242+
elapsed = time.monotonic() - start_time
243+
244+
# Should take ~0.1 seconds due to throttling
245+
assert elapsed >= 0.09
246+
assert frame1 is not None
247+
assert frame2 is not None
248+
assert camera.read_call_count == 2
249+
250+
251+
def test_stream_can_be_stopped_by_user_code():
252+
"""Test that stream() can be stopped by user code breaking out of the loop."""
253+
camera = ConcreteCamera()
254+
camera.start()
255+
256+
n_frames = 0
257+
for i, frame in enumerate(camera.stream()):
258+
n_frames += 1
259+
assert isinstance(frame, np.ndarray)
260+
assert frame.shape == (480, 640, 3)
261+
if i >= 4: # Get 5 frames then break
262+
break
263+
264+
assert n_frames == 5
265+
assert camera.is_started() # Camera should still be running
266+
267+
268+
def test_stream_stops_when_camera_stopped():
269+
"""Test that stream() stops automatically when camera is stopped."""
270+
camera = ConcreteCamera()
271+
camera.start()
272+
273+
frames = []
274+
for i, frame in enumerate(camera.stream()):
275+
frames.append(frame)
276+
if i >= 4: # Get 5 frames then call stop()
277+
camera.stop()
278+
279+
assert len(frames) == 5
280+
assert not camera.is_started()
281+
282+
283+
def test_stream_exception_propagation():
284+
"""Test that exceptions from capture() are correctly propagated outside the consuming loop."""
285+
286+
def bad_adjustment(frame):
287+
raise ValueError("Stream adjustment failed")
288+
289+
camera = ConcreteCamera(adjustments=bad_adjustment)
290+
camera.start()
291+
292+
# Exception should propagate out of the stream loop
293+
with pytest.raises(CameraTransformError, match="Frame transformation failed"):
294+
for _ in camera.stream():
295+
pass
296+
297+
298+
def test_context_manager_calls_start_and_stop():
299+
"""Test that context manager calls start() and stop() when entering and exiting."""
300+
camera = ConcreteCamera()
301+
302+
assert not camera.is_started()
303+
assert camera.open_call_count == 0
304+
assert camera.close_call_count == 0
305+
306+
with camera as ctx_camera:
307+
assert camera.is_started()
308+
assert camera.open_call_count == 1
309+
assert camera.close_call_count == 0
310+
assert ctx_camera is camera # Should return self
311+
312+
frame = camera.capture()
313+
assert frame is not None
314+
315+
assert not camera.is_started()
316+
assert camera.open_call_count == 1
317+
assert camera.close_call_count == 1
318+
319+
320+
def test_context_manager_with_exception():
321+
"""Test that context manager calls stop() even when exception occurs."""
322+
camera = ConcreteCamera()
323+
324+
try:
325+
with camera:
326+
assert camera.is_started()
327+
assert camera.open_call_count == 1
328+
329+
raise RuntimeError("Test exception")
330+
except RuntimeError:
331+
pass
332+
333+
# Verify stop() was called despite exception
334+
assert not camera.is_started()
335+
assert camera.close_call_count == 1
336+
337+
338+
def test_capture_no_throttling():
339+
"""Test capture behavior with fps=0 (no throttling)."""
340+
camera = ConcreteCamera(fps=0)
341+
camera.start()
342+
343+
start_time = time.monotonic()
344+
frame = camera.capture()
345+
elapsed = time.monotonic() - start_time
346+
347+
assert elapsed < 0.01
348+
assert frame is not None
349+
assert camera.read_call_count == 1
350+
351+
352+
def test_capture_multiple_adjustments():
353+
"""Test that adjustment pipelines are applied correctly."""
354+
355+
def adj1(frame):
356+
return frame + 5
357+
358+
adjustment1 = PipeableFunction(adj1)
359+
360+
def adj2(frame):
361+
return frame * 2
362+
363+
adjustment2 = PipeableFunction(adj2)
364+
365+
camera = ConcreteCamera(adjustments=adjustment1 | adjustment2)
366+
camera.start()
367+
368+
frame = camera.capture()
369+
370+
# Verify _read_frame was called and adjustments applied: (0 + 5) * 2 = 10
371+
assert camera.read_call_count == 1
372+
assert np.all(frame == 10)

0 commit comments

Comments
 (0)