GoFlowGoFlow
Guide

Workflows

Workflows define complex multi-step processes with conditionals, loops, and human approvals.

Basic Workflow

import "github.com/nuulab/goflow/pkg/workflow"
 
wf := workflow.New("order-process").
    Step("validate", validateOrder).Then().
    Step("charge", chargeCard).Then().
    Step("fulfill", fulfillOrder).Then().
    Step("notify", sendNotification).
    Build()

Running Workflows

engine := workflow.NewEngine(nil)
engine.Register(wf)
 
// Start workflow
stateID, err := engine.Start(ctx, "order-process", map[string]any{
    "order_id": 12345,
})
 
// Check status
state, _ := engine.Status(stateID)
fmt.Println(state.Status) // running, completed, failed

Conditionals

workflow.New("approval").
    Step("validate", validateRequest).Then().
    If("needs_approval", func(s *State) bool {
        return s.Data["amount"].(float64) > 1000
    }).
        Then(workflow.Step("approve", getApproval)).
        Else(workflow.Step("auto_approve", autoApprove)).
    End().
    Step("process", processRequest).
    Build()

Loops

ForEach Loop

workflow.New("batch-process").
    Loop("process_items").
        ForEach("items").
        Do(workflow.Step("process", processItem)).
    End().
    Build()

While Loop

workflow.New("retry-until").
    Loop("retry").
        While(func(s *State) bool {
            return s.Data["retries"].(int) < 3
        }).
        Do(workflow.Step("attempt", attemptOperation)).
    End().
    Build()

Parallel Execution

workflow.New("fetch-all").
    Parallel("gather",
        workflow.Step("users", fetchUsers),
        workflow.Step("orders", fetchOrders),
        workflow.Step("inventory", fetchInventory),
    ).WaitFor(workflow.WaitAll).
    Step("combine", combineResults).
    Build()

Wait strategies:

  • WaitAll - Wait for all parallel steps
  • WaitAny - Continue when any step completes
  • WaitCount(n) - Wait for n steps to complete

Human-in-the-Loop

workflow.New("expense-approval").
    Step("submit", submitExpense).Then().
    AwaitApproval("manager_approval", []string{"manager@company.com"}).
        Timeout(24 * time.Hour).
        OnTimeout("auto_reject").
    Then().
    Step("process", processExpense).
    Build()

Compensation (Saga Pattern)

workflow.New("booking").
    Step("charge", chargeCard).
        Compensate(refundCard).
    Then().
    Step("reserve", reserveRoom).
        Compensate(cancelReservation).
    Then().
    Step("confirm", sendConfirmation).
    Build()
 
// If reserveRoom fails, refundCard is automatically called

Signals & Events

// Wait for external signal
workflow.New("async-process").
    Step("start", startProcess).Then().
    AwaitSignal("payment_received").
        Timeout(1 * time.Hour).
    Then().
    Step("complete", completeProcess).
    Build()
 
// Send signal from outside
engine.Signal(stateID, "payment_received", paymentData)

Sub-Workflows

workflow.New("main").
    Step("init", initialize).Then().
    SubWorkflow("process", "sub-workflow-name", subInput).Then().
    Step("finalize", finalize).
    Build()

Retry Policies

workflow.New("resilient").
    Step("external_call", callAPI).
        Retry(workflow.NewRetryPolicy().
            Attempts(5).
            Backoff(time.Second, time.Minute).
            RetryOn(isRetryableError),
        ).
    Build()

Cron Scheduling

cron := workflow.NewCron(engine)
 
// Standard cron
cron.Add("daily-report", "report_workflow", "0 9 * * *", nil)
 
// Special expressions
cron.Add("hourly-sync", "sync_workflow", "@hourly", nil)
cron.Add("every-5m", "health_check", "@every 5m", nil)
 
cron.Start(ctx)

On this page