Graph Nodes
Added in: 0.5.0
Nodes are the building blocks of graphs. Each node represents a unit of work that can be executed within a graph workflow.
Node Interfaceβ
All nodes implement the Node interface:
interface Node {
val id: String
suspend fun run(ctx: NodeContext): SpiceResult<NodeResult>
}
NodeContextβ
Breaking Change in: 0.6.0
The context passed to each node during execution:
data class NodeContext(
val graphId: String,
val state: PersistentMap<String, Any?>, // Immutable!
val context: ExecutionContext // Unified!
)
- graphId: Unique identifier for the graph
- state: Immutable state (use
withStateto modify) - context: Unified execution context (tenant, user, custom metadata)
Key Changes from 0.5.x:
metadata+agentContextβ unifiedcontext: ExecutionContextstateis now immutable (PersistentMap)- Use
ctx.withState()instead of direct mutation
See ExecutionContext API for details.
NodeResultβ
Breaking Change in: 0.6.0
The result returned by node execution:
// Constructor is private - use factories!
data class NodeResult private constructor(
val data: Any?,
val metadata: Map<String, Any>,
val nextEdges: List<String> = emptyList()
)
Factory methods:
// Preferred: preserve context
NodeResult.fromContext(ctx, data = result, additional = mapOf("key" to "value"))
// Explicit metadata
NodeResult.create(data = result, metadata = mapOf("key" to "value"))
Built-in Node Typesβ
AgentNodeβ
Executes a Spice Agent within the graph.
class AgentNode(
override val id: String,
val agent: Agent,
val inputKey: String? = null // Which key from state to use as input
) : Node
Usage:
// Using DSL
val graph = graph("my-graph") {
agent("analyzer", analysisAgent) // Uses "_previous" or "input" from state
agent("processor", processorAgent, inputKey = "analyzer") // Uses specific key
}
// Manually
val node = AgentNode(
id = "my-agent",
agent = myAgent,
inputKey = "custom-input"
)
How it works:
- Retrieves input from
state[inputKey](or_previous/inputif not specified) - Creates a
Commwith the input content - Passes
AgentContextfrom node context to the agent - Returns agent's response in
NodeResult
Critical: AgentNode stores only Comm.content in state for downstream nodes, but automatically propagates Comm.data metadata across the graph!
agent("processor", myAgent) // Agent returns Comm("result text", data = mapOf(...))
// What's stored in state:
// state["processor"] = "result text" // β
String (content only)
// state["_previousComm"] = Comm(...) // β
Full Comm (for metadata)
// NOT: state["processor"] = Comm(...) // β Content stored as string
Full conversion process with metadata:
// Internal AgentNode implementation (simplified):
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// 1οΈβ£ Get input as String
val inputContent = ctx.state["_previous"]?.toString() ?: ""
// 2οΈβ£ Extract previous metadata
val previousComm = ctx.state["_previousComm"] as? Comm
val previousData = previousComm?.data ?: emptyMap()
// 3οΈβ£ Create Comm with propagated metadata
val comm = Comm(
content = inputContent,
from = "graph-${ctx.graphId}",
context = ctx.agentContext, // β¨ Auto-propagates context
data = previousData // β¨ Auto-propagates metadata!
)
// 4οΈβ£ Call agent
return agent.processComm(comm) // SpiceResult<Comm>
.map { response ->
// 5οΈβ£ Store full Comm for next node
ctx.state["_previousComm"] = response
// 6οΈβ£ Extract content for state
NodeResult(
data = response.content, // β οΈ Content string only!
metadata = mapOf(
"agentId" to agent.id,
"tenantId" to ctx.agentContext?.tenantId
)
)
} // Returns SpiceResult<NodeResult>
}
Chain behavior with metadata:
val graph = graph("chain") {
agent("step1", agent1) // returns Comm("result1", data = mapOf("key1" to "value1"))
agent("step2", agent2) // receives "result1" as content + metadata from step1
agent("step3", agent3) // receives accumulated metadata from step1 & step2
}
// Internal flow:
// step1: processComm(Comm("input", data = {}))
// β Comm("result1", data = {"key1": "value1"})
// β state["step1"] = "result1"
// β state["_previousComm"] = Comm("result1", data = {"key1": "value1"})
// step2: processComm(Comm("result1", data = {"key1": "value1"})) β metadata propagated!
// β Comm("result2", data = {"key1": "value1", "key2": "value2"})
// β state["step2"] = "result2"
// β state["_previousComm"] = Comm("result2", data = {"key1": "value1", "key2": "value2"})
// step3: processComm(Comm("result2", data = {"key1": "value1", "key2": "value2"}))
// β All metadata from previous agents is available!
Example: Using metadata across agents
val enricherAgent = object : Agent {
override val id = "enricher"
override suspend fun processComm(comm: Comm): SpiceResult<Comm> {
// Add metadata to response
return SpiceResult.success(
comm.reply(
content = "Enriched: ${comm.content}",
from = id,
data = mapOf("enrichedAt" to System.currentTimeMillis().toString())
)
)
}
// ... other methods
}
val consumerAgent = object : Agent {
override val id = "consumer"
override suspend fun processComm(comm: Comm): SpiceResult<Comm> {
// Access metadata from previous agent
val enrichedAt = comm.data["enrichedAt"]
return SpiceResult.success(
comm.reply("Processed at $enrichedAt: ${comm.content}", id)
)
}
// ... other methods
}
val graph = graph("metadata-example") {
agent("enricher", enricherAgent)
agent("consumer", consumerAgent) // Automatically receives metadata!
}
If you need the full Comm object in state:
// Use a custom node
class FullCommAgentNode(
override val id: String,
val agent: Agent
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
val previousComm = ctx.state["_previousComm"] as? Comm
?: ctx.state["_previous"] as? Comm
?: Comm("", "")
return agent.processComm(previousComm)
.map { response ->
ctx.state["_previousComm"] = response
NodeResult(data = response) // β
Store full Comm in node state too
}
}
}
Initializing with metadata:
There are three ways to initialize a graph with metadata:
Method 1: Using "comm" key (Recommended)
// Pass initial Comm with metadata via "comm" key
val initialComm = Comm(
content = "Start",
from = "user",
data = mapOf("sessionId" to "session-123", "priority" to "high")
)
val initialState = mapOf(
"input" to initialComm.content,
"comm" to initialComm // β
First node picks up metadata automatically
)
val report = runner.run(graph, initialState).getOrThrow()
// All agents in the graph can access sessionId and priority!
Method 2: Using "_previousComm" key
// Alternative: use _previousComm (same as previous node pattern)
val initialState = mapOf(
"input" to "Start",
"_previousComm" to initialComm // β
Also works
)
Method 3: Using "metadata" map directly
// Pass metadata as a direct map (fallback pattern)
val initialState = mapOf(
"input" to "Start",
"metadata" to mapOf(
"sessionId" to "session-123",
"priority" to "high"
)
)
Priority Order:
AgentNode checks for metadata in this order:
_previousComm(from previous node)comm(initial Comm from graph input)metadata(direct metadata map)
Recommendation: Use "comm" for clarity - it makes it obvious you're passing a complete Comm object with metadata.
Example:
val greetingAgent = object : Agent {
override val id = "greeter"
// ... implementation
override suspend fun processComm(comm: Comm): SpiceResult<Comm> {
return SpiceResult.success(
comm.reply("Hello, ${comm.content}!", id)
)
}
}
val graph = graph("greeting") {
agent("greeter", greetingAgent)
output("result") { it.state["greeter"] }
}
// Input "Alice" -> Output "Hello, Alice!"
ToolNodeβ
Executes a Spice Tool within the graph.
class ToolNode(
override val id: String,
val tool: Tool,
val paramMapper: (NodeContext) -> Map<String, Any?> = { it.state }
) : Node
Usage:
// Using DSL
val graph = graph("my-graph") {
tool("processor", processorTool) { ctx ->
mapOf(
"input" to ctx.state["data"],
"format" to "json"
)
}
}
// Manually
val node = ToolNode(
id = "my-tool",
tool = myTool,
paramMapper = { ctx ->
mapOf("param" to ctx.state["value"])
}
)
How it works:
- Maps node context to tool parameters using
paramMapper - Filters out null values
- Passes
AgentContextto tool (if available) - Returns tool result in
NodeResult
Example:
val calculatorTool = object : Tool {
override val name = "calculator"
// ... implementation
override suspend fun execute(
parameters: Map<String, Any>,
context: ToolContext
): SpiceResult<ToolResult> {
val a = parameters["a"] as Int
val b = parameters["b"] as Int
return SpiceResult.success(
ToolResult(success = true, result = a + b)
)
}
}
val graph = graph("calculation") {
tool("add", calculatorTool) { ctx ->
mapOf("a" to 5, "b" to 3)
}
output("result") { it.state["add"] }
}
// Output: 8
OutputNodeβ
Transforms and outputs the final result from the graph.
class OutputNode(
override val id: String,
val transformer: (NodeContext) -> Any? = { it.state["_previous"] }
) : Node
Is OutputNode Required?
No! OutputNode is completely optional. Here's how it works:
// β
Without OutputNode - returns last node's result
val simpleGraph = graph("simple") {
agent("processor", processorAgent)
// No output() needed
}
val report = runner.run(simpleGraph, input).getOrThrow()
// report.result = processor's NodeResult.data
// β
With OutputNode - for transformation/selection
val advancedGraph = graph("advanced") {
agent("step1", agent1)
agent("step2", agent2)
output("custom") { ctx ->
// Return step1 instead of step2
ctx.state["step1"]
}
}
val report = runner.run(advancedGraph, input).getOrThrow()
// report.result = step1's NodeResult.data (from output selector)
When to use OutputNode:
- Need to select specific node results (not the last one)
- Want to combine multiple node results
- Need to transform the final output
- Want explicit control over return value
When to skip OutputNode:
- Simple linear workflows
- Last node's result is exactly what you need
- No transformation required
Usage:
// Using DSL
val graph = graph("my-graph") {
agent("step1", agent1)
agent("step2", agent2)
// Simple output (uses specific node's result)
output("result") { it.state["step2"] }
// Complex transformation
output("summary") { ctx ->
mapOf(
"step1_result" to ctx.state["step1"],
"step2_result" to ctx.state["step2"],
"total_steps" to 2
)
}
}
Example:
val graph = graph("analytics") {
agent("analyzer", analysisAgent)
tool("processor", processorTool) { mapOf("data" to it.state["analyzer"]) }
output("report") { ctx ->
mapOf(
"analysis" to ctx.state["analyzer"],
"processed" to ctx.state["processor"],
"timestamp" to System.currentTimeMillis(),
"graph_id" to ctx.graphId
)
}
}
Custom Nodesβ
Create custom nodes by implementing the Node interface:
class DelayNode(
override val id: String,
val delayMs: Long
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
return SpiceResult.catchingSuspend {
kotlinx.coroutines.delay(delayMs)
NodeResult(
data = "Delayed for ${delayMs}ms",
metadata = ctx.metadata // π₯ Always preserve metadata!
)
}
}
}
// Usage
val graph = Graph(
id = "delayed-workflow",
nodes = mapOf(
"delay" to DelayNode("delay", delayMs = 1000),
"output" to OutputNode("output")
),
edges = listOf(Edge("delay", "output")),
entryPoint = "delay"
)
Advanced Custom Node Exampleβ
class ConditionalSplitNode(
override val id: String,
val condition: (Any?) -> Boolean
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
return SpiceResult.catching {
val input = ctx.state["_previous"]
val result = if (condition(input)) "path-a" else "path-b"
NodeResult(
data = result,
metadata = ctx.metadata + mapOf( // π₯ Preserve existing metadata!
"condition_met" to condition(input),
"input" to input
)
)
}
}
}
val graph = graph("conditional") {
// Would need to use Graph constructor with custom nodes
}
// With manual construction
val graph = Graph(
id = "split-workflow",
nodes = mapOf(
"splitter" to ConditionalSplitNode("splitter") { it is String && it.startsWith("A") },
"path-a" to OutputNode("path-a"),
"path-b" to OutputNode("path-b"),
"result" to OutputNode("result")
),
edges = listOf(
Edge("splitter", "path-a") { it.data == "path-a" },
Edge("splitter", "path-b") { it.data == "path-b" },
Edge("path-a", "result"),
Edge("path-b", "result")
),
entryPoint = "splitter"
)
Node State Managementβ
Accessing Stateβ
val node = object : Node {
override val id = "my-node"
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// Access previous node's output
val previousResult = ctx.state["_previous"]
// Access specific node's output
val step1Result = ctx.state["step1"]
// Access initial input
val input = ctx.state["input"]
return SpiceResult.success(NodeResult(
data = "processed",
metadata = ctx.metadata // π₯ Always preserve metadata!
))
}
}
Modifying Stateβ
val node = object : Node {
override val id = "counter"
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// Read current count
val count = (ctx.state["count"] as? Int) ?: 0
// Update state (shared across all nodes)
ctx.state["count"] = count + 1
ctx.state["last_updated"] = System.currentTimeMillis()
return SpiceResult.success(NodeResult(
data = count + 1,
metadata = ctx.metadata // π₯ Always preserve metadata!
))
}
}
Context Propagationβ
AgentContext automatically propagates through all nodes:
// Set context in coroutine scope
val agentContext = AgentContext.of(
"tenantId" to "tenant-123",
"userId" to "user-456"
)
withContext(agentContext) {
runner.run(graph, input)
}
// Nodes automatically receive context
class ContextAwareNode(override val id: String) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
val tenantId = ctx.agentContext?.tenantId
val userId = ctx.agentContext?.userId
println("Processing for tenant: $tenantId, user: $userId")
return SpiceResult.success(NodeResult(
data = "processed",
metadata = ctx.metadata // π₯ Always preserve metadata!
))
}
}
Node Execution Reportβ
After graph execution, you can inspect each node's execution:
val report = runner.run(graph, input).getOrThrow()
report.nodeReports.forEach { nodeReport ->
println("Node: ${nodeReport.nodeId}")
println(" Status: ${nodeReport.status}") // SUCCESS, FAILED, SKIPPED
println(" Duration: ${nodeReport.duration}")
println(" Output: ${nodeReport.output}")
}
Best Practicesβ
β Do'sβ
- Keep nodes focused - Each node should do one thing well
- Use meaningful IDs - Helps with debugging and state access
- Handle errors gracefully - Return
SpiceResult.failure()instead of throwing - Document custom nodes - Explain what they do and their expected inputs
- Use state wisely - Only store what's needed for downstream nodes
β Don'tsβ
- Don't mutate external state - Nodes should be side-effect free (except via state)
- Don't throw exceptions - Use
SpiceResultfor error handling - Don't create tight coupling - Nodes shouldn't know about each other's internals
- Don't store large objects in state - Can impact checkpoint performance
Next Stepsβ
- Explore Graph Middleware
- Learn about Checkpoint & Resume
- Understand Graph Validation