Skip to main content

Parallel Execution

Added in: 0.7.0

Execute multiple graph branches concurrently and merge results with flexible strategies. Parallel execution enables true concurrency for independent operations, significantly reducing total execution time.

Overview​

Parallel execution allows you to:

  • Run independent operations concurrently - Execute multiple branches at the same time
  • Reduce total execution time - Time = max(branches), not sum(branches)
  • Merge results intelligently - Vote, average, or custom merge strategies
  • Preserve metadata - Each branch's metadata tracked and merged
  • Handle failures gracefully - Fail-fast or collect partial results

Performance gain: Up to 3x faster for independent workflows


Quick Start​

Basic Example​

import io.github.noailabs.spice.graph.dsl.graph
import io.github.noailabs.spice.graph.nodes.MergeStrategies

val parallelGraph = graph("parallel-example") {
// Execute branches concurrently
parallel(
id = "data-collection",
branches = mapOf(
"api" to apiAgent,
"database" to dbAgent,
"cache" to cacheAgent
)
)

// Merge results
merge(
id = "aggregate",
parallelNodeId = "data-collection"
) { results ->
combineResults(results)
}

output("result")
}

// Run graph
val runner = DefaultGraphRunner()
val result = runner.run(parallelGraph, input)

Result: Execution time β‰ˆ slowest branch (not sum of all branches)


Core Concepts​

ParallelNode​

Executes multiple branches concurrently using Kotlin coroutines:

val parallelNode = ParallelNode(
id = "parallel-processing",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB,
"branch-c" to nodeC
),
mergePolicy = MergePolicy.Namespace, // Default
failFast = true // Stop on first failure
)

Parameters:

  • id: Unique identifier for this parallel node
  • branches: Map of branch ID to Node
  • mergePolicy: How to merge metadata (default: Namespace)
  • failFast: Stop on first error vs collect all results

MergeNode​

Aggregates results from parallel execution:

val mergeNode = MergeNode(
id = "combine",
parallelNodeId = "parallel-processing"
) { results ->
// Custom merge logic
results.values.toList()
}

Parameters:

  • id: Unique identifier for this merge node
  • parallelNodeId: ID of the ParallelNode to merge
  • merger: Function to combine results

Merge Policies​

Control how metadata is merged across parallel branches.

Namespace (Default)​

Isolate each branch's metadata in separate namespaces:

parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB
),
mergePolicy = MergePolicy.Namespace
)

// Result metadata:
// {
// "parallel.process.branch-a.confidence": 0.8,
// "parallel.process.branch-b.confidence": 0.6
// }

Use when: Branches set same keys with different meanings

Custom Aggregation​

Define aggregation functions per metadata key:

parallel(
id = "scorers",
branches = mapOf(
"model-a" to modelA,
"model-b" to modelB,
"model-c" to modelC
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"confidence" to AggregationFunction.AVERAGE,
"label" to AggregationFunction.VOTE,
"count" to AggregationFunction.SUM
)
)
)

// Result metadata:
// {
// "confidence": 0.75, // Average: (0.8 + 0.7) / 2
// "label": "cat", // Vote: most common
// "count": 150 // Sum: 100 + 50
// }

Available Aggregation Functions:

  • AVERAGE - Average of numeric values
  • SUM - Sum of numeric values
  • VOTE - Most common value (voting)
  • MIN / MAX - Min/max of comparable values
  • FIRST / LAST - First/last value
  • CONCAT_LIST - Combine all into list

LastWrite / FirstWrite​

Simple overwrite strategies:

// LastWrite: Last branch overwrites
mergePolicy = MergePolicy.LastWrite

// FirstWrite: First branch wins
mergePolicy = MergePolicy.FirstWrite

Use when: Metadata conflicts don't matter


Built-in Merge Strategies​

Pre-built functions for common merge patterns:

import io.github.noailabs.spice.graph.nodes.MergeStrategies

// Vote: Most common result
merge("vote", "parallel", MergeStrategies.vote)

// Average: Numeric average
merge("avg", "parallel", MergeStrategies.average)

// Sum: Sum of values
merge("sum", "parallel", MergeStrategies.sum)

// Min/Max: Minimum or maximum
merge("min", "parallel", MergeStrategies.min)
merge("max", "parallel", MergeStrategies.max)

// First/Last: First or last result
merge("first", "parallel", MergeStrategies.first)
merge("last", "parallel", MergeStrategies.last)

// Concat: Combine into list
merge("list", "parallel", MergeStrategies.concatList)

// AsMap: Return all as map (no merging)
merge("all", "parallel", MergeStrategies.asMap)

Use Cases​

Use Case 1: Multi-LLM Voting​

Ask multiple LLMs and vote for consensus:

import io.github.noailabs.spice.springboot.ai.factory.SpringAIAgentFactory
import io.github.noailabs.spice.springboot.ai.factory.OpenAIConfig
import io.github.noailabs.spice.springboot.ai.factory.AnthropicConfig

// Create agents with Spring AI factory
val factory: SpringAIAgentFactory = ... // Inject

val gpt4Agent = factory.openai(
model = "gpt-4",
config = OpenAIConfig(agentId = "gpt4")
)

val claudeAgent = factory.anthropic(
model = "claude-3-5-sonnet-20241022",
config = AnthropicConfig(agentId = "claude")
)

val geminiAgent = factory.openai( // Or use appropriate factory method
model = "gpt-4-turbo", // Placeholder - use actual Gemini integration when available
config = OpenAIConfig(agentId = "gemini")
)

val llmVotingGraph = graph("llm-voting") {
parallel(
id = "llm-ensemble",
branches = mapOf(
"gpt4" to gpt4Agent,
"claude" to claudeAgent,
"gemini" to geminiAgent
)
)

merge(
id = "vote",
parallelNodeId = "llm-ensemble",
merger = MergeStrategies.vote
)

output("consensus")
}

// Performance: ~2s (parallel) vs ~6s (sequential)
// 3x faster!

Use Case 2: Multi-Source Data Fetch​

Fetch from multiple sources concurrently:

val dataFetchGraph = graph("multi-source-fetch") {
parallel(
id = "fetch",
branches = mapOf(
"api" to fetchFromAPI,
"database" to fetchFromDB,
"cache" to fetchFromCache,
"file" to fetchFromFile
)
)

merge("aggregate", "fetch") { results ->
val totalRecords = results.values
.filterIsInstance<List<*>>()
.sumOf { it.size }

mapOf(
"sources" to results.keys.toList(),
"totalRecords" to totalRecords,
"data" to results
)
}

output("result")
}

Use Case 3: Parallel Validation​

Run multiple validators concurrently:

val validationGraph = graph("parallel-validation") {
parallel(
id = "validators",
branches = mapOf(
"schema" to schemaValidator,
"business" to businessValidator,
"security" to securityValidator,
"format" to formatValidator
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"isValid" to AggregationFunction.MIN // All must pass
)
)
)

merge("result", "validators") { results ->
val allValid = results.values.all { it == true }
val failed = results.filter { (_, valid) -> valid != true }.keys

mapOf(
"isValid" to allValid,
"failedValidators" to failed.toList()
)
}

output("validationResult")
}

Use Case 4: Confidence Averaging​

Average scores from multiple models:

val confidenceGraph = graph("confidence-averaging") {
parallel(
id = "models",
branches = mapOf(
"model-a" to modelA,
"model-b" to modelB,
"model-c" to modelC
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"confidence" to AggregationFunction.AVERAGE
)
)
)

merge("avg-score", "models", MergeStrategies.average)

output("result")
}

GraphBuilder DSL​

Convenient DSL for parallel workflows:

parallel()​

fun parallel(
id: String,
branches: Map<String, Node>,
mergePolicy: MergePolicy = MergePolicy.Namespace,
failFast: Boolean = true
)

Example:

graph("my-graph") {
parallel(
id = "process",
branches = mapOf(
"step-a" to nodeA,
"step-b" to nodeB
),
mergePolicy = MergePolicy.Namespace,
failFast = true
)
}

merge()​

fun merge(
id: String,
parallelNodeId: String,
merger: (Map<String, Any?>) -> Any?
)

Example:

graph("my-graph") {
parallel("process", branches)

merge("combine", "process") { results ->
// Custom merge logic
results.values.toList()
}
}

Error Handling​

Fail-Fast Mode (Default)​

Stop on first branch failure:

parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB, // This fails
"branch-c" to nodeC
),
failFast = true // Default
)

// Result: Entire parallel execution fails
// Error from branch-b propagated

Collect-All Mode​

Collect partial results even if some branches fail:

parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB, // This fails
"branch-c" to nodeC
),
failFast = false
)

// Result: Successful branches included
// Failed branches skipped
// Results: { "branch-a": ..., "branch-c": ... }

Performance​

Execution Time​

Sequential:

Total time = T(branch1) + T(branch2) + T(branch3)
Example: 2s + 2s + 2s = 6 seconds

Parallel:

Total time = max(T(branch1), T(branch2), T(branch3))
Example: max(2s, 2s, 2s) = 2 seconds
πŸš€ 3x faster!

Real-World Benchmarks​

Use CaseSequentialParallelSpeedup
Multi-LLM Voting6.2s2.1s2.95x
Multi-Source Fetch4.8s1.6s3.0x
Parallel Validation3.2s0.9s3.56x

Average speedup: 2.7x - 3x faster


Best Practices​

βœ… Do's​

  1. Parallelize independent operations

    // Good: Independent data sources
    parallel("fetch", branches = mapOf(
    "api" to apiCall,
    "db" to dbQuery,
    "cache" to cacheRead
    ))
  2. Use namespace for conflicting metadata

    parallel("process",
    branches = mapOf(...),
    mergePolicy = MergePolicy.Namespace
    )
  3. Choose appropriate merge strategy

    // Voting for consensus
    merge("vote", "llm-ensemble", MergeStrategies.vote)

    // Averaging for scores
    merge("avg", "scorers", MergeStrategies.average)
  4. Handle failures appropriately

    parallel("fetch",
    branches = mapOf(...),
    failFast = false // Collect partial results
    )

❌ Don'ts​

  1. Don't parallelize dependent operations

    // Bad: B depends on A's result
    parallel("bad", branches = mapOf(
    "step-a" to computeA,
    "step-b" to computeB // Uses A's output!
    ))
  2. Don't ignore merge conflicts

    // Bad: LastWrite with important data
    MergePolicy.LastWrite // Can lose data!

    // Good: Namespace or Custom
    MergePolicy.Namespace // Preserve all
  3. Don't overload resources

    // Bad: 100 parallel HTTP calls
    parallel("fetch", branches = (1..100).associate {
    "api-$it" to apiCall
    })

    // Good: Batch with reasonable limit
    parallel("fetch", branches = (1..10).associate {
    "batch-$it" to batchCall
    })

Advanced Examples​

Weighted Voting​

Weight votes by confidence scores:

merge("weighted-vote", "llm-ensemble") { results ->
val responses = results.mapValues { (_, data) ->
data as Map<String, Any>
}

// Weight by confidence
val weighted = responses.mapValues { (_, response) ->
val answer = response["answer"] as String
val confidence = response["confidence"] as Double
answer to confidence
}

// Sum confidences per answer
val scores = weighted.values
.groupBy { it.first }
.mapValues { (_, pairs) -> pairs.sumOf { it.second } }

// Winner = highest total confidence
val winner = scores.maxByOrNull { it.value }!!

mapOf(
"answer" to winner.key,
"totalConfidence" to winner.value
)
}

Conditional Branching + Parallel​

Combine with conditional edges:

graph("hybrid-flow") {
agent("classifier", classifierAgent)

// Conditional: High confidence β†’ parallel processing
edge("classifier", "parallel-process") { result ->
val confidence = getConfidence(result)
confidence > 0.8
}

// Conditional: Low confidence β†’ single processor
edge("classifier", "single-process") { result ->
val confidence = getConfidence(result)
confidence <= 0.8
}

parallel(
id = "parallel-process",
branches = mapOf(
"fast" to fastProcessor,
"accurate" to accurateProcessor
)
)

agent("single-process", slowButSafeProcessor)

merge("vote", "parallel-process", MergeStrategies.vote)

output("result")
}

Testing Parallel Execution​

Verify Timing​

@Test
fun `test parallel execution is concurrent`() = runTest {
val graph = graph("timing-test") {
parallel("parallel", branches = mapOf(
"a" to DelayNode("a", 100, "A"),
"b" to DelayNode("b", 100, "B"),
"c" to DelayNode("c", 100, "C")
))
output("result")
}

val executionTime = measureTimeMillis {
runner.run(graph, emptyMap())
}

// Should be ~100ms (parallel) not ~300ms (sequential)
assertTrue(executionTime < 150)
}

Verify Results​

@Test
fun `test merge produces correct result`() = runTest {
val graph = graph("merge-test") {
parallel("parallel", branches = mapOf(
"a" to SimpleNode("a", 10),
"b" to SimpleNode("b", 20),
"c" to SimpleNode("c", 30)
))

merge("sum", "parallel", MergeStrategies.sum)
output("result")
}

val result = runner.run(graph, emptyMap()).getOrThrow()

assertEquals(60.0, result.result) // 10 + 20 + 30
}

Troubleshooting​

Issue: "No results found for ParallelNode"​

Problem: MergeNode can't find parallel results

Solution: Ensure MergeNode comes after ParallelNode in graph

// Good
parallel("parallel", branches)
merge("merge", "parallel", merger) // After parallel

// Bad
merge("merge", "parallel", merger) // Before parallel!
parallel("parallel", branches)

Issue: Slow execution despite parallelism​

Problem: Operations not truly independent

Solution: Check for hidden dependencies

// Bad: Shared mutable state
val sharedList = mutableListOf<String>()
parallel("bad", branches = mapOf(
"a" to NodeThatModifies(sharedList), // Race condition!
"b" to NodeThatModifies(sharedList)
))

// Good: Independent operations
parallel("good", branches = mapOf(
"a" to NodeA(), // No shared state
"b" to NodeB()
))

Issue: Metadata conflicts​

Problem: Branches setting same keys

Solution: Use Namespace or Custom merge policy

// Solution 1: Namespace
mergePolicy = MergePolicy.Namespace

// Solution 2: Custom aggregation
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"conflictingKey" to AggregationFunction.VOTE
)
)

Migration from Sequential​

Before (Sequential)​

val graph = graph("sequential") {
agent("fetch", fetchAgent)
agent("validate", validateAgent)
agent("transform", transformAgent)
output("result")
}

// Time: T(fetch) + T(validate) + T(transform)

After (Parallel)​

val graph = graph("parallel") {
parallel(
id = "process",
branches = mapOf(
"fetch" to fetchAgent,
"validate" to validateAgent,
"transform" to transformAgent
)
)

merge("combine", "process") { results ->
mapOf(
"fetched" to results["fetch"],
"validated" to results["validate"],
"transformed" to results["transform"]
)
}

output("result")
}

// Time: max(T(fetch), T(validate), T(transform))
// πŸš€ Up to 3x faster!

Next Steps​