Tool Pipeline DSL
Fluent, type-safe API for building multi-step tool pipelines with automatic data flow between steps.
Overviewβ
The Tool Pipeline DSL provides a clean, intuitive way to chain multiple tools together, automatically managing data flow between steps. It combines the power of ModernToolChain with a fluent API that makes complex pipelines easy to read and maintain.
Key Features:
- Fluent API: Method chaining with
.output(),.input(),.named() - Type-Safe: Use Tool objects directly instead of string names
- Auto Data Flow: Outputs automatically flow to next steps
- Context-Aware: Access previous step outputs via
requireOutputOf() - Backward Compatible: Works alongside traditional chain syntax
Quick Startβ
Basic Pipelineβ
val pipeline = toolChain("product-lookup") {
name = "Product Information Pipeline"
description = "Resolve SKU and fetch product specs"
// Step 1: Resolve product name to SKU
+step(resolveTool).output("sku").input {
mapOf("text" to "iPhone 15 Pro")
}
// Step 2: Get specs using resolved SKU
+step(getSpecsTool).input { context ->
val sku = context.requireOutputOf("sku")
mapOf("sku" to sku)
}
// Step 3: Format results
+step(formatTool).input { context ->
val specs = context.requireOutputOf("step-2") // Can use step ID or output name
mapOf("data" to specs)
}
}
// Execute
val result = pipeline.execute(emptyMap())
Simple Sequential Pipelineβ
val pipeline = toolChain("simple-math") {
name = "Math Pipeline"
// Steps use initial parameters automatically
+step(multiplyTool).output("product")
+step(addTool).output("sum")
+step(formatTool)
}
// Provide parameters at execution
val result = pipeline.execute(mapOf(
"a" to 5,
"b" to 3
))
Core Conceptsβ
Step Builderβ
The StepBuilder provides a fluent interface for configuring individual steps:
+step(tool) // Create step
.named("custom-id") // Set step ID (optional)
.output("result-name") // Name the output (optional)
.input { context -> // Provide input parameters (optional)
mapOf("param" to value)
}
Order matters:
named()- Set step IDoutput()- Name the outputinput()- Provide parameters
But all are optional!
Unary Plus Operatorβ
The + operator adds the configured step to the chain:
+step(myTool) // Add step to chain
Without +, the step won't be added:
step(myTool) // β This does nothing!
Output Namingβ
Named outputs are stored in ChainContext and can be accessed by subsequent steps:
// Step 1: Store output as "sku"
+step(resolveTool).output("sku").input {
mapOf("text" to "MacBook Pro")
}
// Step 2: Access "sku" from context
+step(lookupTool).input { context ->
val sku = context.requireOutputOf("sku")
mapOf("product_id" to sku)
}
If you don't specify an output name, the step's result is stored with its step ID (e.g., "step-1", "step-2").
Input Parametersβ
The input() block provides dynamic, context-aware parameters:
+step(tool).input { context ->
// Access previous outputs
val prev = context.requireOutputOf("previous-step")
// Access shared data
val shared = context.sharedData["key"]
// Access last result
val last = context.getLastResult()?.result
// Return parameters map
mapOf(
"param1" to prev,
"param2" to "static value"
)
}
Step IDsβ
By default, steps get auto-generated IDs: "step-1", "step-2", etc.
Set custom IDs with named():
+step(tool).named("resolve").output("sku")
+step(tool).named("lookup").input { context ->
val sku = context.requireOutputOf("sku")
// OR use step ID
val sku2 = context.getOutputOf("resolve")
// ...
}
Usage Patternsβ
Pattern 1: Simple Sequential Chainβ
Steps execute in order using shared data:
val chain = toolChain("sequential") {
name = "Sequential Processing"
+step(step1Tool).output("value1")
+step(step2Tool).output("value2")
+step(step3Tool)
}
val result = chain.execute(initialParams)
Pattern 2: Named Step Referencesβ
Use named steps for clarity:
val chain = toolChain("named-steps") {
name = "Named Step Pipeline"
+step(resolveTool)
.named("resolve")
.output("sku")
.input { mapOf("text" to "iPhone") }
+step(lookupTool)
.named("lookup")
.output("product")
.input { context ->
mapOf("sku" to context.requireOutputOf("sku"))
}
+step(formatTool)
.named("format")
.input { context ->
mapOf("data" to context.requireOutputOf("product"))
}
}
Pattern 3: Conditional Data Flowβ
Use context to make conditional decisions:
val chain = toolChain("conditional") {
name = "Conditional Pipeline"
+step(checkTool).output("status")
+step(processTool).input { context ->
val status = context.requireOutputOf("status")
// Different parameters based on status
if (status == "premium") {
mapOf("level" to "high", "priority" to 1)
} else {
mapOf("level" to "standard", "priority" to 5)
}
}
}
Pattern 4: Complex Data Transformationβ
Transform and combine outputs from multiple steps:
val chain = toolChain("complex") {
name = "Complex Data Pipeline"
+step(fetchUserTool)
.named("user")
.output("userData")
+step(fetchOrdersTool)
.named("orders")
.output("orderData")
+step(mergeTool)
.named("merge")
.input { context ->
val user = context.requireOutputOf("userData")
val orders = context.requireOutputOf("orderData")
mapOf(
"user" to user,
"orders" to orders,
"timestamp" to System.currentTimeMillis()
)
}
}
Pattern 5: Mixed Traditional and Fluentβ
Combine fluent and traditional syntax:
val chain = toolChain("mixed") {
name = "Mixed Syntax Pipeline"
// Traditional syntax
stepWithOutput("calc1", multiplyTool, "value", mapOf("a" to 5, "b" to 4))
// Fluent syntax
+step(formatTool).input { context ->
val value = context.requireOutputOf("value")
mapOf("data" to value)
}
// Traditional syntax again
step("final", finalizeTool)
}
Pattern 6: Parallel-Style Branchingβ
While steps execute sequentially, you can simulate branching with conditional logic:
val chain = toolChain("branching") {
name = "Branching Pipeline"
+step(classifyTool).output("type")
// Branch A processing
+step(processATool).input { context ->
val type = context.requireOutputOf("type")
if (type == "A") {
mapOf("data" to context.sharedData["input"])
} else {
mapOf("skip" to true) // Skip this step's logic
}
}
// Branch B processing
+step(processBTool).input { context ->
val type = context.requireOutputOf("type")
if (type == "B") {
mapOf("data" to context.sharedData["input"])
} else {
mapOf("skip" to true)
}
}
}
API Referenceβ
ToolChainBuilderβ
class ToolChainBuilder(val id: String) {
var name: String
var description: String
var debugEnabled: Boolean
// Create fluent step builder
fun step(tool: Tool): StepBuilder
// Add configured step to chain
operator fun StepBuilder.unaryPlus()
// Traditional methods still available
fun step(stepId: String, toolName: String, parameters: Map<String, Any>)
fun step(stepId: String, tool: Tool, parameters: Map<String, Any>)
// Convenience methods for named outputs
fun stepWithOutput(stepId: String, tool: Tool, outputName: String, parameters: Map<String, Any>)
fun stepWithOutput(stepId: String, toolName: String, outputName: String, parameters: Map<String, Any>)
// Transform methods
fun stepWithTransform(stepId: String, toolName: String, parameters: Map<String, Any>, transformer: (ToolResult, ChainContext) -> Map<String, Any>)
fun stepWithTransform(stepId: String, tool: Tool, parameters: Map<String, Any>, transformer: (ToolResult, ChainContext) -> Map<String, Any>)
}
StepBuilderβ
class StepBuilder(
val tool: Tool,
var stepId: String? = null
) {
// Set step ID
fun named(id: String): StepBuilder
// Name the output
fun output(name: String): StepBuilder
// Provide input parameters (provider is a suspend lambda)
fun input(provider: suspend (ChainContext) -> Map<String, Any>): StepBuilder
}
Note: The input() provider is a suspend lambda, which means it can call other suspend functions. However, when using it in code, you don't need to explicitly write the suspend keyword:
// β
Correct usage (implicit suspend)
+step(tool).input { context ->
val value = context.requireOutputOf("previous")
mapOf("param" to value)
}
// β Don't do this (explicit suspend not needed in lambda body)
+step(tool).input { context: ChainContext -> // Type annotation optional
suspend { // β Not needed!
val value = context.requireOutputOf("previous")
mapOf("param" to value)
}
}
ChainContextβ
data class ChainContext(
val chainId: String,
var currentStep: Int,
val results: MutableList<ToolResult>,
val sharedData: MutableMap<String, Any>,
val stepOutputs: MutableMap<String, Any>
) {
fun addResult(result: ToolResult)
fun getLastResult(): ToolResult?
fun setStepOutput(stepId: String, output: Any)
}
// Extension functions
fun ChainContext.getOutputOf(stepId: String): Any?
fun ChainContext.requireOutputOf(stepId: String): Any // Throws if not found
Building and Executingβ
// Build chain
fun toolChain(id: String, init: ToolChainBuilder.() -> Unit): ModernToolChain
// Execute chain
suspend fun ModernToolChain.execute(initialParameters: Map<String, Any>): ChainResult
// Result
data class ChainResult(
val success: Boolean,
val result: String,
val error: String,
val executionTime: Long,
val stepResults: List<ToolResult>
)
Best Practicesβ
1. Use Named Outputs for Clarityβ
Do:
+step(resolveTool).output("sku").input { ... }
+step(lookupTool).input { context ->
mapOf("sku" to context.requireOutputOf("sku")) // Clear reference
}
Don't:
+step(resolveTool)
+step(lookupTool).input { context ->
mapOf("sku" to context.requireOutputOf("step-1")) // Fragile!
}
2. Use Named Steps for Complex Pipelinesβ
Do:
+step(tool1).named("fetch-user")
+step(tool2).named("fetch-orders")
+step(tool3).named("merge").input { context ->
// Clear what we're accessing
val user = context.requireOutputOf("fetch-user")
val orders = context.requireOutputOf("fetch-orders")
// ...
}
3. Prefer requireOutputOf Over getOutputOfβ
Do:
+step(tool).input { context ->
val sku = context.requireOutputOf("sku") // Fails fast with clear error
// ...
}
Don't:
+step(tool).input { context ->
val sku = context.getOutputOf("sku") // Might be null, silent failures
// ...
}
4. Keep Input Blocks Simpleβ
Do:
+step(tool).input { context ->
val sku = context.requireOutputOf("sku")
mapOf("sku" to sku)
}
Don't:
+step(tool).input { context ->
// Complex business logic in input block
val sku = context.requireOutputOf("sku")
val processed = complexTransformation(sku)
val validated = validate(processed)
// ... more logic ...
mapOf("sku" to validated)
}
Extract complex logic to separate functions or tools.
5. Use Debug Mode During Developmentβ
val chain = toolChain("my-pipeline") {
name = "My Pipeline"
debugEnabled = true // Enable debug logging
+step(tool1).output("out1")
+step(tool2).output("out2")
}
Debug mode logs:
- Step execution start/completion
- Skipped steps (conditions not met)
- Output values
6. Handle Errors Gracefullyβ
+step(tool).input { context ->
val sku = context.getOutputOf("sku")
if (sku == null) {
// Provide default or skip
mapOf("skip" to true)
} else {
mapOf("sku" to sku)
}
}
7. Don't Forget the Unary Plus!β
Do:
+step(tool).output("result") // β
Added to chain
Don't:
step(tool).output("result") // β Not added! No-op!
Examplesβ
Example 1: Product Lookup Pipelineβ
val productPipeline = toolChain("product-pipeline") {
name = "Product Information Pipeline"
description = "Resolve product and fetch specifications"
debugEnabled = false
// Step 1: Resolve product name to SKU
+step(resolveTool)
.named("resolve")
.output("sku")
.input { mapOf("text" to "MacBook Pro 16-inch") }
// Step 2: Fetch product specifications
+step(getSpecsTool)
.named("specs")
.output("specifications")
.input { context ->
val sku = context.requireOutputOf("sku")
mapOf("sku" to sku, "include_pricing" to true)
}
// Step 3: Format for display
+step(formatTool)
.named("format")
.input { context ->
val specs = context.requireOutputOf("specifications")
mapOf(
"data" to specs,
"format" to "markdown"
)
}
}
// Execute
val result = productPipeline.execute(emptyMap())
println(result.result)
Example 2: Data Processing Pipelineβ
val dataPipeline = toolChain("data-processing") {
name = "Data Processing Pipeline"
// Extract
+step(extractTool)
.named("extract")
.output("rawData")
.input { mapOf("source" to "database") }
// Transform
+step(transformTool)
.named("transform")
.output("cleanData")
.input { context ->
val raw = context.requireOutputOf("rawData")
mapOf(
"data" to raw,
"operations" to listOf("normalize", "deduplicate", "validate")
)
}
// Load
+step(loadTool)
.named("load")
.input { context ->
val clean = context.requireOutputOf("cleanData")
mapOf(
"data" to clean,
"destination" to "warehouse"
)
}
}
Example 3: Multi-Source Aggregationβ
val aggregationPipeline = toolChain("multi-source-agg") {
name = "Multi-Source Aggregation"
// Fetch from multiple sources
+step(fetchSourceATool)
.named("source-a")
.output("dataA")
.input { mapOf("source" to "api-a") }
+step(fetchSourceBTool)
.named("source-b")
.output("dataB")
.input { mapOf("source" to "api-b") }
+step(fetchSourceCTool)
.named("source-c")
.output("dataC")
.input { mapOf("source" to "api-c") }
// Merge all sources
+step(mergeTool)
.named("merge")
.output("merged")
.input { context ->
mapOf(
"sources" to listOf(
context.requireOutputOf("dataA"),
context.requireOutputOf("dataB"),
context.requireOutputOf("dataC")
)
)
}
// Analyze merged data
+step(analyzeTool)
.named("analyze")
.input { context ->
val merged = context.requireOutputOf("merged")
mapOf("data" to merged, "type" to "comprehensive")
}
}
Testingβ
Unit Testing Stepsβ
@Test
fun `test product pipeline`() = runBlocking {
// Create test tools
val resolveTool = SimpleTool("resolve", ...) { params ->
ToolResult.success("SKU-12345")
}
val getSpecsTool = SimpleTool("get_specs", ...) { params ->
val sku = params["sku"]
ToolResult.success("Specs for $sku")
}
// Register tools
ToolRegistry.register(resolveTool)
ToolRegistry.register(getSpecsTool)
// Build pipeline
val pipeline = toolChain("test-pipeline") {
name = "Test Pipeline"
+step(resolveTool).output("sku").input {
mapOf("text" to "iPhone")
}
+step(getSpecsTool).input { context ->
mapOf("sku" to context.requireOutputOf("sku"))
}
}
// Execute
val result = pipeline.execute(emptyMap())
// Assert
assertTrue(result.success)
assertEquals(2, result.stepResults.size)
assertEquals("SKU-12345", result.stepResults[0].result)
assertEquals("Specs for SKU-12345", result.stepResults[1].result)
}
Integration Testingβ
@Test
fun `test full pipeline integration`() = runBlocking {
// Setup real tools
setupToolRegistry()
// Build pipeline
val pipeline = createProductPipeline()
// Execute with real data
val result = pipeline.execute(mapOf(
"product_query" to "MacBook Pro 16-inch"
))
// Verify end-to-end behavior
assertTrue(result.success)
assertTrue(result.result.contains("MacBook"))
assertTrue(result.executionTime < 5000)
}
Troubleshootingβ
Step Not Executingβ
Problem: Step is configured but not running.
Solution: Make sure you're using the unary plus operator:
// β Wrong
step(tool).output("result")
// β
Correct
+step(tool).output("result")
Output Not Foundβ
Problem: requireOutputOf() throws "Output not found" error.
Solution:
- Check step order - output must be created before it's accessed
- Verify output name matches:
.output("sku")βrequireOutputOf("sku") - Use debug mode to see what outputs are available
val chain = toolChain("debug") {
debugEnabled = true // See step execution details
// ...
}
Type Mismatchβ
Problem: input { context -> ... } has type mismatch errors.
Solution: Ensure lambda returns Map<String, Any> (non-nullable Any):
// β Wrong - nullable
+step(tool).input { context ->
mapOf("value" to context.sharedData["key"]) // Any?
}
// β
Correct
+step(tool).input { context ->
mapOf("value" to (context.sharedData["key"] ?: "default")) // Any
}
Chain Builds But Fails at Runtimeβ
Problem: Chain builds successfully but fails during execution.
Solution:
- Check tool registration - all tools must be in
ToolRegistry - Verify parameter names match tool expectations
- Use
.named()to track which step is failing
Migration from Traditional Syntaxβ
Before (Traditional)β
val chain = toolChain("old-style") {
name = "Old Style"
step("step1", "resolve-tool", mapOf("text" to "iPhone"))
stepWithTransform("step2", "lookup-tool", mapOf()) { result, context ->
mapOf("sku" to result.result)
}
step("step3", "format-tool")
}
After (Fluent)β
val chain = toolChain("new-style") {
name = "New Style"
+step(resolveTool)
.named("step1")
.output("sku")
.input { mapOf("text" to "iPhone") }
+step(lookupTool)
.named("step2")
.output("product")
.input { context ->
mapOf("sku" to context.requireOutputOf("sku"))
}
+step(formatTool)
.named("step3")
.input { context ->
mapOf("data" to context.requireOutputOf("product"))
}
}
Benefits:
- Type-safe tool references
- Clearer data flow
- No manual transformers needed
- More readable
See Alsoβ
- Context-Aware Tools - Build tools that use AgentContext
- Multi-Agent Orchestration - Coordinate multiple agents
- Tool Caching - Cache tool results
- Output Validation - Validate tool outputs
Summaryβ
The Tool Pipeline DSL provides a fluent, type-safe way to build complex tool chains:
β
Method Chaining: .named(), .output(), .input()
β
Context-Aware: Access previous outputs with requireOutputOf()
β
Type-Safe: Use Tool objects directly
β
Readable: Clear data flow between steps
β
Flexible: Mix fluent and traditional syntax
β
Backward Compatible: Existing chains still work
Start building powerful tool pipelines today! π