Streaming responses give users a faster perceived experience, but they make token counting harder. With a non-streaming call, you get a neat usage object in the response. With streaming, tokens arrive one chunk at a time and you need a strategy to track them. Here's how to do it across the major APIs.

The Streaming Token Problem

In a non-streaming API call, the response includes exact token counts:

// Non-streaming response
{
  "usage": {
    "prompt_tokens": 52,
    "completion_tokens": 183,
    "total_tokens": 235
  }
}

With streaming, the response arrives as a series of Server-Sent Events (SSE), each containing a small chunk of text. Most chunks don't include usage data — it only appears in the final chunk, if at all. If your connection drops or you cancel the stream early, you may never get the usage data.

OpenAI: stream_options for Usage

OpenAI added a stream_options parameter that includes usage in the final streamed chunk:

from openai import OpenAI

client = OpenAI()
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Explain TCP/IP"}],
    stream=True,
    stream_options={"include_usage": True}
)

full_response = ""
usage = None

for chunk in stream:
    if chunk.choices and chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        full_response += content
        print(content, end="", flush=True)

    # Usage appears in the final chunk
    if chunk.usage:
        usage = chunk.usage

print(f"\n\nTokens — input: {usage.prompt_tokens}, "
      f"output: {usage.completion_tokens}")

The key is stream_options={"include_usage": True}. Without it, you get no usage data at all during streaming. The usage object arrives in a final chunk after the last content chunk.

Anthropic: message_start and message_delta

Anthropic's streaming API provides token counts in two events: message_start (input tokens) and message_delta (output tokens at the end):

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain TCP/IP"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

# After the stream completes
message = stream.get_final_message()
print(f"\nInput tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")

Anthropic's SDK makes this clean — the get_final_message() method returns the complete message with usage data after streaming finishes.

Google Gemini: usage_metadata

Google's Gemini API includes usage metadata in streaming responses:

import google.generativeai as genai

model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(
    "Explain TCP/IP",
    stream=True
)

for chunk in response:
    print(chunk.text, end="", flush=True)

# Usage available after iteration completes
print(f"\nInput: {response.usage_metadata.prompt_token_count}")
print(f"Output: {response.usage_metadata.candidates_token_count}")

Fallback: Client-Side Counting

If the API doesn't provide usage data (or the stream is interrupted), count tokens client-side as a fallback:

import tiktoken

class StreamingTokenCounter:
    def __init__(self, model="gpt-4o"):
        self.enc = tiktoken.encoding_for_model(model)
        self.chunks = []
        self.output_tokens = 0

    def on_chunk(self, text):
        """Call this for each streamed chunk."""
        self.chunks.append(text)
        # Count tokens in this chunk
        self.output_tokens += len(self.enc.encode(text))

    def get_full_text(self):
        return "".join(self.chunks)

    def get_accurate_count(self):
        """Re-count the full text for accuracy.
        Chunk-by-chunk counting can differ from
        full-text counting due to token boundaries."""
        full = self.get_full_text()
        return len(self.enc.encode(full))

counter = StreamingTokenCounter()
for chunk in stream:
    if chunk.choices[0].delta.content:
        text = chunk.choices[0].delta.content
        counter.on_chunk(text)
        print(text, end="")

# Chunk-by-chunk count (fast, approximate)
print(f"\nApprox tokens: {counter.output_tokens}")
# Full-text count (accurate)
print(f"Exact tokens: {counter.get_accurate_count()}")

Note that counting tokens chunk-by-chunk can give slightly different results than counting the full concatenated text. This is because token boundaries may span across chunks. For billing estimates, always re-count the full text or use the API-provided usage data.

Handling Interrupted Streams

When a user cancels a streaming response or the connection drops, you still get billed for all tokens generated up to that point. Track partial usage:

try:
    for chunk in stream:
        counter.on_chunk(chunk.text)
        if should_cancel():
            stream.close()
            break
except Exception as e:
    logger.error(f"Stream interrupted: {e}")
finally:
    # Log what we consumed, even if incomplete
    logger.info(f"Partial output tokens: ~{counter.output_tokens}")
    track_usage(counter.get_accurate_count())
Always use the API's built-in usage reporting when available. Fall back to client-side counting only when the stream is interrupted or the API doesn't support usage in streaming mode.