How to Design a Full Streaming Voice Agent with End-to-End Latency Budgeting, Incremental ASR, LLM Streaming, and Real-Time TTS

by
0 comments
How to Design a Full Streaming Voice Agent with End-to-End Latency Budgeting, Incremental ASR, LLM Streaming, and Real-Time TTS

In this tutorial, we build an end-to-end streaming voice agent that demonstrates how modern low-latency conversation systems work in real time. We simulate the entire pipeline, from segmented audio input and streaming speech recognition to incremental language model logic and streamed text-to-speech output, while explicitly tracking latency at every step. By working with a strict latency budget and observing metrics such as time to first token and time to first audio, we focus on the practical engineering trade-offs that shape responsive voice-based user experiences. check it out full code here.

import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import List, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
   asr_partial: float = 0.0
   asr_complete: float = 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
   tts_first_chunk: float = 0.0
   tts_complete: float = 0.0


   def get_time_to_first_audio(self) -> float:
       return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
class LatencyBudgets:
   asr_processing: float = 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
   tts_first_chunk: float = 0.2
   tts_chunk_generation: float = 0.05
   time_to_first_audio: float = 1.0


class AgentState(Enum):
   LISTENING = "listening"
   PROCESSING_SPEECH = "processing_speech"
   THINKING = "thinking"
   SPEAKING = "speaking"
   INTERRUPTED = "interrupted"

We define key data structures and state representations that allow us to track latency across the entire voice pipeline. We formalize the timing signals for ASR, LLM, and TTS to ensure consistent measurements at all stages. We also establish an explicit agent state machine that guides how the system changes during interactions. check it out full code here.

class AudioInputStream:
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, text: str) -> AsyncIterator(np.ndarray):
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(text) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


       for _ in range(num_chunks):
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
           yield chunk

We simulate real-time audio input by breaking speech into fixed duration chunks that arrive asynchronously. We model realistic speaking rates and streaming behavior to mimic live microphone input. We use this stream as a basis for testing downstream latency-sensitive components. check it out full code here.

class StreamingASR:
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


   async def transcribe_stream(
       self,
       audio_stream: AsyncIterator(np.ndarray),
       ground_truth: str
   ) -> AsyncIterator(tuple(str, bool)):
       words = ground_truth.split()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


       async for chunk in audio_stream:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed < len(words):
               words_transcribed += 1
               yield " ".join(words(:words_transcribed)), False


           audio_power = np.mean(np.abs(chunk))
           silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0


           if silence_duration >= self.silence_threshold:
               await asyncio.sleep(0.2)
               yield ground_truth, True
               return


       yield ground_truth, True

We implement a streaming ASR module that produces partial transcriptions before releasing the final results. We progressively reveal the terms to demonstrate how modern ASR systems work in real time. We also introduce silence-based finalization for approximate identification of the end of utterance. check it out full code here.

class StreamingLLM:
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, prompt: str) -> AsyncIterator(str):
       responses = {
           "hello": "Hello! How can I help you today?",
           "weather": "The weather is sunny with a temperature of 72°F.",
           "time": "The current time is 2:30 PM.",
           "default": "I understand. Let me help you with that."
       }


       response = responses("default")
       for key in responses:
           if key in prompt.lower():
               response = responses(key)
               break


       await asyncio.sleep(self.time_to_first_token)


       for word in response.split():
           yield word + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS:
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator(str)) -> AsyncIterator(np.ndarray):
       first_chunk = True
       buffer = ""


       async for text in text_stream:
           buffer += text
           if len(buffer) >= 20 or first_chunk:
               if first_chunk:
                   await asyncio.sleep(self.time_to_first_chunk)
                   first_chunk = False


               duration = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * duration)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(duration * 0.5)

In this snippet, we model a streaming language model and a streaming text-to-speech engine working together. We generate token-by-token responses to capture first-in-time token behavior. We then incrementally convert the text into audio segments to simulate initial and continuous speech synthesis. check it out full code here.

class StreamingVoiceAgent:
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
       self.audio_stream = AudioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: List(LatencyMetrics) = ()


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for text, final in self.asr.transcribe_stream(audio_gen, user_input):
           if final:
               metrics.asr_complete = time.time() - start_time
               transcription = text


       metrics.llm_started = time.time() - start_time
       response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


       async def text_stream():
           for word in response.split():
               yield word + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
       return metrics

We orchestrate the complete voice agent by wiring the audio input, ASR, LLM and TTS into an asynchronous flow. We record precise timestamps on each transition to calculate important latency metrics. We treat each user as a separate experiment to enable systematic performance analysis. check it out full code here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


   inputs = (
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   )


   for text in inputs:
       await agent.process_turn(text)
       await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(run_demo())

We run the entire system over multiple conversation turns to observe latency stability and variation. We apply an aggressive latency budget to keep the pipeline pressure under realistic constraints. We use these runs to verify that the system meets response goals during interactions.

Finally, we demonstrated how an entirely streaming voice agent can be organized as a single asynchronous pipeline with explicit stage boundaries and measurable performance guarantees. We show that combining partial ASR, token-level LLM streaming, and early start TTS reduces perceived latency, even when the total computation time remains non-trivial. This approach helps us reason systematically about turn-taking, reactivity, and adaptation levers, and it provides a solid foundation for expanding the system for real-world deployment using production ASR, LLM, and TTS models.


check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.


Asif Razzaq Marktechpost Media Inc. Is the CEO of. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. Their most recent endeavor is the launch of MarketTechPost, an Artificial Intelligence media platform, known for its in-depth coverage of Machine Learning and Deep Learning news that is technically robust and easily understood by a wide audience. The platform boasts of over 2 million monthly views, which shows its popularity among the audience.

Related Articles

Leave a Comment