workflowThe workflow package provides the core types for defining and executing agentic workflows in Microbus. It is used by both task authors (who interact with Flow) and workflow authors (who build Graph definitions).
Flow is the carrier object passed to every task endpoint. It provides access to the workflow’s identity, state, and control signals.
FlowID - unique identifier for this flow executionWorkflowName - the workflow graph’s nameTaskName - the current task being executedStepNum - the current step number in the flowTasks read input from state and write output back:
// Field-based access (one-off reads/writes)
name := flow.GetString("name")
score := flow.GetFloat("score")
flow.Set("approved", true)
// Check for existence
if flow.Has("delay") {
delay := flow.GetFloat("delay")
// ...
}
// Struct-based access (parse/diff pattern)
snap := flow.Snapshot() // returns map[string]any
var state MyState
flow.ParseState(&state)
// ... modify state ...
flow.SetChanges(state, snap) // only records fields that differ from snap
Tasks can influence execution flow:
flow.Goto(myserviceapi.OtherTask.URL()) // override next transition
flow.Retry() // re-execute this task
flow.Sleep(5 * time.Second) // delay before next execution
flow.Interrupt(payload) // pause for external input
Graph defines the structure of a workflow: tasks, transitions between them, and reducers for fan-in state merging.
graph := workflow.NewGraph(myserviceapi.CreateOrder.URL())
graph.AddTransition(myserviceapi.Validate.URL(), myserviceapi.Charge.URL())
graph.AddTransitionWhen(myserviceapi.Validate.URL(), myserviceapi.Reject.URL(), "valid != true")
graph.AddTransition(myserviceapi.Charge.URL(), myserviceapi.Fulfill.URL())
graph.AddTransition(myserviceapi.Fulfill.URL(), workflow.END)
| Method | Description |
|---|---|
AddTransition(from, to) |
Unconditional transition, always taken |
AddTransitionWhen(from, to, when) |
Taken when the when expression matches state |
AddTransitionGoto(from, to) |
Only taken when the task calls flow.Goto(to) |
AddTransitionForEach(from, to, forEach, as) |
Dynamic fan-out: iterates over a state array field |
Declare merge strategies for state fields during fan-in:
graph.SetReducer("messages", workflow.ReducerAppend)
graph.SetReducer("score", workflow.ReducerAdd)
| Reducer | Behavior |
|---|---|
ReducerReplace (default) |
Last write wins |
ReducerAppend |
Concatenate arrays |
ReducerAdd |
Sum numbers |
ReducerUnion |
Deduplicate and merge arrays |
Set per-task execution timeouts:
graph.SetTimeBudget(myserviceapi.SlowTask.URL(), 2*time.Minute)
err := graph.Validate() // check for structural errors
mermaid := graph.Mermaid() // generate a Mermaid flowchart
Transition describes a single edge in the graph:
type Transition struct {
From string // source task URL
To string // target task URL (or workflow.END)
When string // boolean expression evaluated against state
WithGoto bool // only taken on explicit flow.Goto()
ForEach string // state field to iterate over (dynamic fan-out)
As string // alias for current element during forEach
}
Node describes a task or subgraph node registered in a graph:
type Node struct {
Name string // task or subgraph URL
TimeBudget time.Duration // execution timeout (0 = use foreman default)
Subgraph bool // true if this node is a child workflow
}
Reducer is a string constant defining a merge strategy:
const (
ReducerReplace Reducer = "replace"
ReducerAppend Reducer = "append"
ReducerAdd Reducer = "add"
ReducerUnion Reducer = "union"
)
RawFlow wraps Flow with additional methods used by the foreman orchestrator. Task authors do not interact with RawFlow directly. It exposes raw state/config/changes access, control signal readers, and mutation methods needed for orchestration bookkeeping.
coreservices/foreman - Foreman configuration and endpoints