Graph API Reference
Complete API reference for Spice 0.5.0 Graph System - Microsoft Agent Framework inspired orchestration.
Core Types​
Graph​
data class Graph(
val id: String,
val nodes: Map<String, Node>,
val edges: List<Edge>,
val entryPoint: String,
val middleware: List<Middleware> = emptyList()
)
Properties:
id- Unique identifier for the graphnodes- Map of node ID to Node instanceedges- List of edges connecting nodesentryPoint- ID of the starting nodemiddleware- List of middleware to intercept execution
Example:
val graph = Graph(
id = "my-workflow",
nodes = mapOf(
"start" to AgentNode("start", myAgent),
"process" to ToolNode("process", myTool)
),
edges = listOf(
Edge("start", "process")
),
entryPoint = "start"
)
Node​
interface Node {
val id: String
suspend fun run(ctx: NodeContext): SpiceResult<NodeResult>
}
Built-in Node Types:
AgentNode​
class AgentNode(
override val id: String,
val agent: Agent
) : Node
Executes a Spice Agent within a graph.
ToolNode​
class ToolNode(
override val id: String,
val tool: Tool,
val paramMapper: (NodeContext) -> Map<String, Any?>
) : Node
Executes a Spice Tool within a graph.
OutputNode​
class OutputNode(
override val id: String,
val selector: (NodeContext) -> Any?
) : Node
Selects and returns final output from graph state.
HumanNode (HITL)​
class HumanNode(
override val id: String,
val prompt: String,
val options: List<HumanOption> = emptyList(),
val timeout: Duration? = null,
val validator: ((HumanResponse) -> Boolean)? = null,
val allowFreeText: Boolean = options.isEmpty()
) : Node
Pauses graph execution for human input.
Edge​
data class Edge(
val from: String,
val to: String,
val condition: (NodeResult) -> Boolean = { true }
)
Properties:
from- Source node IDto- Destination node IDcondition- Predicate to determine if edge should be followed
Example:
// Unconditional edge
Edge(from = "agent1", to = "agent2")
// Conditional edge
Edge(
from = "decision",
to = "approved"
) { result ->
(result.data as? HumanResponse)?.selectedOption == "approve"
}
NodeContext​
Added in: 0.5.0
Breaking Change in: 0.6.0
data class NodeContext(
val graphId: String,
val state: PersistentMap<String, Any?>, // Immutable!
val context: ExecutionContext // Unified context
)
Properties:
graphId- ID of the graph being executedstate- Immutable state (usewithStateto modify)context- Unified execution context (tenant, user, custom metadata)
Usage:
class MyNode : Node {
override suspend fun run(ctx: NodeContext): SpiceResult<NodeResult> {
// Read from state
val previousResult = ctx.state["previous-node"]
// Read context (type-safe accessors)
val tenantId = ctx.context.tenantId
val userId = ctx.context.userId
val customValue = ctx.context.get("customKey")
// Return result with metadata (state updates via metadata)
return SpiceResult.success(
NodeResult.fromContext(
ctx,
data = "result",
additional = mapOf("myKey" to "value")
)
)
}
}
Factory & Builders:
// Create NodeContext
val ctx = NodeContext.create(
graphId = "graph-id",
state = mapOf("input" to "data"),
context = ExecutionContext.of(mapOf("tenantId" to "tenant-123"))
)
// Update state (returns new NodeContext)
val updated = ctx.withState("key", "value")
// Update context
val enriched = ctx.withContext(newExecutionContext)
NodeResult​
Breaking Change in: 0.6.0
// Constructor is now private - use factories!
data class NodeResult private constructor(
val data: Any?,
val metadata: Map<String, Any>,
val nextEdges: List<String> = emptyList()
)
Factory Methods:
// Preferred: preserve context metadata
NodeResult.fromContext(
ctx = ctx,
data = result,
additional = mapOf("key" to "value")
)
// Explicit metadata
NodeResult.create(
data = result,
metadata = mapOf("key" to "value")
)
Properties:
data- Result data from node executionmetadata- Execution metadata (propagated to next node)nextEdges- Optional edge IDs to follow
Size Policies:
// Default: warn at 5KB, no hard limit
NodeResult.METADATA_WARN_THRESHOLD // 5000
NodeResult.HARD_LIMIT = 10_000 // Optional hard limit
NodeResult.onOverflow = NodeResult.OverflowPolicy.WARN // WARN | FAIL | IGNORE
Graph Execution​
GraphRunner​
interface GraphRunner {
suspend fun run(
graph: Graph,
input: Map<String, Any?>
): SpiceResult<RunReport>
suspend fun runWithCheckpoint(
graph: Graph,
input: Map<String, Any?>,
store: CheckpointStore,
config: CheckpointConfig = CheckpointConfig()
): SpiceResult<RunReport>
suspend fun resume(
graph: Graph,
checkpointId: String,
store: CheckpointStore,
config: CheckpointConfig = CheckpointConfig()
): SpiceResult<RunReport>
suspend fun resumeWithHumanResponse(
graph: Graph,
checkpointId: String,
response: HumanResponse,
store: CheckpointStore
): SpiceResult<RunReport>
suspend fun getPendingInteractions(
checkpointId: String,
store: CheckpointStore
): SpiceResult<List<HumanInteraction>>
}
Default Implementation: DefaultGraphRunner
Example:
val runner = DefaultGraphRunner()
// Basic execution
val result = runner.run(
graph = myGraph,
input = mapOf("key" to "value")
).getOrThrow()
// With checkpointing
val checkpointStore = InMemoryCheckpointStore()
val result = runner.runWithCheckpoint(
graph = myGraph,
input = mapOf("key" to "value"),
store = checkpointStore,
config = CheckpointConfig(saveEveryNNodes = 5)
).getOrThrow()
// Resume from checkpoint
val resumed = runner.resume(
graph = myGraph,
checkpointId = "checkpoint-id",
store = checkpointStore
).getOrThrow()
RunReport​
data class RunReport(
val graphId: String,
val status: RunStatus,
val result: Any?,
val duration: Duration,
val nodeReports: List<NodeReport>,
val error: Throwable? = null,
val checkpointId: String? = null
)
Properties:
graphId- ID of executed graphstatus- Execution status (SUCCESS, FAILED, CANCELLED, PAUSED)result- Final result from graphduration- Total execution timenodeReports- List of individual node execution reportserror- Exception if failedcheckpointId- Checkpoint ID if paused (HITL)
RunStatus​
enum class RunStatus {
SUCCESS, // Completed successfully
FAILED, // Failed with error
CANCELLED, // Cancelled by user
PAUSED // Paused for human input (HITL)
}
NodeReport​
data class NodeReport(
val nodeId: String,
val startTime: Instant,
val duration: Duration,
val status: NodeStatus,
val output: Any?
)
NodeStatus:
enum class NodeStatus {
SUCCESS, // Executed successfully
FAILED, // Failed with error
SKIPPED // Skipped due to middleware
}
Checkpointing​
Checkpoint​
data class Checkpoint(
val id: String,
val runId: String,
val graphId: String,
val currentNodeId: String,
val state: Map<String, Any?>,
val agentContext: AgentContext? = null,
val timestamp: Instant = Instant.now(),
val metadata: Map<String, Any> = emptyMap(),
val executionState: GraphExecutionState = GraphExecutionState.RUNNING,
val pendingInteraction: HumanInteraction? = null,
val humanResponse: HumanResponse? = null
)
Properties:
id- Unique checkpoint IDrunId- ID of graph execution rungraphId- ID of graphcurrentNodeId- Node where checkpoint was createdstate- Snapshot of graph stateagentContext- Multi-tenant contexttimestamp- When checkpoint was createdexecutionState- Graph state (RUNNING, WAITING_FOR_HUMAN, etc.)pendingInteraction- Human interaction if pausedhumanResponse- Human's response if resuming
CheckpointStore​
interface CheckpointStore {
suspend fun save(checkpoint: Checkpoint): SpiceResult<String>
suspend fun load(checkpointId: String): SpiceResult<Checkpoint>
suspend fun delete(checkpointId: String): SpiceResult<Unit>
suspend fun listByRun(runId: String): SpiceResult<List<Checkpoint>>
suspend fun deleteByRun(runId: String): SpiceResult<Unit>
}
Built-in Implementations:
InMemoryCheckpointStore- For development/testing- Custom stores can be implemented for production
CheckpointConfig​
data class CheckpointConfig(
val saveEveryNNodes: Int? = null,
val saveEveryNSeconds: Long? = null,
val maxCheckpointsPerRun: Int = 10,
val saveOnError: Boolean = true
)
Example:
// Save checkpoint every 5 nodes
CheckpointConfig(saveEveryNNodes = 5)
// Save checkpoint every 60 seconds
CheckpointConfig(saveEveryNSeconds = 60)
// Combine both
CheckpointConfig(
saveEveryNNodes = 10,
saveEveryNSeconds = 120,
maxCheckpointsPerRun = 20
)
Middleware​
Middleware​
interface Middleware {
suspend fun onStart(
ctx: RunContext,
next: suspend () -> Unit
) { next() }
suspend fun onNode(
req: NodeRequest,
next: suspend (NodeRequest) -> SpiceResult<NodeResult>
): SpiceResult<NodeResult> = next(req)
suspend fun onError(
error: Throwable,
ctx: RunContext
): ErrorAction = ErrorAction.PROPAGATE
suspend fun onFinish(report: RunReport) { }
}
Lifecycle Hooks:
onStart- Called before graph execution startsonNode- Called for each node execution (can modify request/result)onError- Called when node execution failsonFinish- Called after graph execution completes
ErrorAction​
sealed class ErrorAction {
data object PROPAGATE : ErrorAction() // Throw error, fail graph
data object RETRY : ErrorAction() // Retry failed node
data object SKIP : ErrorAction() // Skip failed node, continue
data class CONTINUE(val result: Any?) : ErrorAction() // Use fallback result
}
Example:
class RetryMiddleware : Middleware {
override suspend fun onError(
error: Throwable,
ctx: RunContext
): ErrorAction {
return if (error is TemporaryException) {
ErrorAction.RETRY
} else {
ErrorAction.PROPAGATE
}
}
}
RunContext​
data class RunContext(
val graphId: String,
val runId: String,
val agentContext: AgentContext? = null
)
NodeRequest​
data class NodeRequest(
val nodeId: String,
val input: Any?,
val context: RunContext
)
HITL (Human-in-the-Loop)​
HumanResponse​
data class HumanResponse(
val nodeId: String,
val selectedOption: String? = null,
val text: String? = null,
val metadata: Map<String, String> = emptyMap(),
val timestamp: String = Instant.now().toString()
) {
companion object {
fun choice(nodeId: String, optionId: String): HumanResponse
fun text(nodeId: String, text: String): HumanResponse
}
}
Example:
// Multiple choice
val response = HumanResponse.choice(
nodeId = "review",
optionId = "approve"
)
// Free text
val response = HumanResponse.text(
nodeId = "feedback",
text = "Please add more examples"
)
HumanInteraction​
data class HumanInteraction(
val nodeId: String,
val prompt: String,
val options: List<HumanOption>,
val pausedAt: String,
val expiresAt: String? = null,
val allowFreeText: Boolean = false
)
HumanOption​
data class HumanOption(
val id: String,
val label: String,
val description: String? = null
)
GraphExecutionState​
enum class GraphExecutionState {
RUNNING, // Normal execution
WAITING_FOR_HUMAN, // Paused for human input
COMPLETED, // Completed successfully
FAILED, // Failed with error
CANCELLED // Cancelled
}
DSL​
graph()​
fun graph(id: String, block: GraphBuilder.() -> Unit): Graph
Example:
val myGraph = graph("my-workflow") {
agent("step1", myAgent)
tool("step2", myTool)
humanNode("review", "Please review")
output("final") { ctx -> ctx.state["result"] }
}
GraphBuilder​
class GraphBuilder(val id: String) {
fun agent(id: String, agent: Agent)
fun tool(
id: String,
tool: Tool,
paramMapper: (NodeContext) -> Map<String, Any?> = { it.state }
)
fun humanNode(
id: String,
prompt: String,
options: List<HumanOption> = emptyList(),
timeout: Duration? = null,
validator: ((HumanResponse) -> Boolean)? = null
)
fun output(
id: String = "output",
selector: (NodeContext) -> Any? = { it.state["result"] }
)
fun edge(
from: String,
to: String,
condition: (NodeResult) -> Boolean = { true }
)
fun middleware(middleware: Middleware)
fun build(): Graph
}
Example:
val graph = graph("approval-flow") {
agent("draft", draftAgent)
humanNode(
id = "review",
prompt = "Approve or reject?",
options = listOf(
HumanOption("approve", "Approve"),
HumanOption("reject", "Reject")
)
)
edge("review", "publish") { result ->
(result.data as? HumanResponse)?.selectedOption == "approve"
}
agent("publish", publishAgent)
middleware(LoggingMiddleware())
middleware(MetricsMiddleware())
}
Validation​
GraphValidator​
object GraphValidator {
fun validate(graph: Graph): SpiceResult<Unit>
fun findCycles(graph: Graph): List<List<String>>
fun findUnreachableNodes(graph: Graph): Set<String>
fun findTerminalNodes(graph: Graph): Set<String>
}
Example:
val graph = graph("my-graph") {
// ... build graph
}
// Validate before execution
GraphValidator.validate(graph).getOrThrow()
// Or check specific issues
val cycles = GraphValidator.findCycles(graph)
if (cycles.isNotEmpty()) {
println("Found cycles: $cycles")
}
Built-in Middleware​
LoggingMiddleware​
class LoggingMiddleware : Middleware {
override suspend fun onStart(ctx: RunContext, next: suspend () -> Unit)
override suspend fun onNode(req: NodeRequest, next: suspend (NodeRequest) -> SpiceResult<NodeResult>): SpiceResult<NodeResult>
override suspend fun onFinish(report: RunReport)
}
Logs graph execution events.
MetricsMiddleware​
class MetricsMiddleware : Middleware {
fun getNodeMetrics(nodeId: String): NodeMetrics
fun getGraphMetrics(): GraphMetrics
}
data class NodeMetrics(
val executionCount: Int,
val averageExecutionTime: Long,
val minExecutionTime: Long,
val maxExecutionTime: Long
)
Collects execution metrics.