Streaming responses give users a faster perceived experience, but they make token counting harder. With a non-streaming call, you get a neat usage object in the response. With streaming, tokens arrive one chunk at a time and you need a strategy to track them. Here's how to do it across the major APIs.
The Streaming Token Problem
In a non-streaming API call, the response includes exact token counts:
// Non-streaming response
{
"usage": {
"prompt_tokens": 52,
"completion_tokens": 183,
"total_tokens": 235
}
}
With streaming, the response arrives as a series of Server-Sent Events (SSE), each containing a small chunk of text. Most chunks don't include usage data — it only appears in the final chunk, if at all. If your connection drops or you cancel the stream early, you may never get the usage data.
OpenAI: stream_options for Usage
OpenAI added a stream_options parameter that includes usage in the final streamed chunk:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain TCP/IP"}],
stream=True,
stream_options={"include_usage": True}
)
full_response = ""
usage = None
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
print(content, end="", flush=True)
# Usage appears in the final chunk
if chunk.usage:
usage = chunk.usage
print(f"\n\nTokens — input: {usage.prompt_tokens}, "
f"output: {usage.completion_tokens}")
The key is stream_options={"include_usage": True}. Without it, you get no usage data at all during streaming. The usage object arrives in a final chunk after the last content chunk.
Anthropic: message_start and message_delta
Anthropic's streaming API provides token counts in two events: message_start (input tokens) and message_delta (output tokens at the end):
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain TCP/IP"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# After the stream completes
message = stream.get_final_message()
print(f"\nInput tokens: {message.usage.input_tokens}")
print(f"Output tokens: {message.usage.output_tokens}")
Anthropic's SDK makes this clean — the get_final_message() method returns the complete message with usage data after streaming finishes.
Google Gemini: usage_metadata
Google's Gemini API includes usage metadata in streaming responses:
import google.generativeai as genai
model = genai.GenerativeModel("gemini-1.5-pro")
response = model.generate_content(
"Explain TCP/IP",
stream=True
)
for chunk in response:
print(chunk.text, end="", flush=True)
# Usage available after iteration completes
print(f"\nInput: {response.usage_metadata.prompt_token_count}")
print(f"Output: {response.usage_metadata.candidates_token_count}")
Fallback: Client-Side Counting
If the API doesn't provide usage data (or the stream is interrupted), count tokens client-side as a fallback:
import tiktoken
class StreamingTokenCounter:
def __init__(self, model="gpt-4o"):
self.enc = tiktoken.encoding_for_model(model)
self.chunks = []
self.output_tokens = 0
def on_chunk(self, text):
"""Call this for each streamed chunk."""
self.chunks.append(text)
# Count tokens in this chunk
self.output_tokens += len(self.enc.encode(text))
def get_full_text(self):
return "".join(self.chunks)
def get_accurate_count(self):
"""Re-count the full text for accuracy.
Chunk-by-chunk counting can differ from
full-text counting due to token boundaries."""
full = self.get_full_text()
return len(self.enc.encode(full))
counter = StreamingTokenCounter()
for chunk in stream:
if chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
counter.on_chunk(text)
print(text, end="")
# Chunk-by-chunk count (fast, approximate)
print(f"\nApprox tokens: {counter.output_tokens}")
# Full-text count (accurate)
print(f"Exact tokens: {counter.get_accurate_count()}")
Note that counting tokens chunk-by-chunk can give slightly different results than counting the full concatenated text. This is because token boundaries may span across chunks. For billing estimates, always re-count the full text or use the API-provided usage data.
Handling Interrupted Streams
When a user cancels a streaming response or the connection drops, you still get billed for all tokens generated up to that point. Track partial usage:
try:
for chunk in stream:
counter.on_chunk(chunk.text)
if should_cancel():
stream.close()
break
except Exception as e:
logger.error(f"Stream interrupted: {e}")
finally:
# Log what we consumed, even if incomplete
logger.info(f"Partial output tokens: ~{counter.output_tokens}")
track_usage(counter.get_accurate_count())
Always use the API's built-in usage reporting when available. Fall back to client-side counting only when the stream is interrupted or the API doesn't support usage in streaming mode.