Parallelization
Parallelization
Parallelization involves executing multiple components, such as LLM calls, tool usages, or even entire sub-agents, concurrently. Instead of waiting for one step to complete before starting the next, parallel execution allows independent tasks to run at the same time, significantly reducing the overall execution time for tasks that can be broken down into independent parts.
Practical Applications & Use Cases
Parallelization is a powerful pattern for optimizing agent performance across various applications:
- Information Gathering and Research
- Data Processing and Analysis
- Multi-API or Tool Interaction
- Content Generation with Multiple Components
- Validation and Verification
- Multi-Modal Processing
- A/B Testing or Multiple Options Generation
Summary
What: Many agentic workflows involve multiple sub-tasks that must be completed to achieve a final goal. A purely sequential execution, where each task waits for the previous one to finish, is often inefficient and slow. This latency becomes a significant bottleneck when tasks depend on external I/O operations, such as calling different APIs or querying multiple databases. Without a mechanism for concurrent execution, the total processing time is the sum of all individual task durations, hindering the system's overall performance and responsiveness.
Why: The Parallelization pattern provides a standardized solution by enabling the simultaneous execution of independent tasks. It works by identifying components of a workflow, like tool usages or LLM calls, that do not rely on each other's immediate outputs. Agentic frameworks like LangChain and the Google ADK provide built-in constructs to define and manage these concurrent operations. For instance, a main process can invoke several sub-tasks that run in parallel and wait for all of them to complete before proceeding to the next step. By running these independent tasks at the same time rather than one after another, this pattern drastically reduces the total execution time.
Rule of thumb: Use this pattern when a workflow contains multiple independent operations that can run simultaneously, such as fetching data from several APIs, processing different chunks of data, or generating multiple pieces of content for later synthesis.
Key Takeaways
- Parallelization is a pattern for executing independent tasks concurrently to improve efficiency.
- It is particularly useful when tasks involve waiting for external resources, such as API calls.
- The adoption of a concurrent or parallel architecture introduces substantial complexity and cost, impacting key development phases such as design, debugging, and system logging.
- Frameworks like LangChain and Google ADK provide built-in support for defining and managing parallel execution.
- In LangChain Expression Language (LCEL), RunnableParallel is a key construct for running multiple runnables side-by-side.
- Google ADK can facilitate parallel execution through LLM-Driven Delegation, where a Coordinator agent's LLM identifies independent sub-tasks and triggers their concurrent handling by specialized sub-agents.
- Parallelization helps reduce overall latency and makes agentic systems more responsive for complex tasks.
Code Examples
LangChain Implementation
parallelization_langchain.py
import osimport asynciofrom typing import Optionalfrom langchain_openai import ChatOpenAIfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthroughfrom dotenv import load_dotenvload_dotenv() # Load environment variables from .env fileMODEL_ID = os.getenv("MODEL_ID")try:llm: Optional[ChatOpenAI] = ChatOpenAI(temperature=0.7, model=MODEL_ID)except Exception as e:print(f"Error initializing ChatOpenAI: {e}")llm = None# --- Define Independent Chains ---# These three chains represent distinct tasks that can be executed in parallel.summarize_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Summarize the following topic concisely:"),("user", "{topic}"),])| llm| StrOutputParser())questions_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Generate three interesting questions about the following topic:"),("user", "{topic}"),])| llm| StrOutputParser())terms_chain: Runnable = (ChatPromptTemplate.from_messages([("system", "Identify 5-10 key terms related to the following topic, separated by commas:"),("user", "{topic}"),])| llm| StrOutputParser())# --- Build the Parallel + Synthesis Chain ---# 1. Define the block of tasks to run in parallel. The results of these,# along with the original topic, will be fed into the next step.map_chain = RunnableParallel({"summary": summarize_chain,"questions": questions_chain,"key_terms": terms_chain,"topic": RunnablePassthrough(), # Pass the original topic through})# 2. Define the final synthesis prompt which will combine the parallel results.synthesis_prompt = ChatPromptTemplate.from_messages([("system", """Based on the following information:Summary: {summary}Related Questions: {questions}Key Terms: {key_terms}Synthesize a comprehensive answer."""),("user", "Original topic: {topic}"),])# 3. Construct the full chain by piping the parallel results directly into the synthesis prompt, followed by the LLM and output parser.full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()# --- Run the Chain ---async def run_parallel_example(topic: str) -> None:"""Asynchronously invokes the parallel processing chain with a specific topic and pronts the synthesized result.Args:topic: The input topic to be processed by the LangChain chains."""if llm is None:print("LLM is not initialized. Cannot run the chain.")returnprint(f"\n--- Running parallel chain for topic: '{topic}' ---")try:# The input to `ainvoke` is the single 'topic' string,# then passed to each runnable in the `map_chain`.response = await full_parallel_chain.ainvoke(topic)print("\n--- Final Synthesized Response ---")print(response)except Exception as e:print(f"An error occurred while running the parallel chain: {e}")if __name__ == "__main__":test_topic = "The history of space exploration"asyncio.run(run_parallel_example(test_topic))
Google ADK Implementation
parallelization_adk.py
import asyncioimport osimport uuidfrom google.adk.agents import LlmAgent, ParallelAgent, SequentialAgentfrom google.adk.runners import Runnerfrom google.adk.sessions.in_memory_session_service import InMemorySessionServicefrom google.adk.tools import google_searchfrom google.genai.types import Content, Partfrom dotenv import load_dotenvload_dotenv() # Load environment variables from .env fileMODEL_ID = os.getenv("GOOGLE_MODEL_ID") # Ensure this is set in your .env file, e.g., "gemini-2.0-flash"# --- 1. Define Researcher Sub-Agents (to run in parallel) ---# Researcher 1: Renewable Energyresearcher_agent_1 = LlmAgent(name="RenewableEnergyResearcher",model=MODEL_ID,instruction="""You are an AI Research Assistant specializing in energy.Research the latest advancements in 'renewable energy sources'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches renewable energy sources.",tools=[google_search],# Store result in state for the merger agentoutput_key="renewable_energy_result",)# Researcher 2: Electric Vehiclesresearcher_agent_2 = LlmAgent(name="EVResearcher",model=MODEL_ID,instruction="""You are an AI Research Assistant specializing in transportation.Research the latest developments in 'electric vehicle technology'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches electric vehicle technology.",tools=[google_search],# Store result in state for the merger agentoutput_key="ev_technology_result",)# Researcher 3: Carbon Captureresearcher_agent_3 = LlmAgent(name="CarbonCaptureResearcher",model=MODEL_ID,instruction="""You are an AI Research Assistant specializing in climate solutions.Research the current state of 'carbon capture methods'.Use the Google Search tool provided.Summarize your key findings concisely (1-2 sentences).Output *only* the summary.""",description="Researches carbon capture methods.",tools=[google_search],# Store result in state for the merger agentoutput_key="carbon_capture_result",)# --- 2. Create the ParallelAgent (Runs researchers concurrently) ---# This agent orchestrates the concurrent execution of the researchers.# It finishes once all researchers have completed and stored their results in state.parallel_research_agent = ParallelAgent(name="ParallelWebResearchAgent",sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],description="Runs multiple research agents in parallel to gather information.",)# --- 3. Define the Merger Agent (Runs *after* the parallel agents) ---# This agent takes the results stored in the session state by the parallel agents# and synthesizes them into a single, structured response with attributions.merger_agent = LlmAgent(name="SynthesisAgent",model=MODEL_ID, # Or potentially a more powerful model if needed for synthesisinstruction="""You are an AI Assistant responsible for combining research findings into a structured report.Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly.**Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.****Input Summaries:*** **Renewable Energy:**{renewable_energy_result}* **Electric Vehicles:**{ev_technology_result}* **Carbon Capture:**{carbon_capture_result}**Output Format:**## Summary of Recent Sustainable Technology Advancements### Renewable Energy Findings(Based on RenewableEnergyResearcher's findings)[Synthesize and elaborate *only* on the renewable energy input summary provided above.]### Electric Vehicle Findings(Based on EVResearcher's findings)[Synthesize and elaborate *only* on the EV input summary provided above.]### Carbon Capture Findings(Based on CarbonCaptureResearcher's findings)[Synthesize and elaborate *only* on the carbon capture input summary provided above.]### Overall Conclusion[Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.]Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content.""",description=("Combines research findings from parallel agents into a structured, cited report, ""strictly grounded on provided inputs."),# No tools needed for merging# No output_key needed here, as its direct response is the final output of the sequence)# --- 4. Create the SequentialAgent (Orchestrates the overall flow) ---# This is the main agent that will be run. It first executes the ParallelAgent# to populate the state, and then executes the MergerAgent to produce the final output.sequential_pipeline_agent = SequentialAgent(name="ResearchAndSynthesisPipeline",# Run parallel research first, then mergesub_agents=[parallel_research_agent, merger_agent],description="Coordinates parallel research and synthesizes the results.",)# For ADK tools compatibility, the root agent must be named `root_agent`root_agent = sequential_pipeline_agent# =========================# Runnable entrypoint# =========================APP_NAME = "parallel_web_research_app"async def run_once(user_text: str) -> str:"""Runs the root_agent once for a single user message and returns the final response text."""# You must set GOOGLE_API_KEY for Gemini usage.# Example: export GOOGLE_API_KEY="..."if not os.getenv("GOOGLE_API_KEY"):raise RuntimeError('Missing GOOGLE_API_KEY. Set it first, e.g. `export GOOGLE_API_KEY="..."`.')session_service = InMemorySessionService()runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)user_id = "user_1"session_id = f"session_{uuid.uuid4().hex[:8]}"# Create a session (stores state/history)await session_service.create_session(app_name=APP_NAME,user_id=user_id,session_id=session_id,)# Build the user messagenew_message = Content(role="user", parts=[Part(text=user_text)])final_text = ""async for event in runner.run_async(user_id=user_id,session_id=session_id,new_message=new_message,):# Print streaming output if you want (optional)# print(event)# Capture the final responseif event.is_final_response() and event.content and event.content.parts:# Usually the first part is the text responsefinal_text = event.content.parts[0].text or ""return final_textdef main() -> None:prompt = ("Research renewable energy sources, electric vehicle technology, and carbon capture methods, ""then produce the structured report.")result = asyncio.run(run_once(prompt))print("\n===== FINAL OUTPUT =====\n")print(result)if __name__ == "__main__":main()