LLM Tools: Real-time LLM Responses in Production (Part 4)

Build streaming LLM responses with Server-Sent Events, request cancellation, and real-time status updates to create engaging, responsive AI applications.


LLM Tools: Real-time LLM Responses in Production (Part 4)

Series Overview

This is part of a four-part series on building production-ready AI agents:

I hope you find the code in this series helpful! The complete implementation for this post (which is also the complete code for the entire project) can be found here. Feel free to fork it and adapt it for your own projects.

Note: This project includes comprehensive testing with a carefully configured Jest/SWC setup for TypeScript monorepos. Testing LLM applications with MCP can be quite tricky, so if you fork this project, don’t ignore the valuable testing configuration—it includes solutions for common issues like workspace package mocking, module resolution, and proper test isolation.

Architecture Note: This series implements MCP protocol manually rather than using the official MCP TypeScript SDK for educational purposes and maximum control. You’ll learn exactly how MCP works under the hood, making debugging and customization easier. The patterns shown here can easily be adapted to use the official SDK if preferred.

The repository includes an AI.md file with comprehensive guidance for developers and LLMs who want to modify and extend this code as a starting point for their own projects. It covers architecture patterns, extension points, testing configuration, and production considerations.


You’ve built a scalable, secure LLM agent system. Users can ask questions, the system discovers and executes tools across multiple MCP servers, and responses come back with the right information. Everything works, but there’s a problem: users have to wait longer than desirable for responses.

When someone asks “Find me 3-bedroom homes in Portland under $800,000,” they wait 3-5 seconds before seeing any response. During that time, the system is busy discovering tools, calling the OpenRouter API, executing MCP tools, and generating a thoughtful response. But from the user’s perspective, nothing is happening—they’re left wondering if their request was received.

In production LLM applications, perceived performance matters as much as actual performance. Users expect to see immediate feedback and progressive responses, not silent delays followed by walls of text. This is where Server-Sent Events (SSE) transform the user experience.

The Streaming Opportunity

Modern LLM APIs support streaming responses. Instead of waiting for the complete response, they send tokens as they’re generated. OpenRouter, OpenAI, and Anthropic all support this pattern. Your MCP architecture is perfectly positioned to take advantage of this because the separation between your main app and tool execution creates natural streaming checkpoints.

Here’s what we’ll build: when a user sends a message, they’ll immediately see “Discovering available tools…” followed by “Executing findListings…” and then tokens streaming in real-time as the LLM generates its response. They can also cancel the request mid-stream if they realize they asked the wrong question.

Setting Up Server-Sent Events

First, let’s create the streaming endpoint in our main application. SSE works over standard HTTP connections, making it simpler than WebSockets while still providing real-time updates.

📁 View agents controller on GitHub

// src/agents/agents.controller.ts
import { Controller, Post, Body, Response } from '@nestjs/common';
import { FastifyReply } from 'fastify';

@Controller('agents')
@UseGuards(JwtAuthGuard)
export class AgentsController {
  
  @Post("chat/stream")
  async streamChat(
    @Body() body: ChatRequestDto,
    @Response() res: FastifyReply
  ): Promise<void> {
    // Set up SSE headers
    res.type('text/event-stream');
    res.header('Cache-Control', 'no-cache');
    res.header('Connection', 'keep-alive');
    res.header('Access-Control-Allow-Origin', '*');
    res.header('Access-Control-Allow-Headers', 'Cache-Control');

    try {
      // Send immediate feedback
      this.sendEvent(res, {
        type: 'status',
        message: 'Starting conversation...'
      });

      // Stream the actual chat response
      await this.agentsService.chatStream(
        body.userId, 
        body.userMessage, 
        res
      );

      // Don't call res.send() - the stream will handle closing
    } catch (error) {
      this.sendEvent(res, {
        type: 'error',
        message: error.message
      });
      // Don't call res.send() - the stream will handle closing
    }
  }

  private sendEvent(res: FastifyReply, data: any): void {
    res.raw.write(`data: ${JSON.stringify(data)}\n\n`);
  }
}

This approach uses direct Fastify Response manipulation, making the code simple and easy to understand. We manually set the SSE headers using Fastify’s methods and use res.raw.write() to send events to the client.

Why Not Use NestJS @Sse() Decorator?

NestJS provides a built-in @Sse() decorator for Server-Sent Events, but it requires RxJS observables to work. While powerful, this approach would add complexity and dependencies we want to avoid in our project. The official NestJS SSE implementation looks like this:

@Sse('events')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(
    map((_) => ({ data: { hello: 'world' } } as MessageEvent)),
  );
}

We chose the direct Fastify approach because it keeps our codebase simple, avoids introducing RxJS observables throughout our application, and doesn’t require additional package dependencies. Our manual implementation gives us full control over the streaming behavior while remaining easy to understand and maintain.

Streaming from the Agent Service

Now we need to modify the AgentsService to support streaming. The existing chat method will become chatStream, and we’ll emit events at each major step.

📁 View agents service on GitHub

// src/agents/agents.service.ts
import { FastifyReply } from 'fastify';
import { FastifyStreamEventSender } from './streaming.service';

export class AgentsService {
  
  async chatStream(userId: string, userMessage: string, res: FastifyReply): Promise<void> {
    const eventSender = new FastifyStreamEventSender(res);
    
    try {
      // Update user context and tokens for MCP authentication
      this.currentUserId = userId;
      const userToken = this.generateServiceToken(userId);
      this.mcpClients.forEach(client => {
        if (client.setAuthToken) {
          client.setAuthToken(userToken);
        }
      });

      // Save user message to history
      await this.chatHistoryService.saveChatMessage(userId, {
        role: "user",
        content: userMessage
      });

      // Send initial status
      this.streamingService.sendStatusEvent(eventSender, 'Discovering available tools...');
      this.streamingService.sendHeartbeat(eventSender);

      // Get chat history and build messages
      const history = await this.chatHistoryService.getChatHistory(userId, 5);
      const messages: OpenRouterMessage[] = [
        {
          role: "system",
          content: TOOL_SELECTION_PROMPT
        },
        ...history,
        {
          role: "user",
          content: userMessage
        }
      ];

      // Check if tools are needed (non-streaming call to Kimi)
      const toolResponse = await this.callOpenRouter("moonshotai/kimi-k2", messages, this.tools);
      
      if (toolResponse?.choices?.[0]?.message?.tool_calls) {
        // Execute tools first
        const toolCall = toolResponse.choices[0].message.tool_calls[0];
        await this.executeToolWithEvents(toolCall, eventSender);
        
        // Get tool result for context
        const toolResult = await this.executeTool(toolCall);
        
        // Stream the final response with tool context
        const finalContent = await this.streamFinalResponseWithTool(userId, userMessage, eventSender, toolCall, toolResult);
        
        // Save to history
        await this.chatHistoryService.saveChatMessage(userId, {
          role: "assistant",
          content: finalContent
        });
      } else {
        // No tools needed, stream direct response
        const finalContent = await this.streamFinalResponseDirect(userId, userMessage, eventSender);
        
        // Save to history
        await this.chatHistoryService.saveChatMessage(userId, {
          role: "assistant",
          content: finalContent
        });
      }
    } catch (error) {
      this.logger.error("Stream chat error:", error);
      eventSender.sendEvent({
        type: 'error',
        message: error.message || "An error occurred during streaming"
      });
      eventSender.end();
    }
  }
}

The critical change is setting stream: true in the OpenRouter request and handling the response as a stream instead of waiting for a complete JSON response. Each token gets forwarded to the client immediately using Fastify’s response writing.

📁 View streaming service on GitHub

// src/agents/streaming.service.ts
async streamResponse(messages: OpenRouterMessage[], eventSender: any): Promise<string> {
  let accumulatedContent = '';

  const response = await axios.post(
    this.openrouterUrl,
    {
      model: "google/gemini-2.0-flash-001",
      messages,
      stream: true  // Enable streaming responses
    },
    {
      headers: {
        "Authorization": `Bearer ${this.configService.get<string>("OPENROUTER_API_KEY")}`,
        "Content-Type": "application/json"
      },
      responseType: 'stream'
    }
  );

  return new Promise((resolve, reject) => {
    response.data.on('data', (chunk: Buffer) => {
      const { tokens, isComplete } = this.streamProcessor.processChunk(chunk.toString());
      
      // Send individual tokens as events
      for (const token of tokens) {
        accumulatedContent += token;
        eventSender.sendEvent({
          type: 'token',
          content: token,
          accumulated: accumulatedContent
        });
      }

      if (isComplete) {
        resolve(accumulatedContent);
      }
    });

    response.data.on('error', (error: any) => {
      reject(error);
    });
  });
}

This method demonstrates the key streaming pattern: set stream: true in the request, use responseType: 'stream' to handle the response as a stream, and process each chunk as it arrives to extract tokens and forward them immediately to the client.

Handling Tool Calls During Streaming

Tool calls present an interesting challenge with streaming because they interrupt the text flow. When the LLM decides to use a tool, we need to pause text streaming, execute the tool, and then resume.

📁 View tool execution methods on GitHub

// src/agents/agents.service.ts
export class AgentsService {
  
  private async executeToolWithEvents(toolCall: ToolCall, eventSender: any): Promise<void> {
    this.streamingService.sendToolExecutionEvent(eventSender, toolCall.function.name, 'starting');

    try {
      const result = await this.executeTool(toolCall);
      this.streamingService.sendToolExecutionEvent(eventSender, toolCall.function.name, 'completed', result);
    } catch (error) {
      this.streamingService.sendToolExecutionEvent(eventSender, toolCall.function.name, 'failed', undefined, error.message);
      throw error;
    }
  }
}

This approach keeps users informed about what’s happening behind the scenes. Instead of a mysterious pause in the text stream, they see “Executing findListings…” with progress updates.

Frontend Integration

On the frontend, we need to handle the SSE connection and update the UI as events arrive. The implementation involves three main parts: HTML structure, CSS styling, and JavaScript stream handling.

Here’s the core JavaScript class that manages streaming:

📁 View chat interface on GitHub

// Key methods from the ChatInterface class
class ChatInterface {
  async startStream(message, token) {
    try {
      // Create abort controller for cancellation
      this.currentAbortController = new AbortController();
      
      // Create POST request to streaming endpoint
      const response = await fetch('/agents/chat/stream', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Accept': 'text/event-stream',
          'Authorization': `Bearer ${token}`
        },
        body: JSON.stringify({
          userId: this.userId,
          userMessage: message
        }),
        signal: this.currentAbortController.signal
      });

      // Process the stream
      await this.processStream(response.body.getReader());
    } catch (error) {
      if (error.name === 'AbortError') {
        console.log('Request was cancelled');
        this.updateStatus('Request cancelled', 'status-message');
      } else {
        console.error('Stream error:', error);
        this.addAssistantMessage('Sorry, I encountered an error. Please try again.');
      }
    }
  }

  async processStream(reader) {
    const decoder = new TextDecoder();
    let buffer = '';
    let currentMessageDiv = null;
    
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      
      // Process SSE data chunks
      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      
      // Keep the last incomplete line in the buffer
      buffer = lines.pop() || '';
      
      for (const line of lines) {
        if (line.startsWith('data: ')) {
          try {
            const data = JSON.parse(line.slice(6));
            
            // Create message div on first token
            if (data.type === 'token' && !currentMessageDiv) {
              currentMessageDiv = this.createStreamingMessage();
            }
            
            // Handle the event
            this.handleStreamEvent(data, currentMessageDiv);
          } catch (e) {
            console.error('Failed to parse SSE data:', e);
          }
        }
      }
    }
  }

  handleStreamEvent(data, currentMessageDiv) {
    switch (data.type) {
      case 'status':
        this.updateStatus(data.message, 'status-message');
        break;
      case 'token':
        if (currentMessageDiv) {
          this.updateStreamingMessage(currentMessageDiv, data.accumulated || data.content);
        }
        break;
      case 'tool-execution':
        this.updateToolStatus(data);
        break;
      case 'complete':
        this.clearStatus();
        if (currentMessageDiv) {
          currentMessageDiv.classList.remove('streaming');
        }
        break;
    }
  }
}

The frontend uses the Fetch API with ReadableStream processing to handle real-time updates. Each SSE event type triggers a different UI update: status messages show progress, tokens get appended to create the streaming text effect, and tool execution events provide transparency about what’s happening behind the scenes.

Complete Implementation: The full frontend code including HTML structure, CSS styling, and complete JavaScript implementation is available in the repository:

The key insight is treating the UI as a live document that updates incrementally rather than a static page that renders once.

Request Cancellation and Timeouts

Production systems need graceful handling of cancelled and timed-out requests. Users should be able to cancel expensive operations, and the system should automatically timeout requests that take too long.

📁 View timeout and cancellation handling on GitHub

// src/agents/agents.service.ts
export class AgentsService {
  
  async chatStream(
    userId: string, 
    userMessage: string, 
    res: FastifyReply
  ): Promise<void> {
    const timeoutId = setTimeout(() => {
      this.sendEvent(res, {
        type: 'error',
        message: 'Request timeout after 30 seconds'
      });
      res.send();
    }, 30000);

    // Check if client disconnected
    res.raw.on('close', () => {
      clearTimeout(timeoutId);
      // Cleanup any ongoing operations
    });

    try {
      // Send heartbeat to check connection
      this.sendEvent(res, { type: 'heartbeat' });

      await this.streamLLMResponse(messages, res);
      
      clearTimeout(timeoutId);
    } catch (error) {
      clearTimeout(timeoutId);
      
      if (error.message.includes('cancelled')) {
        // Handle cancellation gracefully
        this.sendEvent(res, {
          type: 'cancelled',
          message: 'Request was cancelled'
        });
      } else {
        this.sendEvent(res, {
          type: 'error',
          message: error.message
        });
      }
    }
  }
}

The timeout ensures that long-running requests don’t consume resources indefinitely, while the cancellation handling provides immediate feedback when users change their minds. The res.raw.on('close') listener detects when clients disconnect using Fastify’s raw response object.

The Results

With these changes, the user experience transforms completely. Instead of a 3-5 second silent wait, users see:

  1. Immediate feedback: “Starting conversation…” appears instantly
  2. Progress updates: “Discovering tools…” then “Executing findListings…”
  3. Live response: Text appears token by token as the LLM generates it
  4. Tool transparency: Clear indication when tools are running
  5. Cancellation: Users can stop expensive operations

The perceived performance improvement is dramatic. Even though the total time might be similar, users feel like the system is responsive and working on their behalf rather than ignoring them.

Alternative Approaches

You might consider WebSockets instead of SSE. WebSockets enable bidirectional communication, which could be useful for features like interrupting the AI mid-response or collaborative sessions. However, SSE has several advantages for this use case: it works over standard HTTP (better for firewalls and proxies), automatically handles reconnection, and has simpler error handling. Unless you need bidirectional communication, SSE is the better choice.

The frontend implementation shown uses the Fetch API with ReadableStream processing, which provides excellent browser support and clean error handling. You could also use the traditional EventSource API for simpler implementations, but the Fetch approach gives you more control over request headers and error handling.

Beyond This Series: Production Monitoring

While we’ve built a production-ready LLM agent system with real-time streaming, there’s one crucial aspect we haven’t covered: monitoring and observability. In production, you’ll need visibility into LLM performance including token usage, response times, and model costs per user. You’ll also want to track MCP health with server availability, tool execution latencies, and error rates. User experience metrics like stream connection drops, cancellation rates, and timeout frequencies become critical for optimization.

For application monitoring, consider OpenTelemetry as the industry standard for distributed tracing, Prometheus with Grafana for metrics collection and visualization, or all-in-one solutions like DataDog or New Relic. For LLM-specific monitoring, LangSmith is purpose-built for LLM applications, Weights & Biases offers ML experiment tracking, and OpenLLMetry provides open source LLM observability.

You’d want to track metrics like llm_tokens_used_total, mcp_server_response_time_seconds, chat_session_duration_minutes, and tool_execution_success_rate. Monitoring is essential for production LLM systems, but it’s a deep topic that deserves its own dedicated coverage. The architecture we’ve built makes it straightforward to add these observability layers when you’re ready to scale.

Conclusion

Streaming transforms LLM applications from batch processes into interactive experiences. Users feel heard and informed rather than ignored and frustrated. The technical implementation builds naturally on the MCP architecture we’ve developed throughout this series, creating clear separation between streaming concerns and business logic.

The key insight is that modern web applications should feel conversational rather than transactional. Users expect immediate feedback and progressive disclosure of information. With Server-Sent Events and request management, your LLM agent system provides a production-ready experience that users will actually enjoy using.

You now have a complete, scalable, secure, and responsive LLM agent system. From direct tool integration to microservices to authentication to real-time streaming, each piece builds on the previous to create something genuinely production-ready. The techniques shown here apply broadly to any LLM application where user experience matters.