Documentation Index
Fetch the complete documentation index at: https://daily-docs-pr-4424.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Pipecat provides a straightforward way to collect conversation transcriptions using turn events. When a user or assistant turn ends, the corresponding event includes the complete transcript for that turn.
The key events for transcription collection are:
on_user_turn_stopped - Provides the user’s complete transcript via UserTurnStoppedMessage
on_assistant_turn_stopped - Provides the assistant’s complete transcript via AssistantTurnStoppedMessage
Basic Example
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
UserTurnStoppedMessage,
AssistantTurnStoppedMessage,
)
# Create context aggregator
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
# Handle user transcriptions
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
print(f"[USER] {message.content}")
# Handle assistant transcriptions
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
if message.content:
print(f"[ASSISTANT] {message.content}")
if message.interrupted:
print("[ASSISTANT] (interrupted)")
Saving Transcripts to a File
import json
from datetime import datetime
transcript_log = []
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
transcript_log.append({
"role": "user",
"content": message.content,
"timestamp": message.timestamp,
"user_id": message.user_id,
})
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
transcript_log.append({
"role": "assistant",
"content": message.content,
"timestamp": message.timestamp,
"interrupted": message.interrupted,
})
# Save transcript when session ends
async def save_transcript():
with open(f"transcript_{datetime.now().isoformat()}.json", "w") as f:
json.dump(transcript_log, f, indent=2)
Sending Transcripts to an External Service
import aiohttp
async def send_to_service(role: str, content: str, timestamp: str):
async with aiohttp.ClientSession() as session:
await session.post(
"https://api.example.com/transcripts",
json={"role": role, "content": content, "timestamp": timestamp}
)
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
await send_to_service("user", message.content, message.timestamp)
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
await send_to_service("assistant", message.content, message.timestamp)
Message Types
For details on UserTurnStoppedMessage and AssistantTurnStoppedMessage fields, see Turn Events - Message Types.