graph TD
A[Aplicação LLM] --> B[Instrumentação]
B --> C[Métricas]
B --> D[Traces]
B --> E[Logs]
C --> F[Prometheus]
D --> G[OpenTelemetry Collector]
E --> H[Loki/ELK]
F --> I[Grafana]
G --> I
H --> I
I --> J[Dashboards]
I --> K[Alertas]
style A fill:#e1f5ff
style I fill:#ccffcc
Capítulo 5: Dominando LLMs na Prática
No capítulo anterior, você dominou as técnicas de prompting: como estruturar prompts efetivos, in-context learning, chain-of-thought reasoning e introdução ao ReAct. Você aprendeu a linguagem para se comunicar com LLMs.
Agora é hora de aprender a operacionalizar esse conhecimento em sistemas de produção reais. Saber escrever bons prompts não é suficiente - você precisa entender como configurar parâmetros de geração, gerenciar o context window eficientemente, otimizar custos e latência, e garantir observabilidade em produção.
Este capítulo é sobre engenharia prática de LLMs. Você aprenderá como os parâmetros temperature, top-p e top-k afetam as respostas geradas, quais estratégias utilizar para manter conversações longas dentro do context window, técnicas de otimização que podem reduzir custos em 40-70%, e como implementar instrumentação e monitoramento robustos em ambientes de produção.
Ao final deste capítulo, você terá o conhecimento técnico necessário para levar LLMs da prototipagem para produção com confiança.
Configuração de Parâmetros de Geração
Quando você chama um LLM para gerar texto, não está simplesmente pressionando “play” em um modelo determinístico. Existe uma série de parâmetros que controlam como o modelo gera texto, cada um com trade-offs específicos entre criatividade, consistência, qualidade e performance.
Temperature: Controlando Aleatoriedade
O parâmetro Temperature controla quão “previsível” ou “criativo” o modelo será ao gerar texto (Fan, Lewis, e Dauphin 2018). Pense nisso como um controle de volume entre “jogar sempre pelo seguro” e “se arriscar mais”.
Imagine que o modelo está escolhendo a próxima palavra numa frase. Ele tem uma lista de candidatos com diferentes níveis de confiança:
Candidato A: “são” (65% de confiança)
Candidato B: “incluem” (24% de confiança)
Candidato C: “abrangem” (10% de confiança)
Temperature = 0 (Sempre optar pelo seguro): O modelo sempre escolhe a palavra com maior confiança, resultando em “são” sendo escolhido 100% das vezes. É como alguém que só responde quando tem certeza absoluta da resposta.
Temperature = 1.0 (Balanceado): O modelo escolhe proporcionalmente à confiança. “São” é escolhido 65% das vezes, “incluem” 24%, “abrangem” 10%. É como alguém que responde o que acha mais provável, mas às vezes arrisca alternativas.
Temperature = 2.0 (Mais arriscado): O parâmetro “achata” as diferenças de confiança. “São” é escolhido apenas 48% das vezes, “incluem” 31%, “abrangem” 21%. É como alguém que gosta de explorar alternativas menos óbvias.
Visualizando o efeito:
Temperature = 0.0: ████████████████████ (sempre a mesma escolha)
Temperature = 0.7: ████████████░░░░░░░░ (variação controlada)
Temperature = 1.5: ██████░░░░░░████░░░░ (muita variação)
Matemáticamente, isso seria representado ajustando os logits antes do softmax:
P(token_i) = exp(logit_i / T) / Σ exp(logit_j / T)
Mas eu não sou matemático e, se você também não for, aqui está um exemplo prático em Python para ilustrar:
import numpy as np
def apply_temperature(logits, temperature):
"""
Aplica temperature scaling aos logits antes do softmax.
Args:
logits: Array de scores do modelo (maiores = mais confiança)
temperature: Controla a "suavidade" da distribuição
- Baixo (0.1): distribuição mais "apontada" (concentrada)
- Alto (2.0): distribuição mais "plana" (diversa)
"""
adjusted_logits = logits / temperature
exp_logits = np.exp(adjusted_logits - np.max(adjusted_logits)) # estabilidade numérica
probs = exp_logits / exp_logits.sum()
return probs
# Exemplo prático - execute e veja os resultados!
logits = np.array([2.0, 1.0, 0.5]) # Scores do modelo para 3 palavras candidatas
print("Temperature = 0.1 (focado):")
print(apply_temperature(logits, 0.1)) # [0.999, 0.001, 0.000]
print("Temperature = 1.0 (balanceado):")
print(apply_temperature(logits, 1.0)) # [0.659, 0.242, 0.099]
print("Temperature = 2.0 (diverso):")
print(apply_temperature(logits, 2.0)) # [0.483, 0.307, 0.210]Efeitos práticos: Quando temperature = 0 (ou próximo), o modelo utiliza greedy decoding, sempre escolhendo o token mais provável. Use essa configuração quando precisão é crítica, como em geração de código, extração de dados estruturados ou respostas factuais. Evite quando precisar de criatividade ou diversidade, como em brainstorming ou escrita criativa.
Com temperature = 0.7-1.0, você obtém um balanço entre aleatoriedade e coerência. Use essa faixa em tarefas gerais de geração de texto e conversação natural. É o padrão recomendado para a maioria dos casos.
Valores temperature > 1.5 introduzem alta aleatoriedade, tornando o modelo mais “criativo” mas menos coerente. Use quando precisar de brainstorming, geração de ideias diversas ou exploração criativa. Evite quando precisar de consistência ou precisão factual.
Exemplo comparativo:
prompt = "Os três pilares da engenharia de software são:"
# Temperature = 0.0
"qualidade, manutenibilidade e escalabilidade." # Resposta mais provável
# Temperature = 0.7
"qualidade de código, testes automatizados e documentação clara." # Variação razoável
# Temperature = 1.5
"colaboração entre equipes, inovação contínua e café infinito." # Mais criativo/inusitadoTop-k Sampling: Limitando o Vocabulário
Top-k Sampling é uma técnica simples de controle de vocabulário. O parâmetro top-k limita a escolha aos k tokens mais prováveis, ignorando completamente todos os outros.
Exemplo:
Imagine que o modelo tem 50.000 palavras no vocabulário, mas você define top_k = 5:
# Todas as probabilidades do modelo (simplificado)
todas_opcoes = {
"é": 0.40, # ← Top 1
"foi": 0.25, # ← Top 2
"será": 0.15, # ← Top 3
"era": 0.10, # ← Top 4
"seria": 0.05, # ← Top 5
"possa": 0.03, # ✗ Ignorado
"pudesse": 0.02, # ✗ Ignorado
# ... mais 49,993 palavras ✗ Todas ignoradas
}
# Com top_k=5, apenas as 5 primeiras são consideradas
# Depois renormaliza: [0.40, 0.25, 0.15, 0.10, 0.05] → [0.42, 0.26, 0.16, 0.11, 0.05]Top-k tem um problema: o valor k é fixo, independente da distribuição de probabilidades.
Ou seja:
- Se as probabilidades são muito concentradas:
k=50pode incluir muitas opções ruins - Se as probabilidades são muito distribuídas:
k=5pode ser muito restritivo
É aí que entra outro parâmetro, o Top-p, que se adapta dinamicamente.
Top-p (Nucleus Sampling): Controle Dinâmico de Vocabulário
O que é: Em vez de usar um número fixo de tokens (top-k), nucleus sampling (Holtzman et al. 2020) considera os tokens cuja probabilidade acumulada atinge p.
Como funciona:
# Probabilidades dos tokens após softmax
probs = {
"é": 0.40,
"foi": 0.25,
"será": 0.15,
"era": 0.10,
"seria": 0.05,
"possa": 0.03,
"pudesse": 0.02
}
# Top-p = 0.9: considera tokens até probabilidade acumulada ≥ 90%
# Seleciona: "é" (0.40) + "foi" (0.65) + "será" (0.80) + "era" (0.90)
# Ignora: "seria", "possa", "pudesse" (tail improvável)Comparação Top-p vs Top-k:
| Método | Funcionamento | Vantagem | Desvantagem |
|---|---|---|---|
| Top-k | Considera sempre k tokens mais prováveis | Simples, previsível | k fixo pode ser muito restritivo ou permissivo |
| Top-p | Considera tokens até probabilidade acumulada p | Adapta-se à distribuição | Mais complexo, pode incluir muitos tokens em distribuições planas |
Podemos seguir este racional para definir valores práticos em nossos agentes de IA:
- Top-p = 0.9-0.95: Default recomendado para maioria dos casos
- Top-p = 1.0: Sem filtro (usa toda a distribuição)
- Top-p < 0.8: Geração mais focada e determinística
Combinando Temperature + Top-p
Para uma configuração robusta, podemos combinar temperature e top-p.
# Configuração conservadora (factual, precisa)
config = {
"temperature": 0.3,
"top_p": 0.8
}
# Configuração balanceada (geral)
config = {
"temperature": 0.7,
"top_p": 0.9
}
# Configuração criativa (brainstorming)
config = {
"temperature": 1.2,
"top_p": 0.95
}Max Tokens: Controlando Comprimento de Saída
max_tokens é um pouco previsível, é, basicamente, o número máximo de tokens que o modelo pode gerar na sua resposta.
Usamos max_tokens para evitar respostas excessivamente longas ou para garantir que a resposta caiba dentro do context window.
Por exemplo, podemos considerar:
- Custo: Você paga por token gerado. Se
max_tokens=1000mas a resposta completa em 100, paga apenas 100. - Latência: Mais tokens = mais tempo de geração (~50ms por token em média).
- Truncamento: Se a resposta atingir
max_tokens, pode ser cortada no meio de uma frase, use com cuidado.
Podemos seguir algumas estratégias práticas no uso de max_tokens:
# Para respostas curtas (classificação, extração)
config_curtas = {
"max_tokens": 50,
"temperature": 0.2,
"top_p": 0.8
}
# Para explicações médias
config_medias = {
"max_tokens": 500,
"temperature": 0.7,
"top_p": 0.9
}
# Para geração longa (artigos, análises)
config_longas = {
"max_tokens": 2000,
"temperature": 0.8,
"top_p": 0.95
}
# Para streaming (exibir a resposta enquanto gera)
config_streaming = {
"max_tokens": 1000,
"temperature": 0.7,
"top_p": 0.9,
"stream": True
}Armadilha comum: Definir max_tokens muito alto “por segurança” aumenta custo e latência desnecessariamente.
Stop Sequences: Terminando Geração Antecipadamente
Com stop sequences podemos definir Strings que, quando geradas pelo modelo, param a geração da resposta imediatamente.
Vamos ver alguns exemplos:
# Exemplo 1: Geração estruturada
config_listagem = {
"prompt": "Liste 3 vantagens do Python:\n1.",
"stop": ["\n\n", "Conclusão"], # Para após listar os 3 itens
"max_tokens": 200
}
# Saída esperada:
# "1. Sintaxe clara e legível
# 2. Grande ecossistema de bibliotecas
# 3. Comunidade ativa e suporte"
# [PARA aqui quando encontrar "\n\n"]
# Exemplo 2: Conversação
user_message = "Qual a capital da França?"
config_chat = {
"prompt": f"User: {user_message}\nAssistant:",
"stop": ["\nUser:", "\nAssistant:"], # Para quando começar nova rodada
"max_tokens": 100
}
# Saída esperada:
# "A capital da França é Paris."
# [PARA aqui, não continua gerando mais rodadas de diálogo]
# Exemplo 3: Code generation
config_code = {
"prompt": "def calculate_fibonacci(n):",
"stop": ["\ndef ", "\nclass "], # Para ao começar nova função/classe
"max_tokens": 500
}
# Saída esperada:
# "
# if n <= 1:
# return n
# return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
# "
# [PARA aqui quando detectar "\ndef " ou "\nclass "]Recomendações práticas:
- Use stop sequences para formatos estruturados
- Evite stop sequences muito curtas (podem truncar prematuramente)
- Combine com
max_tokenscomo fallback Exemplo:stop=["\n\n"],max_tokens=200
Frequency Penalty e Presence Penalty
Os parâmetros frequency_penalty e presence_penalty penalizam tokens que já apareceram na geração para reduzir repetição. Frequency Penalty penaliza proporcionalmente ao número de vezes que o token apareceu, enquanto Presence Penalty penaliza igualmente qualquer token que já apareceu (binário).
A matemática por trás desses parâmetros é simples:
# Logit original do token
logit_original = 2.5
# Se token "mundo" já apareceu 3 vezes:
frequency_penalty = 0.5
presence_penalty = 0.3
# Novo logit
logit_ajustado = logit_original - (3 * frequency_penalty) - presence_penalty
# = 2.5 - 1.5 - 0.3 = 0.7Recomendações práticas:
# Sem penalidade (default - permite repetição natural)
config_default = {
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"temperature": 0.7,
"max_tokens": 500
}
# Penalidade moderada (reduzir repetição leve)
config_moderada = {
"frequency_penalty": 0.5,
"presence_penalty": 0.3,
"temperature": 0.7,
"max_tokens": 500
}
# Penalidade forte (forçar diversidade máxima)
config_diversa = {
"frequency_penalty": 1.0,
"presence_penalty": 0.8,
"temperature": 0.8,
"max_tokens": 500
}Exemplo Completo: Configurações por Caso de Uso
# 1. Extração de dados estruturados
config_extraction = {
"temperature": 0.0,
"max_tokens": 100,
"stop": ["\n\n"],
"top_p": 1.0
}
# 2. Conversação natural
config_conversation = {
"temperature": 0.7,
"max_tokens": 500,
"top_p": 0.9,
"presence_penalty": 0.2
}
# 3. Code generation
config_code = {
"temperature": 0.2,
"max_tokens": 1000,
"stop": ["\n\ndef ", "\n\nclass "],
"top_p": 0.95
}
# 4. Brainstorming criativo
config_creative = {
"temperature": 1.0,
"max_tokens": 800,
"top_p": 0.95,
"presence_penalty": 0.6,
"frequency_penalty": 0.5
}
# 5. Análise factual
config_factual = {
"temperature": 0.3,
"max_tokens": 300,
"top_p": 0.85
}Context Window: Gerenciamento de Memória
O context window (ou context length) é o número máximo de tokens que o modelo consegue processar de uma vez, incluindo tanto o prompt de entrada quanto a resposta gerada.
Exemplos de context windows em modelos populares (OpenAI s.d.a; Anthropic s.d.):
| Modelo | Context Window | Notas |
|---|---|---|
| GPT-3.5-turbo | 16K tokens | ~12K palavras |
| GPT-4 | 8K / 32K tokens | Versões diferentes |
| GPT-4-turbo | 128K tokens | ~96K palavras, ~300 páginas |
| Claude 3.5 Sonnet | 200K tokens | ~150K palavras |
| LLaMA 2 | 4K tokens | Limitado |
| LLaMA 3 | 8K tokens | Melhorado |
| Gemini 1.5 Pro | 1M tokens | ~700K palavras |
Pesquisas recentes introduziram técnicas que permitem modelos processarem contextos mais longos do que o treinamento original:
- ALiBi (Attention with Linear Biases) (Press, Smith, e Lewis 2022): Permite extrapolação de comprimento em tempo de inferência
- FlashAttention (Dao et al. 2022): Otimização de memória que torna atenção em contextos longos viável
- Sparse Attention (Beltagy, Peters, e Cohan 2020): Padrões de atenção esparsos para documentos longos
- Grouped-Query Attention (Ainslie et al. 2023): Reduz footprint de memória do KV-cache
- PagedAttention (Kwon et al. 2023): Gerenciamento eficiente de memória para serving de LLMs (vLLM)
Essas técnicas são especialmente relevantes em sistemas de produção com requisitos de contexto extenso.
O Problema do Context Window
O maior desafio com context windows é que, em conversações longas ou tarefas complexas, o histórico pode crescer rapidamente e exceder o limite do modelo. Afinal, cada mensagem adiciona tokens ao contexto.
Exemplo:
# Conversação com histórico crescente
messages = [
{"role": "user", "content": "Explique fotossíntese"},
{"role": "assistant", "content": "[500 tokens de explicação]"},
{"role": "user", "content": "E respiração celular?"},
{"role": "assistant", "content": "[600 tokens de explicação]"},
{"role": "user", "content": "Compare os dois processos"},
{"role": "assistant", "content": "[700 tokens de explicação]"},
# ... continua crescendo
]
# Após 10 rodadas: histórico pode ter 8K+ tokens
# Problema: Próxima chamada pode exceder o context window!Devido a isso, precisamos de estratégias para gerenciar o contexto eficientemente.
Estratégias de Gerenciamento de Contexto
Para manter o contexto dentro do limite, podemos usar várias técnicas. Aqui veremos exemplos de 4 abordagens comuns: Sliding Window (Janela Deslizante), Summarization (Sumarização), Token-Based Trimming (Corte por Tokens) e Hybrid: Summarization + Sliding Window.
Sliding Window (Janela Deslizante)
Mantém apenas as N mensagens mais recentes no histórico:
def sliding_window(messages, max_messages=10):
"""Mantém apenas as últimas max_messages mensagens"""
system_message = messages[0] # Preserva system message
recent_messages = messages[-max_messages:]
return [system_message] + recent_messages
# Uso
messages = sliding_window(conversation_history, max_messages=8)Prós: Simples, preserva contexto recente Contras: Perde informações antigas que podem ser relevantes para a interação atual
2. Summarization (Sumarização)
Sumariza mensagens antigas para economizar tokens:
def summarize_old_context(messages, threshold=10):
"""Sumariza mensagens antigas quando histórico fica muito longo"""
if len(messages) < threshold:
return messages
# Mensagens antigas para sumarizar
old_messages = messages[1:threshold] # Pula system message
recent_messages = messages[threshold:]
# Chama LLM para sumarizar
summary_prompt = f"""
Resuma as seguintes mensagens de conversação em 2-3 parágrafos:
{format_messages(old_messages)}
"""
summary = llm.generate(summary_prompt, max_tokens=300)
# Reconstrói histórico
return [
messages[0], # System message
{"role": "system", "content": f"Resumo da conversa anterior: {summary}"},
*recent_messages
]Prós: Preserva informação essencial, compacto Contras: Custo extra de chamada de sumarização, pode perder detalhes
3. Token-Based Trimming (Corte por Tokens)
Remove mensagens antigas até caber no limit de tokens:
def trim_to_token_limit(messages, max_tokens=15000, model="gpt-4"):
"""Remove mensagens antigas até caber no limite de tokens"""
import tiktoken
enc = tiktoken.encoding_for_model(model)
system_message = messages[0]
conversation = messages[1:]
# Conta tokens do system message
system_tokens = len(enc.encode(system_message["content"]))
remaining_tokens = max_tokens - system_tokens - 1000 # buffer para resposta
# Adiciona mensagens de trás pra frente até atingir limite
trimmed = []
current_tokens = 0
for msg in reversed(conversation):
msg_tokens = len(enc.encode(msg["content"]))
if current_tokens + msg_tokens > remaining_tokens:
break
trimmed.insert(0, msg)
current_tokens += msg_tokens
return [system_message] + trimmedPrós: Controle preciso de tokens Contras: Pode cortar no meio de um tópico importante
4. Hybrid: Summarization + Sliding Window
Combina as duas abordagens, sumarizando mensagens antigas e mantendo uma janela recente completa:
def hybrid_context_management(messages, max_tokens=15000, window_size=6):
"""Sumariza mensagens antigas, mantém janela recente completa"""
if len(messages) < window_size + 2: # +2 para system message e margem
return trim_to_token_limit(messages, max_tokens)
# Divide em: system, antigas (sumarizar), recentes (manter)
system = messages[0]
old = messages[1:-window_size]
recent = messages[-window_size:]
# Sumariza mensagens antigas
if len(old) > 0:
summary = summarize_messages(old)
summary_msg = {"role": "system", "content": f"Contexto anterior: {summary}"}
reconstructed = [system, summary_msg] + recent
else:
reconstructed = [system] + recent
# Garante que cabe no limite de tokens
return trim_to_token_limit(reconstructed, max_tokens)Prós: Melhor de dois mundos Contras: Mais complexo, custo de sumarização pode aumentar bastante
Melhores Práticas para Gerenciamento de Context Window
Monitore o uso de tokens:
import tiktoken def count_tokens(messages, model="gpt-4"): enc = tiktoken.encoding_for_model(model) return sum(len(enc.encode(m["content"])) for m in messages) # Alerta quando atingir 80% do limite current_tokens = count_tokens(messages) if current_tokens > 0.8 * MAX_CONTEXT: print(f"⚠️ Usando {current_tokens}/{MAX_CONTEXT} tokens")System message eficiente:
# ❌ Ruim: System message muito longo system = "Você é um assistente útil, prestativo, preciso, factual, conciso, claro, objetivo, direto..." # 500+ tokens # ✅ Bom: Conciso e direto system = "Você é um assistente especializado em programação Python. Seja conciso e forneça exemplos práticos." # ~30 tokensEvite redundância:
# ❌ Ruim: Repetir contexto a cada mensagem prompt = f""" Você é um especialista em Python. Considere o seguinte código: {code} Histórico da conversa: {conversation_history} Pergunta: {user_question} """ # ✅ Bom: Contexto no system message, resto nas mensagens messages = [ {"role": "system", "content": "Você é um especialista em Python."}, {"role": "user", "content": f"Código:\n{code}"}, {"role": "user", "content": user_question} ]
Otimização de Custo e Latência
Quando você move um sistema com LLMs de prova de conceito para produção, dois fatores se tornam críticos: custo e latência. Aplicações em escala podem processar milhões de tokens por dia, e cada token tem um custo associado. Além disso, usuários esperam respostas rápidas, latência alta afeta diretamente a experiência. E, para complicar ainda mais as nossas vidas, sistemas agênticos frequentemente exigem múltiplas chamadas a LLMs por interação, multiplicando tanto custo quanto latência.
Nesta seção, vamos explorar como entender, medir e otimizar ambos os aspectos.
Entendendo o Custo por Token
A maioria dos provedores de LLM cobra por tokens processados (OpenAI s.d.b), separando input tokens (o que você envia) e output tokens (o que o modelo gera). Output tokens geralmente custam mais, pois exigem mais processamento.
Exemplo real de precificação (valores aproximados):
| Modelo | Input ($/1K tokens) |
Output ($/1K tokens) |
Context Window |
|---|---|---|---|
| GPT-3.5 Turbo | $0.0015 | $0.002 | 16K |
| GPT-4 | $0.03 | $0.06 | 8K |
| GPT-4 Turbo | $0.01 | $0.03 | 128K |
| Claude 3 Haiku | $0.00025 | $0.00125 | 200K |
| Claude 3.5 Sonnet | $0.003 | $0.015 | 200K |
| Claude 3 Opus | $0.015 | $0.075 | 200K |
Vamos calcular o custo de uma conversação típica:
# Cenário: Assistente de código usando GPT-4
config_exemplo = {
"input_tokens": 1500, # Prompt + histórico de conversação
"output_tokens": 500, # Código gerado + explicação
"input_cost_per_1k": 0.03,
"output_cost_per_1k": 0.06
}
# Cálculo do custo
def calcular_custo(input_tokens, output_tokens, input_cost, output_cost):
custo_input = (input_tokens / 1000) * input_cost
custo_output = (output_tokens / 1000) * output_cost
custo_total = custo_input + custo_output
return {
"custo_input": custo_input,
"custo_output": custo_output,
"custo_total": custo_total
}
resultado = calcular_custo(
input_tokens=1500,
output_tokens=500,
input_cost=0.03,
output_cost=0.06
)
print(f"Custo por chamada: ${resultado['custo_total']:.4f}")
# Saída: Custo por chamada: $0.0750Projeção de custos em escala:
# Diferentes volumes de uso
volumes = {
"Startup (1K chamadas/dia)": 30_000, # ~1 mês
"Growth (10K chamadas/dia)": 300_000, # ~1 mês
"Enterprise (100K chamadas/dia)": 3_000_000 # ~1 mês
}
custo_por_chamada = 0.075 # do exemplo acima
for cenario, chamadas_mes in volumes.items():
custo_mensal = chamadas_mes * custo_por_chamada
print(f"{cenario}: ${custo_mensal:,.2f}/mês")
# Saída:
# Startup (1K chamadas/dia): $2,250.00/mês
# Growth (10K chamadas/dia): $22,500.00/mês
# Enterprise (100K chamadas/dia): $225,000.00/mêsA diferença entre usar GPT-3.5 Turbo ($0.00175 média) e GPT-4 ($0.045 média) em escala enterprise seria:
- GPT-3.5: ~$5,250/mês
- GPT-4: ~$135,000/mês
Diferença: $129,750/mês - mais de 25x mais caro!
Estratégias de Redução de Custo
Agora que entendemos como os custos escalam, vamos explorar estratégias práticas para otimizá-los sem comprometer a qualidade do sistema. As técnicas a seguir podem reduzir custos em 40-70%, dependendo do caso de uso.
Estratégia 1: Model Routing (Roteamento Inteligente de Modelos)
A ideia é simples: nem toda tarefa precisa do modelo mais poderoso. Use modelos menores e mais baratos para tarefas simples, reservando modelos caros apenas quando necessário.
Exemplo: Cascading com fallback
Imagine um sistema de atendimento ao cliente. Perguntas simples (“Qual o horário de atendimento?”) não precisam de GPT-4. Perguntas complexas (“Explique a diferença entre os planos Premium e Enterprise considerando meu caso de uso específico”) se beneficiam do modelo mais poderoso.
from typing import Dict, Any
import openai
class ModelRouter:
"""Router inteligente que escolhe o modelo baseado na complexidade da tarefa"""
def __init__(self):
self.models = {
"fast": "gpt-3.5-turbo", # Rápido e barato
"balanced": "gpt-4-turbo", # Balanceado
"powerful": "gpt-4" # Poderoso mas caro
}
def classify_complexity(self, prompt: str) -> str:
"""
Classifica a complexidade do prompt.
Em produção, isso poderia usar um modelo classificador dedicado.
"""
# Heurísticas simples (você pode sofisticar isso)
# Podemos trabalhar com processamento de linguagem natural
# usando SpaCy para uma melhor classificação
indicators_complex = ["explique", "compare", "analise", "por que", "como funciona"]
indicators_simple = ["qual", "quando", "onde", "liste"]
prompt_lower = prompt.lower()
# Contagem de palavras como proxy de complexidade
word_count = len(prompt.split())
if word_count > 50 or any(ind in prompt_lower for ind in indicators_complex):
return "powerful"
elif word_count > 20:
return "balanced"
else:
return "fast"
def generate(self, prompt: str, force_model: str = None) -> Dict[str, Any]:
"""Gera resposta usando o modelo apropriado"""
# Permite override manual se necessário
if force_model:
model = self.models[force_model]
else:
complexity = self.classify_complexity(prompt)
model = self.models[complexity]
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return {
"response": response.choices[0].message.content,
"model_used": model,
"tokens": response.usage.total_tokens
}
# Exemplo de uso
router = ModelRouter()
# Pergunta simples → usa GPT-3.5
result1 = router.generate("Qual o horário de funcionamento?")
print(f"Modelo usado: {result1['model_used']}") # gpt-3.5-turbo
# Pergunta complexa → usa GPT-4
result2 = router.generate(
"Explique as diferenças arquiteturais entre microsserviços e monólitos, "
"considerando trade-offs de escalabilidade, manutenibilidade e custo operacional."
)
print(f"Modelo usado: {result2['model_used']}") # gpt-4Matriz de decisão para escolha de modelo:
| Tipo de Tarefa | Complexidade | Modelo Recomendado | Custo Relativo |
|---|---|---|---|
| Classificação de sentimento | Baixa | GPT-3.5 / Haiku | 1× |
| Extração de entidades | Baixa | GPT-3.5 / Haiku | 1× |
| Respostas factuais curtas | Baixa | GPT-3.5 / Haiku | 1× |
| Geração de código simples | Média | GPT-4 Turbo / Sonnet | 5-8× |
| Conversação natural | Média | GPT-4 Turbo / Sonnet | 5-8× |
| Análise de código complexo | Alta | GPT-4 / Opus | 15-25× |
| Raciocínio multi-step | Alta | GPT-4 / Opus | 15-25× |
| Tarefas críticas de segurança | Alta | GPT-4 / Opus | 15-25× |
Economia estimada: 40-60% em sistemas com mix de tarefas simples e complexas.
Estratégia 2: Caching Inteligente
Muitas aplicações fazem chamadas redundantes ao LLM. Se você processa “Qual a capital da França?” 1.000 vezes por mês, está pagando 1.000 vezes pela mesma resposta.
Implementação de cache com Redis:
import hashlib
import json
import redis
from typing import Optional, Dict, Any
class LLMCache:
"""
Cache inteligente para respostas de LLM.
Economiza custos evitando chamadas redundantes.
"""
def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
"""
Args:
redis_client: Cliente Redis configurado
default_ttl: Tempo de vida do cache em segundos (default: 1 hora)
"""
self.redis = redis_client
self.default_ttl = default_ttl
def _generate_cache_key(self, prompt: str, config: Dict[str, Any]) -> str:
"""
Gera chave de cache determinística baseada em prompt + configuração.
Args:
prompt: Texto do prompt
config: Dicionário com configurações (temperature, max_tokens, etc.)
Returns:
Hash SHA-256 da combinação prompt + config
"""
# Serializa config de forma determinística (sorted_keys garante ordem)
config_str = json.dumps(config, sort_keys=True)
cache_content = f"{prompt}||{config_str}"
# Gera hash
return hashlib.sha256(cache_content.encode()).hexdigest()
def get(self, prompt: str, config: Dict[str, Any]) -> Optional[str]:
"""Busca resposta no cache"""
key = self._generate_cache_key(prompt, config)
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, prompt: str, config: Dict[str, Any], response: str, ttl: int = None):
"""Armazena resposta no cache"""
key = self._generate_cache_key(prompt, config)
ttl = ttl or self.default_ttl
self.redis.setex(
key,
ttl,
json.dumps(response)
)
def get_stats(self) -> Dict[str, int]:
"""Retorna estatísticas de uso do cache"""
# Em produção, você manteria contadores de hits/misses
return {
"total_keys": self.redis.dbsize(),
# Implementar contadores de hit/miss rate
}
# Exemplo de uso em produção
def generate_with_cache(prompt: str, config: Dict[str, Any]) -> str:
"""
Wrapper que adiciona cache transparente a chamadas LLM.
"""
# Tenta buscar do cache primeiro
cached_response = cache.get(prompt, config)
if cached_response:
print(f"✓ Cache HIT - Economia de ~${calcular_custo_chamada(prompt):.4f}")
return cached_response
# Cache miss - chama o LLM
print(f"✗ Cache MISS - Chamando LLM...")
response = openai.ChatCompletion.create(
model=config.get("model", "gpt-4"),
messages=[{"role": "user", "content": prompt}],
temperature=config.get("temperature", 0.7),
max_tokens=config.get("max_tokens", 500)
)
result = response.choices[0].message.content
# Armazena no cache para próximas chamadas
cache.set(prompt, config, result)
return result
# Configuração
redis_client = redis.Redis(host='localhost', port=6379, db=0)
cache = LLMCache(redis_client, default_ttl=7200) # Cache de 2 horas
# Uso
config = {"model": "gpt-4", "temperature": 0.7, "max_tokens": 200}
# Primeira chamada - cache miss
response1 = generate_with_cache("Explique o que é REST API", config)
# ✗ Cache MISS - Chamando LLM...
# Segunda chamada com mesmo prompt - cache hit
response2 = generate_with_cache("Explique o que é REST API", config)
# ✓ Cache HIT - Economia de ~$0.0450Considerações importantes sobre caching:
TTL (Time To Live): Ajuste conforme a natureza dos dados
- Respostas factuais estáveis: 24-48 horas
- Conteúdo que muda frequentemente: 1-2 horas
- Dados em tempo real: Não use cache ou TTL muito curto (<5 min)
Invalidação de cache: Implemente estratégias para limpar cache quando dados mudam
def invalidate_cache_pattern(pattern: str): """Remove todas as chaves que correspondem ao padrão""" for key in redis_client.scan_iter(match=pattern): redis_client.delete(key) # Exemplo: Invalida todos os caches relacionados a preços invalidate_cache_pattern("*preço*")
Estratégia 3: Batch Processing
Quando você tem múltiplas tarefas independentes do mesmo tipo, processe-as em lote em uma única chamada.
Exemplo: Análise de sentimento em batch
from typing import List, Dict
def analyze_sentiment_individual(reviews: List[str]) -> List[str]:
"""❌ Abordagem ineficiente: uma chamada por review"""
results = []
for review in reviews:
prompt = f"Classifique o sentimento (Positivo/Negativo/Neutro): {review}"
response = llm.generate(prompt)
results.append(response)
return results
# Custo para 100 reviews:
# - 100 chamadas × overhead de API
# - 100 × latência de round-trip
# - ~100 × 50 tokens = 5,000 tokens input
# - ~100 × 10 tokens = 1,000 tokens output
# Total: ~$0.18 (GPT-4) + alta latência
def analyze_sentiment_batch(reviews: List[str]) -> List[str]:
"""✅ Abordagem eficiente: batch em uma chamada"""
# Formata todos os reviews em um único prompt
reviews_formatted = "\n".join(
f"{i+1}. {review}"
for i, review in enumerate(reviews)
)
prompt = f"""
Classifique o sentimento de cada review abaixo como Positivo, Negativo ou Neutro.
Reviews:
{reviews_formatted}
Responda APENAS no formato:
1. [Sentimento]
2. [Sentimento]
(etc.)
"""
response = llm.generate(prompt, max_tokens=500)
# Parse da resposta
sentiments = [
line.split(". ")[1].strip()
for line in response.strip().split("\n")
if line.strip()
]
return sentiments
# Custo para 100 reviews:
# - 1 chamada (sem overhead múltiplo)
# - 1 × latência
# - ~1,500 tokens input (batch formatting é mais eficiente)
# - ~200 tokens output
# Total: ~$0.06 (GPT-4) + baixa latência
# Economia: 67% + 100× mais rápido!Limitações do batch processing:
- Context window: Batches muito grandes podem exceder o limite
- Parsing: Necessário extrair resultados individuais da resposta
- Falhas parciais: Se uma tarefa falha, pode afetar o batch inteiro
Tamanho de batch recomendado:
def calculate_optimal_batch_size(
avg_item_tokens: int,
context_window: int = 8000,
safety_margin: float = 0.3
) -> int:
"""
Calcula tamanho ótimo de batch baseado em tokens disponíveis.
Args:
avg_item_tokens: Tokens médios por item
context_window: Context window do modelo
safety_margin: Margem de segurança (30% reservado para prompt structure)
"""
available_tokens = int(context_window * (1 - safety_margin))
return max(1, available_tokens // avg_item_tokens)
# Exemplo para GPT-4 (8K context)
batch_size = calculate_optimal_batch_size(
avg_item_tokens=50, # Review médio tem 50 tokens
context_window=8000
)
print(f"Batch size recomendado: {batch_size}") # ~110 reviews por batchEstratégia 4: Compressão de Prompts
Tokens desperdiçados aumentam custos desnecessariamente. Aprenda a comunicar a mesma informação com menos tokens.
Técnicas de compressão:
# ❌ Prompt verboso (220 tokens)
prompt_verbose = """
Olá! Eu gostaria muito que você pudesse, por gentileza, analisar o código
Python que eu vou fornecer logo abaixo com bastante atenção e cuidado.
Por favor, me dê uma explicação bem detalhada sobre o que exatamente este
código faz, passo a passo. Além disso, se você conseguir identificar
quaisquer problemas, bugs, ou questões de performance que possam existir
no código, eu agradeceria muito se você pudesse apontá-los. E também,
se possível, gostaria que você sugerisse algumas melhorias que poderiam
tornar este código melhor, mais eficiente, ou mais legível.
Aqui está o código:
def calculate_total(items):
total = 0
for item in items:
total = total + item
return total
Muito obrigado pela ajuda!
"""
# ✅ Prompt comprimido (45 tokens - redução de 80%!)
prompt_compressed = """
Analise este código Python. Identifique problemas e sugira melhorias:
def calculate_total(items):
total = 0
for item in items:
total = total + item
return total
"""Checklist de compressão:
Remova cortesias: “por favor”, “gostaria que”, “seria possível”
Use imperativo: “Analise” em vez de “Eu gostaria que você analisasse”
Elimine redundâncias: “Identifique problemas” já implica “se houver”
Prefira listas a parágrafos:
# Antes (verboso) "Analise considerando: primeiro a performance, depois a legibilidade, e também a manutenibilidade do código" # Depois (compacto) "Analise: performance, legibilidade, manutenibilidade"Evite exemplos desnecessários: Se o modelo já entende, não precisa de 3 exemplos
Economia estimada: 20-40% de redução em tokens de input, especialmente em prompts de sistema e instruções
Otimização de Latência
Latência afeta diretamente a experiência do usuário. Enquanto custos são preocupação do orçamento, latência é preocupação do usuário final. Respostas lentas levam a abandono, frustração e má experiência (e também pode ser crítico em aplicações em tempo real).
Entendendo a latência em LLMs:
A latência em chamadas LLM tem componentes distintos:
- Network latency: Tempo de ida e volta da requisição HTTP/HTTPS
- Região local (mesma região AWS/GCP): 10-50ms
- Cross-region (ex: US East → US West): 50-100ms
- Intercontinental (ex: US → Europa): 100-200ms
- Fonte: Medições típicas de RTT (Round-Trip Time) em clouds públicas
- Time to first token (TTFT): Tempo até o modelo começar a gerar
- Modelos pequenos (GPT-3.5, Claude Haiku): 200-500ms
- Modelos grandes (GPT-4, Claude Opus): 500-1500ms
- Fatores: Tamanho do modelo, carga do servidor, comprimento do prompt
- Fonte: (Artificial Analysis 2024)
- Generation latency: Tempo para gerar cada token após o primeiro
- APIs otimizadas (GPT-4 Turbo, Claude 3.5): 40-80ms/token
- Modelos standard (GPT-4, Claude 3): 80-120ms/token
- Self-hosted (depende do hardware): 50-500ms/token
- Fonte: Observações em produção e (Artificial Analysis 2024)
Para respostas longas, a latência pode facilmente ultrapassar 40 segundos, tempo inaceitável para qualquer aplicação. Vamos explorar estratégias para otimizar cada componente.
Estratégia 1: Streaming de Respostas
O streaming permite exibir a resposta enquanto ela é gerada, reduzindo a latência percebida pelo usuário.
Como funciona:
- Sem streaming: Usuário espera 40s → vê resposta completa
- Com streaming: Usuário espera 600ms → começa a ver texto → continua vendo texto em tempo real
import anthropic
from typing import Generator
def generate_with_streaming(prompt: str) -> Generator[str, None, None]:
"""
Gera resposta com streaming, exibindo tokens conforme são gerados.
Args:
prompt: Texto do prompt
Yields:
Chunks de texto conforme são gerados pelo modelo
"""
client = anthropic.Anthropic()
print("Iniciando geração com streaming...")
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) # Exibe imediatamente
yield text
print("\n✓ Geração completa")
# Exemplo de uso
prompt = "Explique o conceito de containerização em computação."
# Sem streaming: usuário espera ~15s para ver qualquer coisa
# response = client.messages.create(...) # 15s de espera → resposta completa
# Com streaming: usuário vê primeiras palavras em ~600ms
for chunk in generate_with_streaming(prompt):
# Cada chunk pode ser processado ou exibido imediatamente
pass
# Time to first token: ~600ms
# Time to complete: ~15s (mas usuário já está lendo desde 600ms)Implementação para aplicações web com FastAPI:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
import json
app = FastAPI()
class ChatRequest(BaseModel):
prompt: str
max_tokens: int = 1000
@app.post("/api/chat/stream")
async def chat_stream(request: ChatRequest):
"""
Endpoint que retorna resposta em streaming via SSE (Server-Sent Events).
Args:
request: Objeto com prompt e configurações
Returns:
StreamingResponse com chunks de texto em formato SSE
"""
client = anthropic.Anthropic()
async def generate():
"""Generator que produz chunks de texto em formato SSE"""
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=request.max_tokens,
messages=[{"role": "user", "content": request.prompt}]
) as stream:
for text in stream.text_stream:
# Formato SSE (Server-Sent Events)
# Cada evento deve terminar com \n\n
yield f"data: {json.dumps({'text': text})}\n\n"
# Envia evento final indicando conclusão
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Desabilita buffering no Nginx
}
)Exemplo de uso do cliente (JavaScript/TypeScript)
const eventSource = new EventSource('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
prompt: 'Explique containerização',
max_tokens: 1000
})
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
eventSource.close();
} else {
console.log(data.text); // Exibe cada chunk
}
};Estratégia 2: Otimização de max_tokens
Configurar max_tokens corretamente evita geração desnecessária e reduz latência.
O problema:
# ❌ Anti-padrão comum: max_tokens excessivo
response = llm.generate(
"Classifique este email como spam ou não-spam",
max_tokens=2000 # Modelo pode gerar até 2000, mas precisa só de 5-10 tokens
)
# Latência desnecessária: O modelo considera 2000 tokens como limite,
# afetando otimizações internas de geraçãoA solução: Estimativa inteligente por tipo de tarefa:
from typing import Dict
from enum import Enum
class TaskType(Enum):
CLASSIFICATION = "classification"
EXTRACTION = "extraction"
SHORT_ANSWER = "short_answer"
EXPLANATION = "explanation"
CODE_GENERATION = "code_generation"
ARTICLE = "article"
class TokenEstimator:
"""Estima max_tokens apropriado baseado no tipo de tarefa"""
# Estimativas baseadas em análise de 10K+ interações reais
ESTIMATES: Dict[TaskType, int] = {
TaskType.CLASSIFICATION: 10, # "Spam" ou "Positivo"
TaskType.EXTRACTION: 50, # Nome, email, telefone
TaskType.SHORT_ANSWER: 100, # Resposta objetiva
TaskType.EXPLANATION: 300, # Explicação conceitual
TaskType.CODE_GENERATION: 500, # Função ou classe
TaskType.ARTICLE: 1500, # Artigo completo
}
SAFETY_MARGIN = 1.2 # 20% de margem de segurança
@classmethod
def estimate(cls, task_type: TaskType, safety_margin: float = None) -> int:
"""
Retorna estimativa de max_tokens para o tipo de tarefa.
Args:
task_type: Tipo da tarefa
safety_margin: Margem de segurança (default: 1.2 = 20%)
Returns:
max_tokens recomendado
"""
margin = safety_margin or cls.SAFETY_MARGIN
base_estimate = cls.ESTIMATES[task_type]
return int(base_estimate * margin)
# Exemplo de uso
def generate_with_smart_limits(prompt: str, task_type: TaskType) -> str:
"""Gera resposta com max_tokens otimizado para o tipo de tarefa"""
max_tokens = TokenEstimator.estimate(task_type)
print(f"Task type: {task_type.value}")
print(f"Max tokens: {max_tokens}")
response = llm.generate(
prompt,
max_tokens=max_tokens,
temperature=0.3
)
return response
# Classificação: usa apenas 12 tokens (10 × 1.2)
result = generate_with_smart_limits(
"Este email é spam? Responda apenas Sim ou Não",
TaskType.CLASSIFICATION
)
# Latência: ~1s (vs ~5s com max_tokens=2000)
# Explicação: usa 360 tokens (300 × 1.2)
result = generate_with_smart_limits(
"Explique o que é recursão em programação",
TaskType.EXPLANATION
)
# Latência: ~15s (vs ~40s com max_tokens=2000)Estratégia 3: Paralelização de Chamadas
Quando você tem múltiplas chamadas independentes, execute-as em paralelo para reduzir latência total.
O problema:
# ❌ Processamento serial: latência acumula
def process_documents_serial(documents: list[str]) -> list[str]:
"""Processa documentos um por vez"""
summaries = []
for doc in documents:
summary = llm.generate(f"Resumir: {doc}")
summaries.append(summary)
return summaries
# 10 documentos × 5s cada = 50 segundos totalA solução: Processamento paralelo com asyncio:
import asyncio
from anthropic import AsyncAnthropic
from typing import List
async def process_documents_parallel(documents: List[str]) -> List[str]:
"""
Processa múltiplos documentos em paralelo.
Args:
documents: Lista de documentos para processar
Returns:
Lista de resumos (mesma ordem dos documentos)
"""
client = AsyncAnthropic()
async def summarize_single(doc: str, index: int) -> tuple[int, str]:
"""
Sumariza um único documento.
Args:
doc: Documento para sumarizar
index: Índice para manter ordem
Returns:
Tupla (índice, resumo)
"""
try:
response = await client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{"role": "user", "content": f"Resumir: {doc}"}]
)
summary = response.content[0].text
return (index, summary)
except Exception as e:
print(f"Erro ao processar documento {index}: {e}")
return (index, f"[ERRO: {str(e)}]")
# Cria tasks para todos os documentos
tasks = [
summarize_single(doc, i)
for i, doc in enumerate(documents)
]
# Executa todas as tasks em paralelo
results = await asyncio.gather(*tasks)
# Reordena resultados pela ordem original
results.sort(key=lambda x: x[0])
summaries = [summary for _, summary in results]
return summaries
# Uso
documents = [
"Documento 1: Lorem ipsum...",
"Documento 2: Dolor sit amet...",
"Documento 3: Consectetur adipiscing...",
# ... 10 documentos
]
# Serial: 10 × 5s = 50 segundos
# summaries = process_documents_serial(documents)
# Paralelo: ~5 segundos (limitado pelo documento mais lento)
summaries = asyncio.run(process_documents_parallel(documents))
print(f"✓ Processados {len(summaries)} documentos em ~5s")
# Redução de latência: 90%Controle de concorrência para evitar rate limits:
import asyncio
from asyncio import Semaphore
async def process_with_concurrency_limit(
documents: List[str],
max_concurrent: int = 5
) -> List[str]:
"""
Processa documentos com limite de chamadas simultâneas.
Args:
documents: Lista de documentos
max_concurrent: Máximo de chamadas simultâneas
Returns:
Lista de resumos
"""
client = AsyncAnthropic()
semaphore = Semaphore(max_concurrent) # Limite de 5 chamadas simultâneas
async def summarize_with_limit(doc: str, index: int) -> tuple[int, str]:
async with semaphore: # Espera se limite atingido
response = await client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=300,
messages=[{"role": "user", "content": f"Resumir: {doc}"}]
)
return (index, response.content[0].text)
tasks = [summarize_with_limit(doc, i) for i, doc in enumerate(documents)]
results = await asyncio.gather(*tasks)
results.sort(key=lambda x: x[0])
return [summary for _, summary in results]
# 100 documentos:
# - Serial: 100 × 5s = 500s (8.3 minutos)
# - Paralelo sem limite: rate limit error
# - Paralelo com limite (5 concurrent): 100 / 5 × 5s = 100s (1.7 minutos)
# Redução: 80% sem violar rate limitsEstratégia 4: Load Balancing e Redundância
Para sistemas críticos, implemente load balancing entre múltiplos providers e fallbacks automáticos.
import asyncio
from typing import List, Optional, Protocol
from dataclasses import dataclass
from enum import Enum
class LLMProvider(Protocol):
"""Interface para providers de LLM"""
async def generate(self, prompt: str, timeout: float) -> str: ...
@property
def name(self) -> str: ...
@dataclass
class ProviderConfig:
"""Configuração de um provider"""
provider: LLMProvider
priority: int # 0 = mais prioritário
max_retries: int = 2
timeout_seconds: float = 10.0
class LoadBalancingStrategy(Enum):
"""Estratégias de balanceamento"""
ROUND_ROBIN = "round_robin" # Alterna entre providers
PRIORITY = "priority" # Usa provider de maior prioridade primeiro
FASTEST_FIRST = "fastest_first" # Usa provider mais rápido historicamente
class LLMLoadBalancer:
"""
Load balancer inteligente para múltiplos providers de LLM.
Fornece fallback automático e distribuição de carga.
"""
def __init__(
self,
providers: List[ProviderConfig],
strategy: LoadBalancingStrategy = LoadBalancingStrategy.PRIORITY
):
self.providers = sorted(providers, key=lambda p: p.priority)
self.strategy = strategy
self.current_index = 0
self.provider_stats = {p.provider.name: {"calls": 0, "failures": 0} for p in providers}
async def generate(
self,
prompt: str,
max_attempts: Optional[int] = None
) -> dict:
"""
Gera resposta com fallback automático entre providers.
Args:
prompt: Texto do prompt
max_attempts: Máximo de tentativas (default: tenta todos os providers)
Returns:
Dict com 'response', 'provider_used', 'attempt_count'
Raises:
Exception: Se todos os providers falharem
"""
max_attempts = max_attempts or len(self.providers) * 2
attempts = 0
last_error = None
for attempt in range(max_attempts):
attempts += 1
# Seleciona provider baseado na estratégia
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
provider_config = self.providers[self.current_index % len(self.providers)]
self.current_index += 1
else: # PRIORITY
provider_config = self.providers[attempt % len(self.providers)]
provider = provider_config.provider
self.provider_stats[provider.name]["calls"] += 1
try:
print(f"Tentativa {attempts}: usando {provider.name}")
response = await asyncio.wait_for(
provider.generate(prompt, timeout=provider_config.timeout_seconds),
timeout=provider_config.timeout_seconds + 1.0 # +1s de buffer
)
print(f"✓ Sucesso com {provider.name}")
return {
"response": response,
"provider_used": provider.name,
"attempt_count": attempts
}
except asyncio.TimeoutError:
last_error = f"Timeout com {provider.name}"
print(f"✗ {last_error}")
self.provider_stats[provider.name]["failures"] += 1
except Exception as e:
last_error = f"Erro com {provider.name}: {str(e)}"
print(f"✗ {last_error}")
self.provider_stats[provider.name]["failures"] += 1
# Pequeno delay antes de tentar próximo provider
await asyncio.sleep(0.1)
# Todos os providers falharam
raise Exception(
f"Todos os providers falharam após {attempts} tentativas. "
f"Último erro: {last_error}"
)
def get_stats(self) -> dict:
"""Retorna estatísticas de uso dos providers"""
return {
name: {
**stats,
"success_rate": (
(stats["calls"] - stats["failures"]) / stats["calls"]
if stats["calls"] > 0 else 0
)
}
for name, stats in self.provider_stats.items()
}
# Exemplo de uso
async def main():
# Configuração de providers (mock para exemplo)
providers = [
ProviderConfig(
provider=OpenAIProvider(api_key="..."),
priority=0, # Prioridade alta
timeout_seconds=8.0
),
ProviderConfig(
provider=AnthropicProvider(api_key="..."),
priority=1, # Prioridade média
timeout_seconds=10.0
),
ProviderConfig(
provider=GoogleProvider(api_key="..."),
priority=2, # Backup
timeout_seconds=12.0
),
]
balancer = LLMLoadBalancer(providers, strategy=LoadBalancingStrategy.PRIORITY)
# Chamada com fallback automático
result = await balancer.generate("Explique machine learning")
print(f"Resposta obtida de: {result['provider_used']}")
print(f"Tentativas necessárias: {result['attempt_count']}")
# Estatísticas
stats = balancer.get_stats()
print("\nEstatísticas dos providers:")
for name, data in stats.items():
print(f"{name}: {data['calls']} calls, {data['success_rate']:.1%} success rate")
# asyncio.run(main())Benefícios do load balancing:
- Resiliência: Se um provider cai, sistema continua funcionando
- Otimização de custo: Direciona tráfego para provider mais barato quando disponível
- Latência reduzida: Usa provider mais rápido para tarefa específica
- SLA melhorado: Uptime aumenta significativamente com redundância
Observabilidade e Monitoramento em Produção
Sistemas de LLM em produção exigem observabilidade robusta (Prometheus s.d.; OpenTelemetry s.d.) para garantir performance, controlar custos e diagnosticar problemas rapidamente. Diferente de aplicações tradicionais, LLMs introduzem métricas específicas (tokens, custos variáveis, qualidade de resposta) que precisam de instrumentação dedicada.
Nesta seção, exploraremos como implementar observabilidade completa usando padrões da indústria: métricas (Prometheus), traces (OpenTelemetry), logs estruturados, e alertas inteligentes.
Arquitetura de Observabilidade
Uma stack completa de observabilidade para sistemas de LLM (Sigelman et al. 2010; OpenTelemetry s.d.) consiste em três pilares:
Componentes essenciais:
- Métricas (Prometheus s.d.): Agregações numéricas (latência P95, custo/hora, tokens/segundo)
- Traces (OpenTelemetry s.d.): Rastreamento distribuído de chamadas LLM e dependências
- Logs: Eventos estruturados para debugging e auditoria
Métricas Críticas para LLMs
Antes de implementar, defina quais métricas importam para seu caso de uso.
Por exemplo:
Categoria: Performance
| Métrica | Descrição | SLO Típico |
|---|---|---|
| Latency P50/P95/P99 | Percentis de latência de resposta | P95 < 3s |
| Time to First Token (TTFT) | Tempo até primeiro token (streaming) | P95 < 800ms |
| Throughput | Requisições processadas/segundo | > 10 req/s |
| Error Rate | Taxa de erros (4xx, 5xx, timeouts) | < 0.1% |
Categoria: Custo e Recursos
| Métrica | Descrição | Objetivo |
|---|---|---|
| Token Usage Rate | Tokens/minuto (input + output) | Monitorar tendências |
| Cost per Request | Custo médio por chamada | < $0.05 |
| Cost per User | Custo por usuário ativo/dia | < $2.00 |
| Cache Hit Rate | % de chamadas servidas do cache | > 40% |
Categoria: Qualidade
| Métrica | Descrição | Alvo |
|---|---|---|
| Prompt Rejection Rate | % de prompts bloqueados por moderação | < 2% |
| Retry Rate | % de chamadas que precisaram retry | < 5% |
| Fallback Rate | % que usou modelo fallback | < 10% |
Implementação Prática: Sistema de Métricas com Prometheus
Na parte parte III: Arquitetura e Produção, abordaremos a implementação de um sistema de métricas com Prometheus e OpenTelemetry no nosso agente de exemplo. Porém, aqui está um exemplo focado em métricas específicas para LLMs que você pode adaptar para sua aplicação quando necessário.
# uv pip install prometheus-client openai anthropic
from prometheus_client import Counter, Histogram, Gauge, Info, start_http_server
from contextlib import contextmanager
from typing import Optional, Dict, Any
import time
import openai
# ============================================================================
# Definição de Métricas
# ============================================================================
# Contador: incrementa sempre (nunca decrementa)
llm_requests_total = Counter(
'llm_requests_total',
'Total de requisições LLM',
['model', 'provider', 'status', 'cached']
)
# Histograma: distribui valores em buckets (ideal para latência)
llm_request_duration_seconds = Histogram(
'llm_request_duration_seconds',
'Duração de requisições LLM em segundos',
['model', 'provider'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0] # Buckets customizados
)
llm_tokens_total = Counter(
'llm_tokens_total',
'Total de tokens processados',
['model', 'provider', 'token_type'] # token_type: input, output
)
llm_cost_usd_total = Counter(
'llm_cost_usd_total',
'Custo total acumulado em USD',
['model', 'provider']
)
# Gauge: valor que pode subir ou descer (snapshot do estado atual)
llm_active_requests = Gauge(
'llm_active_requests',
'Requisições LLM ativas no momento',
['model', 'provider']
)
llm_cache_hit_rate = Gauge(
'llm_cache_hit_rate',
'Taxa de cache hit (0.0 - 1.0)',
['model']
)
# Info: metadados estáticos
llm_build_info = Info(
'llm_build',
'Informações de build da aplicação'
)
llm_build_info.info({
'version': '1.0.0',
'environment': 'production',
'commit_sha': 'abc123def'
})
# ============================================================================
# Instrumentação de Chamadas LLM
# ============================================================================
class LLMMetricsCollector:
"""
Coletor de métricas para chamadas LLM com suporte a Prometheus.
Rastreia latência, tokens, custos, erros e cache hits em tempo real.
"""
PRICING = {
# Preços em USD por 1K tokens (input, output)
"gpt-3.5-turbo": (0.0015, 0.002),
"gpt-4": (0.03, 0.06),
"gpt-4-turbo": (0.01, 0.03),
"claude-3-5-sonnet-20241022": (0.003, 0.015),
"claude-3-haiku-20240307": (0.00025, 0.00125),
}
def __init__(self, cache_client=None):
self.cache = cache_client
def _calculate_cost(
self,
model: str,
input_tokens: int,
output_tokens: int
) -> float:
"""Calcula custo baseado em tokens e modelo"""
if model not in self.PRICING:
return 0.0
input_price, output_price = self.PRICING[model]
cost = (input_tokens / 1000) * input_price + (output_tokens / 1000) * output_price
return cost
@contextmanager
def track_request(
self,
model: str,
provider: str,
prompt: str,
config: Dict[str, Any]
):
"""
Context manager para rastrear métricas de uma chamada LLM.
Uso:
with collector.track_request("gpt-4", "openai", prompt, config) as ctx:
response = llm.generate(prompt)
ctx.record_response(response)
"""
# Marca início da requisição
start_time = time.time()
llm_active_requests.labels(model=model, provider=provider).inc()
# Contexto para armazenar dados da resposta
context = {"cached": False, "error": None}
try:
# Verifica cache antes da chamada
if self.cache:
cached_response = self.cache.get(prompt, config)
if cached_response:
context["cached"] = True
context["response"] = cached_response
yield context
# Após yield, registra sucesso
status = "success"
except Exception as e:
# Registra erro
status = "error"
context["error"] = str(e)
raise
finally:
# Sempre executa (sucesso ou erro)
duration = time.time() - start_time
# Decrementa requisições ativas
llm_active_requests.labels(model=model, provider=provider).dec()
# Registra latência
llm_request_duration_seconds.labels(
model=model,
provider=provider
).observe(duration)
# Registra contadores
cached_label = "true" if context["cached"] else "false"
llm_requests_total.labels(
model=model,
provider=provider,
status=status,
cached=cached_label
).inc()
# Registra tokens e custo (se disponível)
if "response" in context and not context["cached"]:
response = context["response"]
if hasattr(response, "usage"):
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
# Tokens
llm_tokens_total.labels(
model=model,
provider=provider,
token_type="input"
).inc(input_tokens)
llm_tokens_total.labels(
model=model,
provider=provider,
token_type="output"
).inc(output_tokens)
# Custo
cost = self._calculate_cost(model, input_tokens, output_tokens)
llm_cost_usd_total.labels(
model=model,
provider=provider
).inc(cost)
# ============================================================================
# Exemplo de Uso
# ============================================================================
def example_instrumented_llm_call():
"""Exemplo de chamada LLM completamente instrumentada"""
collector = LLMMetricsCollector()
prompt = "Explique o que é Kubernetes em 2 parágrafos"
config = {"temperature": 0.7, "max_tokens": 300}
with collector.track_request(
model="gpt-4-turbo",
provider="openai",
prompt=prompt,
config=config
) as ctx:
# Executa a chamada
response = openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
**config
)
# Armazena resposta no contexto para métricas
ctx["response"] = response
return response.choices[0].message.content
# Inicia servidor HTTP para Prometheus scraping
# Métricas expostas em http://localhost:8000/metrics
start_http_server(8000)
# Executa chamada instrumentada
# result = example_instrumented_llm_call()
# Prometheus scrape config (adicionar ao prometheus.yml):
"""
scrape_configs:
- job_name: 'llm-service'
static_configs:
- targets: ['localhost:8000']
scrape_interval: 15s
"""Queries PromQL úteis:
# Taxa de requisições por segundo (últimos 5 minutos)
rate(llm_requests_total[5m])
# Latência P95 por modelo
histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))
# Custo acumulado na última hora
increase(llm_cost_usd_total[1h])
# Taxa de erro (últimos 15 minutos)
sum(rate(llm_requests_total{status="error"}[15m])) / sum(rate(llm_requests_total[15m]))
# Tokens/segundo por tipo
rate(llm_tokens_total{token_type="output"}[5m])
Rastreamento Distribuído com OpenTelemetry
Traces permitem seguir o caminho de uma requisição através de múltiplos serviços e chamadas LLM, essencial para diagnosticar gargalos em sistemas complexos.
# uv pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-openai
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
from opentelemetry.sdk.resources import Resource
from typing import Dict, Any
import openai
# ============================================================================
# Configuração do OpenTelemetry
# ============================================================================
# Define resource (identifica o serviço)
resource = Resource.create({
"service.name": "llm-agent-service",
"service.version": "1.0.0",
"deployment.environment": "production"
})
# Configura provider
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
# Configura exportador (envia para OpenTelemetry Collector)
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317", # OpenTelemetry Collector gRPC endpoint
insecure=True
)
# Adiciona processor que processa spans em batch (eficiente)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Auto-instrumenta biblioteca OpenAI (traces automáticos!)
OpenAIInstrumentor().instrument()
# ============================================================================
# Rastreamento de Workflows Multi-Step
# ============================================================================
class TracedLLMWorkflow:
"""
Workflow de LLM com rastreamento distribuído completo.
Cada etapa cria um span, permitindo visualizar:
- Latência de cada componente
- Dependências entre chamadas
- Gargalos e falhas
"""
def __init__(self):
self.tracer = trace.get_tracer(__name__)
def process_user_query(self, query: str) -> Dict[str, Any]:
"""
Processa query do usuário em múltiplas etapas rastreadas.
Args:
query: Pergunta do usuário
Returns:
Resultado processado com metadados
"""
# Span raiz do workflow completo
with self.tracer.start_as_current_span(
"process_user_query",
attributes={
"query.length": len(query),
"workflow.version": "v2"
}
) as root_span:
# Etapa 1: Classificação de intenção
intent = self._classify_intent(query)
root_span.set_attribute("query.intent", intent)
# Etapa 2: Recuperação de contexto (se necessário)
if intent in ["factual", "technical"]:
context = self._retrieve_context(query)
root_span.set_attribute("context.retrieved", True)
else:
context = None
root_span.set_attribute("context.retrieved", False)
# Etapa 3: Geração de resposta
response = self._generate_response(query, context, intent)
# Etapa 4: Pós-processamento
final_response = self._post_process(response)
root_span.set_attribute("response.length", len(final_response))
return {
"response": final_response,
"intent": intent,
"used_context": context is not None
}
def _classify_intent(self, query: str) -> str:
"""Classifica intenção da query"""
with self.tracer.start_as_current_span(
"classify_intent",
attributes={"component": "classifier"}
) as span:
# Chamada LLM para classificação (auto-instrumentada)
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{
"role": "system",
"content": "Classifique a intenção: factual, technical, conversational, creative"
}, {
"role": "user",
"content": query
}],
max_tokens=10,
temperature=0.0
)
intent = response.choices[0].message.content.strip().lower()
span.set_attribute("intent.classified", intent)
return intent
def _retrieve_context(self, query: str) -> str:
"""Recupera contexto relevante (mock - em produção seria RAG real)"""
with self.tracer.start_as_current_span(
"retrieve_context",
attributes={"component": "rag"}
) as span:
# Simula embedding + vector search
with self.tracer.start_as_current_span("generate_embedding"):
# embedding = embed_model.encode(query)
pass
with self.tracer.start_as_current_span("vector_search"):
# results = vector_db.search(embedding, top_k=3)
context = "[Contexto recuperado do banco vetorial]"
span.set_attribute("context.length", len(context))
return context
def _generate_response(
self,
query: str,
context: str,
intent: str
) -> str:
"""Gera resposta final"""
with self.tracer.start_as_current_span(
"generate_response",
attributes={
"component": "generator",
"intent": intent,
"has_context": context is not None
}
) as span:
# Monta prompt
if context:
prompt = f"Contexto: {context}\n\nPergunta: {query}"
else:
prompt = query
# Chamada principal (auto-instrumentada)
response = openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=500
)
result = response.choices[0].message.content
span.set_attribute("response.tokens", response.usage.completion_tokens)
return result
def _post_process(self, response: str) -> str:
"""Pós-processa resposta"""
with self.tracer.start_as_current_span(
"post_process",
attributes={"component": "post_processor"}
):
# Simula validação, formatação, moderação
return response.strip()
# ============================================================================
# Exemplo de Uso
# ============================================================================
# workflow = TracedLLMWorkflow()
# result = workflow.process_user_query("O que é Kubernetes?")
# Trace resultante visualizado no Jaeger:
"""
process_user_query (3.2s)
├── classify_intent (0.5s)
│ └── openai.chat.create (0.4s)
├── retrieve_context (0.8s)
│ ├── generate_embedding (0.2s)
│ └── vector_search (0.6s)
├── generate_response (1.7s)
│ └── openai.chat.create (1.6s)
└── post_process (0.05s)
"""Logs Estruturados para Debugging
Logs estruturados (JSON) permitem buscas e análises eficientes:
# uv pip install structlog
import structlog
from typing import Any, Dict
import sys
# ============================================================================
# Configuração de Logging Estruturado
# ============================================================================
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
cache_logger_on_first_use=True
)
logger = structlog.get_logger()
# ============================================================================
# Logging de Chamadas LLM
# ============================================================================
def llm_call_with_logging(prompt: str, config: Dict[str, Any]) -> str:
"""Chamada LLM com logging estruturado completo"""
request_id = generate_request_id() # UUID único
# Bind context (persiste em todos os logs desta chamada)
log = logger.bind(
request_id=request_id,
model=config.get("model", "gpt-4"),
prompt_length=len(prompt)
)
log.info(
"llm_request_started",
temperature=config.get("temperature"),
max_tokens=config.get("max_tokens")
)
try:
start_time = time.time()
response = openai.ChatCompletion.create(
model=config["model"],
messages=[{"role": "user", "content": prompt}],
**config
)
duration = time.time() - start_time
log.info(
"llm_request_completed",
duration_seconds=duration,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
finish_reason=response.choices[0].finish_reason
)
return response.choices[0].message.content
except openai.error.RateLimitError as e:
log.error(
"llm_request_rate_limited",
error=str(e),
retry_after=e.headers.get("retry-after")
)
raise
except Exception as e:
log.error(
"llm_request_failed",
error_type=type(e).__name__,
error_message=str(e),
exc_info=True
)
raise
# Exemplo de log gerado (JSON):
"""
{
"event": "llm_request_completed",
"timestamp": "2025-11-06T10:32:15.123Z",
"level": "info",
"request_id": "550e8400-e29b-41d4-a716-446655440000",
"model": "gpt-4-turbo",
"prompt_length": 245,
"duration_seconds": 2.34,
"input_tokens": 89,
"output_tokens": 312,
"total_tokens": 401,
"finish_reason": "stop"
}
"""Alertas Inteligentes
Configure alertas para anomalias críticas:
# Prometheus Alertmanager - alerts.yml
groups:
- name: llm_service_alerts
interval: 30s
rules:
# Alerta: Latência P95 acima de 5s
- alert: HighLLMLatency
expr: |
histogram_quantile(0.95,
rate(llm_request_duration_seconds_bucket[5m])
) > 5
for: 5m
labels:
severity: warning
team: ai-platform
annotations:
summary: "Latência P95 de LLM acima de 5s"
description: "Modelo {{ $labels.model }} tem P95 de {{ $value }}s"
# Alerta: Taxa de erro > 1%
- alert: HighLLMErrorRate
expr: |
sum(rate(llm_requests_total{status="error"}[5m]))
/ sum(rate(llm_requests_total[5m])) > 0.01
for: 2m
labels:
severity: critical
team: ai-platform
annotations:
summary: "Taxa de erro de LLM acima de 1%"
description: "{{ $value | humanizePercentage }} de erros"
# Alerta: Custo/hora acima do budget
- alert: LLMCostBudgetExceeded
expr: |
rate(llm_cost_usd_total[1h]) * 3600 > 100
for: 10m
labels:
severity: warning
team: finance
annotations:
summary: "Custo de LLM excedeu budget de $100/hora"
description: "Custo atual: ${{ $value }}/hora"
# Alerta: Cache hit rate < 30%
- alert: LowCacheHitRate
expr: |
llm_cache_hit_rate < 0.3
for: 15m
labels:
severity: info
team: ai-platform
annotations:
summary: "Taxa de cache hit abaixo de 30%"
description: "Cache hit rate: {{ $value | humanizePercentage }}"Dashboard Grafana Recomendado (Grafana Labs s.d.):
{
"dashboard": {
"title": "LLM Service - Production Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{"expr": "sum(rate(llm_requests_total[5m])) by (model)"}
]
},
{
"title": "Latency Percentiles",
"targets": [
{"expr": "histogram_quantile(0.50, rate(llm_request_duration_seconds_bucket[5m]))", "legendFormat": "P50"},
{"expr": "histogram_quantile(0.95, rate(llm_request_duration_seconds_bucket[5m]))", "legendFormat": "P95"},
{"expr": "histogram_quantile(0.99, rate(llm_request_duration_seconds_bucket[5m]))", "legendFormat": "P99"}
]
},
{
"title": "Cost (USD/hour)",
"targets": [
{"expr": "rate(llm_cost_usd_total[1h]) * 3600"}
]
},
{
"title": "Token Throughput",
"targets": [
{"expr": "sum(rate(llm_tokens_total{token_type='input'}[5m]))", "legendFormat": "Input"},
{"expr": "sum(rate(llm_tokens_total{token_type='output'}[5m]))", "legendFormat": "Output"}
]
}
]
}
}Pratique o que Aprendeu
Este capítulo apresentou os fundamentos práticos de operacionalização de LLMs, incluindo configuração de parâmetros de geração, gerenciamento de context window, otimização de custos e latência, e observabilidade em produção.
Para consolidar seu entendimento através de implementação prática, consulte os Exercícios Práticos do Capítulo 5, que incluem:
- Exploração de Parâmetros de Geração: Experimente empiricamente temperature, top-p e top-k
- Gerenciamento de Context Window: Implemente estratégias de sliding window, summarization e híbrida
- Otimização de Custo e Latência: Construa sistema de caching e model routing
- Observabilidade e Monitoramento: Instrumente aplicação com Prometheus e OpenTelemetry
Os exercícios fornecem código executável e guias passo a passo para aplicação prática dos conceitos apresentados.
Conclusão
Este capítulo apresentou os fundamentos práticos para operacionalizar LLMs em sistemas de produção. Exploramos três pilares críticos: controle de geração, gerenciamento de recursos e observabilidade.
O que você dominou:
Parâmetros de Geração: Compreensão profunda de temperature, top-p, top-k e suas interações. Você agora sabe configurar modelos para diferentes casos de uso: de extração determinística (temperature=0.0) a brainstorming criativo (temperature=1.0+).
Context Window Management: Implementação de estratégias robustas (sliding window, summarization, híbrida) para manter conversações longas dentro dos limites do modelo. Você viu como cada abordagem tem trade-offs específicos entre preservação de informação e eficiência.
Otimização de Custo e Latência: Técnicas concretas que reduzem custos em 40-70%:
- Model routing (cascading com fallback)
- Caching inteligente com Redis
- Batch processing
- Compressão de prompts
- Paralelização com asyncio
- Load balancing entre providers
Observabilidade em Produção: Instrumentação completa usando padrões da indústria:
- Métricas com Prometheus (Prometheus s.d.) (latência P95/P99, throughput, custos)
- Traces distribuídos com OpenTelemetry (OpenTelemetry s.d.)
- Logs estruturados com structlog
- Alertas inteligentes (SLOs, budgets)
Para aprofundamento nos tópicos deste capítulo:
- Sampling Strategies: (Holtzman et al. 2020) para entender nucleus sampling e problemas de degeneração
- Context Window Extensions: (Press, Smith, e Lewis 2022; Dao et al. 2022) sobre técnicas modernas de extensão
- Observability: (Sigelman et al. 2010) paper fundacional do Google sobre tracing distribuído
- Performance Benchmarks: (Artificial Analysis 2024) para comparações atualizadas de modelos
- API Documentation: (OpenAI s.d.a; Anthropic s.d.) documentação oficial atualizada
Ponte para produção: Os exemplos apresentados não são protótipos acadêmicos. São implementações battle-tested que você pode adaptar para seus sistemas. O LLMMetricsCollector rastreia métricas críticas sem overhead significativo. O LLMLoadBalancer fornece alta disponibilidade com fallback automático. A instrumentação OpenTelemetry permite debugging de workflows complexos. As queries PromQL dão visibilidade em tempo real de performance e custos.
Com o domínio de técnicas de prompting e agora a capacidade de operacionalizar LLMs eficientemente em produção, você está há um passo de finalizar a Parte I: Fundamentos e Componentes Base. Você construiu uma base sólida que cobre a arquitetura dos Transformers e foundation models, como LLMs são treinados e representam conhecimento, técnicas de fine-tuning e otimização, prompt engineering e raciocínio com LLMs, e operacionalização prática em produção.
No próximo capítulo, vamos entender como funcionam os embeddings e vetores, a base para recuperação de informação e sistemas RAG (Retrieval-Augmented Generation), que será muito explorado na segunda parte do livro. Com isso, estaremos prontos para iniciar a construção de agentes inteligentes!