Parallel Execution
Added in: 0.7.0
Execute multiple graph branches concurrently and merge results with flexible strategies. Parallel execution enables true concurrency for independent operations, significantly reducing total execution time.
Overviewβ
Parallel execution allows you to:
- Run independent operations concurrently - Execute multiple branches at the same time
- Reduce total execution time - Time = max(branches), not sum(branches)
- Merge results intelligently - Vote, average, or custom merge strategies
- Preserve metadata - Each branch's metadata tracked and merged
- Handle failures gracefully - Fail-fast or collect partial results
Performance gain: Up to 3x faster for independent workflows
Quick Startβ
Basic Exampleβ
import io.github.noailabs.spice.graph.dsl.graph
import io.github.noailabs.spice.graph.nodes.MergeStrategies
val parallelGraph = graph("parallel-example") {
// Execute branches concurrently
parallel(
id = "data-collection",
branches = mapOf(
"api" to apiAgent,
"database" to dbAgent,
"cache" to cacheAgent
)
)
// Merge results
merge(
id = "aggregate",
parallelNodeId = "data-collection"
) { results ->
combineResults(results)
}
output("result")
}
// Run graph
val runner = DefaultGraphRunner()
val result = runner.run(parallelGraph, input)
Result: Execution time β slowest branch (not sum of all branches)
Core Conceptsβ
ParallelNodeβ
Executes multiple branches concurrently using Kotlin coroutines:
val parallelNode = ParallelNode(
id = "parallel-processing",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB,
"branch-c" to nodeC
),
mergePolicy = MergePolicy.Namespace, // Default
failFast = true // Stop on first failure
)
Parameters:
id: Unique identifier for this parallel nodebranches: Map of branch ID to NodemergePolicy: How to merge metadata (default: Namespace)failFast: Stop on first error vs collect all results
MergeNodeβ
Aggregates results from parallel execution:
val mergeNode = MergeNode(
id = "combine",
parallelNodeId = "parallel-processing"
) { results ->
// Custom merge logic
results.values.toList()
}
Parameters:
id: Unique identifier for this merge nodeparallelNodeId: ID of the ParallelNode to mergemerger: Function to combine results
Merge Policiesβ
Control how metadata is merged across parallel branches.
Namespace (Default)β
Isolate each branch's metadata in separate namespaces:
parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB
),
mergePolicy = MergePolicy.Namespace
)
// Result metadata:
// {
// "parallel.process.branch-a.confidence": 0.8,
// "parallel.process.branch-b.confidence": 0.6
// }
Use when: Branches set same keys with different meanings
Custom Aggregationβ
Define aggregation functions per metadata key:
parallel(
id = "scorers",
branches = mapOf(
"model-a" to modelA,
"model-b" to modelB,
"model-c" to modelC
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"confidence" to AggregationFunction.AVERAGE,
"label" to AggregationFunction.VOTE,
"count" to AggregationFunction.SUM
)
)
)
// Result metadata:
// {
// "confidence": 0.75, // Average: (0.8 + 0.7) / 2
// "label": "cat", // Vote: most common
// "count": 150 // Sum: 100 + 50
// }
Available Aggregation Functions:
AVERAGE- Average of numeric valuesSUM- Sum of numeric valuesVOTE- Most common value (voting)MIN/MAX- Min/max of comparable valuesFIRST/LAST- First/last valueCONCAT_LIST- Combine all into list
LastWrite / FirstWriteβ
Simple overwrite strategies:
// LastWrite: Last branch overwrites
mergePolicy = MergePolicy.LastWrite
// FirstWrite: First branch wins
mergePolicy = MergePolicy.FirstWrite
Use when: Metadata conflicts don't matter
Built-in Merge Strategiesβ
Pre-built functions for common merge patterns:
import io.github.noailabs.spice.graph.nodes.MergeStrategies
// Vote: Most common result
merge("vote", "parallel", MergeStrategies.vote)
// Average: Numeric average
merge("avg", "parallel", MergeStrategies.average)
// Sum: Sum of values
merge("sum", "parallel", MergeStrategies.sum)
// Min/Max: Minimum or maximum
merge("min", "parallel", MergeStrategies.min)
merge("max", "parallel", MergeStrategies.max)
// First/Last: First or last result
merge("first", "parallel", MergeStrategies.first)
merge("last", "parallel", MergeStrategies.last)
// Concat: Combine into list
merge("list", "parallel", MergeStrategies.concatList)
// AsMap: Return all as map (no merging)
merge("all", "parallel", MergeStrategies.asMap)
Use Casesβ
Use Case 1: Multi-LLM Votingβ
Ask multiple LLMs and vote for consensus:
import io.github.noailabs.spice.springboot.ai.factory.SpringAIAgentFactory
import io.github.noailabs.spice.springboot.ai.factory.OpenAIConfig
import io.github.noailabs.spice.springboot.ai.factory.AnthropicConfig
// Create agents with Spring AI factory
val factory: SpringAIAgentFactory = ... // Inject
val gpt4Agent = factory.openai(
model = "gpt-4",
config = OpenAIConfig(agentId = "gpt4")
)
val claudeAgent = factory.anthropic(
model = "claude-3-5-sonnet-20241022",
config = AnthropicConfig(agentId = "claude")
)
val geminiAgent = factory.openai( // Or use appropriate factory method
model = "gpt-4-turbo", // Placeholder - use actual Gemini integration when available
config = OpenAIConfig(agentId = "gemini")
)
val llmVotingGraph = graph("llm-voting") {
parallel(
id = "llm-ensemble",
branches = mapOf(
"gpt4" to gpt4Agent,
"claude" to claudeAgent,
"gemini" to geminiAgent
)
)
merge(
id = "vote",
parallelNodeId = "llm-ensemble",
merger = MergeStrategies.vote
)
output("consensus")
}
// Performance: ~2s (parallel) vs ~6s (sequential)
// 3x faster!
Use Case 2: Multi-Source Data Fetchβ
Fetch from multiple sources concurrently:
val dataFetchGraph = graph("multi-source-fetch") {
parallel(
id = "fetch",
branches = mapOf(
"api" to fetchFromAPI,
"database" to fetchFromDB,
"cache" to fetchFromCache,
"file" to fetchFromFile
)
)
merge("aggregate", "fetch") { results ->
val totalRecords = results.values
.filterIsInstance<List<*>>()
.sumOf { it.size }
mapOf(
"sources" to results.keys.toList(),
"totalRecords" to totalRecords,
"data" to results
)
}
output("result")
}
Use Case 3: Parallel Validationβ
Run multiple validators concurrently:
val validationGraph = graph("parallel-validation") {
parallel(
id = "validators",
branches = mapOf(
"schema" to schemaValidator,
"business" to businessValidator,
"security" to securityValidator,
"format" to formatValidator
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"isValid" to AggregationFunction.MIN // All must pass
)
)
)
merge("result", "validators") { results ->
val allValid = results.values.all { it == true }
val failed = results.filter { (_, valid) -> valid != true }.keys
mapOf(
"isValid" to allValid,
"failedValidators" to failed.toList()
)
}
output("validationResult")
}
Use Case 4: Confidence Averagingβ
Average scores from multiple models:
val confidenceGraph = graph("confidence-averaging") {
parallel(
id = "models",
branches = mapOf(
"model-a" to modelA,
"model-b" to modelB,
"model-c" to modelC
),
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"confidence" to AggregationFunction.AVERAGE
)
)
)
merge("avg-score", "models", MergeStrategies.average)
output("result")
}
GraphBuilder DSLβ
Convenient DSL for parallel workflows:
parallel()β
fun parallel(
id: String,
branches: Map<String, Node>,
mergePolicy: MergePolicy = MergePolicy.Namespace,
failFast: Boolean = true
)
Example:
graph("my-graph") {
parallel(
id = "process",
branches = mapOf(
"step-a" to nodeA,
"step-b" to nodeB
),
mergePolicy = MergePolicy.Namespace,
failFast = true
)
}
merge()β
fun merge(
id: String,
parallelNodeId: String,
merger: (Map<String, Any?>) -> Any?
)
Example:
graph("my-graph") {
parallel("process", branches)
merge("combine", "process") { results ->
// Custom merge logic
results.values.toList()
}
}
Error Handlingβ
Fail-Fast Mode (Default)β
Stop on first branch failure:
parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB, // This fails
"branch-c" to nodeC
),
failFast = true // Default
)
// Result: Entire parallel execution fails
// Error from branch-b propagated
Collect-All Modeβ
Collect partial results even if some branches fail:
parallel(
id = "process",
branches = mapOf(
"branch-a" to nodeA,
"branch-b" to nodeB, // This fails
"branch-c" to nodeC
),
failFast = false
)
// Result: Successful branches included
// Failed branches skipped
// Results: { "branch-a": ..., "branch-c": ... }
Performanceβ
Execution Timeβ
Sequential:
Total time = T(branch1) + T(branch2) + T(branch3)
Example: 2s + 2s + 2s = 6 seconds
Parallel:
Total time = max(T(branch1), T(branch2), T(branch3))
Example: max(2s, 2s, 2s) = 2 seconds
π 3x faster!
Real-World Benchmarksβ
| Use Case | Sequential | Parallel | Speedup |
|---|---|---|---|
| Multi-LLM Voting | 6.2s | 2.1s | 2.95x |
| Multi-Source Fetch | 4.8s | 1.6s | 3.0x |
| Parallel Validation | 3.2s | 0.9s | 3.56x |
Average speedup: 2.7x - 3x faster
Best Practicesβ
β Do'sβ
-
Parallelize independent operations
// Good: Independent data sources
parallel("fetch", branches = mapOf(
"api" to apiCall,
"db" to dbQuery,
"cache" to cacheRead
)) -
Use namespace for conflicting metadata
parallel("process",
branches = mapOf(...),
mergePolicy = MergePolicy.Namespace
) -
Choose appropriate merge strategy
// Voting for consensus
merge("vote", "llm-ensemble", MergeStrategies.vote)
// Averaging for scores
merge("avg", "scorers", MergeStrategies.average) -
Handle failures appropriately
parallel("fetch",
branches = mapOf(...),
failFast = false // Collect partial results
)
β Don'tsβ
-
Don't parallelize dependent operations
// Bad: B depends on A's result
parallel("bad", branches = mapOf(
"step-a" to computeA,
"step-b" to computeB // Uses A's output!
)) -
Don't ignore merge conflicts
// Bad: LastWrite with important data
MergePolicy.LastWrite // Can lose data!
// Good: Namespace or Custom
MergePolicy.Namespace // Preserve all -
Don't overload resources
// Bad: 100 parallel HTTP calls
parallel("fetch", branches = (1..100).associate {
"api-$it" to apiCall
})
// Good: Batch with reasonable limit
parallel("fetch", branches = (1..10).associate {
"batch-$it" to batchCall
})
Advanced Examplesβ
Weighted Votingβ
Weight votes by confidence scores:
merge("weighted-vote", "llm-ensemble") { results ->
val responses = results.mapValues { (_, data) ->
data as Map<String, Any>
}
// Weight by confidence
val weighted = responses.mapValues { (_, response) ->
val answer = response["answer"] as String
val confidence = response["confidence"] as Double
answer to confidence
}
// Sum confidences per answer
val scores = weighted.values
.groupBy { it.first }
.mapValues { (_, pairs) -> pairs.sumOf { it.second } }
// Winner = highest total confidence
val winner = scores.maxByOrNull { it.value }!!
mapOf(
"answer" to winner.key,
"totalConfidence" to winner.value
)
}
Conditional Branching + Parallelβ
Combine with conditional edges:
graph("hybrid-flow") {
agent("classifier", classifierAgent)
// Conditional: High confidence β parallel processing
edge("classifier", "parallel-process") { result ->
val confidence = getConfidence(result)
confidence > 0.8
}
// Conditional: Low confidence β single processor
edge("classifier", "single-process") { result ->
val confidence = getConfidence(result)
confidence <= 0.8
}
parallel(
id = "parallel-process",
branches = mapOf(
"fast" to fastProcessor,
"accurate" to accurateProcessor
)
)
agent("single-process", slowButSafeProcessor)
merge("vote", "parallel-process", MergeStrategies.vote)
output("result")
}
Testing Parallel Executionβ
Verify Timingβ
@Test
fun `test parallel execution is concurrent`() = runTest {
val graph = graph("timing-test") {
parallel("parallel", branches = mapOf(
"a" to DelayNode("a", 100, "A"),
"b" to DelayNode("b", 100, "B"),
"c" to DelayNode("c", 100, "C")
))
output("result")
}
val executionTime = measureTimeMillis {
runner.run(graph, emptyMap())
}
// Should be ~100ms (parallel) not ~300ms (sequential)
assertTrue(executionTime < 150)
}
Verify Resultsβ
@Test
fun `test merge produces correct result`() = runTest {
val graph = graph("merge-test") {
parallel("parallel", branches = mapOf(
"a" to SimpleNode("a", 10),
"b" to SimpleNode("b", 20),
"c" to SimpleNode("c", 30)
))
merge("sum", "parallel", MergeStrategies.sum)
output("result")
}
val result = runner.run(graph, emptyMap()).getOrThrow()
assertEquals(60.0, result.result) // 10 + 20 + 30
}
Troubleshootingβ
Issue: "No results found for ParallelNode"β
Problem: MergeNode can't find parallel results
Solution: Ensure MergeNode comes after ParallelNode in graph
// Good
parallel("parallel", branches)
merge("merge", "parallel", merger) // After parallel
// Bad
merge("merge", "parallel", merger) // Before parallel!
parallel("parallel", branches)
Issue: Slow execution despite parallelismβ
Problem: Operations not truly independent
Solution: Check for hidden dependencies
// Bad: Shared mutable state
val sharedList = mutableListOf<String>()
parallel("bad", branches = mapOf(
"a" to NodeThatModifies(sharedList), // Race condition!
"b" to NodeThatModifies(sharedList)
))
// Good: Independent operations
parallel("good", branches = mapOf(
"a" to NodeA(), // No shared state
"b" to NodeB()
))
Issue: Metadata conflictsβ
Problem: Branches setting same keys
Solution: Use Namespace or Custom merge policy
// Solution 1: Namespace
mergePolicy = MergePolicy.Namespace
// Solution 2: Custom aggregation
mergePolicy = MergePolicy.Custom(
aggregators = mapOf(
"conflictingKey" to AggregationFunction.VOTE
)
)
Migration from Sequentialβ
Before (Sequential)β
val graph = graph("sequential") {
agent("fetch", fetchAgent)
agent("validate", validateAgent)
agent("transform", transformAgent)
output("result")
}
// Time: T(fetch) + T(validate) + T(transform)
After (Parallel)β
val graph = graph("parallel") {
parallel(
id = "process",
branches = mapOf(
"fetch" to fetchAgent,
"validate" to validateAgent,
"transform" to transformAgent
)
)
merge("combine", "process") { results ->
mapOf(
"fetched" to results["fetch"],
"validated" to results["validate"],
"transformed" to results["transform"]
)
}
output("result")
}
// Time: max(T(fetch), T(validate), T(transform))
// π Up to 3x faster!
Next Stepsβ
- Explore Graph Patterns for more workflow designs
- Learn Graph Middleware for cross-cutting concerns
- Review Error Handling for robust workflows
- Check Performance Optimization for tips