diff --git a/livekit-rtc/tests/test_audio.py b/livekit-rtc/tests/test_audio.py new file mode 100644 index 00000000..9fc2a3eb --- /dev/null +++ b/livekit-rtc/tests/test_audio.py @@ -0,0 +1,270 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end audio publish/subscribe tests.""" + +import asyncio +import ctypes +import math +import os +import uuid +import wave +from pathlib import Path + +import numpy as np +import pytest + +from livekit import api, rtc +from livekit.rtc.audio_frame import AudioFrame + + +SAMPLE_RATE = 48000 +NUM_CHANNELS = 1 +TONE_DURATION_SEC = 1.0 +FREQUENCIES_HZ = [100, 300, 500, 700, 1000] +AMPLITUDE = 0.5 + + +def skip_if_no_credentials(): + required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] + missing = [var for var in required_vars if not os.getenv(var)] + return pytest.mark.skipif( + bool(missing), reason=f"Missing environment variables: {', '.join(missing)}" + ) + + +def create_token(identity: str, room_name: str) -> str: + return ( + api.AccessToken() + .with_identity(identity) + .with_name(identity) + .with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + ) + ) + .to_jwt() + ) + + +def unique_room_name(base: str) -> str: + return f"{base}-{uuid.uuid4().hex[:8]}" + + +def _generate_sine_wave( + frequency: int, + sample_rate: int, + num_channels: int, + duration_sec: float, + amplitude: float = 0.5, +) -> AudioFrame: + """Generate an AudioFrame containing a sine wave at the given frequency.""" + samples_per_channel = int(sample_rate * duration_sec) + t = np.arange(samples_per_channel, dtype=np.float64) / sample_rate + wave_signal = np.sin(2.0 * math.pi * frequency * t) * amplitude + pcm = (wave_signal * np.iinfo(np.int16).max).astype(np.int16) + + if num_channels > 1: + pcm = np.repeat(pcm[:, np.newaxis], num_channels, axis=1).reshape(-1) + + return AudioFrame( + data=pcm.tobytes(), + sample_rate=sample_rate, + num_channels=num_channels, + samples_per_channel=samples_per_channel, + ) + + +def _frame_to_mono_float(frame: AudioFrame) -> np.ndarray: + """Decode an int16 AudioFrame into a normalized float64 mono signal.""" + samples = np.frombuffer(bytes(frame.data.cast("B")), dtype=np.int16).astype(np.float64) + if frame.num_channels > 1: + samples = samples.reshape(-1, frame.num_channels).mean(axis=1) + return samples / float(np.iinfo(np.int16).max) + + +def _fft_spectrum(frame: AudioFrame) -> tuple[np.ndarray, np.ndarray]: + """Return (freqs, magnitudes) from a Hann-windowed rfft of `frame`.""" + signal = _frame_to_mono_float(frame) + window = np.hanning(len(signal)) + # Compensate for the Hann window's coherent gain so magnitudes stay comparable. + spectrum = np.fft.rfft(signal * window) / (np.sum(window) / 2.0) + magnitudes = np.abs(spectrum) + freqs = np.fft.rfftfreq(len(signal), d=1.0 / frame.sample_rate) + return freqs, magnitudes + + +def _detect_peak_frequency(frame: AudioFrame) -> float: + """Return the frequency bin with the largest magnitude in `frame`.""" + freqs, magnitudes = _fft_spectrum(frame) + return float(freqs[int(np.argmax(magnitudes))]) + + +def _band_energies( + frame: AudioFrame, + centers: list[int], + bandwidth_hz: float = 20.0, +) -> dict[int, float]: + """Sum squared-magnitude (energy) in narrow bands centered at each frequency.""" + freqs, magnitudes = _fft_spectrum(frame) + power = magnitudes**2 + return { + center: float( + np.sum(power[(freqs >= center - bandwidth_hz) & (freqs <= center + bandwidth_hz)]) + ) + for center in centers + } + + +@skip_if_no_credentials() +class TestAudioStreamPublishSubscribe: + """End-to-end: publish a sine sweep into a room and verify spectrum on the subscriber.""" + + async def test_audio_stream_publish_subscribe(self): + """Publish 5 seconds of 100/300/500/700/1000 Hz tones and FFT-verify received audio.""" + url = os.environ["LIVEKIT_URL"] + room_name = unique_room_name("test-audio-sweep") + + publisher_room = rtc.Room() + subscriber_room = rtc.Room() + + publisher_token = create_token("audio-sweep-publisher", room_name) + subscriber_token = create_token("audio-sweep-subscriber", room_name) + + track_subscribed_event = asyncio.Event() + subscribed_track: rtc.Track | None = None + + @subscriber_room.on("track_subscribed") + def on_track_subscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): + nonlocal subscribed_track + if track.kind == rtc.TrackKind.KIND_AUDIO: + subscribed_track = track + track_subscribed_event.set() + + try: + await subscriber_room.connect(url, subscriber_token) + await publisher_room.connect(url, publisher_token) + + source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS) + track = rtc.LocalAudioTrack.create_audio_track("sine-sweep", source) + options = rtc.TrackPublishOptions() + options.source = rtc.TrackSource.SOURCE_MICROPHONE + await publisher_room.local_participant.publish_track(track, options) + + await asyncio.wait_for(track_subscribed_event.wait(), timeout=10.0) + assert subscribed_track is not None + + audio_stream = rtc.AudioStream( + subscribed_track, + sample_rate=SAMPLE_RATE, + num_channels=NUM_CHANNELS, + ) + + total_duration = TONE_DURATION_SEC * len(FREQUENCIES_HZ) + target_samples = int(SAMPLE_RATE * total_duration) + # Collect a little extra to tolerate codec startup latency. + collect_samples_target = target_samples + int(SAMPLE_RATE * 1.0) + + async def publish_tones() -> None: + for freq in FREQUENCIES_HZ: + frame = _generate_sine_wave( + freq, + SAMPLE_RATE, + NUM_CHANNELS, + TONE_DURATION_SEC, + AMPLITUDE, + ) + await source.capture_frame(frame) + await source.wait_for_playout() + + async def collect_samples() -> np.ndarray: + buffers: list[np.ndarray] = [] + total = 0 + async for event in audio_stream: + chunk = np.frombuffer(bytes(event.frame.data.cast("B")), dtype=np.int16) + buffers.append(chunk) + total += len(chunk) + if total >= collect_samples_target: + break + return np.concatenate(buffers) if buffers else np.array([], dtype=np.int16) + + publish_task = asyncio.create_task(publish_tones()) + received = await asyncio.wait_for(collect_samples(), timeout=20.0) + await publish_task + await audio_stream.aclose() + await source.aclose() + + assert len(received) >= target_samples, ( + f"Expected at least {target_samples} samples, got {len(received)}" + ) + + recv_wav_path = Path(__file__).parent / "subscriber_recv_freqs.wav" + with wave.open(str(recv_wav_path), "wb") as wav_out: + wav_out.setnchannels(NUM_CHANNELS) + wav_out.setsampwidth(ctypes.sizeof(ctypes.c_int16)) + wav_out.setframerate(SAMPLE_RATE) + wav_out.writeframes(received.tobytes()) + + # Find signal onset to skip codec startup silence. + envelope = np.abs(received.astype(np.float32)) + threshold = float(envelope.max()) * 0.2 + onset_candidates = np.where(envelope > threshold)[0] + assert onset_candidates.size > 0, "Received audio contains only silence" + onset = int(onset_candidates[0]) + + samples_per_tone = int(SAMPLE_RATE * TONE_DURATION_SEC) + # Analyze the middle slice of each tone window to avoid boundary transitions. + analysis_margin = int(SAMPLE_RATE * 0.2) + analysis_length = samples_per_tone - 2 * analysis_margin + + per_tone_peaks: list[tuple[int, float]] = [] + for idx, expected_freq in enumerate(FREQUENCIES_HZ): + start = onset + idx * samples_per_tone + analysis_margin + end = start + analysis_length + assert end <= len(received), ( + f"Not enough samples for tone {idx} (expected {expected_freq} Hz): " + f"need {end}, have {len(received)}" + ) + segment = received[start:end] + segment_frame = AudioFrame( + data=segment.tobytes(), + sample_rate=SAMPLE_RATE, + num_channels=NUM_CHANNELS, + samples_per_channel=len(segment), + ) + peak_hz = _detect_peak_frequency(segment_frame) + per_tone_peaks.append((expected_freq, peak_hz)) + + # Opus transcoding adds spectral jitter; allow a 15 Hz tolerance. + assert peak_hz == pytest.approx(expected_freq, abs=15.0), ( + f"Tone {idx}: expected {expected_freq} Hz, got peak at {peak_hz:.1f} Hz. " + f"All peaks: {per_tone_peaks}" + ) + + # The target band should also dominate the other sweep bands. + energies = _band_energies(segment_frame, FREQUENCIES_HZ, bandwidth_hz=30.0) + target_energy = energies[expected_freq] + other_energy = sum(v for k, v in energies.items() if k != expected_freq) + assert target_energy > 5.0 * max(other_energy, 1e-12), ( + f"Tone {idx} ({expected_freq} Hz) did not dominate other bands: " + f"target={target_energy:.3e}, other={other_energy:.3e}" + ) + finally: + await publisher_room.disconnect() + await subscriber_room.disconnect() diff --git a/livekit-rtc/tests/test_video.py b/livekit-rtc/tests/test_video.py new file mode 100644 index 00000000..cb832f2e --- /dev/null +++ b/livekit-rtc/tests/test_video.py @@ -0,0 +1,276 @@ +# Copyright 2026 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end video publish/subscribe tests.""" + +import asyncio +import os +import struct +import uuid +import zlib +from pathlib import Path + +import numpy as np +import pytest + +from livekit import api, rtc + + +VIDEO_WIDTH = 640 +VIDEO_HEIGHT = 480 +VIDEO_FPS = 15 +VIDEO_COLOR_DURATION_SEC = 1.0 +# (name, RGB tuple) — order matters; the subscriber must see them in this sequence. +VIDEO_COLOR_SEQUENCE: list[tuple[str, tuple[int, int, int]]] = [ + ("red", (255, 0, 0)), + ("green", (0, 255, 0)), + ("blue", (0, 0, 255)), + ("white", (255, 255, 255)), + ("black", (0, 0, 0)), +] + + +def skip_if_no_credentials(): + required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"] + missing = [var for var in required_vars if not os.getenv(var)] + return pytest.mark.skipif( + bool(missing), reason=f"Missing environment variables: {', '.join(missing)}" + ) + + +def create_token(identity: str, room_name: str) -> str: + return ( + api.AccessToken() + .with_identity(identity) + .with_name(identity) + .with_grants( + api.VideoGrants( + room_join=True, + room=room_name, + ) + ) + .to_jwt() + ) + + +def unique_room_name(base: str) -> str: + return f"{base}-{uuid.uuid4().hex[:8]}" + + +def _solid_color_rgba_frame(width: int, height: int, rgb: tuple[int, int, int]) -> rtc.VideoFrame: + """Build a solid-color 640x480 RGBA `VideoFrame` for the given RGB triple.""" + pixels = np.empty((height, width, 4), dtype=np.uint8) + pixels[:, :, 0] = rgb[0] + pixels[:, :, 1] = rgb[1] + pixels[:, :, 2] = rgb[2] + pixels[:, :, 3] = 255 + return rtc.VideoFrame( + width=width, + height=height, + type=rtc.VideoBufferType.RGBA, + data=pixels.tobytes(), + ) + + +def _classify_frame_color( + frame_rgb: np.ndarray, + palette: list[tuple[str, tuple[int, int, int]]], +) -> tuple[str, float]: + """Return (nearest palette color name, euclidean distance) for the mean RGB of `frame_rgb`.""" + mean_rgb = frame_rgb[:, :, :3].reshape(-1, 3).mean(axis=0) + best_name, best_dist = palette[0][0], float("inf") + for name, rgb in palette: + dist = float(np.linalg.norm(mean_rgb - np.asarray(rgb, dtype=np.float64))) + if dist < best_dist: + best_name, best_dist = name, dist + return best_name, best_dist + + +def _save_rgba_frame_as_png(frame_rgba: np.ndarray, path: Path) -> None: + """Encode an (H, W, 4) uint8 RGBA array to a PNG file using only the stdlib.""" + height, width, _ = frame_rgba.shape + rgb = np.ascontiguousarray(frame_rgba[:, :, :3], dtype=np.uint8) + + def _chunk(tag: bytes, data: bytes) -> bytes: + return ( + struct.pack(">I", len(data)) + + tag + + data + + struct.pack(">I", zlib.crc32(tag + data) & 0xFFFFFFFF) + ) + + signature = b"\x89PNG\r\n\x1a\n" + # IHDR: width, height, bit depth=8, color type=2 (RGB), compression=0, filter=0, interlace=0 + ihdr = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) + # Each scanline must be prefixed with a filter byte (0 = None). + filter_col = np.zeros((height, 1), dtype=np.uint8) + scanlines = np.concatenate([filter_col, rgb.reshape(height, width * 3)], axis=1) + idat = zlib.compress(scanlines.tobytes(), level=6) + + with open(path, "wb") as f: + f.write(signature) + f.write(_chunk(b"IHDR", ihdr)) + f.write(_chunk(b"IDAT", idat)) + f.write(_chunk(b"IEND", b"")) + + +@skip_if_no_credentials() +class TestVideoStreamPublishSubscribe: + """End-to-end: publish a 640x480 color-cycle video and verify colors on the subscriber.""" + + async def test_video_stream_publish_subscribe(self): + """Publish red/green/blue/white/black (1s each, 15fps) and verify color sequence.""" + url = os.environ["LIVEKIT_URL"] + room_name = unique_room_name("test-video-colors") + + publisher_room = rtc.Room() + subscriber_room = rtc.Room() + + publisher_token = create_token("video-publisher", room_name) + subscriber_token = create_token("video-subscriber", room_name) + + track_subscribed_event = asyncio.Event() + subscribed_track: rtc.Track | None = None + + @subscriber_room.on("track_subscribed") + def on_track_subscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant, + ): + nonlocal subscribed_track + if track.kind == rtc.TrackKind.KIND_VIDEO: + subscribed_track = track + track_subscribed_event.set() + + try: + await subscriber_room.connect(url, subscriber_token) + await publisher_room.connect(url, publisher_token) + + source = rtc.VideoSource(VIDEO_WIDTH, VIDEO_HEIGHT) + track = rtc.LocalVideoTrack.create_video_track("color-cycle", source) + options = rtc.TrackPublishOptions() + options.source = rtc.TrackSource.SOURCE_CAMERA + await publisher_room.local_participant.publish_track(track, options) + + await asyncio.wait_for(track_subscribed_event.wait(), timeout=10.0) + assert subscribed_track is not None + + # Request RGBA frames from the SFU so we don't have to convert per frame. + video_stream = rtc.VideoStream(subscribed_track, format=rtc.VideoBufferType.RGBA) + + received_frames: list[np.ndarray] = [] + stop_collecting = asyncio.Event() + + async def publish_colors() -> None: + frame_interval = 1.0 / VIDEO_FPS + frames_per_color = int(VIDEO_FPS * VIDEO_COLOR_DURATION_SEC) + loop = asyncio.get_event_loop() + start = loop.time() + global_idx = 0 + for _color_name, rgb in VIDEO_COLOR_SEQUENCE: + frame = _solid_color_rgba_frame(VIDEO_WIDTH, VIDEO_HEIGHT, rgb) + for _ in range(frames_per_color): + source.capture_frame(frame) + global_idx += 1 + target = start + global_idx * frame_interval + sleep_for = target - loop.time() + if sleep_for > 0: + await asyncio.sleep(sleep_for) + + async def collect_frames() -> None: + async for event in video_stream: + vf = event.frame + if vf.type != rtc.VideoBufferType.RGBA: + vf = vf.convert(rtc.VideoBufferType.RGBA) + arr = ( + np.frombuffer(bytes(vf.data.cast("B")), dtype=np.uint8) + .reshape(vf.height, vf.width, 4) + .copy() + ) + received_frames.append(arr) + if stop_collecting.is_set(): + break + + publish_task = asyncio.create_task(publish_colors()) + collect_task = asyncio.create_task(collect_frames()) + + await publish_task + # Allow trailing frames to drain before signaling stop. + await asyncio.sleep(0.5) + stop_collecting.set() + + try: + await asyncio.wait_for(collect_task, timeout=3.0) + except asyncio.TimeoutError: + collect_task.cancel() + try: + await collect_task + except (asyncio.CancelledError, BaseException): + pass + + await video_stream.aclose() + await source.aclose() + + assert len(received_frames) > 0, "No video frames received" + + # Classify each received frame against the 5-color palette. + classified = [ + _classify_frame_color(f, VIDEO_COLOR_SEQUENCE)[0] for f in received_frames + ] + + # Reduce to stable runs: ignore single-frame transitions at color boundaries. + runs: list[tuple[str, int]] = [] + for color in classified: + if runs and runs[-1][0] == color: + runs[-1] = (color, runs[-1][1] + 1) + else: + runs.append((color, 1)) + # A stable run has several frames of the same classified color. + stable_run_min = max(3, VIDEO_FPS // 3) + dominant = [c for c, n in runs if n >= stable_run_min] + + expected_order = [name for name, _ in VIDEO_COLOR_SEQUENCE] + # Find expected_order as a subsequence of dominant. + seq_idx = 0 + for color in dominant: + if seq_idx < len(expected_order) and color == expected_order[seq_idx]: + seq_idx += 1 + assert seq_idx == len(expected_order), ( + f"Expected color sequence {expected_order} not found in dominant runs " + f"{dominant}. Full classified stream: {classified}" + ) + + # Snapshot the first received frame of each expected color to JPEG. + output_dir = Path(__file__).parent + saved: dict[str, Path] = {} + for idx, color_name in enumerate(classified): + if color_name in saved: + continue + if color_name not in (name for name, _ in VIDEO_COLOR_SEQUENCE): + continue + out_path = output_dir / f"subscriber_recv_frame_color_{color_name}.png" + _save_rgba_frame_as_png(received_frames[idx], out_path) + saved[color_name] = out_path + if len(saved) == len(VIDEO_COLOR_SEQUENCE): + break + + expected_names = {name for name, _ in VIDEO_COLOR_SEQUENCE} + missing = expected_names - saved.keys() + assert not missing, ( + f"Did not capture a frame for colors: {missing}. Classified stream: {classified}" + ) + finally: + await publisher_room.disconnect() + await subscriber_room.disconnect()