AI

Crie um agente de IA que salva dados no banco de dados passo a passo

Aprenda a desenvolver um agente de IA inteligente que mantém o histórico completo da conversa em um banco de dados, rastreia entidades e integra dados da web em tempo real.
40 min de leitura
Build an AI Agent that Saves Data to Database

Neste artigo, você aprenderá:

  • Como criar um agente de IA pronto para produção que persiste conversas em bancos de dados
  • Como implementar a extração inteligente de dados e o rastreamento de entidades
  • Como criar um tratamento robusto de erros com recuperação automática
  • Como aprimorar seu agente com dados da web em tempo real da Bright Data

Vamos começar!

O desafio das conversas de IA sem estado

Os agentes de IA atuais geralmente funcionam como sistemas sem estado. Eles tratam cada conversa como um evento separado. Essa falta de contexto histórico faz com que os usuários repitam informações. Como resultado, isso causa ineficiências operacionais e frustração do usuário. Além disso, as empresas perdem a oportunidade de usar dados de longo prazo para personalização ou melhoria do serviço.

A IA com persistência de dados resolve esse problema registrando todas as interações em um banco de dados estruturado. Ao manter um registro contínuo, esses sistemas podem lembrar o contexto histórico, rastrear entidades específicas ao longo do tempo e usar padrões de interação anteriores para fornecer uma experiência de usuário consistente e personalizada.

O que estamos construindo: sistema de agente de IA conectado a banco de dados

Criaremos um agente de IA pronto para produção que processa mensagens usando LangChain e GPT-4. Ele salva cada conversa no PostgreSQL. Ele extrai entidades e insights em tempo real. Ele mantém um histórico completo de conversas entre as sessões. Ele gerencia erros com sistemas de repetição automática. Ele oferece monitoramento com registro.

O sistema lidará com:

  • Esquema de banco de dados com relações e índices adequados
  • Agente LangChain com ferramentas de banco de dados personalizadas
  • Persistência automática de conversas e extração de entidades
  • Pipeline de processamento em segundo plano para coleta de dados
  • Tratamento de erros com gerenciamento de transações
  • Interface de consulta para recuperar dados históricos
  • Integração RAG com Bright Data para inteligência web

Pré-requisitos

Configure seu ambiente de desenvolvimento com:

  • Python 3.10 ou superior. Necessário para recursos assíncronos modernos e dicas de tipo
  • PostgreSQL 14+ ou SQLite 3.35+. Banco de dados para persistência de dados
  • Chave API OpenAI. Para acesso ao GPT-4. Obtenha-a na Plataforma OpenAI
    Creating an OpenAI Key
  • LangChain. Estrutura para a criação de agentes de IA. Consulte a documentação
  • Ambiente virtual Python. Mantém as dependências isoladas. Consulte a documentaçãovenv

Configuração do ambiente

Crie o diretório do seu projeto e instale as dependências:

mkdir database-agent
cd database-agent
python -m venv venv

# macOS/Linux: source venv/bin/activate
# Windows: venv\Scripts\activate

pip install langchain langchain-openai sqlalchemy psycopg2-binary python-dotenv pydantic

Crie um novo arquivo chamado agent.py e adicione as seguintes importações:

import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from queue import Queue
from threading import Thread

# Importações do SQLAlchemy
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, Float, JSON, ForeignKey, text
from sqlalchemy.orm import sessionmaker, relationship, Session, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError

# Importações LangChain
de langchain.agents import AgentExecutor, create_openai_functions_agent
de langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
de langchain.tools import Tool
de langchain_openai import ChatOpenAI
de langchain.memory import ConversationBufferMemory
de langchain.schema import HumanMessage, AIMessage, SystemMessage

# Importações RAG
from langchain_community.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
import requests

# Configuração do ambiente
from dotenv import load_dotenv
load_dotenv()

# Configurar registro
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)

Crie um arquivo .env com suas credenciais:

# Configuração do banco de dados
DATABASE_URL="postgresql://username:password@localhost:5432/agent_db"
# Ou para SQLite: DATABASE_URL="sqlite:///./agent_data.db"

# Chaves API
OPENAI_API_KEY="your-openai-api-key"

# Opcional: Bright Data (para a Etapa 7)
BRIGHT_DATA_API_KEY="sua-chave-API-bright-data"

# Configurações do aplicativo
AGENT_MODEL="gpt-4-turbo-preview"
CONNECTION_POOL_SIZE=5
MAX_RETRIES=3

Você precisa de:

  • URL do banco de dados: cadeia de conexão para PostgreSQL ou SQLite
  • Chave da API OpenAI: para inteligência do agente via GPT-4
  • Chave da API Bright Data: opcional, para dados da web em tempo real na Etapa 7
    Creating a BrightData API Key

Criando seu agente de IA conectado ao banco de dados

Etapa 1: Projetando o esquema do banco de dados

Crie tabelas para usuários, conversas, mensagens e entidades extraídas. O esquema usa chaves estrangeiras e relações para manter a integridade dos dados.

Base = declarative_base()


class User(Base):
    """Tabela de perfil do usuário - armazena informações e preferências do usuário."""
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    user_id = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(255))
    email = Column(String(255))
    preferences = Column(JSON, default={})
    created_at = Column(DateTime, default=datetime.utcnow)
    last_active = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # Relações
    conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<User(user_id='{self.user_id}', name='{self.name}')>"


class Conversation(Base):
    """Tabela de sessões de conversação - rastreia sessões de conversação individuais."""
    __tablename__ = 'conversations'

    id = Coluna(Integer, primary_key=True)
    conversation_id = Coluna(String(255), unique=True, nullable=False, index=True)
    user_id = Coluna(Integer, ForeignKey('users.id'), nullable=False)
    title = Coluna(String(500))
    summary = Coluna(Text)
    status = Coluna(String(50), padrão='ativo')  # ativo, arquivado, excluído
meta_data = Coluna(JSON, padrão={})
criado_em = Coluna(DateTime, padrão=datetime.utcnow)
atualizado_em = Coluna(DateTime, padrão=datetime.utcnow, onupdate=datetime.utcnow)

    # Relações
    usuário = relationship("User", back_populates="conversations")
    mensagens = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
    entidades = relationship("Entity", back_populates="conversation", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<Conversation(id='{self.conversation_id}', user='{self.user_id}')>"


class Message(Base):
    """Tabela de mensagens individuais - armazena cada mensagem em uma conversa."""
    __tablename__ = 'messages'

    id = Coluna(Inteiro, chave_primária=Verdadeiro)
    conversation_id = Coluna(Inteiro, Chave_estrangeira('conversations.id'), nulo=Falso, índice=Verdadeiro)
    role = Coluna(String(50), nulo=Falso)  # usuário, assistente, sistema
    content = Coluna(Texto, nulo=Falso)
    tokens = Coluna(Inteiro)
    model = Coluna(String(100))
    meta_data = Coluna(JSON, padrão={})
    created_at = Coluna(DataHora, padrão=datetime.utcnow)

    # Relações
    conversation = relação("Conversation", back_populates="messages")

    def __repr__(self):
        return f"<Message(role='{self.role}', conversation='{self.conversation_id}')>"


class Entity(Base):
    """Tabela de entidades extraídas - armazena entidades nomeadas extraídas de conversas."""
    __tablename__ = 'entities'

    id = Coluna(Inteiro, primary_key=True)
    conversation_id = Coluna(Inteiro, ForeignKey('conversations.id'), nullable=False, index=True)
    entity_type = Coluna(String(100), nullable=False, index=True)  # pessoa, organização, localização, etc.
    entity_value = Coluna(String(500), nulo=False)
    context = Coluna(Texto)
    confidence = Coluna(Float, padrão=0.0)
    meta_data = Coluna(JSON, padrão={})
    extracted_at = Coluna(DateTime, padrão=datetime.utcnow)

    # Relações
    conversation = relationship("Conversation", back_populates="entities")

    def __repr__(self):
        return f"<Entity(type='{self.entity_type}', value='{self.entity_value}')>"


class AgentLog(Base):
    """Tabela de registros de operação do agente - armazena registros operacionais para monitoramento."""
    __tablename__ = 'agent_logs'

    id = Coluna(Integer, primary_key=True)
    conversation_id = Coluna(String(255), index=True)
    level = Coluna(String(50), nullable=False)  # INFO, WARNING, ERROR
    operation = Coluna(String(255), nullable=False)
    message = Coluna(Texto, nullable=False)
    error_details = Coluna(JSON)
    execution_time = Coluna(Float)  # em segundos
    created_at = Coluna(DateTime, default=datetime.utcnow)

    def __repr__(self):
        return f"<AgentLog(level='{self.level}', operation='{self.operation}')>"

O esquema define cinco tabelas principais. O usuário armazena perfis com preferências JSON para dados flexíveis. A conversa rastreia sessões com rastreamento de status. A mensagem mantém trocas individuais com indicadores de função para mensagens do usuário versus assistente. A entidade captura informações extraídas com pontuações de confiança. O AgentLog fornece rastreamento de operações para monitoramento. Chaves estrangeiras mantêm a integridade referencial. Índices em campos frequentemente consultados otimizam o desempenho. A configuração cascade="all, delete-orphan" limpa registros relacionados quando os registros pai são excluídos.

Etapa 2: Configurando a camada de conexão do banco de dados

Configure o gerenciador de conexão de banco de dados com o SQLAlchemy. O gerenciador lida com o pool de conexões, verificações de integridade e lógica de repetição automática para garantir a confiabilidade.

class DatabaseManager:
    """
    Gerencia conexões e operações de banco de dados.

    Recursos:
    - Pool de conexões para uso eficiente de recursos
    - Verificações de integridade para verificar a conectividade do banco de dados
    - Criação automática de tabelas
    """

    def __init__(self, database_url: str, pool_size: int = 5, max_retries: int = 3):
        """
        Inicializa o gerenciador de banco de dados.

        Argumentos:
            database_url: String de conexão do banco de dados (por exemplo, 'sqlite:///./agent_data.db')
            pool_size: Número de conexões a serem mantidas no pool
            max_retries: Número máximo de tentativas de repetição para operações com falha
        """
        self.database_url = database_url
        self.max_retries = max_retries

        # Cria o mecanismo com pool de conexões
        self.engine = create_engine(
            database_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=10,
            pool_pre_ping=True,  # Verifica as conexões antes de usar
            echo=False  # Defina como True para depuração SQL
        )

        # Crie uma fábrica de sessões
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False
        )

        logger.info(f"✓ Mecanismo de banco de dados criado com pool de conexões {pool_size}")

    def initialize_database(self):
        """Criar todas as tabelas no banco de dados."""
        try:
            Base.metadata.create_all(bind=self.engine)
            logger.info("✓ Tabelas do banco de dados criadas com sucesso")
        except Exception as e:
            logger.error(f"❌ Falha ao criar tabelas do banco de dados: {e}")
            raise

    def get_session(self) -> Session:
        """Obter uma nova sessão de banco de dados para realizar operações."""
        return self.SessionLocal()

    def health_check(self) -> bool:
        """
        Verificar a conectividade do banco de dados.

        Retorna:
            bool: True se o banco de dados estiver saudável, False caso contrário
        """
        tente:
            com self.engine.connect() como conn:
                conn.execute(text("SELECT 1"))
            logger.info("✓ Verificação de integridade do banco de dados aprovada")
            retorne Verdadeiro
        exceto Exceção como e:
            logger.error(f"❌ Falha na verificação de integridade do banco de dados: {e}")
            retorne Falso

O DatabaseManager estabelece conexões usando o pool de conexões do SQLAlchemy. Definir pool_size=5 mantém cinco conexões persistentes para maior eficiência. A opção pool_pre_ping valida as conexões antes do uso. Isso evita erros de conexão obsoletas. O mecanismo de repetição tenta operações com falha até três vezes com backoff exponencial. Ele lida com problemas de rede transitórios.

Etapa 3: Construindo o núcleo do agente LangChain

Crie o agente de IA usando o LangChain com ferramentas personalizadas que interagem com o banco de dados. O agente usa chamadas de função para salvar informações e recuperar o histórico de conversas.

class DataPersistentAgent:
    """
    Agente de IA com recursos de persistência de banco de dados.

    Este agente:
    - Lembra conversas entre sessões
    - Salva e recupera informações do usuário
    - Extrai e armazena entidades importantes
    - Fornece respostas personalizadas com base no histórico
    """

    def __init__(
        self,
        db_manager: DatabaseManager,
        model_name: str = "gpt-4-turbo-preview",
        temperature: float = 0.7
    ):
        """
        Inicializa o agente persistente de dados.

        Argumentos:
            db_manager: Instância do gerenciador de banco de dados
            model_name: Modelo LLM a ser usado (padrão: gpt-4-turbo-preview)
            temperature: Temperatura do modelo para geração de respostas
        """
        self.db_manager = db_manager
        self.model_name = model_name

        # Inicializar LLM
        self.llm = ChatOpenAI(
            model=model_name,
            temperature=temperature,
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        # Criar ferramentas para o agente
        self.tools = self._create_agent_tools()

        # Criar prompt do agente
        self.prompt = self._create_agent_prompt()

        # Inicializar memória
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )

        # Criar agente
        self.agent = create_openai_functions_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=self.prompt
        )

        # Criar executor do agente
        self.agent_executor = AgentExecutor(
            agent=self.agent,
            tools=self.tools,
            memory=self.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )

        logger.info(f"✓ Agente persistente de dados inicializado com {model_name}")

    def _create_agent_tools(self) -> List[Tool]:
        """Criar ferramentas personalizadas para operações de banco de dados."""

        def salvar_informações_do_usuário(dados_do_usuário: str) -> str:
            """Salvar informações do usuário no banco de dados."""
            tente:
                dados = json.loads(dados_do_usuário)
                sessão = self.db_manager.get_session()

                user = session.query(User).filter_by(user_id=data['user_id']).first()
                if not user:
                    user = User(**data)
                    session.add(user)
                else:
                    for key, value in data.items():
                        setattr(user, key, value)

                sessão.confirmar()
sessão.fechar()

retornar f"✓ Informações do usuário salvas com sucesso"
exceto Exceção como e:
logger.error(f"Falha ao salvar informações do usuário: {e}")
retornar f"❌ Erro ao salvar informações do usuário: {str(e)}"

        def recuperar_histórico_do_usuário(id_do_usuário: str) -> str:
            """Recupera o histórico de conversas do usuário."""
            tente:
                sessão = self.db_manager.get_session()

                usuário = sessão.query(User).filter_by(user_id=id_do_usuário).first()
                se não houver usuário:
                    retorne "Nenhum usuário encontrado"

                conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(5).all()

                history = []
                for conv in conversations:
                    messages = session.query(Message).filter_by(conversation_id=conv.id).all()
                    histórico.append({
                        'id_da_conversa': conv.conversation_id,
                        'criado_em': conv.created_at.isoformat(),
                        'contagem_de_mensagens': len(mensagens),
                        'resumo': conv.summary
                    })

                sessão.close()
                retornar json.dumps(histórico, indent=2)
            exceto Exception como e:
                logger.error(f"Falha ao recuperar o histórico: {e}")
                retornar f"❌ Erro ao recuperar o histórico: {str(e)}"

        def extrair_entidades(texto: str) -> str:
            """Extrair entidades do texto e salvar no banco de dados."""

            tentar:
                entidades = []
                # Extração simples de palavras-chave (substitua por NER adequado)
                palavras-chave = ['importante', 'chave', 'crítico']
                para palavra-chave em palavras-chave:
                    se palavra-chave em texto.lower():
                        entidades.append({
                            'entity_type': 'palavra-chave',
                            'entity_value': palavra-chave,
                            'confidence': 0.8
                        })

                retornar json.dumps(entidades, indent=2)
            exceto Exception como e:
                logger.error(f"Falha ao extrair entidades: {e}")
                retornar f"❌ Erro ao extrair entidades: {str(e)}"

        tools = [
            Tool(
                name="SaveUserInfo",
                func=save_user_info,
                description="Salvar informações do usuário no banco de dados. A entrada deve ser uma string JSON com detalhes do usuário."
            ),
            Tool(
                name="RetrieveUserHistory",
                func=retrieve_user_history,
                description="Recupera o histórico de conversas de um usuário do banco de dados. A entrada deve ser o user_id."
            ),
            Tool(
                name="ExtractEntities",
                func=extract_entities,
                description="Extrair entidades importantes do texto e salvar no banco de dados. A entrada deve ser o texto a ser analisado."
            )
        ]

        retornar ferramentas

    def _create_agent_prompt(self) -> ChatPromptTemplate:
        """Criar modelo de prompt do agente."""

        system_message = """Você é um assistente de IA útil, com a capacidade de lembrar e aprender com as conversas.

Você tem acesso às seguintes ferramentas:
- SaveUserInfo: Salvar informações do usuário para lembrar em conversas futuras
- RetrieveUserHistory: Pesquisar conversas anteriores com um usuário
- ExtractEntities: Extrair e salvar informações importantes das conversas

Use essas ferramentas para fornecer respostas personalizadas e contextuais. Sempre verifique se você tem conversas anteriores com um usuário antes de responder.

Seja proativo em salvar informações importantes para conversas futuras."""

        prompt = ChatPromptTemplate.from_messages([
            ("system", system_message),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])

        return prompt

    def chat(self, user_id: str, message: str, conversation_id: Optional[str] = None) -> Dict[str, Any]:
        """
        Processe uma mensagem de chat e persista no banco de dados.

        Este método lida com:
1. Criação ou recuperação de conversas
2. Salvamento de mensagens do usuário no banco de dados
3. Geração de respostas do agente
4. Salvamento das respostas do agente no banco de dados
5. Registro de operações para monitoramento

Argumentos:
user_id: Identificador exclusivo do usuário
message: Texto da mensagem do usuário
conversation_id: ID opcional da conversa para continuar a conversa existente

        Retorna:
dict: Contém conversation_id, response e execution_time
"""
start_time = datetime.utcnow()

try:
# Obter ou criar conversa
session = self.db_manager.get_session()

            if conversation_id:
                conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            else:
                # Cria uma nova conversa
                user = session.query(User).filter_by(user_id=user_id).first()
                if not user:
                    usuário = Usuário(user_id=user_id, nome=user_id)
                    sessão.adicionar(usuário)
                    sessão.confirmar()

                conversa = Conversa(
                    conversation_id=f"conv_{user_id}_{datetime.utcnow().timestamp()}",
                    user_id=user.id,
                    title=message[:100]
                )
sessão.adicionar(conversa)
sessão.confirmar()

# Salvar mensagem do usuário
mensagem_do_usuário = Mensagem(
conversa_id=conversa.id,
função="usuário",
conteúdo=mensagem,
modelo=self.nome_do_modelo)

sessão.adicionar(mensagem_do_usuário)
sessão.confirmar()

            # Obter resposta do agente
            response = self.agent_executor.invoke({
                "input": f"[ID do usuário: {user_id}] {message}"
            })

            # Salvar mensagem do assistente
            assistant_message = Message(
                conversation_id=conversation.id,
                role="assistant",
                content=response['output'],
                model=self.model_name
            )
            session.add(assistant_message)
            session.commit()

            # Registrar operação
            execution_time = (datetime.utcnow() - start_time).total_seconds()
            log_entry = AgentLog(
                conversation_id=conversation.conversation_id,
                level="INFO",
                operation="chat",
                message="Chat processado com sucesso",
                execution_time=execution_time
            )
            sessão.adicionar(entrada_de_registro)
sessão.confirmar()

# Extrair conversation_id antes de encerrar a sessão
resultado_conversation_id = conversation.conversation_id

sessão.encerrar()

logger.info(f"✓ Chat processado para o usuário {user_id} em {execution_time:.2f}s")

            return {
                'conversation_id': conversation_id_result,
                'response': response['output'],
                'execution_time': execution_time
            }

        except Exception as e:
            logger.error(f"❌ Erro ao processar o chat: {e}")

            # Registrar erro
sessão = self.db_manager.get_session()
error_log = AgentLog(
conversation_id=conversation_id ou "desconhecido",
level="ERROR",
operation="chat",
message=str(e),
error_details={'exception_type': type(e).__name__}
            )
            sessão.add(error_log)
            sessão.commit()
            sessão.close()

            raise

O DataPersistentAgent envolve a função de chamada do agente do LangChain com ferramentas de banco de dados. A ferramenta SaveUserInfo mantém os dados do usuário criando ou atualizando registros de usuário. A ferramenta RetrieveHistory consulta conversas anteriores para fornecer contexto. O prompt do sistema instrui o agente a ser proativo quanto ao salvamento de informações e à verificação do histórico. O ConversationBufferMemory mantém o contexto de curto prazo dentro das sessões. O armazenamento em banco de dados fornece persistência de longo prazo entre as sessões.

Data persistent AI agent output

Etapa 3.5: Criação do módulo de coleta de dados

Crie ferramentas para extrair e estruturar dados das conversas. O coletor gera resumos, extrai preferências e identifica entidades usando o LLM.

class DataCollector:
    """
    Coleta e estrutura dados de conversas do agente.

    Este módulo:
    - Gera resumos de conversas
    - Extrai preferências do usuário do histórico de conversas
    - Identifica e salva entidades nomeadas
    """

    def __init__(self, db_manager: DatabaseManager, llm: ChatOpenAI):
        """
        Inicializa o coletor de dados.

        Argumentos:
            db_manager: Instância do gerenciador de banco de dados
            llm: Modelo de linguagem para análise de texto
        """
        self.db_manager = db_manager
        self.llm = llm
        logger.info("✓ Coletor de dados inicializado")

    def extrair_resumo_da_conversa(self, id_da_conversa: str) -> str:
        """
        Gera e salva o resumo da conversa usando LLM.

        Argumentos:
            id_da_conversa: ID da conversa a ser resumida

        Retorna:
            str: Texto do resumo gerado
        """
        tente:
            sessão = self.db_manager.get_session()

            conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            if not conversation:
                return "Conversation not found"

            messages = session.query(Message).filter_by(conversation_id=conversation.id).all()

            # Construir texto da conversa
            conv_text = "n".join([
                f"{msg.role}: {msg.content}" for msg in messages
            ])

            # Gerar resumo usando LLM
            summary_prompt = f"""Resuma a seguinte conversa em 2-3 frases, capturando os principais tópicos e resultados:

{conv_text}

Resumo:"""

            summary_response = self.llm.invoke([HumanMessage(content=summary_prompt)])
            summary = summary_response.content

            # Atualizar conversa com resumo
            conversation.summary = summary
            session.commit()
            session.close()

            logger.info(f"✓ Resumo gerado para a conversa {conversation_id}")
            return summary

        except Exception as e:
            logger.error(f"Falha ao gerar o resumo: {e}")
            return ""

    def extrair_preferências_do_usuário(self, user_id: str) -> Dict[str, Any]:
        """
        Extrair e salvar as preferências do usuário do histórico de conversas.

        Argumentos:
            user_id: ID do usuário a ser analisado

        Retorna:
            dict: Preferências extraídas
        """
        tente:
            sessão = self.db_manager.get_session()

            usuário = sessão.query(User).filter_by(user_id=user_id).first()
            if not user:
                return {}

            # Obter conversas recentes
            conversations = session.query(Conversation).filter_by(user_id=user.id).order_by(Conversation.created_at.desc()).limit(10).all()

            all_messages = []
            para conv em conversas:
                mensagens = sessão.query(Mensagem).filter_by(conversation_id=conv.id).all()
                todas_as_mensagens.extend([msg.content para msg em mensagens se msg.role == "usuário"])

            se não todas_as_mensagens:
                retornar {}

            # Analisar preferências usando LLM
            analysis_prompt = f"""Analise as seguintes mensagens de um usuário e extraia suas preferências, interesses e estilo de comunicação.

Mensagens:
{chr(10).join(all_messages[:20])}

Retorne um objeto JSON com a seguinte estrutura:
{{
    "interests": ["interest1", "interest2"],
    "communication_style": "description",
    "preferred_topics": ["topic1", "topic2"],
    "language_preference": "language"
}}"""

            response = self.llm.invoke([HumanMessage(content=analysis_prompt)])

            tente:
                # Extrair JSON da resposta
                content = response.content
                if '```json' in content:
                    content = content.split('```json')[1].split('```')[0].strip()
                elif '```' in content:
                    content = content.split('```')[1].split('```')[0].strip()

                preferences = json.loads(conteúdo)

                # Atualizar preferências do usuário
                usuário.preferências = preferências
                sessão.confirmar()

                logger.info(f"✓ Preferências extraídas para o usuário {user_id}")
                retornar preferências

            exceto json.JSONDecodeError:
                logger.warning("Falha ao analisar JSON das preferências")
                retornar {}
            finalmente:
                session.close()

        exceto Exception como e:
            logger.error(f"Falha ao extrair preferências: {e}")
            retornar {}

    def extrair_entidades_com_llm(self, conversation_id: str) -> Lista[Dict[str, Any]]:
        """
        Extrair entidades nomeadas usando LLM.

        Argumentos:
            conversation_id: ID da conversa a ser analisada

        Retorna:
            list: Lista de entidades extraídas
        """
        tente:
            session = self.db_manager.get_session()

            conversation = session.query(Conversation).filter_by(conversation_id=conversation_id).first()
            se não houver conversation:
                retorne []

            messages = session.query(Message).filter_by(conversation_id=conversation.id).all()
            conv_text = "n".join([msg.content for msg in messages])

            # Extrair entidades usando LLM
            entity_prompt = f"""Extraia entidades nomeadas da seguinte conversa. Identifique:
- Pessoas (PESSOA)
- Organizações (ORG)
- Locais (LOC)
- Datas (DATE)
- Produtos (PRODUCT)
- Tecnologias (TECH)

Conversa:
{conv_text}

Retorne uma matriz JSON de entidades com o formato:
[
    {{"type": "PERSON", "value": "John Doe", "context": "mencionado como líder da equipe"}},
    {{"type": "ORG", "value": "Acme Corp", "context": "empresa cliente"}}
]"""

            response = self.llm.invoke([HumanMessage(content=entity_prompt)])

            try:
                content = response.content
                if '```json' in content:
                    content = content.split('```json')[1].split('```')[0].strip()
                elif '```' in content:
                    content = content.split('```')[1].split('```')[0].strip()

entities_data = json.loads(content)

# Salvar entidades no banco de dados
saved_entities = []
                para entity_data em entities_data:
entidade = Entidade(
conversation_id=conversation.id,
entity_type=entity_data['type'],
entity_value=entity_data['value'],
context=entity_data.get('context', ''),
confidence=0.9  # A extração LLM tem alta confiança
                    )
sessão.adicionar(entidade)
entidades_salvas.acrescentar(entidade_dados)

sessão.confirmar()
sessão.fechar()

logger.info(f"✓ Extraiu {len(entidades_salvas)} entidades da conversa {conversation_id}")
retornar entidades_salvas

            exceto json.JSONDecodeError:
                logger.warning("Falha no Parsing de JSON das entidades")
                retornar []

        exceto Exception como e:
            logger.error(f"Falha ao extrair entidades: {e}")
            retornar []

O DataCollector usa o LLM para analisar conversas. O método extract_conversation_summary cria resumos concisos das conversas. O método extract_user_preferences analisa padrões de mensagens para identificar os interesses e estilos de comunicação dos usuários. O método extract_entities_with_llm usa prompts estruturados para extrair entidades nomeadas, como pessoas, organizações e tecnologias. Todos os dados extraídos são salvos no banco de dados para referência futura.

Etapa 4: Construindo o pipeline de processamento de dados inteligente

Implemente o processamento em segundo plano para lidar com a coleta de dados sem bloquear o agente. O pipeline usa threads de trabalho e filas para processar resumos e entidades.

class DataProcessingPipeline:
    """
    Pipeline de processamento de dados assíncrono.

    Este pipeline:
    - Processa conversas em segundo plano
    - Gera resumos
    - Extrai entidades sem bloquear o fluxo principal
    - Atualiza as preferências do usuário periodicamente
    """

    def __init__(self, db_manager: DatabaseManager, collector: DataCollector, batch_size: int = 10):
        """
        Inicializa o pipeline de processamento.

        Argumentos:
            db_manager: Instância do gerenciador de banco de dados
            collector: Coletor de dados para operações de processamento
            batch_size: Número de itens a serem processados em cada lote
        """
        self.db_manager = db_manager
        self.collector = collector
        self.batch_size = batch_size

        # Filas de processamento
        self.summary_queue = Queue()
        self.entity_queue = Queue()
        self.preference_queue = Queue()

        # Threads de trabalho
        self.workers = []
        self.running = False

        logger.info("✓ Pipeline de processamento de dados inicializado")

    def start(self):
        """Iniciar trabalhadores de processamento em segundo plano."""
        self.running = True

        # Criar threads de trabalho
        summary_worker = Thread(target=self._process_summaries, daemon=True)
        entity_worker = Thread(target=self._process_entities, daemon=True)
        preference_worker = Thread(target=self._process_preferences, daemon=True)

        summary_worker.start()
        entity_worker.start()
        preference_worker.start()

        self.workers = [summary_worker, entity_worker, preference_worker]

        logger.info("✓ Iniciados 3 trabalhadores de processamento em segundo plano")

    def stop(self):
        """Interromper trabalhadores de processamento em segundo plano."""
        self.running = False
        for worker in self.workers:
            worker.join(timeout=5)
        logger.info("✓ Interrompidos trabalhadores de processamento em segundo plano")

    def queue_conversation_for_processing(self, conversation_id: str, user_id: str):
        """
        Adicionar conversa às filas de processamento.

        Argumentos:
            conversation_id: ID da conversa a ser processada
            user_id: ID do usuário para extração de preferências
        """
        self.summary_queue.put(conversation_id)
        self.entity_queue.put(conversation_id)
        self.preference_queue.put(user_id)

        logger.info(f"✓ Conversa {conversation_id} na fila para processamento")

    def _process_summaries(self):
        """Trabalhador para processamento de resumos de conversas."""
        enquanto self.running:
tente:
se não self.summary_queue.empty():
conversation_id = self.summary_queue.get()
self.collector.extract_conversation_summary(conversation_id)
self.summary_queue.task_done()
caso contrário:
time.sleep(1)
            exceto Exception como e:
                logger.error(f"Erro no trabalhador de resumo: {e}")

    def _process_entities(self):
        """Trabalhador para processamento de extração de entidades."""
        enquanto self.running:
            tente:
                se não self.entity_queue.empty():
                    conversation_id = self.entity_queue.get()
                    self.collector.extract_entities_with_llm(conversation_id)
                    self.entity_queue.task_done()
                else:
                    time.sleep(1)
            except Exception as e:
                logger.error(f"Erro no trabalhador de entidade: {e}")

    def _process_preferences(self):
        """Trabalhador para processar as preferências do usuário."""
        enquanto self.running:
            tente:
                se não self.preference_queue.empty():
                    user_id = self.preference_queue.get()
                    self.collector.extract_user_preferences(user_id)
                    self.preference_queue.task_done()
                else:
                    time.sleep(1)
            except Exception as e:
                logger.error(f"Erro no trabalhador de preferências: {e}")

    def get_queue_status(self) -> Dict[str, int]:
        """
        Obter os tamanhos atuais das filas.

        Retorna:
            dict: Tamanhos das filas para cada tipo de processamento
        """
        return {
            'summary_queue': self.summary_queue.qsize(),
            'entity_queue': self.entity_queue.qsize(),
            'preference_queue': self.preference_queue.qsize()
        }

O ProcessingPipeline separa a coleta de dados do processamento de mensagens. Quando uma conversa é concluída, ela é adicionada às filas em vez de ser processada imediatamente. Threads de trabalho separadas extraem dessas filas e processam itens em segundo plano. Isso evita que a coleta de dados bloqueie as respostas do agente. A configuração daemon=True garante que os trabalhadores sejam encerrados quando o programa principal for fechado. O monitoramento do status da fila ajuda a rastrear atrasos no processamento.

Data processing pipeline Agent

Etapa 5: adicionar monitoramento e registro em tempo real

Crie um sistema de monitoramento para rastrear o desempenho do agente, detectar erros e gerar relatórios. O monitor analisa os registros para fornecer insights operacionais.

class AgentMonitor:
    """
    Monitoramento em tempo real e coleta de métricas.

    Este módulo:
    - Rastreia métricas de desempenho
    - Monitora a integridade do sistema
    - Gera relatórios analíticos
    """

    def __init__(self, db_manager: DatabaseManager):
        """
        Inicializa o monitor do agente.

        Args:
            db_manager: Instância do gerenciador de banco de dados
        """
        self.db_manager = db_manager
        logger.info("✓ Monitor do agente inicializado")

    def get_performance_metrics(self, hours: int = 24) -> Dict[str, Any]:
        """
        Obtém métricas de desempenho para o período de tempo especificado.

        Argumentos:
            hours: Número de horas a serem analisadas

        Retorna:
            dict: Métricas de desempenho, incluindo contagens de operações e taxas de erro
        """
        tente:
            session = self.db_manager.get_session()

            cutoff_time = datetime.utcnow() - timedelta(hours=hours)

            # Consultar logs
            logs = session.query(AgentLog).filter(
                AgentLog.created_at >= cutoff_time
            ).all()

            # Calcular métricas
total_operations = len(logs)
error_count = len([log for log in logs if log.level == "ERROR"])
avg_execution_time = sum([log.execution_time or 0 for log in logs]) / max(total_operations, 1)

            # Obter contagens de conversas
            conversations = session.query(Conversation).filter(
                Conversation.created_at >= cutoff_time
            ).count()

            messages = session.query(Message).join(Conversation).filter(
                Message.created_at >= cutoff_time
            ).count()

            session.close()

            métricas = {
                'período_horas': horas,
                'total_operações': total_operações,
                'contagem_erros': contagem_erros,
                'taxa_erros': contagem_erros / max(total_operações, 1),
                'avg_execution_time': avg_execution_time,
                'conversations_created': conversations,
                'messages_processed': messages
            }

            logger.info(f"✓ Métricas de desempenho geradas para as últimas {hours} horas")
            return metrics

        except Exception as e:
            logger.error(f"Falha ao obter métricas de desempenho: {e}")
            return {}

    def health_check(self) -> Dict[str, Any]:
        """
        Executar verificação de integridade.

        Retorna:
            dict: Status de integridade, incluindo conectividade do banco de dados e taxas de erro
        """
        try:
            # Verificar conectividade do banco de dados
            db_healthy = self.db_manager.health_check()

            # Verificar taxa de erros recentes
            metrics = self.get_performance_metrics(hours=1)
            recent_errors = metrics.get('error_count', 0)

            # Determinar integridade geral
            is_healthy = db_healthy and recent_errors < 10

            health_status = {
                'status': 'healthy' se is_healthy, caso contrário 'degraded',
                'database_connected': db_healthy,
                'recent_errors': recent_errors,
                'timestamp': datetime.utcnow().isoformat()
            }

            logger.info(f"✓ Verificação de integridade: {health_status['status']}")
            retornar health_status

        exceto Exception como e:
            logger.error(f"Falha na verificação de integridade: {e}")
            retornar {
                'status': 'unhealthy',
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            }

O AgentMonitor fornece observabilidade nas operações do sistema. Ele rastreia métricas como operações totais, taxas de erro e tempos médios de execução, consultando a tabela AgentLog. O método get_metrics calcula estatísticas em janelas de tempo configuráveis. O método get_error_report recupera informações detalhadas sobre erros para depuração. Esse monitoramento permite a detecção proativa de problemas. Altas taxas de erro acionam uma investigação antes que os usuários sejam afetados.

Etapa 6: Criação da interface de consulta

Crie recursos de consulta para recuperar e analisar dados armazenados. A interface fornece métodos para pesquisar conversas, rastrear entidades e gerar análises.

class DataQueryInterface:
    """
    Interface para consultar dados armazenados do agente.

    Este módulo fornece métodos para:
    - Consultar análises do usuário
    - Recuperar histórico de conversas
    - Pesquisar informações específicas
    """

    def __init__(self, db_manager: DatabaseManager):
        """
        Inicializar a interface de consulta.

        Args:
            db_manager: Instância do gerenciador de banco de dados
        """
        self.db_manager = db_manager
        logger.info("✓ Interface de consulta inicializada")

    def get_user_analytics(self, user_id: str) -> Dict[str, Any]:
        """
        Obter análises para um usuário específico.

        Argumentos:
            user_id: ID do usuário a ser analisado

        Retorna:
            dict: Análises do usuário, incluindo contagens de conversas e preferências
        """
        tente:
            session = self.db_manager.get_session()

            user = session.query(User).filter_by(user_id=user_id).first()
            se não houver usuário:
                retornar {}

            # Obter contagem de conversas
            conversation_count = session.query(Conversation).filter_by(user_id=user.id).count()

            # Obter contagem de mensagens
            message_count = session.query(Message).join(Conversation).filter(
                Conversation.user_id == user.id
            ).count()

            # Obter contagem de entidades
            entity_count = session.query(Entity).join(Conversation).filter(
                Conversation.user_id == user.id
            ).count()

            # Obter intervalo de tempo
            first_conversation = session.query(Conversation).filter_by(
                user_id=user.id
            ).order_by(Conversation.created_at).first()

            last_conversation = session.query(Conversation).filter_by(
                user_id=user.id
            ).order_by(Conversation.created_at.desc()).first()

            session.close()

            analytics = {
                'user_id': user_id,
                'name': user.name,
                'conversation_count': conversation_count,
                'message_count': message_count,
                'entity_count': entity_count,
                'preferences': user.preferences,
                'first_interaction': first_conversation.created_at.isoformat() se for a primeira conversa, caso contrário, None,
                'last_interaction': last_conversation.created_at.isoformat() se for a última conversa, caso contrário, None,
                'avg_messages_per_conversation': message_count / max(conversation_count, 1)
            }

            logger.info(f"✓ Análises geradas para o usuário {user_id}")
            return analytics

        except Exception as e:
            logger.error(f"Falha ao obter as análises do usuário: {e}")
            return {}

O QueryInterface fornece métodos para acessar dados armazenados. O método get_user_conversations recupera o histórico completo da conversa com inclusão opcional de mensagens. O método search_conversations realiza uma pesquisa de texto completo no conteúdo da mensagem usando o operador ILIKE do SQL. O método get_entity_mentions encontra todas as conversas em que entidades específicas foram mencionadas. O método get_user_analytics gera estatísticas sobre a atividade do usuário. Essas consultas permitem a criação de painéis, a geração de relatórios e a criação de experiências personalizadas.

Etapa 7: Criação de um RAG com dados da web em tempo real da Bright Data

Aprimore seu agente conectado ao banco de dados com recursos RAG da inteligência da web em tempo real da Bright Data. Essa integração combina seu histórico de conversas com dados da web atualizados para respostas melhores.

class BrightDataRAGEnhancer:
    """
    Aprimore o agente persistente de dados com a inteligência da web da Bright Data.

    Este módulo:
    - Busca dados da web em tempo real da Bright Data
    - Ingere dados da web no armazenamento vetorial para RAG
    - Aprimora o agente com conhecimento aumentado pela web
    """

    def __init__(self, api_key: str, db_manager: DatabaseManager):
        """
        Inicializa o aprimorador RAG com a Bright Data.

        Args:
            api_key: chave API da Bright Data
            db_manager: instância do gerenciador de banco de dados
        """
        self.api_key = api_key
        self.db_manager = db_manager
        self.base_url = "https://api.brightdata.com"

        # Inicializar armazenamento de vetores para RAG
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = Chroma(
            embedding_function=self.embeddings,
            persist_directory="./chroma_db"
        )

        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )

        logger.info("✓ Bright Data RAG enhancer inicializado")

    def fetch_dataset_data(
        self,
        dataset_id: str,
        filters: Optional[Dict[str, Any]] = None,
        limit: int = 1000
    ) -> List[Dict[str, Any]]:
        """
        Busca dados do Bright Data Dataset Marketplace.

        Argumentos:
            dataset_id: ID do conjunto de dados a ser buscado
filtros: Filtros opcionais para os dados
limite: Número máximo de registros a serem buscados

Retorna:
lista: Registros do conjunto de dados recuperados
"""
headers = {
"Autorização": f"Bearer {self.api_key}",
"Tipo de conteúdo": "application/json"
        }

        endpoint = f"{self.base_url}/Conjuntos de datos/v3/snapshot/{dataset_id}"

        params = {
            "format": "json",
            "limit": limit
        }

        if filters:
            params["filter"] = json.dumps(filters)

        tente:
            resposta = requests.get(endpoint, headers=headers, params=params)
            resposta.raise_for_status()

            dados = resposta.json()
            logger.info(f"✓ Recuperados {len(dados)} registros do conjunto de dados Bright Data {dataset_id}")
            retorne dados

        exceto Exception como e:
            logger.error(f"Falha ao buscar o conjunto de dados Bright Data: {e}")
            return []

    def ingest_web_data_to_rag(
        self,
        dataset_records: List[Dict[str, Any]],
        text_fields: List[str],
        metadata_fields: Optional[List[str]] = None
    ) -> int:
        """
        Ingestão de dados da web no armazenamento vetorial RAG.

        Argumentos:
            dataset_records: Registros do Bright Data
            text_fields: Campos a serem usados como conteúdo de texto
            metadata_fields: Campos a serem incluídos nos metadados

        Retorna:
            int: Número de fragmentos de documentos inseridos
        """
        tente:
            documentos = []

            para registro em registros_do_conjunto_de_dados:
                # Combinar campos de texto
                conteúdo_de_texto = " ".join([
                    str(registro.get(campo, ""))
                    para campo em campos_de_texto
                    se registro.get(campo)
                ])

                if not text_content.strip():
                    continue

                # Criar metadados
                metadata = {
                    "source": "bright_data",
                    "record_id": record.get("id", "unknown"),
                    "timestamp": datetime.utcnow().isoformat()
                }

                if metadata_fields:
                    for field in metadata_fields:
                        if field in record:
                            metadata[field] = record[field]

                # Dividir o texto em partes
                chunks = self.text_splitter.split_text(text_content)

                para chunk em chunks:
                    documentos.append({
                        "conteúdo": chunk,
                        "metadados": metadata
                    })

            # Adicionar ao armazenamento vetorial
            se documentos:
                textos = [doc["conteúdo"] para doc em documentos]
                metadados = [doc["metadados"] para doc em documentos]

                self.vector_store.add_texts(
                    texts=texts,
                    metadatas=metadatas
                )

                logger.info(f"✓ Ingestão de {len(documents)} partes de documentos no RAG")

            return len(documents)

        exceto Exception como e:
            logger.error(f"Falha ao inserir dados da web no RAG: {e}")
            retornar 0

    def create_rag_enhanced_agent(
        self,
        base_agent: DataPersistentAgent
    ) -> DataPersistentAgent:
        """
        Aprimorar o agente existente com recursos RAG.

        Argumentos:
base_agent: Agente base a ser aprimorado

Retorna:
DataPersistentAgent: Agente aprimorado com a ferramenta RAG
"""
def rag_search(query: str) -> str:
"""Pesquisar tanto o histórico de conversas quanto os dados da web."""
            tente:
# Recuperar do histórico de conversas
sessão = self.db_manager.get_session()

mensagens = sessão.query(Mensagem).filter(
Mensagem.content.ilike(f'%{query}%')
).order_by(Mensagem.created_at.desc()).limit(5).all()

                resultados = []
                para msg em mensagens:
                    resultados.append({
                        'conteúdo': msg.content,
                        'fonte': 'histórico_de_conversas',
                        'relevância': 0.8
                    })

                sessão.close()

                # Recuperar do armazenamento vetorial (dados da web)
                tente:
                    resultados_vetoriais = self.vector_store.similarity_search_with_score(query, k=5)

para doc, pontuação em resultados_vetoriais:
resultados.append({
'conteúdo': doc.page_content,
'fonte': 'web_data',
'relevância': 1 - pontuação
})
                exceto Exception como e:
                    logger.error(f"Falha ao recuperar do armazenamento vetorial: {e}")

                se não houver resultados:
                    retorne "Nenhuma informação relevante encontrada."

                # Formatar contexto
                context_text = "nn".join([
                    f"[{item['source']}] {item['content'][:200]}..."
                    para item em results[:5]
                ])

                retornar f"Contexto recuperado:n{context_text}"

            exceto Exception como e:
                logger.error(f"Falha na pesquisa RAG: {e}")
                return f"Erro ao realizar a pesquisa: {str(e)}"

        # Adicionar ferramenta RAG ao agente
        rag_tool = Tool(
            name="SearchKnowledgeBase",
            func=rag_search,
            description="Pesquisar informações relevantes no histórico de conversas e nos dados da web em tempo real. A entrada deve ser uma consulta de pesquisa."
        )

        base_agent.tools.append(rag_tool)

        # Recrie o agente com novas ferramentas
        base_agent.agent = create_openai_functions_agent(
            llm=base_agent.llm,
            tools=base_agent.tools,
            prompt=base_agent.prompt
        )

        base_agent.agent_executor = AgentExecutor(
            agent=base_agent.agent,
            tools=base_agent.tools,
            memory=base_agent.memory,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=5
        )

        logger.info("✓ Agente aprimorado com recursos RAG")
return base_agent

O BrightDataEnhancer integra dados da web em tempo real ao seu agente. O método fetch_dataset recupera dados estruturados do marketplace da Bright Data. O método ingest_to_rag processa e divide esses dados. Ele os armazena em um banco de dados vetorial Chroma para pesquisa semântica. O método retrieve_context realiza uma recuperação híbrida. Ele combina o histórico do banco de dados com a pesquisa de similaridade vetorial. O método create_rag_tool empacota essa funcionalidade como uma ferramenta LangChain que o agente usa. O método enhance_agent adiciona esse recurso RAG ao seu agente existente. Ele permite que o agente responda a perguntas usando tanto o histórico de conversas internas quanto dados externos recentes.

Executando seu sistema completo de agente com persistência de dados

Reúna todos os componentes para criar um sistema funcional.

def main():
    """Fluxo de execução principal demonstrando todos os componentes trabalhando juntos."""

    print("=" * 60)
    print("Sistema de agente de IA com persistência de dados - Inicialização")
    print("=" * 60)

    # Etapa 1: Inicializar banco de dados
    print("n[Etapa 1] Configurando conexão com banco de dados...")
    db_manager = DatabaseManager(
        database_url=os.getenv("DATABASE_URL"),
        pool_size=5,
        max_retries=3
    )
    db_manager.initialize_database()

    # Etapa 2: Inicializar o agente principal
    print("n[Etapa 2] Construindo o núcleo do agente de IA...")
    agent = DataPersistentAgent(
        db_manager=db_manager,
        model_name=os.getenv("AGENT_MODEL", "gpt-4-turbo-preview")
    )

    # Passo 3: Inicializar coletor de dados
    print("n[Passo 3] Criando módulo de coleta de dados...")
    coletor = DataCollector(db_manager, agente.llm)

    # Etapa 4: Inicializar pipeline de processamento
    print("n[Etapa 4] Implementando pipeline de processamento de dados...")
    pipeline = DataProcessingPipeline(db_manager, coletor)
    pipeline.start()

    # Passo 5: Inicializar o monitoramento
    print("n[Passo 5] Adicionando monitoramento e registro...")
    monitor = AgentMonitor(db_manager)

    # Passo 6: Inicializar a interface de consulta
    print("n[Passo 6] Construindo a interface de consulta...")
    query_interface = DataQueryInterface(db_manager)

    # Etapa 7: Aprimoramento opcional do Bright Data RAG
    print("n[Etapa 7] Aprimoramento do RAG (opcional)...")
    bright_data_key = os.getenv("BRIGHT_DATA_API_KEY")
    if bright_data_key and bright_data_key != "your-bright-data-api-key":
        print("Buscando dados da web em tempo real do Bright Data...")
        enhancer = BrightDataRAGEnhancer(bright_data_key, db_manager)

        # Exemplo: Buscar e ingestão de dados da web
        web_data = enhancer.fetch_dataset_data(
            dataset_id="example_dataset_id",
            limit=100
        )

        if web_data:
            enhancer.ingest_web_data_to_rag(
                dataset_records=web_data,
                text_fields=["title", "content", "description"],
                metadata_fields=["url", "published_date"]
            )

        # Aprimorar o agente com RAG
        agent = enhancer.create_rag_enhanced_agent(agent)
        print("✓ Agente aprimorado com recursos RAG da Bright Data")
    else:
        print("⚠️ Chave API da Bright Data não encontrada - ignorando integração de dados da web")

    print("n" + "=" * 60)
    imprimir("Conversas de demonstração")
    imprimir("=" * 60)

    # Interações de usuário de demonstração
    usuário_teste = "demo_user_001"

    # Primeira conversa
    imprimir("n📝 Conversa 1:")
    resposta1 = agente.chat(
        user_id=usuário_teste,
        mensagem="Olá! Estou interessado em aprender sobre aprendizado de máquina.")

print(f"Agente: {response1['response']}n")

# Fila para processamento
pipeline.queue_conversation_for_processing(
response1['conversation_id'],
test_user
)

    # Segunda conversa
    print("📝 Conversa 2:")
    response2 = agent.chat(
        user_id=test_user,
        message="Ajude-me a entender redes neurais?",
        conversation_id=response1['conversation_id']
    )
    print(f"Agente: {response2['response']}n")

    # Aguardar processamento em segundo plano
    print("⏳ Processando dados em segundo plano...")
    time.sleep(5)

    print("n" + "=" * 60)
    print("Análise e monitoramento")
    print("=" * 60)

    # Obter métricas de desempenho
    metrics = monitor.get_performance_metrics(hours=1)
    print(f"n📊 Métricas de desempenho:")
    print(f"  - Total de operações: {metrics.get('total_operations', 0)}")
    print(f"  - Taxa de erro: {metrics.get('error_rate', 0):.2%}")
    print(f"  - Tempo médio de execução: {metrics.get('avg_execution_time', 0):.2f}s")
    print(f"  - Conversas criadas: {metrics.get('conversations_created', 0)}")
    imprimir(f"  - Mensagens processadas: {metrics.get('messages_processed', 0)}")

    # Obter análises do usuário
    analytics = query_interface.get_user_analytics(test_user)
    imprimir(f"n👤 Análises do usuário:")
    imprimir(f"  - Contagem de conversas: {analytics.get('conversation_count', 0)}")
    imprimir(f"  - Contagem de mensagens: {analytics.get('message_count', 0)}")
    imprimir(f"  - Contagem de entidades: {analytics.get('entity_count', 0)}")
    imprimir(f"  - Média de mensagens/conversa: {analytics.get('avg_messages_per_conversation', 0):.1f}")

    # Verificação de integridade
    integridade = monitor.health_check()
    imprimir(f"n🏥 Integridade do sistema: {integridade['status']}")

    # Status da fila
    queue_status = pipeline.get_queue_status()
    print(f"n📋 Filas de processamento:")
    print(f"  - Fila de resumo: {queue_status['summary_queue']}")
    print(f"  - Fila de entidades: {queue_status['entity_queue']}")
    print(f"  - Fila de preferências: {queue_status['preference_queue']}")

    # Parar pipeline
    pipeline.stop()

    imprimir("n" + "=" * 60)
    imprimir("Sistema de agente com persistência de dados - Concluído")
    imprimir("=" * 60)
    imprimir("n✓ Todos os dados persistentes no banco de dados")
    imprimir("✓ Processamento em segundo plano concluído")
    imprimir("✓ Sistema pronto para uso em produção")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("nn⚠️ Desligando normalmente...")
    except Exception as e:
        logger.error(f"Erro do sistema: {e}")
        import traceback
        traceback.print_exc()

Execute seu sistema agente conectado ao banco de dados:

python agent.py

O sistema executa o fluxo de trabalho completo. Ele inicializa o banco de dados e cria todas as tabelas. Ele configura o agente LangChain com ferramentas de banco de dados. Ele inicia os trabalhadores em segundo plano para processamento. Ele processa conversas de demonstração e salva no banco de dados. Ele extrai entidades e gera resumos em segundo plano. Ele exibe análises e métricas em tempo real.

Você verá registros detalhados à medida que cada componente inicializa e processa os dados. O agente armazena todas as mensagens. Ele extrai insights. Ele mantém o contexto completo da conversa.

Building an AI Agent that Saves Data to Database Demo

Casos de uso práticos

1. Suporte ao cliente com histórico completo

# O agente recupera interações anteriores
support_agent = DataPersistentAgent(db_manager)
response = support_agent.chat(
    user_id="customer_123",
    message="Ainda estou tendo esse problema de conexão")

# O agente vê conversas anteriores sobre problemas de conexão

2. Assistente pessoal de IA com aprendizagem

# O agente aprende as preferências ao longo do tempo
query_interface = QueryInterface(db_manager)
analytics = query_interface.get_user_analytics("user_456")
# Mostra padrões de interação, preferências, tópicos comuns

3. Assistente de pesquisa com base de conhecimento

# Combina o histórico de conversas com dados da web
enhancer = BrightDataEnhancer(api_key, db_manager)
enhancer.ingest_to_rag(research_data, ["title", "abstract", "content"])
agent = enhancer.enhance_agent(agent)
# O agente consulta discussões anteriores e pesquisas mais recentes

Resumo dos benefícios

Recurso Sem banco de dados Com banco de dados Persistência
Memória Perdida na reinicialização Armazenamento permanente
Personalização Nenhuma Com base no histórico completo
Análise Não é possível Dados completos de interação
Recuperação de erros Intervenção manual Nova tentativa automática e registro
Escalabilidade Instância única Múltiplas instâncias com estado compartilhado
Insights Perdido após a sessão Extraídos e rastreados

Conclusão

Agora você tem um sistema de agente de IA pronto para produção que mantém as conversas em bancos de dados. O sistema armazena todas as interações, extrai entidades e insights, mantém o histórico completo das conversas e fornece monitoramento com recuperação automática de erros.

Aprimore-o adicionando autenticação de usuário para acesso seguro, criando painéis para visualizar análises, implementando incorporações para pesquisa semântica, criando pontos de extremidade de API para integração ou implantando com Docker para escalabilidade. O design modular permite fácil personalização para suas necessidades específicas.

Explore padrões avançados de agentes de IA e a plataforma de inteligência da web da Bright Data para obter mais recursos.

Crie uma conta gratuita para começar a construir sistemas inteligentes que lembram e aprendem.