opncrafter

Deploying Open Source STT Models in Production

Deploying an open-source STT model to production is dramatically different from running it in a Jupyter notebook. Production means handling concurrent requests, managing memory limits, monitoring accuracy degradation, scaling under variable load, and dealing with the full diversity of audio quality your users will send. This guide covers the production deployment architecture I recommend for teams choosing to self-host their speech recognition stack.


Capacity Planning

Before designing your deployment, you need to understand your throughput requirements. STT throughput is measured in "audio hours per hour" — how many hours of audio your service can process per hour of wall-clock time.

# Capacity estimation for faster-whisper
# Hardware: GPU instance (A10G, 24GB VRAM)
# Model: large-v3-turbo (float16)

# Measured benchmarks:
AUDIO_PER_SECOND_GPU = 35.0   # seconds of audio processed per second of wall-clock (35x real-time)
AUDIO_PER_SECOND_CPU = 3.5    # int8 on 8-core CPU (3.5x real-time)

# Your requirements:
PEAK_CONCURRENT_REQUESTS = 50         # simultaneous users
AVERAGE_AUDIO_LENGTH_SECONDS = 120    # 2-minute average call

# GPU capacity (single A10G):
gpu_throughput = AUDIO_PER_SECOND_GPU  # 35 audio-seconds per wall-clock second
# Can handle: 35 / 120 = ~0.29 requests/sec = ~17 requests/min
# For 50 concurrent users: need 50 / (35/120) = ~171x GPUs? NO --
# Whisper adds to a queue; users wait. Queue depth = bottleneck.

# Practical rule of thumb:
# 1 A10G GPU handles ~5-10 concurrent short-audio requests (< 30s)
# 1 A10G GPU handles ~2-3 concurrent long-audio requests (2-5 min)
# Scale horizontally: one GPU instance per 5 concurrent requests

INSTANCES_NEEDED = max(1, PEAK_CONCURRENT_REQUESTS // 5)
print(f"GPU instances needed at peak: {INSTANCES_NEEDED}")  # 10

Production Architecture

# docker-compose.yml for production STT stack

version: '3.8'
services:
  # API Gateway (handles auth, rate limiting, routing)
  nginx:
    image: nginx:alpine
    ports: ["443:443", "80:80"]
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on: [api]
  
  # API Service (stateless, scales horizontally)
  api:
    build: ./api
    environment:
      - REDIS_URL=redis://redis:6379
      - WORKER_URL=amqp://rabbitmq:5672
    deploy:
      replicas: 3    # Scale API layer independently
  
  # Task Queue (for async long-audio processing)
  rabbitmq:
    image: rabbitmq:3-management
    ports: ["15672:15672"]
  
  # Result Cache
  redis:
    image: redis:7-alpine
    volumes: ["redis_data:/data"]
  
  # STT Worker (GPU-accelerated, scales with GPU capacity)
  worker:
    build: ./worker
    environment:
      - MODEL_SIZE=large-v3-turbo
      - DEVICE=cuda
    deploy:
      replicas: 2
      resources:
        reservations:
          devices:
            - capabilities: [gpu]  # GPU required
  
volumes:
  redis_data:

Celery Worker Implementation

# worker/tasks.py
from celery import Celery
from faster_whisper import WhisperModel
import os, json, redis

# Initialize once at worker startup, not per-task
MODEL_SIZE = os.getenv("MODEL_SIZE", "base")
DEVICE = os.getenv("DEVICE", "cpu")
COMPUTE_TYPE = "float16" if DEVICE == "cuda" else "int8"

print(f"Loading model: {MODEL_SIZE} on {DEVICE}")
model = WhisperModel(MODEL_SIZE, device=DEVICE, compute_type=COMPUTE_TYPE)

app = Celery("stt", broker=os.getenv("WORKER_URL"))
cache = redis.from_url(os.getenv("REDIS_URL"))

CACHE_TTL = 3600  # Cache results for 1 hour

@app.task(bind=True, max_retries=3)
def transcribe_audio(self, job_id: str, audio_path: str, options: dict):
    try:
        cache.set(f"job:{job_id}:status", "processing")
        
        segments, info = model.transcribe(
            audio_path,
            language=options.get("language"),
            beam_size=options.get("beam_size", 5),
            vad_filter=options.get("vad_filter", True),
            word_timestamps=options.get("word_timestamps", False),
        )
        
        result = {
            "text": " ".join(seg.text.strip() for seg in segments),
            "language": info.language,
            "language_probability": info.language_probability,
            "segments": [
                {"start": seg.start, "end": seg.end, "text": seg.text.strip()}
                for seg in segments
            ],
        }
        
        # Cache result
        cache.setex(f"job:{job_id}:result", CACHE_TTL, json.dumps(result))
        cache.set(f"job:{job_id}:status", "completed")
        
        # Cleanup audio file
        os.unlink(audio_path)
        
        return result
    
    except Exception as exc:
        cache.set(f"job:{job_id}:status", f"failed: {str(exc)}")
        raise self.retry(exc=exc, countdown=5)

FastAPI with Async Job Submission

# api/main.py
import uuid, tempfile, os, json
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
import redis
from tasks import transcribe_audio

app = FastAPI(title="Production STT API")
cache = redis.from_url(os.getenv("REDIS_URL"))

MAX_FILE_SIZE_MB = 100

@app.post("/transcribe/async")
async def submit_transcription(
    audio: UploadFile = File(...),
    language: str = Form(None),
    word_timestamps: bool = Form(False),
):
    # Validate file size
    content = await audio.read()
    if len(content) > MAX_FILE_SIZE_MB * 1024 * 1024:
        raise HTTPException(status_code=413, detail=f"File exceeds {MAX_FILE_SIZE_MB}MB limit")
    
    job_id = str(uuid.uuid4())
    
    # Save to temp file (worker needs filesystem access)
    tmp_path = f"/tmp/stt_{job_id}.wav"
    with open(tmp_path, "wb") as f:
        f.write(content)
    
    cache.set(f"job:{job_id}:status", "queued")
    
    # Submit to Celery queue
    transcribe_audio.delay(job_id, tmp_path, {
        "language": language,
        "word_timestamps": word_timestamps,
    })
    
    return {"job_id": job_id, "status": "queued"}

@app.get("/transcribe/{job_id}")
async def get_result(job_id: str):
    status = cache.get(f"job:{job_id}:status")
    if not status:
        raise HTTPException(status_code=404, detail="Job not found")
    
    status = status.decode()
    
    if status == "completed":
        result_raw = cache.get(f"job:{job_id}:result")
        return {"status": "completed", "result": json.loads(result_raw)}
    
    return {"status": status}

Monitoring and Observability

  • Track WER over time: Maintain a golden test set of reference audio/transcript pairs and run them weekly. Audio quality drift (new microphone types, new codecs) can silently degrade accuracy
  • Monitor queue depth: Rising queue depth is the first signal of capacity problems — alert before users experience latency
  • Log language distribution: If the model is seeing languages it wasn't tested on, accuracy may be lower than expected
  • Track hallucination rate: Sample transcripts from silent audio submissions — if the model is generating text on empty audio, VAD filtering may have broken

Conclusion

Production STT deployment is primarily an infrastructure engineering challenge, not an ML challenge. The model (faster-whisper) is well-documented and stable. The work is in building a robust async job queue, managing GPU memory across concurrent requests, monitoring accuracy over time, and scaling capacity ahead of demand. Teams that treat this as straightforward web service deployment and skip the monitoring infrastructure are the ones who discover production accuracy problems six months later when a customer complains.

Continue Reading

👨‍💻
Written by

Vivek

AI Engineer

Full-stack AI engineer with 4+ years building LLM-powered products, autonomous agents, and RAG pipelines. I've shipped AI features to production for startups and worked hands-on with GPT-4o, LangChain, LlamaIndex, and the Vercel AI SDK. I started OpnCrafter to share everything I wish I had when learning — no fluff, just working code and real-world context.

GPT-4oLangChainNext.jsVector DBsRAGVercel AI SDK