#
# Copyright (c) 2024–2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""code-helper - Pipecat Voice Agent

This example bot demonstrates using an LLMTextProcessor along with TTS and
RTVI translations to handle special text segments like code snippets,
credit card numbers, and URLs. By prompting the LLM to wrap these segments in
specific tags, we can then process them accordingly for better speech synthesis.

This bot uses a cascade pipeline: Speech-to-Text → LLM → Text-to-Speech

Generated by Pipecat CLI

Required AI services:
- Deepgram (Speech-to-Text)
- Openai (LLM)
- Elevenlabs (Text-to-Speech)

Run the bot using::

    uv run bot.py
"""

import os

from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame, TranscriptionMessage, TranscriptionUpdateFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.turns.user_stop.turn_analyzer_user_turn_stop_strategy import (
    TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.utils.text.pattern_pair_aggregator import MatchAction, PatternPairAggregator

load_dotenv(override=True)


# Example/stand-in function to exemplify function calling
# and handling sensitive info like credit cards.
# This function simply returns dummy credit card info.
async def fetch_credit_card_info(params: FunctionCallParams):
    await params.result_callback(
        {"card_number": "1234-5678-9012-3456", "expiration_date": "12/24", "cvv": "123"}
    )


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    """Main bot logic."""
    logger.info("Starting bot")

    # LLM system prompt -- instruct the LLM on how to format special segments
    #   code blocks should be wrapped in <code></code> tags
    #   credit card numbers should be wrapped in <card></card> tags
    #   urls should be wrapped in <link></link> tags
    system_prompt = "You are a friendly AI assistant. All code snippets should be wrapped in <code></code> blocks. All credit card numbers should be wrapped in <card></card> blocks. All urls should be wrapped in <link></link> blocks."

    # Set up the LLMTextProcessor with a custom PatternPairAggregator
    # to identify code blocks, credit cards, and urls and aggregate them
    # separately. This allows us to handle them differently downstream in
    # TTS and RTVI processing.
    llm_text_aggregator = PatternPairAggregator()
    llm_text_aggregator.add_pattern(
        type="code",
        start_pattern="<code>",
        end_pattern="</code>",
        action=MatchAction.AGGREGATE,
    )
    llm_text_aggregator.add_pattern(
        type="credit_card",
        start_pattern="<card>",
        end_pattern="</card>",
        action=MatchAction.AGGREGATE,
    )
    llm_text_aggregator.add_pattern(
        type="link",
        start_pattern="<link>",
        end_pattern="</link>",
        action=MatchAction.AGGREGATE,
    )
    llm_text_processor = LLMTextProcessor(text_aggregator=llm_text_aggregator)

    # Speech-to-Text service
    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))

    # Text-to-Speech service
    #   using skip_aggregator_types to avoid having code blocks spoken out loud
    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        voice_id="71a7ad14-091c-4e8e-a314-022ece01c121",  # British Reading Lady
        skip_aggregator_types=["code"],  # Skip code blocks in TTS speech
    )

    # Text transformers for TTS
    # This will insert Cartesia's spell tags around the provided text.
    async def spell_out_text(text: str, type: str) -> str:
        return CartesiaTTSService.SPELL(text)

    # This will strip URL protocols for cleaner speech.
    async def strip_url_protocol(text: str, type: str) -> str:
        if text.startswith("http://"):
            text = text[len("http://") :]
        elif text.startswith("https://"):
            text = text[len("https://") :]
        if text.startswith("www."):
            text = text[len("www.") :]
        return text

    # Setup the text transformers in TTS to strip protocols from all
    # links and spell out credit card numbers. The strings below match
    # the types defined in the PatternPairAggregator above so that whenever
    # those segments are encountered, these transformers will be applied.
    tts.add_text_transformer(strip_url_protocol, "link")
    tts.add_text_transformer(spell_out_text, "credit_card")

    # LLM service with a function call to retrieve credit card info
    llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
    llm.register_function("get_credit_card_info", fetch_credit_card_info)

    # LLM aggregator context
    messages = [
        {
            "role": "system",
            "content": system_prompt,
        },
    ]

    credit_card_function = FunctionSchema(
        name="get_credit_card_info",
        description="Get credit card information for the user.",
        properties={},
        required=[],
    )
    tools = ToolsSchema(standard_tools=[credit_card_function])
    context = LLMContext(messages, tools)
    user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
        context,
        user_params=LLMUserAggregatorParams(
            user_turn_strategies=UserTurnStrategies(
                stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())]
            ),
        ),
    )

    # Transcription processor
    transcript_processor = TranscriptProcessor()

    # RTVI processor and observer with a text transformer to obfuscate
    # credit card numbers in the bot's output.
    async def obfuscate_credit_card(text: str, type: str) -> str:
        return "XXXX-XXXX-XXXX-" + text[-4:]

    rtvi = RTVIProcessor()
    rtvi_observer = RTVIObserver(rtvi)
    rtvi_observer.add_bot_output_transformer(obfuscate_credit_card, "credit_card")

    # Pipeline - The following pipeline is typical for a STT->LLM->TTS bot + RTVI
    #            with the addition of the LLMTextProcessor to handle special text segments.
    pipeline = Pipeline(
        [
            transport.input(),
            rtvi,
            stt,
            transcript_processor.user(),
            user_aggregator,
            llm,
            llm_text_processor,  # Pre-aggregate LLMTextFrames for custom segment handling
            tts,
            transport.output(),
            transcript_processor.assistant(),
            assistant_aggregator,
        ]
    )

    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            enable_metrics=True,
            enable_usage_metrics=True,
        ),
        observers=[rtvi_observer],
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        logger.info("Client connected")
        # Kick off the conversation.
        messages.append({"role": "system", "content": "Say hello and briefly introduce yourself."})
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        logger.info("Client disconnected")
        await task.cancel()

    @transcript_processor.event_handler("on_transcript_update")
    async def on_transcript_update(processor: TranscriptProcessor, frame: TranscriptionUpdateFrame):
        for msg in frame.messages:
            if isinstance(msg, TranscriptionMessage):
                timestamp = f"[{msg.timestamp}] " if msg.timestamp else ""
                line = f"{timestamp}{msg.role}: {msg.content}"
                logger.info(f"Transcript: {line}")

    @rtvi.event_handler("on_client_message")
    async def on_message(rtvi, msg):
        logger.info(f"Received unknown message from client: {msg.type} | {msg.data}")

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)

    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    """Main bot entry point."""

    # We store functions so objects (e.g. SileroVADAnalyzer) don't get
    # instantiated. The function will be called when the desired transport gets
    # selected.
    transport_params = {
        "webrtc": lambda: TransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
        ),
    }

    transport = await create_transport(runner_args, transport_params)

    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main

    main()
