mirror of https://github.com/dapr/dapr-agents.git
430 lines
15 KiB
Python
430 lines
15 KiB
Python
from dapr_agents.document.reader.pdf.pypdf import PyPDFReader
|
|
from dapr.ext.workflow import DaprWorkflowContext
|
|
from dapr_agents import WorkflowApp
|
|
from urllib.parse import urlparse, unquote
|
|
from dotenv import load_dotenv
|
|
from typing import Dict, Any, List
|
|
from pydantic import BaseModel
|
|
from pathlib import Path
|
|
from dapr_agents import OpenAIAudioClient
|
|
from dapr_agents.types.llm import AudioSpeechRequest
|
|
from pydub import AudioSegment
|
|
import io
|
|
import requests
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Initialize the WorkflowApp
|
|
wfapp = WorkflowApp()
|
|
|
|
|
|
# Define structured output models
|
|
class SpeakerEntry(BaseModel):
|
|
name: str
|
|
text: str
|
|
|
|
|
|
class PodcastDialogue(BaseModel):
|
|
participants: List[SpeakerEntry]
|
|
|
|
|
|
# Define Workflow logic
|
|
@wfapp.workflow(name="doc2podcast")
|
|
def doc2podcast(ctx: DaprWorkflowContext, input: Dict[str, Any]):
|
|
# Extract pre-validated input
|
|
podcast_name = input["podcast_name"]
|
|
host_config = input["host"]
|
|
participant_configs = input["participants"]
|
|
max_rounds = input["max_rounds"]
|
|
file_input = input["pdf_url"]
|
|
output_transcript_path = input["output_transcript_path"]
|
|
output_audio_path = input["output_audio_path"]
|
|
audio_model = input["audio_model"]
|
|
|
|
# Step 1: Assign voices to the team
|
|
team_config = yield ctx.call_activity(
|
|
assign_podcast_voices,
|
|
input={
|
|
"host_config": host_config,
|
|
"participant_configs": participant_configs,
|
|
},
|
|
)
|
|
|
|
# Step 2: Read PDF and get documents
|
|
file_path = yield ctx.call_activity(download_pdf, input=file_input)
|
|
documents = yield ctx.call_activity(read_pdf, input={"file_path": file_path})
|
|
|
|
# Step 3: Initialize context and transcript parts
|
|
accumulated_context = ""
|
|
transcript_parts = []
|
|
total_iterations = len(documents)
|
|
|
|
for chunk_index, document in enumerate(documents):
|
|
# Generate the intermediate prompt
|
|
document_with_context = {
|
|
"text": document["text"],
|
|
"iteration_index": chunk_index + 1,
|
|
"total_iterations": total_iterations,
|
|
"context": accumulated_context,
|
|
"participants": [p["name"] for p in team_config["participants"]],
|
|
}
|
|
generated_prompt = yield ctx.call_activity(
|
|
generate_prompt, input=document_with_context
|
|
)
|
|
|
|
# Use the prompt to generate the structured dialogue
|
|
prompt_parameters = {
|
|
"podcast_name": podcast_name,
|
|
"host_name": team_config["host"]["name"],
|
|
"prompt": generated_prompt,
|
|
"max_rounds": max_rounds,
|
|
}
|
|
dialogue_entry = yield ctx.call_activity(
|
|
generate_transcript, input=prompt_parameters
|
|
)
|
|
|
|
# Update context and transcript parts
|
|
conversations = dialogue_entry["participants"]
|
|
for participant in conversations:
|
|
accumulated_context += f" {participant['name']}: {participant['text']}"
|
|
transcript_parts.append(participant)
|
|
|
|
# Step 4: Write the final transcript to a file
|
|
yield ctx.call_activity(
|
|
write_transcript_to_file,
|
|
input={
|
|
"podcast_dialogue": transcript_parts,
|
|
"output_path": output_transcript_path,
|
|
},
|
|
)
|
|
|
|
# Step 5: Convert transcript to audio using team_config
|
|
yield ctx.call_activity(
|
|
convert_transcript_to_audio,
|
|
input={
|
|
"transcript_parts": transcript_parts,
|
|
"output_path": output_audio_path,
|
|
"voices": team_config,
|
|
"model": audio_model,
|
|
},
|
|
)
|
|
|
|
|
|
@wfapp.task
|
|
def assign_podcast_voices(
|
|
host_config: Dict[str, Any], participant_configs: List[Dict[str, Any]]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Assign voices to the podcast host and participants.
|
|
|
|
Args:
|
|
host_config: Dictionary containing the host's configuration (name and optionally a voice).
|
|
participant_configs: List of dictionaries containing participants' configurations (name and optionally a voice).
|
|
|
|
Returns:
|
|
A dictionary with the updated `host` and `participants`, including their assigned voices.
|
|
"""
|
|
allowed_voices = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"]
|
|
assigned_voices = set() # Track assigned voices to avoid duplication
|
|
|
|
# Assign voice to the host if not already specified
|
|
if "voice" not in host_config:
|
|
host_config["voice"] = next(
|
|
voice for voice in allowed_voices if voice not in assigned_voices
|
|
)
|
|
assigned_voices.add(host_config["voice"])
|
|
|
|
# Assign voices to participants, ensuring no duplicates
|
|
updated_participants = []
|
|
for participant in participant_configs:
|
|
if "voice" not in participant:
|
|
participant["voice"] = next(
|
|
voice for voice in allowed_voices if voice not in assigned_voices
|
|
)
|
|
assigned_voices.add(participant["voice"])
|
|
updated_participants.append(participant)
|
|
|
|
# Return the updated host and participants
|
|
return {
|
|
"host": host_config,
|
|
"participants": updated_participants,
|
|
}
|
|
|
|
|
|
@wfapp.task
|
|
def download_pdf(pdf_url: str, local_directory: str = ".") -> str:
|
|
"""
|
|
Downloads a PDF file from a URL and saves it locally, automatically determining the filename.
|
|
"""
|
|
try:
|
|
parsed_url = urlparse(pdf_url)
|
|
filename = unquote(Path(parsed_url.path).name)
|
|
|
|
if not filename:
|
|
raise ValueError("Invalid URL: Cannot determine filename from the URL.")
|
|
|
|
filename = filename.replace(" ", "_")
|
|
local_directory_path = Path(local_directory).resolve()
|
|
local_directory_path.mkdir(parents=True, exist_ok=True)
|
|
local_file_path = local_directory_path / filename
|
|
|
|
if not local_file_path.exists():
|
|
logger.info(f"Downloading PDF from {pdf_url}...")
|
|
response = requests.get(pdf_url)
|
|
response.raise_for_status()
|
|
with open(local_file_path, "wb") as pdf_file:
|
|
pdf_file.write(response.content)
|
|
logger.info(f"PDF saved to {local_file_path}")
|
|
else:
|
|
logger.info(f"PDF already exists at {local_file_path}")
|
|
|
|
return str(local_file_path)
|
|
except Exception as e:
|
|
logger.error(f"Error downloading PDF: {e}")
|
|
raise
|
|
|
|
|
|
@wfapp.task
|
|
def read_pdf(file_path: str) -> List[dict]:
|
|
"""
|
|
Reads and extracts text from a PDF document.
|
|
"""
|
|
try:
|
|
reader = PyPDFReader()
|
|
documents = reader.load(file_path)
|
|
return [doc.model_dump() for doc in documents]
|
|
except Exception as e:
|
|
logger.error(f"Error reading document: {e}")
|
|
raise
|
|
|
|
|
|
@wfapp.task
|
|
def generate_prompt(
|
|
text: str,
|
|
iteration_index: int,
|
|
total_iterations: int,
|
|
context: str,
|
|
participants: List[str],
|
|
) -> str:
|
|
"""
|
|
Generate a prompt dynamically for the chunk.
|
|
"""
|
|
logger.info(f"Processing iteration {iteration_index} of {total_iterations}.")
|
|
instructions = f"""
|
|
CONTEXT:
|
|
- Previous conversation: {context.strip() or "No prior context available."}
|
|
- This is iteration {iteration_index} of {total_iterations}.
|
|
"""
|
|
|
|
if participants:
|
|
participant_names = ", ".join(participants)
|
|
instructions += f"\nPARTICIPANTS: {participant_names}"
|
|
else:
|
|
instructions += "\nPARTICIPANTS: None (Host-only conversation)"
|
|
|
|
if iteration_index == 1:
|
|
instructions += """
|
|
INSTRUCTIONS:
|
|
- Begin with a warm welcome to the podcast titled 'Podcast Name'.
|
|
- Introduce the host and the participants (if available).
|
|
- Provide an overview of the topics to be discussed in this episode.
|
|
"""
|
|
elif iteration_index == total_iterations:
|
|
instructions += """
|
|
INSTRUCTIONS:
|
|
- Conclude the conversation with a summary of the discussion.
|
|
- Include farewell messages from the host and participants.
|
|
"""
|
|
else:
|
|
instructions += """
|
|
INSTRUCTIONS:
|
|
- Continue the conversation smoothly without re-introducing the podcast.
|
|
- Follow up on the previous discussion points and introduce the next topic naturally.
|
|
"""
|
|
|
|
instructions += """
|
|
TASK:
|
|
- Use the provided TEXT to guide this part of the conversation.
|
|
- Alternate between speakers, ensuring a natural conversational flow.
|
|
- Keep responses concise and aligned with the context.
|
|
"""
|
|
return f"{instructions}\nTEXT:\n{text.strip()}"
|
|
|
|
|
|
@wfapp.task(
|
|
"""
|
|
Generate a structured podcast dialogue based on the context and text provided.
|
|
The podcast is titled '{podcast_name}' and is hosted by {host_name}.
|
|
If participants are available, each speaker is limited to a maximum of {max_rounds} turns per iteration.
|
|
A "round" is defined as one turn by the host followed by one turn by a participant.
|
|
The podcast should alternate between the host and participants.
|
|
If participants are not available, the host drives the conversation alone.
|
|
Keep the dialogue concise and ensure a natural conversational flow.
|
|
{prompt}
|
|
"""
|
|
)
|
|
def generate_transcript(
|
|
podcast_name: str, host_name: str, prompt: str, max_rounds: int
|
|
) -> PodcastDialogue:
|
|
pass
|
|
|
|
|
|
@wfapp.task
|
|
def write_transcript_to_file(
|
|
podcast_dialogue: List[Dict[str, Any]], output_path: str
|
|
) -> None:
|
|
"""
|
|
Write the final structured transcript to a file.
|
|
"""
|
|
try:
|
|
with open(output_path, "w", encoding="utf-8") as file:
|
|
import json
|
|
|
|
json.dump(podcast_dialogue, file, ensure_ascii=False, indent=4)
|
|
logger.info(f"Podcast dialogue successfully written to {output_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error writing podcast dialogue to file: {e}")
|
|
raise
|
|
|
|
|
|
@wfapp.task
|
|
def convert_transcript_to_audio(
|
|
transcript_parts: List[Dict[str, Any]],
|
|
output_path: str,
|
|
voices: Dict[str, Any],
|
|
model: str = "tts-1",
|
|
) -> None:
|
|
"""
|
|
Converts a transcript into a single audio file using the OpenAI Audio Client and pydub for concatenation.
|
|
|
|
Args:
|
|
transcript_parts: List of dictionaries containing speaker and text.
|
|
output_path: File path to save the final audio.
|
|
voices: Dictionary containing "host" and "participants" with their assigned voices.
|
|
model: TTS model to use (default: "tts-1").
|
|
"""
|
|
try:
|
|
client = OpenAIAudioClient()
|
|
combined_audio = AudioSegment.silent(duration=500) # Start with a short silence
|
|
|
|
# Build voice mapping
|
|
voice_mapping = {voices["host"]["name"]: voices["host"]["voice"]}
|
|
voice_mapping.update({p["name"]: p["voice"] for p in voices["participants"]})
|
|
|
|
for part in transcript_parts:
|
|
speaker_name = part["name"]
|
|
speaker_text = part["text"]
|
|
assigned_voice = voice_mapping.get(
|
|
speaker_name, "alloy"
|
|
) # Default to "alloy" if not found
|
|
|
|
# Log assigned voice for debugging
|
|
logger.info(
|
|
f"Generating audio for {speaker_name} using voice '{assigned_voice}'."
|
|
)
|
|
|
|
# Create TTS request
|
|
tts_request = AudioSpeechRequest(
|
|
model=model,
|
|
input=speaker_text,
|
|
voice=assigned_voice,
|
|
response_format="mp3",
|
|
)
|
|
|
|
# Generate the audio
|
|
audio_bytes = client.create_speech(request=tts_request)
|
|
|
|
# Create an AudioSegment from the audio bytes
|
|
audio_chunk = AudioSegment.from_file(
|
|
io.BytesIO(audio_bytes), format=tts_request.response_format
|
|
)
|
|
|
|
# Append the audio to the combined segment
|
|
combined_audio += audio_chunk + AudioSegment.silent(duration=300)
|
|
|
|
# Export the combined audio to the output file
|
|
combined_audio.export(output_path, format="mp3")
|
|
logger.info(f"Podcast audio successfully saved to {output_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during audio generation: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import json
|
|
import yaml
|
|
|
|
def load_config(file_path: str) -> dict:
|
|
"""Load configuration from a JSON or YAML file."""
|
|
with open(file_path, "r") as file:
|
|
if file_path.endswith(".yaml") or file_path.endswith(".yml"):
|
|
return yaml.safe_load(file)
|
|
elif file_path.endswith(".json"):
|
|
return json.load(file)
|
|
else:
|
|
raise ValueError("Unsupported file format. Use JSON or YAML.")
|
|
|
|
# CLI Argument Parser
|
|
parser = argparse.ArgumentParser(description="Document to Podcast Workflow")
|
|
parser.add_argument("--config", type=str, help="Path to a JSON/YAML config file.")
|
|
parser.add_argument("--pdf_url", type=str, help="URL of the PDF document.")
|
|
parser.add_argument("--podcast_name", type=str, help="Name of the podcast.")
|
|
parser.add_argument("--host_name", type=str, help="Name of the host.")
|
|
parser.add_argument("--host_voice", type=str, help="Voice for the host.")
|
|
parser.add_argument(
|
|
"--participants", type=str, nargs="+", help="List of participant names."
|
|
)
|
|
parser.add_argument(
|
|
"--max_rounds", type=int, default=4, help="Number of turns per round."
|
|
)
|
|
parser.add_argument(
|
|
"--output_transcript_path", type=str, help="Path to save the output transcript."
|
|
)
|
|
parser.add_argument(
|
|
"--output_audio_path", type=str, help="Path to save the final audio file."
|
|
)
|
|
parser.add_argument(
|
|
"--audio_model", type=str, default="tts-1", help="Audio model for TTS."
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Load config file if provided
|
|
config = load_config(args.config) if args.config else {}
|
|
|
|
# Merge CLI and Config inputs
|
|
user_input = {
|
|
"pdf_url": args.pdf_url or config.get("pdf_url"),
|
|
"podcast_name": args.podcast_name
|
|
or config.get("podcast_name", "Default Podcast"),
|
|
"host": {
|
|
"name": args.host_name or config.get("host", {}).get("name", "Host"),
|
|
"voice": args.host_voice or config.get("host", {}).get("voice", "alloy"),
|
|
},
|
|
"participants": config.get("participants", []),
|
|
"max_rounds": args.max_rounds or config.get("max_rounds", 4),
|
|
"output_transcript_path": args.output_transcript_path
|
|
or config.get("output_transcript_path", "podcast_dialogue.json"),
|
|
"output_audio_path": args.output_audio_path
|
|
or config.get("output_audio_path", "final_podcast.mp3"),
|
|
"audio_model": args.audio_model or config.get("audio_model", "tts-1"),
|
|
}
|
|
|
|
# Add participants from CLI if provided
|
|
if args.participants:
|
|
user_input["participants"].extend({"name": name} for name in args.participants)
|
|
|
|
# Validate inputs
|
|
if not user_input["pdf_url"]:
|
|
raise ValueError("PDF URL must be provided via CLI or config file.")
|
|
|
|
# Run the workflow
|
|
wfapp.run_and_monitor_workflow_sync(workflow=doc2podcast, input=user_input)
|