Graph System
Added in: 0.5.0
The Graph System is a powerful orchestration framework inspired by Microsoft's Agent Framework, enabling you to build complex, multi-step AI workflows with fine-grained control over execution flow, error handling, and state management.
Overviewβ
The Graph System provides three core abstractions:
- Node: A unit of work (Agent, Tool, or custom logic)
- Graph: A directed acyclic graph (DAG) connecting nodes
- Runner: Executes the graph with middleware support
// Simple example
val graph = graph("my-workflow") {
agent("analyzer", analysisAgent)
tool("processor", processorTool) { mapOf("input" to it.state["analyzer"]) }
output("result") { it.state["processor"] }
}
val report = DefaultGraphRunner().run(graph, mapOf("input" to "data")).getOrThrow()
Key Featuresβ
π Flexible Node Typesβ
- AgentNode: Execute any Spice Agent
- ToolNode: Execute any Spice Tool
- OutputNode: Transform and output results
- Custom Nodes: Implement
Nodeinterface
π― Smart Execution Flowβ
- Sequential execution with automatic state management
- Conditional edges for dynamic routing
- Multiple paths from a single node
π‘οΈ Robust Error Handlingβ
- ErrorAction system (RETRY, SKIP, CONTINUE, PROPAGATE)
- Automatic retry with configurable limits
- Graceful degradation with SKIP/CONTINUE
πΎ Checkpoint & Resumeβ
- Save execution state at any point
- Resume from failure without re-executing completed nodes
- Configurable checkpointing (every N nodes, on error, time-based)
π Graph Validationβ
- Pre-execution validation catches errors early
- Cycle detection ensures DAG structure
- Unreachable node detection
- Invalid reference checking
π¨ Middleware Systemβ
- Intercept execution at graph and node level
- Metrics collection with
onNodehooks - Custom error handling with
onErrorhooks - Lifecycle hooks:
onStart,onFinish
π Context Propagationβ
- AgentContext flows through all nodes automatically
- Multi-tenant support built-in
- Correlation IDs for distributed tracing
Architectureβ
βββββββββββββββββββββββββββββββββββββββββββ
β GraphRunner β
β ββββββββββββββββββββββββββββββββββββββ β
β β Middleware Chain β β
β β β’ onStart β β
β β β’ onNode (for each node) β β
β β β’ onError (on failures) β β
β β β’ onFinish β β
β ββββββββββββββββββββββββββββββββββββββ β
β β
β ββββββββββββββββββββββββββββββββββββββ β
β β Graph Execution β β
β β β’ Validate graph structure β β
β β β’ Execute nodes sequentially β β
β β β’ Handle errors with ErrorAction β β
β β β’ Save checkpoints (optional) β β
β ββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββ
β Node Execution β
β β’ AgentNode β
β β’ ToolNode β
β β’ OutputNode β
β β’ Custom Node β
βββββββββββββββββββββββ
β
βββββββββββββββββββββββ
β AgentContext β
β (auto-propagated) β
βββββββββββββββββββββββ
Quick Startβ
1. Define a Simple Graphβ
val graph = graph("greeting-workflow") {
// Agent node: processes input
agent("greeter", greetingAgent)
// Output node: transforms result
output("result") { ctx ->
ctx.state["greeter"]
}
}
2. Execute the Graphβ
val runner = DefaultGraphRunner()
val result = runner.run(
graph = graph,
input = mapOf("name" to "Alice")
)
when (result) {
is SpiceResult.Success -> println("Result: ${result.value.result}")
is SpiceResult.Failure -> println("Error: ${result.error.message}")
}
3. Access Execution Reportβ
val report = result.getOrThrow()
println("Graph: ${report.graphId}")
println("Status: ${report.status}")
println("Duration: ${report.duration}")
println("Nodes executed: ${report.nodeReports.size}")
report.nodeReports.forEach { nodeReport ->
println(" - ${nodeReport.nodeId}: ${nodeReport.status} (${nodeReport.duration})")
}
Multi-Step Workflow Exampleβ
val workflow = graph("data-processing") {
// Step 1: Validate input
tool("validator", validationTool) { ctx ->
mapOf("data" to ctx.state["input"])
}
// Step 2: Process with AI
agent("processor", processingAgent)
// Step 3: Store results
tool("storage", storageTool) { ctx ->
mapOf(
"validation" to ctx.state["validator"],
"processed" to ctx.state["processor"]
)
}
// Output combined result
output("summary") { ctx ->
mapOf(
"validation" to ctx.state["validator"],
"processing" to ctx.state["processor"],
"storage" to ctx.state["storage"]
)
}
}
Conditional Routingβ
val graph = graph("conditional-workflow") {
// Decision node
agent("classifier", classificationAgent)
// Route A
agent("route-a", routeAAgent)
// Route B
agent("route-b", routeBAgent)
output("result") { it.state["_previous"] }
// Custom edges with conditions
edges {
edge("classifier", "route-a") { result ->
result.data == "type-a"
}
edge("classifier", "route-b") { result ->
result.data == "type-b"
}
edge("route-a", "result")
edge("route-b", "result")
}
}
Error Handling with Middlewareβ
val retryMiddleware = object : Middleware {
override suspend fun onError(
err: Throwable,
ctx: RunContext
): ErrorAction {
return when {
err.message?.contains("retry") == true -> ErrorAction.RETRY
err.message?.contains("skip") == true -> ErrorAction.SKIP
else -> ErrorAction.PROPAGATE
}
}
}
val graph = Graph(
id = "resilient-workflow",
nodes = nodes,
edges = edges,
entryPoint = "start",
middleware = listOf(retryMiddleware)
)
Checkpoint & Resumeβ
val store = InMemoryCheckpointStore()
val config = CheckpointConfig(
saveEveryNNodes = 5,
saveOnError = true
)
// Run with checkpointing
val result = runner.runWithCheckpoint(graph, input, store, config)
// Later: resume from failure
if (result.isFailure) {
val checkpoints = store.listByGraph(graph.id).getOrThrow()
val latestCheckpoint = checkpoints.first()
val resumeResult = runner.resume(graph, latestCheckpoint.id, store)
}
Best Practicesβ
β Do'sβ
- Validate graphs before deployment
- Use meaningful node IDs for debugging
- Leverage middleware for cross-cutting concerns
- Enable checkpointing for long-running workflows
- Use conditional edges for dynamic routing
- Propagate context for multi-tenant scenarios
β Don'tsβ
- Don't create cycles - graphs must be DAGs
- Don't ignore validation errors - they catch issues early
- Don't skip error handling - use ErrorAction appropriately
- Don't forget to clean up checkpoints - they're cleaned automatically on success
Next Stepsβ
- Learn about Graph Nodes
- Explore Middleware System
- Master Checkpoint & Resume
- Understand Graph Validation