Skip to main content

Node API

Graph execution system for orchestrating multi-step workflows with state management.

Overview

The Node API is the core abstraction for Spice's graph-based execution system. Nodes represent units of work that can be:

  • Executed sequentially or in parallel
  • Connected through edges to form workflows
  • Paused and resumed with checkpointing
  • Integrated with human-in-the-loop patterns

Core Interfaces

Node

The fundamental building block of graph execution:

interface Node {
val id: String
suspend fun run(ctx: NodeContext): SpiceResult<NodeResult>
}

Every node must:

  • Have a unique id within the graph
  • Implement run() to perform its work
  • Return SpiceResult<NodeResult> for error handling

NodeContext

Execution context passed to each node:

data class NodeContext(
val graphId: String,
val state: Map<String, Any?>, // ✅ v0.9.0: Standard Map interface
val context: ExecutionContext
)

Fields:

  • graphId: Unique identifier for the current graph execution
  • state: Immutable state map shared across nodes (read-only access)
  • context: Execution metadata (tenantId, userId, correlationId, etc.)

Helper Methods:

// Create new context with updated state
fun withState(key: String, value: Any?): NodeContext
fun withState(updates: Map<String, Any?>): NodeContext

// Update execution context
fun withContext(newContext: ExecutionContext): NodeContext

// Preserve metadata for NodeResult
fun preserveMetadata(additional: Map<String, Any> = emptyMap()): Map<String, Any>

NodeResult

Result of node execution:

data class NodeResult private constructor(
val data: Any?,
val metadata: Map<String, Any>, // Non-nullable (nulls filtered)
val nextEdges: List<String> = emptyList()
)

Factory Methods:

// Explicit metadata
NodeResult.create(
data = myData,
metadata = mapOf("status" to "complete"),
nextEdges = listOf("next-node")
)

// Preserve context metadata
NodeResult.fromContext(
ctx = ctx,
data = result,
additional = mapOf("phase" to "processing")
)

// From HumanResponse (auto-propagates metadata)
NodeResult.fromHumanResponse(
ctx = ctx,
response = humanResponse
)

Built-in Node Types

AgentNode

Execute an agent with automatic Comm conversion:

AgentNode(
id = "analyzer",
agent = myAgent,
inputExtractor = { ctx ->
ctx.state["user_input"]?.toString() ?: "Default query"
}
)

Features:

  • Automatically converts state to Comm for agent
  • Extracts result from agent response
  • Propagates ExecutionContext

ToolNode

Execute a tool with parameters from state:

ToolNode(
id = "search",
tool = searchTool,
parameterExtractor = { ctx ->
mapOf(
"query" to ctx.state["search_query"],
"limit" to 10
)
}
)

Features:

  • Type-safe parameter extraction
  • Automatic error handling
  • Metadata propagation

HumanNode

Pause execution for human input:

HumanNode(
id = "approval",
question = "Approve this action?",
options = listOf(
HumanOption("approve", "Approve"),
HumanOption("reject", "Reject")
),
metadata = mapOf("required" to true)
)

Features:

  • Checkpoint creation
  • Resume with human response
  • Metadata propagation to next node

OutputNode

Format final output:

OutputNode(
id = "result",
formatter = { ctx ->
val result = ctx.state["analysis_result"]
"Analysis complete: $result"
}
)

Features:

  • Access to full state
  • Custom formatting logic
  • Final result extraction

ResponseNode

Structured response generation:

ResponseNode(
id = "response",
extractors = mapOf(
"workflow_message" to { ctx -> ctx.state["message"]?.toString() },
"reasoning" to { ctx -> ctx.state["reasoning"]?.toString() },
"data" to { ctx -> ctx.state["result_data"] }
)
)

Features:

  • Multiple field extraction
  • Type-safe access to state
  • Structured JSON output

State Management

Immutability Pattern

State is immutable - modifications create new NodeContext:

override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// ❌ Cannot modify state directly
// ctx.state["key"] = value // Compile error

// ✅ Create result with new state
return SpiceResult.success(
NodeResult.fromContext(
ctx = ctx,
data = "processed",
additional = mapOf(
"processed_at" to System.currentTimeMillis(),
"status" to "complete"
)
)
)
}

State Access Patterns

// Read from state
val input = ctx.state["user_input"]?.toString() ?: "default"
val count = (ctx.state["count"] as? Number)?.toInt() ?: 0
val enabled = ctx.state["enabled"] as? Boolean ?: false

// Pattern matching
when (ctx.state["action"]) {
"approve" -> handleApproval(ctx)
"reject" -> handleRejection(ctx)
else -> handleUnknown(ctx)
}

// Check existence
if (ctx.state.containsKey("optional_field")) {
// Handle optional data
}

State Propagation

State flows through the graph automatically:

graph {
node(AgentNode(
id = "analyzer",
agent = analyzer,
inputExtractor = { it.state["input"]?.toString() ?: "" }
))

node(ToolNode(
id = "enrich",
tool = enrichTool,
parameterExtractor = { ctx ->
// Access previous node's result
mapOf("data" to ctx.state["_previous"])
}
))

edge("analyzer", "enrich")
}

Special state keys:

  • _previous: Result of the previous node
  • workflow_message: Human-readable message
  • _checkpoint_id: Current checkpoint (if paused)

Metadata Propagation

Metadata flows through ExecutionContext:

// Initial context
val ctx = ExecutionContext(
tenantId = "tenant-123",
userId = "user-456",
metadata = mapOf("session_id" to "abc-123")
)

// Accessible in all nodes
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
val tenantId = ctx.context.tenantId // "tenant-123"
val sessionId = ctx.context["session_id"] // "abc-123"

// Add more metadata
return SpiceResult.success(
NodeResult.fromContext(
ctx,
data = result,
additional = mapOf(
"node_id" to id,
"execution_time" to System.currentTimeMillis()
)
)
)
}

Custom Node Implementation

Basic Custom Node

class ProcessingNode(
override val id: String,
private val processor: (String) -> String
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
return try {
val input = ctx.state["input"]?.toString()
?: return SpiceResult.failure("Missing input")

val result = processor(input)

SpiceResult.success(
NodeResult.fromContext(
ctx = ctx,
data = result,
additional = mapOf(
"processed" to true,
"length" to result.length
)
)
)
} catch (e: Exception) {
SpiceResult.failure("Processing failed: ${e.message}")
}
}
}

Using Custom Nodes in Graph DSL

Use the node() function to add custom node instances:

val graph = graph("my-workflow") {
// Built-in nodes
agent("analyzer", analyzerAgent)

// ✅ Custom node
node(ProcessingNode(
id = "processor",
processor = { input -> input.uppercase() }
))

// Another custom node
node(ConditionalNode(
id = "decision",
condition = { ctx -> ctx.state["score"] as? Int ?: 0 > 50 },
trueEdge = "success",
falseEdge = "retry"
))

output("result")
}

Conditional Node

class ConditionalNode(
override val id: String,
private val condition: (NodeContext) -> Boolean,
private val trueEdge: String,
private val falseEdge: String
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
val nextEdge = if (condition(ctx)) trueEdge else falseEdge

return SpiceResult.success(
NodeResult.fromContext(
ctx = ctx,
data = "Condition evaluated",
additional = mapOf("branch" to nextEdge)
).copy(nextEdges = listOf(nextEdge))
)
}
}

Async Processing Node

class AsyncProcessingNode(
override val id: String,
private val service: ExternalService
) : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
return try {
val data = ctx.state["data"]?.toString()
?: return SpiceResult.failure("Missing data")

// Async call
val result = withTimeout(30.seconds) {
service.process(data)
}

SpiceResult.success(
NodeResult.fromContext(
ctx = ctx,
data = result,
additional = mapOf(
"async" to true,
"service" to service.name
)
)
)
} catch (e: TimeoutCancellationException) {
SpiceResult.failure("Service timeout")
} catch (e: Exception) {
SpiceResult.failure("Service error: ${e.message}")
}
}
}

Error Handling

Node-Level Errors

override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// Validation errors
val input = ctx.state["input"]?.toString()
?: return SpiceResult.failure("Missing required input")

// Processing errors
return try {
val result = dangerousOperation(input)
SpiceResult.success(NodeResult.fromContext(ctx, result))
} catch (e: ValidationException) {
SpiceResult.failure("Validation failed: ${e.message}")
} catch (e: Exception) {
SpiceResult.failure("Processing failed: ${e.message}")
}
}

Graph-Level Error Actions

Configure error handling at graph level:

graph {
errorAction = ErrorAction.CONTINUE // Skip failed nodes
// or ErrorAction.FAIL (default)
// or ErrorAction.RETRY(maxAttempts = 3)
}

Best Practices

1. State Access

// ✅ GOOD: Safe with defaults
val count = (ctx.state["count"] as? Number)?.toInt() ?: 0

// ✅ GOOD: Explicit error
val id = ctx.state["id"]?.toString()
?: return SpiceResult.failure("Missing ID")

// ❌ BAD: Unsafe cast
val count = ctx.state["count"] as Int // Runtime exception risk

2. Metadata Size

Keep metadata under 5KB:

// ✅ GOOD: Small, focused metadata
mapOf(
"status" to "complete",
"count" to 42,
"timestamp" to System.currentTimeMillis()
)

// ❌ BAD: Large payload in metadata
mapOf(
"full_result" to largeJsonString // Use state instead!
)

3. Node Composition

// ✅ GOOD: Small, focused nodes
graph {
node(ValidateNode("validate"))
node(ProcessNode("process"))
node(FormatNode("format"))

edge("validate", "process")
edge("process", "format")
}

// ❌ BAD: One giant node
graph {
node(DoEverythingNode("all")) // Hard to test, reuse, debug
}

4. Context Propagation

// ✅ GOOD: Preserve and extend
NodeResult.fromContext(
ctx = ctx,
data = result,
additional = mapOf("phase" to "complete")
)

// ❌ BAD: Lose context
NodeResult.create(
data = result,
metadata = mapOf("phase" to "complete") // Lost tenantId, userId, etc!
)

See Also