from dapr_agents.document.reader.pdf.pypdf import PyPDFReader from dapr.ext.workflow import DaprWorkflowContext from dapr_agents import WorkflowApp from urllib.parse import urlparse, unquote from dotenv import load_dotenv from typing import Dict, Any, List from pydantic import BaseModel from pathlib import Path from dapr_agents import OpenAIAudioClient from dapr_agents.types.llm import AudioSpeechRequest from pydub import AudioSegment import io import requests import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Initialize the WorkflowApp wfapp = WorkflowApp() # Define structured output models class SpeakerEntry(BaseModel): name: str text: str class PodcastDialogue(BaseModel): participants: List[SpeakerEntry] # Define Workflow logic @wfapp.workflow(name="doc2podcast") def doc2podcast(ctx: DaprWorkflowContext, input: Dict[str, Any]): # Extract pre-validated input podcast_name = input["podcast_name"] host_config = input["host"] participant_configs = input["participants"] max_rounds = input["max_rounds"] file_input = input["pdf_url"] output_transcript_path = input["output_transcript_path"] output_audio_path = input["output_audio_path"] audio_model = input["audio_model"] # Step 1: Assign voices to the team team_config = yield ctx.call_activity( assign_podcast_voices, input={ "host_config": host_config, "participant_configs": participant_configs, }, ) # Step 2: Read PDF and get documents file_path = yield ctx.call_activity(download_pdf, input=file_input) documents = yield ctx.call_activity(read_pdf, input={"file_path": file_path}) # Step 3: Initialize context and transcript parts accumulated_context = "" transcript_parts = [] total_iterations = len(documents) for chunk_index, document in enumerate(documents): # Generate the intermediate prompt document_with_context = { "text": document["text"], "iteration_index": chunk_index + 1, "total_iterations": total_iterations, "context": accumulated_context, "participants": [p["name"] for p in team_config["participants"]], } generated_prompt = yield ctx.call_activity( generate_prompt, input=document_with_context ) # Use the prompt to generate the structured dialogue prompt_parameters = { "podcast_name": podcast_name, "host_name": team_config["host"]["name"], "prompt": generated_prompt, "max_rounds": max_rounds, } dialogue_entry = yield ctx.call_activity( generate_transcript, input=prompt_parameters ) # Update context and transcript parts conversations = dialogue_entry["participants"] for participant in conversations: accumulated_context += f" {participant['name']}: {participant['text']}" transcript_parts.append(participant) # Step 4: Write the final transcript to a file yield ctx.call_activity( write_transcript_to_file, input={ "podcast_dialogue": transcript_parts, "output_path": output_transcript_path, }, ) # Step 5: Convert transcript to audio using team_config yield ctx.call_activity( convert_transcript_to_audio, input={ "transcript_parts": transcript_parts, "output_path": output_audio_path, "voices": team_config, "model": audio_model, }, ) @wfapp.task def assign_podcast_voices( host_config: Dict[str, Any], participant_configs: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Assign voices to the podcast host and participants. Args: host_config: Dictionary containing the host's configuration (name and optionally a voice). participant_configs: List of dictionaries containing participants' configurations (name and optionally a voice). Returns: A dictionary with the updated `host` and `participants`, including their assigned voices. """ allowed_voices = ["alloy", "echo", "fable", "onyx", "nova", "shimmer"] assigned_voices = set() # Track assigned voices to avoid duplication # Assign voice to the host if not already specified if "voice" not in host_config: host_config["voice"] = next( voice for voice in allowed_voices if voice not in assigned_voices ) assigned_voices.add(host_config["voice"]) # Assign voices to participants, ensuring no duplicates updated_participants = [] for participant in participant_configs: if "voice" not in participant: participant["voice"] = next( voice for voice in allowed_voices if voice not in assigned_voices ) assigned_voices.add(participant["voice"]) updated_participants.append(participant) # Return the updated host and participants return { "host": host_config, "participants": updated_participants, } @wfapp.task def download_pdf(pdf_url: str, local_directory: str = ".") -> str: """ Downloads a PDF file from a URL and saves it locally, automatically determining the filename. """ try: parsed_url = urlparse(pdf_url) filename = unquote(Path(parsed_url.path).name) if not filename: raise ValueError("Invalid URL: Cannot determine filename from the URL.") filename = filename.replace(" ", "_") local_directory_path = Path(local_directory).resolve() local_directory_path.mkdir(parents=True, exist_ok=True) local_file_path = local_directory_path / filename if not local_file_path.exists(): logger.info(f"Downloading PDF from {pdf_url}...") response = requests.get(pdf_url) response.raise_for_status() with open(local_file_path, "wb") as pdf_file: pdf_file.write(response.content) logger.info(f"PDF saved to {local_file_path}") else: logger.info(f"PDF already exists at {local_file_path}") return str(local_file_path) except Exception as e: logger.error(f"Error downloading PDF: {e}") raise @wfapp.task def read_pdf(file_path: str) -> List[dict]: """ Reads and extracts text from a PDF document. """ try: reader = PyPDFReader() documents = reader.load(file_path) return [doc.model_dump() for doc in documents] except Exception as e: logger.error(f"Error reading document: {e}") raise @wfapp.task def generate_prompt( text: str, iteration_index: int, total_iterations: int, context: str, participants: List[str], ) -> str: """ Generate a prompt dynamically for the chunk. """ logger.info(f"Processing iteration {iteration_index} of {total_iterations}.") instructions = f""" CONTEXT: - Previous conversation: {context.strip() or "No prior context available."} - This is iteration {iteration_index} of {total_iterations}. """ if participants: participant_names = ", ".join(participants) instructions += f"\nPARTICIPANTS: {participant_names}" else: instructions += "\nPARTICIPANTS: None (Host-only conversation)" if iteration_index == 1: instructions += """ INSTRUCTIONS: - Begin with a warm welcome to the podcast titled 'Podcast Name'. - Introduce the host and the participants (if available). - Provide an overview of the topics to be discussed in this episode. """ elif iteration_index == total_iterations: instructions += """ INSTRUCTIONS: - Conclude the conversation with a summary of the discussion. - Include farewell messages from the host and participants. """ else: instructions += """ INSTRUCTIONS: - Continue the conversation smoothly without re-introducing the podcast. - Follow up on the previous discussion points and introduce the next topic naturally. """ instructions += """ TASK: - Use the provided TEXT to guide this part of the conversation. - Alternate between speakers, ensuring a natural conversational flow. - Keep responses concise and aligned with the context. """ return f"{instructions}\nTEXT:\n{text.strip()}" @wfapp.task( """ Generate a structured podcast dialogue based on the context and text provided. The podcast is titled '{podcast_name}' and is hosted by {host_name}. If participants are available, each speaker is limited to a maximum of {max_rounds} turns per iteration. A "round" is defined as one turn by the host followed by one turn by a participant. The podcast should alternate between the host and participants. If participants are not available, the host drives the conversation alone. Keep the dialogue concise and ensure a natural conversational flow. {prompt} """ ) def generate_transcript( podcast_name: str, host_name: str, prompt: str, max_rounds: int ) -> PodcastDialogue: pass @wfapp.task def write_transcript_to_file( podcast_dialogue: List[Dict[str, Any]], output_path: str ) -> None: """ Write the final structured transcript to a file. """ try: with open(output_path, "w", encoding="utf-8") as file: import json json.dump(podcast_dialogue, file, ensure_ascii=False, indent=4) logger.info(f"Podcast dialogue successfully written to {output_path}") except Exception as e: logger.error(f"Error writing podcast dialogue to file: {e}") raise @wfapp.task def convert_transcript_to_audio( transcript_parts: List[Dict[str, Any]], output_path: str, voices: Dict[str, Any], model: str = "tts-1", ) -> None: """ Converts a transcript into a single audio file using the OpenAI Audio Client and pydub for concatenation. Args: transcript_parts: List of dictionaries containing speaker and text. output_path: File path to save the final audio. voices: Dictionary containing "host" and "participants" with their assigned voices. model: TTS model to use (default: "tts-1"). """ try: client = OpenAIAudioClient() combined_audio = AudioSegment.silent(duration=500) # Start with a short silence # Build voice mapping voice_mapping = {voices["host"]["name"]: voices["host"]["voice"]} voice_mapping.update({p["name"]: p["voice"] for p in voices["participants"]}) for part in transcript_parts: speaker_name = part["name"] speaker_text = part["text"] assigned_voice = voice_mapping.get( speaker_name, "alloy" ) # Default to "alloy" if not found # Log assigned voice for debugging logger.info( f"Generating audio for {speaker_name} using voice '{assigned_voice}'." ) # Create TTS request tts_request = AudioSpeechRequest( model=model, input=speaker_text, voice=assigned_voice, response_format="mp3", ) # Generate the audio audio_bytes = client.create_speech(request=tts_request) # Create an AudioSegment from the audio bytes audio_chunk = AudioSegment.from_file( io.BytesIO(audio_bytes), format=tts_request.response_format ) # Append the audio to the combined segment combined_audio += audio_chunk + AudioSegment.silent(duration=300) # Export the combined audio to the output file combined_audio.export(output_path, format="mp3") logger.info(f"Podcast audio successfully saved to {output_path}") except Exception as e: logger.error(f"Error during audio generation: {e}") raise if __name__ == "__main__": import argparse import json import yaml def load_config(file_path: str) -> dict: """Load configuration from a JSON or YAML file.""" with open(file_path, "r") as file: if file_path.endswith(".yaml") or file_path.endswith(".yml"): return yaml.safe_load(file) elif file_path.endswith(".json"): return json.load(file) else: raise ValueError("Unsupported file format. Use JSON or YAML.") # CLI Argument Parser parser = argparse.ArgumentParser(description="Document to Podcast Workflow") parser.add_argument("--config", type=str, help="Path to a JSON/YAML config file.") parser.add_argument("--pdf_url", type=str, help="URL of the PDF document.") parser.add_argument("--podcast_name", type=str, help="Name of the podcast.") parser.add_argument("--host_name", type=str, help="Name of the host.") parser.add_argument("--host_voice", type=str, help="Voice for the host.") parser.add_argument( "--participants", type=str, nargs="+", help="List of participant names." ) parser.add_argument( "--max_rounds", type=int, default=4, help="Number of turns per round." ) parser.add_argument( "--output_transcript_path", type=str, help="Path to save the output transcript." ) parser.add_argument( "--output_audio_path", type=str, help="Path to save the final audio file." ) parser.add_argument( "--audio_model", type=str, default="tts-1", help="Audio model for TTS." ) args = parser.parse_args() # Load config file if provided config = load_config(args.config) if args.config else {} # Merge CLI and Config inputs user_input = { "pdf_url": args.pdf_url or config.get("pdf_url"), "podcast_name": args.podcast_name or config.get("podcast_name", "Default Podcast"), "host": { "name": args.host_name or config.get("host", {}).get("name", "Host"), "voice": args.host_voice or config.get("host", {}).get("voice", "alloy"), }, "participants": config.get("participants", []), "max_rounds": args.max_rounds or config.get("max_rounds", 4), "output_transcript_path": args.output_transcript_path or config.get("output_transcript_path", "podcast_dialogue.json"), "output_audio_path": args.output_audio_path or config.get("output_audio_path", "final_podcast.mp3"), "audio_model": args.audio_model or config.get("audio_model", "tts-1"), } # Add participants from CLI if provided if args.participants: user_input["participants"].extend({"name": name} for name in args.participants) # Validate inputs if not user_input["pdf_url"]: raise ValueError("PDF URL must be provided via CLI or config file.") # Run the workflow wfapp.run_and_monitor_workflow_sync(workflow=doc2podcast, input=user_input)