Cancellation, Progress, and Backpressure in MCP Streams

Streaming responses, long-running tools, and multi-step agent pipelines all share a common challenge: what happens when the client stops listening? Without proper cancellation propagation, cancelled client connections leave expensive operations running on the server indefinitely. This lesson covers three related mechanisms: request cancellation using AbortSignal, progress reporting with real-time updates, and backpressure strategies that prevent fast producers from overwhelming slow consumers.

Cancellation propagation diagram client disconnect AbortSignal tool cleanup chain stop resource release dark
Cancellation must propagate through the entire call chain: from client disconnect to every active resource.

AbortSignal in MCP Tool Handlers

When a client disconnects or cancels a request, the MCP SDK calls server.setRequestHandler‘s signal. Tool handlers should check this signal and abort expensive operations:

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

const server = new McpServer({ name: 'streaming-server', version: '1.0.0' });

server.tool('search_large_dataset', {
  query: z.string(),
  maxResults: z.number().default(100),
}, async ({ query, maxResults }, { signal }) => {
  // Pass signal to database query
  const results = await db.search(query, { maxResults, signal });

  // Pass signal to downstream HTTP calls
  const enriched = await Promise.all(
    results.map(r =>
      fetch(`https://enrichment.api/v1/${r.id}`, { signal })
        .then(res => res.json())
        .catch(err => {
          if (err.name === 'AbortError') throw err;  // Re-throw cancellation
          return r;  // Return unenriched on other errors
        })
    )
  );

  return { content: [{ type: 'text', text: JSON.stringify(enriched) }] };
});
// In database clients that support AbortSignal
async function search(query, { maxResults, signal } = {}) {
  const client = await pool.connect();

  // Register cleanup on signal abort
  const cleanup = () => {
    client.query('SELECT pg_cancel_backend(pg_backend_pid())').catch(() => {});
    client.release();
  };

  signal?.addEventListener('abort', cleanup, { once: true });

  try {
    const result = await client.query(
      'SELECT * FROM products WHERE to_tsvector(description) @@ plainto_tsquery($1) LIMIT $2',
      [query, maxResults]
    );
    return result.rows;
  } finally {
    signal?.removeEventListener('abort', cleanup);
    client.release();
  }
}

Progress Reporting via Streaming Tool Results

// MCP tools can emit progress events using the server's notification mechanism
// For now, progress is communicated via the task polling pattern from Lesson 45
// or via streaming text content updates

server.tool('process_batch', {
  items: z.array(z.string()).max(1000),
}, async ({ items }, { signal }) => {
  const results = [];
  const total = items.length;

  for (let i = 0; i < items.length; i++) {
    if (signal?.aborted) {
      return {
        content: [{ type: 'text', text: JSON.stringify({
          status: 'cancelled',
          processed: i,
          total,
          results,
        }) }],
      };
    }

    const result = await processItem(items[i]);
    results.push(result);

    // Emit progress via logs/notification (visible in MCP Inspector)
    if (i % 50 === 0) {
      server.server.sendLoggingMessage({
        level: 'info',
        data: `Progress: ${i + 1}/${total} (${Math.round(((i + 1) / total) * 100)}%)`,
      });
    }
  }

  return { content: [{ type: 'text', text: JSON.stringify({ status: 'complete', results }) }] };
});
Progress reporting pattern batch processing loop checking AbortSignal emitting log notifications at intervals dark
Batch tools check the AbortSignal on each iteration and emit progress via logging notifications.

Backpressure in Streaming Tool Results

// When a tool generates large amounts of streaming data,
// use a ReadableStream with backpressure control

server.tool('stream_logs', {
  service: z.string(),
  since: z.string(),
}, async ({ service, since }, { signal }) => {
  // Generator-based streaming with backpressure
  async function* generateLogs() {
    const logStream = await getLiveLogStream(service, since, { signal });
    let buffer = [];

    for await (const logLine of logStream) {
      if (signal?.aborted) break;
      buffer.push(logLine);

      // Yield batches of 50 lines to avoid overwhelming the response
      if (buffer.length >= 50) {
        yield buffer.join('\n');
        buffer = [];
        // Yield control to allow backpressure to work
        await new Promise(r => setImmediate(r));
      }
    }

    if (buffer.length > 0) yield buffer.join('\n');
  }

  // Collect all chunks (in practice, return first N lines for tool calls)
  const chunks = [];
  let totalLines = 0;

  for await (const chunk of generateLogs()) {
    chunks.push(chunk);
    totalLines += chunk.split('\n').length;
    if (totalLines > 500) {
      chunks.push('[...truncated, 500 line limit reached]');
      break;
    }
  }

  return { content: [{ type: 'text', text: chunks.join('\n') }] };
});

Handling SSE Client Disconnections

// For Streamable HTTP servers, detect client disconnections via res.on('close')
app.post('/mcp', async (req, res) => {
  const transport = getOrCreateTransport(req);

  // Create an AbortController for this connection
  const controller = new AbortController();
  req.socket.on('close', () => controller.abort());

  // Pass the signal to the MCP transport (SDK handles propagation to tool handlers)
  await transport.handleRequest(req, res, req.body, { signal: controller.signal });
});

What to Build Next

  • Add signal?.addEventListener('abort', cleanup) to your longest-running tool handler. Test it by disconnecting the client mid-execution and verify resources are released.
  • Add a per-tool timeout using AbortSignal.timeout(ms) to prevent any single tool call from running indefinitely.

nJoy 😉

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.