Checkpoint & Resume
Added in: 0.5.0
The Checkpoint system enables resilient, long-running workflows by saving execution state at regular intervals. Resume from failures without re-executing completed nodes.
Overviewβ
Checkpointing provides:
- Save execution state during graph execution
- Resume from failure points
- Automatic cleanup on success
- Configurable triggers (time, node count, error)
- Context preservation including AgentContext
Quick Startβ
val store = InMemoryCheckpointStore()
val config = CheckpointConfig(
saveEveryNNodes = 5,
saveOnError = true
)
// Run with checkpointing
val result = runner.runWithCheckpoint(
graph = graph,
input = input,
store = store,
config = config
)
// If it fails, resume
if (result.isFailure) {
val checkpoints = store.listByGraph(graph.id).getOrThrow()
val latestCheckpoint = checkpoints.first()
val resumeResult = runner.resume(graph, latestCheckpoint.id, store)
}
Checkpoint Configurationβ
data class CheckpointConfig(
val saveEveryNNodes: Int? = null, // Save every N nodes
val saveEveryNSeconds: Long? = null, // Save every N seconds
val maxCheckpointsPerRun: Int = 10, // Max checkpoints to keep
val saveOnError: Boolean = true // Save on error
)
Configuration Strategiesβ
1. Node-based Checkpointing
val config = CheckpointConfig(
saveEveryNNodes = 5 // Checkpoint after every 5 nodes
)
2. Time-based Checkpointing
val config = CheckpointConfig(
saveEveryNSeconds = 60 // Checkpoint every minute
)
3. Error-only Checkpointing
val config = CheckpointConfig(
saveOnError = true // Only save when errors occur
)
4. Combined Strategy
val config = CheckpointConfig(
saveEveryNNodes = 10,
saveEveryNSeconds = 300, // 5 minutes
saveOnError = true
)
Checkpoint Dataβ
data class Checkpoint(
val id: String,
val runId: String,
val graphId: String,
val currentNodeId: String,
val state: Map<String, Any?>,
val agentContext: AgentContext? = null,
val timestamp: Instant = Instant.now(),
val metadata: Map<String, Any> = emptyMap()
)
CheckpointStore Interfaceβ
Implement custom storage backends:
interface CheckpointStore {
suspend fun save(checkpoint: Checkpoint): SpiceResult<String>
suspend fun load(checkpointId: String): SpiceResult<Checkpoint>
suspend fun listByRun(runId: String): SpiceResult<List<Checkpoint>>
suspend fun listByGraph(graphId: String): SpiceResult<List<Checkpoint>>
suspend fun delete(checkpointId: String): SpiceResult<Unit>
suspend fun deleteByRun(runId: String): SpiceResult<Unit>
}
Built-in Storesβ
InMemoryCheckpointStoreβ
For testing and single-process applications:
val store = InMemoryCheckpointStore()
// Store operations
store.save(checkpoint)
store.load(checkpointId)
store.listByGraph(graphId)
store.delete(checkpointId)
// Utility methods
store.clear() // Clear all checkpoints
store.size() // Get count
Note: Checkpoints are lost when process terminates. Use custom store for persistence.
Custom Checkpoint Storesβ
Example: Redis Storeβ
β οΈ IMPORTANT: Use CheckpointSerializer for production stores to ensure nested Map/List structures are preserved correctly.
import io.github.noailabs.spice.graph.checkpoint.CheckpointSerializer
class RedisCheckpointStore(
private val redis: RedisClient
) : CheckpointStore {
override suspend fun save(checkpoint: Checkpoint): SpiceResult<String> {
return SpiceResult.catching {
// β
Use CheckpointSerializer for type-safe serialization
val json = CheckpointSerializer.serialize(checkpoint)
redis.set("checkpoint:${checkpoint.id}", json)
checkpoint.id
}
}
override suspend fun load(checkpointId: String): SpiceResult<Checkpoint> {
return SpiceResult.catching {
val json = redis.get("checkpoint:$checkpointId")
?: throw NoSuchElementException("Checkpoint not found")
// β
Deserialize with type preservation
CheckpointSerializer.deserialize(json)
}
}
// ... other methods
}
Example: Database Storeβ
import io.github.noailabs.spice.graph.checkpoint.CheckpointSerializer
class DatabaseCheckpointStore(
private val database: Database
) : CheckpointStore {
override suspend fun save(checkpoint: Checkpoint): SpiceResult<String> {
return SpiceResult.catching {
database.transaction {
CheckpointTable.insert {
it[id] = checkpoint.id
it[runId] = checkpoint.runId
it[graphId] = checkpoint.graphId
it[currentNodeId] = checkpoint.currentNodeId
// β
Use CheckpointSerializer for the entire checkpoint
it[checkpointJson] = CheckpointSerializer.serialize(checkpoint)
it[timestamp] = checkpoint.timestamp
}
}
checkpoint.id
}
}
override suspend fun load(checkpointId: String): SpiceResult<Checkpoint> {
return SpiceResult.catching {
database.transaction {
val json = CheckpointTable
.select { CheckpointTable.id eq checkpointId }
.firstOrNull()
?.get(CheckpointTable.checkpointJson)
?: throw NoSuchElementException("Checkpoint not found")
// β
Deserialize with preserved types
CheckpointSerializer.deserialize(json)
}
}
}
// ... other methods
}
Type-Safe Checkpoint Serializationβ
Added in: 0.9.5
The Problem: Type Loss with Default Jacksonβ
When storing checkpoints in Redis or databases, default Jackson serialization loses nested type information:
// β BEFORE: Default Jackson (Type Loss)
val checkpoint = Checkpoint(
state = mapOf(
"structured_data" to mapOf("key" to "value") // Map<String, String>
)
)
// After Redis β Jackson deserialize:
state["structured_data"] = LinkedHashMap<String, Any?>() // Type lost!
// When app tries to use it:
val data = state["structured_data"] as Map<*, *>
val json = objectMapper.writeValueAsString(data)
// Result: "{key=value}" β NOT proper JSON! π₯
This causes issues when:
- Workflows use nested data structures in state
- Agents store complex configurations in checkpoint
- HITL workflows pass structured data between resume sessions
The Solution: CheckpointSerializerβ
CheckpointSerializer uses Kotlin Serialization to preserve nested structures:
import io.github.noailabs.spice.graph.checkpoint.CheckpointSerializer
// β
Type-safe serialization
val json = CheckpointSerializer.serialize(checkpoint)
val restored = CheckpointSerializer.deserialize(json)
// Nested structures are preserved!
val data = restored.state["structured_data"] as Map<*, *>
// β
Proper Map restored, not LinkedHashMap<String, Any?>!
How It Worksβ
CheckpointSerializer:
- Recursively converts nested Maps/Lists to JSON elements
- Preserves structure through JSON serialization
- Reconstructs typed objects during deserialization
- Handles special types (Instant, ExecutionContext, HumanInteraction)
Important Notes:
-
Number Type Caveat: JSON doesn't distinguish Int/Long or Float/Double
- Int becomes Long after round-trip
- Float becomes Double after round-trip
- This is a JSON spec limitation, not a bug
-
Null Value Handling: Explicit nulls in
Map<String, Any?>may not round-trip perfectly- Acceptable as storing explicit nulls in state is rare
- Prefer omitting keys instead of storing null values
-
InMemoryCheckpointStore: Now uses JSON serialization internally
- Ensures consistent behavior between in-memory and persistent stores
- Tests verify nested structure preservation
Migration Guideβ
If you have existing custom CheckpointStore implementations:
// β BEFORE (Dangerous - loses type information)
class MyCheckpointStore : CheckpointStore {
private val objectMapper = jacksonObjectMapper()
override suspend fun save(checkpoint: Checkpoint): SpiceResult<String> {
return SpiceResult.catching {
val json = objectMapper.writeValueAsString(checkpoint) // β Type loss!
redis.set(checkpoint.id, json)
checkpoint.id
}
}
}
// β
AFTER (Safe - preserves nested structures)
class MyCheckpointStore : CheckpointStore {
override suspend fun save(checkpoint: Checkpoint): SpiceResult<String> {
return SpiceResult.catching {
val json = CheckpointSerializer.serialize(checkpoint) // β
Type-safe!
redis.set(checkpoint.id, json)
checkpoint.id
}
}
override suspend fun load(checkpointId: String): SpiceResult<Checkpoint> {
return SpiceResult.catching {
val json = redis.get(checkpointId) ?: error("Not found")
CheckpointSerializer.deserialize(json) // β
Restores types!
}
}
}
Testing Nested Structuresβ
Verify your custom store preserves nested structures:
@Test
fun `test nested structures survive serialization`() = runTest {
val store = MyCustomCheckpointStore()
val checkpoint = Checkpoint(
id = "test",
runId = "run-1",
graphId = "graph-1",
currentNodeId = "node1",
state = mapOf(
"nested" to mapOf(
"list" to listOf(
mapOf("id" to "1", "data" to "value1"),
mapOf("id" to "2", "data" to "value2")
)
)
)
)
// Save β Load
store.save(checkpoint).getOrThrow()
val restored = store.load("test").getOrThrow()
// Verify nested structure preserved
val nested = restored.state["nested"] as Map<*, *>
val list = nested["list"] as List<*>
val firstItem = list[0] as Map<*, *>
assertEquals("value1", firstItem["data"]) // β
Preserved!
}
Usage Examplesβ
Example 1: Long-Running Workflowβ
val longWorkflow = graph("data-processing") {
// 20 nodes total
repeat(20) { i ->
agent("step-$i", processingAgent)
}
output("result") { it.state["step-19"] }
}
val config = CheckpointConfig(
saveEveryNNodes = 5 // Checkpoint after every 5 nodes
)
val result = runner.runWithCheckpoint(
graph = longWorkflow,
input = input,
store = InMemoryCheckpointStore(),
config = config
)
Example 2: Resume After Failureβ
val store = InMemoryCheckpointStore()
// First attempt (fails at node 15/20)
val result1 = runner.runWithCheckpoint(
graph = workflow,
input = input,
store = store,
config = CheckpointConfig(saveEveryNNodes = 5, saveOnError = true)
)
if (result1.isFailure) {
println("Workflow failed. Finding last checkpoint...")
// Get checkpoints for this run
val checkpoints = store.listByGraph(workflow.id).getOrThrow()
val latestCheckpoint = checkpoints.first()
println("Resuming from node: ${latestCheckpoint.currentNodeId}")
// Resume execution
val result2 = runner.resume(
graph = workflow,
checkpointId = latestCheckpoint.id,
store = store
)
println("Resume result: ${result2.isSuccess}")
}
Example 3: Context Preservationβ
// Set tenant context
val agentContext = AgentContext.of(
"tenantId" to "tenant-123",
"userId" to "user-456",
"sessionId" to "session-789"
)
val store = InMemoryCheckpointStore()
// Run with context
withContext(agentContext) {
runner.runWithCheckpoint(
graph = graph,
input = input,
store = store,
config = CheckpointConfig(saveEveryNNodes = 5)
)
}
// Later: Resume without context (restored from checkpoint)
val checkpoints = store.listByGraph(graph.id).getOrThrow()
val result = runner.resume(graph, checkpoints.first().id, store)
// Context is automatically restored!
Example 4: Manual Checkpoint Managementβ
val store = InMemoryCheckpointStore()
// Run with checkpointing
runner.runWithCheckpoint(graph, input, store, config)
// List all checkpoints for a graph
val checkpoints = store.listByGraph("my-graph").getOrThrow()
checkpoints.forEach { checkpoint ->
println("Checkpoint: ${checkpoint.id}")
println(" Node: ${checkpoint.currentNodeId}")
println(" Time: ${checkpoint.timestamp}")
println(" State keys: ${checkpoint.state.keys}")
}
// Delete old checkpoints
checkpoints.drop(5).forEach { checkpoint ->
store.delete(checkpoint.id)
}
Checkpoint Lifecycleβ
1. Graph Execution Starts
β
2. Node Executes Successfully
β
3. Check if checkpoint needed:
- Every N nodes?
- Every N seconds elapsed?
β
4. Save Checkpoint:
- Current node ID
- Full state
- AgentContext
- Timestamp
β
5. Continue to Next Node
β
6. (If error) Save Error Checkpoint (if configured)
β
7. Graph Completes Successfully
β
8. Clean Up Checkpoints (automatic)
Resume Behaviorβ
When resuming from a checkpoint:
- Load checkpoint with full state
- Restore AgentContext from checkpoint
- Skip to next node after checkpoint (don't re-execute)
- Continue execution normally
- Preserve run ID from original execution
// Original run: Executes nodes 1-10, fails at 11
// Checkpoint saved at node 10
// Resume: Starts at node 11, continues 11-20
val result = runner.resume(graph, checkpointId, store)
Best Practicesβ
β Do'sβ
- Checkpoint frequently in long-running workflows
- Use persistent stores for production (Redis, DB)
- Use CheckpointSerializer for custom stores (0.9.5+)
- Clean up old checkpoints periodically
- Test resume scenarios thoroughly
- Test nested structure preservation in custom stores
- Log checkpoint IDs for debugging
- Include metadata for context
β Don'tsβ
- Don't checkpoint too frequently - impacts performance
- Don't store large objects in state - use references
- Don't rely on in-memory store in production
- Don't use default Jackson for custom stores - use CheckpointSerializer
- Don't forget to handle resume failures
- Don't modify checkpoint data manually
- Don't assume Int/Float types are preserved - they become Long/Double
Error Scenariosβ
Scenario 1: Checkpoint Save Failsβ
// Checkpoint save failure doesn't stop execution
// Error is logged, execution continues
Scenario 2: Resume with Invalid IDβ
val result = runner.resume(graph, "invalid-id", store)
// Returns SpiceResult.Failure with CheckpointError
Scenario 3: State Corruptionβ
// Graph structure changed after checkpoint was saved
// Resume may fail if nodes don't exist
// Validation catches this early
Performance Considerationsβ
Checkpoint Overhead:
- Node-based: Low overhead, predictable
- Time-based: Medium overhead, depends on execution speed
- Every node: High overhead, not recommended
Storage Size:
- State map size matters
- AgentContext is small (~100 bytes)
- Use compression for large states
Recommended Settings:
- Short workflows (under 10 nodes): No checkpointing or error-only
- Medium workflows (10-50 nodes): Every 10-20 nodes
- Long workflows (over 50 nodes): Every 20-30 nodes + time-based
Next Stepsβ
- Understand Graph Validation
- Explore Graph Middleware
- Review Performance Optimization