DEV Community

Alain Airom
Alain Airom

Posted on

Introducing the Voxtral Test: Breaking the Speed Barrier in Real-Time Speech AI

A test on implementing voxtral using Bob!

Introduction — What is voxtral?

Voxtral is a next-generation family of open-source speech-to-text models developed by Mistral AI. It is designed to bridge the gap between high-latency offline transcription and fast but often less accurate real-time systems.

The family features two primary models:

  • Voxtral Mini 4B Realtime: A lightweight, 4-billion parameter model purpose-built for live, streaming transcription with ultra-low latency (<500ms).
  • Voxtral Mini Transcribe V2: A batch-optimized version that delivers state-of-the-art accuracy, outperforming models like Whisper large-v3 and GPT-4o mini Transcribe in word error rate (WER) and cost efficiency.

Unlike traditional models that process audio in fixed chunks, Voxtral uses a novel streaming architecture and a custom causal audio encoder. This allows it to transcribe audio as it arrives, making it one of the first open-source solutions to match offline accuracy in a real-time environment.

What Can it Be Used For?

The “Voxtral Test” explores the model’s versatility across several high-impact domains:

  • Real-Time Voice Agents: Powering conversational AI and virtual assistants with sub-200ms latency, enabling natural-feeling voice interfaces.
  • Live Subtitling & Broadcasting: Generating near-instant multilingual subtitles for media, broadcasts, and conferences.
  • Meeting Intelligence: Transcribing long-form recordings (up to 3 hours) with speaker diarization to clearly identify who is speaking and when.
  • Contact Center Automation: Analyzing customer calls in real-time to suggest responses, track sentiment, and automate CRM documentation.
  • Edge & Private Deployments: Because it is released under the Apache 2.0 license and runs efficiently on single GPUs (requiring ~16GB VRAM), it is ideal for privacy-first applications where data cannot leave the local environment.
  • Domain-Specific Accuracy: Using context biasing, developers can provide the model with technical terms or proper nouns (up to 100 phrases) to ensure precise spelling in specialized industries like legal or medical fields.

As usual, my go-to partner Bob helped me craft a user-friendly application to showcase exactly what Voxtral can do.

The Vision: Real-Time Transcription for Everyone

The primary goal of this implementation is to provide a high-performance, user-friendly interface for testing Mistral AI’s Voxtral-Mini-4B-Realtime-2602. By leveraging a modern web-based UI, users can capture live audio directly from their microphones and witness state-of-the-art speech-to-text transcription with ultra-low latency — typically under 500ms. The project bridges the gap between complex AI model hosting and the end-user, offering a dashboard that monitors everything from audio waveforms to real-time latency statistics.

Getting Started: From Clone to Capture


The “Quick Start” workflow is designed for rapid deployment, guiding developers through a simple five-step process. After setting up the environment and installing dependencies, the core of the GPU-enabled experience relies on running a vLLM server. This server hosts the Voxtral model using a specialized container image optimized for NVIDIA GPUs. Once the backend is active, a Python-based proxy server establishes the bridge to the web browser, allowing users to simply open an HTML file and start transcribing immediately.

# server.py
#!/usr/bin/env python3
"""
Voxtral Realtime Transcription Server
Connects to vLLM server running Voxtral-Mini-4B-Realtime-2602
"""

import argparse
import asyncio
import base64
import json
import logging
import time
from typing import Optional

import numpy as np
import websockets
from websockets.server import WebSocketServerProtocol

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class VoxtralServer:
    def __init__(self, vllm_host: str = "127.0.0.1", vllm_port: int = 8000):
        self.vllm_host = vllm_host
        self.vllm_port = vllm_port
        self.vllm_url = f"ws://{vllm_host}:{vllm_port}/v1/realtime"
        self.sessions = {}

    async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
        """Handle incoming client WebSocket connection"""
        client_id = id(websocket)
        logger.info(f"Client {client_id} connected from {websocket.remote_address}")

        vllm_ws = None

        try:
            # Connect to vLLM server
            logger.info(f"Connecting to vLLM server at {self.vllm_url}")
            vllm_ws = await websockets.connect(self.vllm_url)
            logger.info(f"Connected to vLLM server for client {client_id}")

            # Wait for session.created from vLLM
            session_response = await vllm_ws.recv()
            session_data = json.loads(session_response)

            if session_data.get('type') == 'session.created':
                logger.info(f"Session created: {session_data.get('id')}")
                # Forward session.created to client
                await websocket.send(session_response)

            # Create tasks for bidirectional communication
            client_to_vllm = asyncio.create_task(
                self.forward_client_to_vllm(websocket, vllm_ws, client_id)
            )
            vllm_to_client = asyncio.create_task(
                self.forward_vllm_to_client(vllm_ws, websocket, client_id)
            )

            # Wait for either task to complete
            done, pending = await asyncio.wait(
                [client_to_vllm, vllm_to_client],
                return_when=asyncio.FIRST_COMPLETED
            )

            # Cancel pending tasks
            for task in pending:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass

        except websockets.exceptions.WebSocketException as e:
            logger.error(f"WebSocket error for client {client_id}: {e}")
            try:
                await websocket.send(json.dumps({
                    'type': 'error',
                    'error': f'Connection error: {str(e)}'
                }))
            except:
                pass
        except Exception as e:
            logger.error(f"Error handling client {client_id}: {e}", exc_info=True)
            try:
                await websocket.send(json.dumps({
                    'type': 'error',
                    'error': f'Server error: {str(e)}'
                }))
            except:
                pass
        finally:
            if vllm_ws:
                await vllm_ws.close()
            logger.info(f"Client {client_id} disconnected")

    async def forward_client_to_vllm(
        self, 
        client_ws: WebSocketServerProtocol, 
        vllm_ws, 
        client_id: int
    ):
        """Forward messages from client to vLLM server"""
        try:
            async for message in client_ws:
                try:
                    data = json.loads(message)
                    msg_type = data.get('type', 'unknown')

                    logger.debug(f"Client {client_id} -> vLLM: {msg_type}")

                    # Forward to vLLM
                    await vllm_ws.send(message)

                    # Log audio chunks
                    if msg_type == 'input_audio_buffer.append':
                        audio_len = len(data.get('audio', ''))
                        logger.debug(f"Forwarded audio chunk ({audio_len} bytes)")

                except json.JSONDecodeError as e:
                    logger.error(f"Invalid JSON from client {client_id}: {e}")
                except Exception as e:
                    logger.error(f"Error forwarding client message: {e}")

        except websockets.exceptions.ConnectionClosed:
            logger.info(f"Client {client_id} connection closed")
        except Exception as e:
            logger.error(f"Error in client->vLLM forwarding: {e}")

    async def forward_vllm_to_client(
        self, 
        vllm_ws, 
        client_ws: WebSocketServerProtocol, 
        client_id: int
    ):
        """Forward messages from vLLM server to client"""
        try:
            async for message in vllm_ws:
                try:
                    data = json.loads(message)
                    msg_type = data.get('type', 'unknown')

                    logger.debug(f"vLLM -> Client {client_id}: {msg_type}")

                    # Forward to client
                    await client_ws.send(message)

                    # Log transcription results
                    if msg_type == 'transcription.delta':
                        delta = data.get('delta', '')
                        logger.info(f"Transcription delta: {delta}")
                    elif msg_type == 'transcription.done':
                        text = data.get('text', '')
                        logger.info(f"Transcription complete: {text}")

                except json.JSONDecodeError as e:
                    logger.error(f"Invalid JSON from vLLM: {e}")
                except Exception as e:
                    logger.error(f"Error forwarding vLLM message: {e}")

        except websockets.exceptions.ConnectionClosed:
            logger.info(f"vLLM connection closed for client {client_id}")
        except Exception as e:
            logger.error(f"Error in vLLM->client forwarding: {e}")

    async def start(self, host: str = "0.0.0.0", port: int = 8080):
        """Start the WebSocket server"""
        logger.info(f"Starting Voxtral proxy server on {host}:{port}")
        logger.info(f"Proxying to vLLM server at {self.vllm_url}")

        async with websockets.serve(self.handle_client, host, port):
            logger.info(f"Server ready and listening on ws://{host}:{port}")
            await asyncio.Future()  # Run forever


def main():
    parser = argparse.ArgumentParser(
        description="Voxtral Realtime Transcription Proxy Server"
    )
    parser.add_argument(
        "--host",
        type=str,
        default="0.0.0.0",
        help="Host to bind the proxy server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port",
        type=int,
        default=8080,
        help="Port to bind the proxy server (default: 8080)"
    )
    parser.add_argument(
        "--vllm-host",
        type=str,
        default="127.0.0.1",
        help="vLLM server host (default: 127.0.0.1)"
    )
    parser.add_argument(
        "--vllm-port",
        type=int,
        default=8000,
        help="vLLM server port (default: 8000)"
    )
    parser.add_argument(
        "--debug",
        action="store_true",
        help="Enable debug logging"
    )

    args = parser.parse_args()

    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)

    server = VoxtralServer(vllm_host=args.vllm_host, vllm_port=args.vllm_port)

    try:
        asyncio.run(server.start(host=args.host, port=args.port))
    except KeyboardInterrupt:
        logger.info("Server stopped by user")
    except Exception as e:
        logger.error(f"Server error: {e}", exc_info=True)


if __name__ == "__main__":
    main()

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

Inclusivity: The CPU-Only Alternative


Recognizing that not every tester has access to an NVIDIA GPU with 16GB of VRAM, the project includes a dedicated CPU-Only setup. This path utilizes Vosk, an offline-compatible speech recognition engine that runs efficiently on standard consumer hardware. While Voxtral on a GPU offers the highest accuracy and lowest latency, the Vosk-based CPU alternative still provides impressive real-time performance with 85–95% accuracy. This ensures that the user interface and workflow can be evaluated on virtually any modern machine, from laptops to basic servers.

# sever_vosk.py
#!/usr/bin/env python3
"""
CPU-Compatible Realtime Transcription Server using Vosk
Works on CPU without GPU requirements
"""

import argparse
import asyncio
import base64
import json
import logging
import os
import struct
from typing import Optional

import websockets
from websockets.server import WebSocketServerProtocol

try:
    from vosk import Model, KaldiRecognizer
    VOSK_AVAILABLE = True
except ImportError:
    VOSK_AVAILABLE = False
    print("⚠️  Vosk not installed. Install with: pip install vosk")
    print("⚠️  Download a model from: https://alphacephei.com/vosk/models")

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class VoskTranscriptionServer:
    def __init__(self, model_path: str):
        if not VOSK_AVAILABLE:
            raise ImportError("Vosk is not installed. Run: pip install vosk")

        if not os.path.exists(model_path):
            raise FileNotFoundError(
                f"Model not found at {model_path}\n"
                f"Download from: https://alphacephei.com/vosk/models\n"
                f"Example: vosk-model-small-en-us-0.15"
            )

        logger.info(f"Loading Vosk model from {model_path}")
        self.model = Model(model_path)
        logger.info("Model loaded successfully")

    async def handle_client(self, websocket: WebSocketServerProtocol, path: str):
        """Handle incoming client WebSocket connection"""
        client_id = id(websocket)
        logger.info(f"Client {client_id} connected from {websocket.remote_address}")

        # Create recognizer for this session
        recognizer = KaldiRecognizer(self.model, 16000)
        recognizer.SetWords(True)

        session_id = f"vosk-session-{client_id}"

        try:
            # Send session.created
            await websocket.send(json.dumps({
                'type': 'session.created',
                'id': session_id,
                'model': 'vosk-cpu',
                'object': 'realtime.session'
            }))

            logger.info(f"Session created: {session_id}")

            audio_buffer = bytearray()
            is_recording = False

            async for message in websocket:
                try:
                    data = json.loads(message)
                    msg_type = data.get('type', 'unknown')

                    logger.debug(f"Client {client_id}: {msg_type}")

                    if msg_type == 'session.update':
                        # Acknowledge session update
                        await websocket.send(json.dumps({
                            'type': 'session.updated',
                            'session': {
                                'id': session_id,
                                'model': 'vosk-cpu'
                            }
                        }))

                    elif msg_type == 'input_audio_buffer.commit':
                        is_recording = True
                        if data.get('final', False):
                            # Process final audio
                            if recognizer.AcceptWaveform(bytes(audio_buffer)):
                                result = json.loads(recognizer.Result())
                                text = result.get('text', '')

                                if text:
                                    await websocket.send(json.dumps({
                                        'type': 'transcription.done',
                                        'text': text,
                                        'object': 'realtime.transcription'
                                    }))
                                    logger.info(f"Final transcription: {text}")

                            # Get any remaining text
                            final_result = json.loads(recognizer.FinalResult())
                            final_text = final_result.get('text', '')
                            if final_text:
                                await websocket.send(json.dumps({
                                    'type': 'transcription.done',
                                    'text': final_text,
                                    'object': 'realtime.transcription'
                                }))

                            # Reset for next session
                            audio_buffer.clear()
                            recognizer = KaldiRecognizer(self.model, 16000)
                            recognizer.SetWords(True)
                            is_recording = False

                    elif msg_type == 'input_audio_buffer.append':
                        if is_recording:
                            # Decode base64 audio
                            audio_b64 = data.get('audio', '')
                            audio_bytes = base64.b64decode(audio_b64)

                            # Add to buffer
                            audio_buffer.extend(audio_bytes)

                            # Process in chunks
                            if len(audio_buffer) >= 8192:  # Process every 8KB
                                chunk = bytes(audio_buffer[:8192])
                                audio_buffer = audio_buffer[8192:]

                                if recognizer.AcceptWaveform(chunk):
                                    result = json.loads(recognizer.Result())
                                    text = result.get('text', '')

                                    if text:
                                        # Send partial result
                                        await websocket.send(json.dumps({
                                            'type': 'transcription.delta',
                                            'delta': text + ' ',
                                            'object': 'realtime.transcription.delta'
                                        }))
                                        logger.info(f"Partial transcription: {text}")
                                else:
                                    # Send partial result
                                    partial = json.loads(recognizer.PartialResult())
                                    partial_text = partial.get('partial', '')

                                    if partial_text:
                                        await websocket.send(json.dumps({
                                            'type': 'transcription.delta',
                                            'delta': partial_text,
                                            'object': 'realtime.transcription.delta'
                                        }))

                except json.JSONDecodeError as e:
                    logger.error(f"Invalid JSON from client {client_id}: {e}")
                    await websocket.send(json.dumps({
                        'type': 'error',
                        'error': f'Invalid JSON: {str(e)}'
                    }))
                except Exception as e:
                    logger.error(f"Error processing message: {e}", exc_info=True)
                    await websocket.send(json.dumps({
                        'type': 'error',
                        'error': f'Processing error: {str(e)}'
                    }))

        except websockets.exceptions.ConnectionClosed:
            logger.info(f"Client {client_id} connection closed")
        except Exception as e:
            logger.error(f"Error handling client {client_id}: {e}", exc_info=True)
        finally:
            logger.info(f"Client {client_id} disconnected")

    async def start(self, host: str = "0.0.0.0", port: int = 8080):
        """Start the WebSocket server"""
        logger.info(f"Starting Vosk transcription server on {host}:{port}")
        logger.info("This server runs on CPU - no GPU required!")

        async with websockets.serve(self.handle_client, host, port):
            logger.info(f"Server ready and listening on ws://{host}:{port}")
            await asyncio.Future()  # Run forever


def main():
    parser = argparse.ArgumentParser(
        description="CPU-Compatible Realtime Transcription Server using Vosk"
    )
    parser.add_argument(
        "--host",
        type=str,
        default="0.0.0.0",
        help="Host to bind the server (default: 0.0.0.0)"
    )
    parser.add_argument(
        "--port",
        type=int,
        default=8080,
        help="Port to bind the server (default: 8080)"
    )
    parser.add_argument(
        "--model",
        type=str,
        required=True,
        help="Path to Vosk model directory (e.g., vosk-model-small-en-us-0.15)"
    )
    parser.add_argument(
        "--debug",
        action="store_true",
        help="Enable debug logging"
    )

    args = parser.parse_args()

    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)

    if not VOSK_AVAILABLE:
        print("\n❌ Vosk is not installed!")
        print("\nInstall with:")
        print("  pip install vosk")
        print("\nDownload a model from:")
        print("  https://alphacephei.com/vosk/models")
        print("\nExample models:")
        print("  - vosk-model-small-en-us-0.15 (40MB, fast)")
        print("  - vosk-model-en-us-0.22 (1.8GB, accurate)")
        return 1

    try:
        server = VoskTranscriptionServer(model_path=args.model)
        asyncio.run(server.start(host=args.host, port=args.port))
    except KeyboardInterrupt:
        logger.info("Server stopped by user")
    except Exception as e:
        logger.error(f"Server error: {e}", exc_info=True)
        return 1

    return 0


if __name__ == "__main__":
    exit(main())

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

Under the Hood: A Multi-Layered Architecture

The system follows a robust, multi-layered architecture designed for efficiency and scalability.

  • The Client Layer: A browser-based UI that captures raw audio via the Web Audio API and visualizes it in real-time.
  • The Application Layer: A Python proxy server that manages WebSocket connections and routes data.
  • The Server Layer: Where the “brain” resides — either a vLLM server running the 4-billion parameter Voxtral model on a GPU, or a Vosk server processing audio on a CPU. This decoupled design allows for horizontal scaling, where a single load balancer can distribute traffic across multiple GPU or CPU instances to support many concurrent users.

Conclusion

In conclusion, this project provides a functional and accessible entry point for exploring the cutting edge of real-time speech-to-text technology. By implementing a dual-mode architecture, the application successfully bridges the gap between high-end hardware requirements and common consumer setups, allowing users to experience Mistral AI’s Voxtral on GPUs or the efficient Vosk engine on CPUs. From the live audio visualization in the browser to the multi-layered backend that handles high-speed WebSocket streaming, the core infrastructure for sub-500ms transcription is now firmly in place.


Disclaimer & Status: Please note that this application is currently in Alpha development. It is intended solely as a rudimentary testing tool and a proof-of-concept for real-time transcription workflows. Users should expect potential bugs and stability issues, as it is not yet optimized for production environments. We have a roadmap of future enhancements planned — including multi-user support, speaker diarization, and automated punctuation — to transform this testbed into a more robust solution. Your feedback is invaluable during this early stage; comments, bug reports, and contributions are highly welcome as we continue to refine the experience.

>>> Thanks for reading <<<

Links

Top comments (0)