Skip to main content

Workflows as code

Flows are not the only way to write distributed programs that execute distinct jobs. Workflows as code let you define orchestration logic directly in TypeScript or Python, using familiar language constructs like functions, conditionals, and loops, while Windmill handles checkpointing, parallelism, and fault tolerance.

WAC editor

Each task() call runs as a separate job with its own logs, resources, and timeline entry. The workflow suspends between tasks (releasing its worker slot entirely) and resumes from a checkpoint when child jobs complete. This means a single worker can run workflows with any number of parallel tasks without deadlocking.

Workflows as code can be synced with git and the CLI like any other script.

Quickstart

Wrap your orchestration function with workflow() and annotate task functions with task():

import { task, step, workflow, sleep, parallel } from 'windmill-client';

async function fetchData(url: string) {
const resp = await fetch(url);
return resp.json();
}

async function transform(data: any) {
return { count: data.length, summary: data.slice(0, 5) };
}

async function notify(message: string) {
console.log(`Sending notification: ${message}`);
return 'sent';
}

export const main = workflow(async (url: string) => {
// step() persists lightweight inline results (timestamps, IDs, config)
const startedAt = await step('started_at', () => new Date().toISOString());

// task() runs each function as a separate Windmill job
const data = await task(fetchData)(url);
const result = await task(transform)(data);

// sleep() suspends the workflow server-side without holding a worker
await sleep(5);

await task(notify)(`Processed ${result.count} items since ${startedAt}`);
return result;
});

WAC editor

How it works

Workflows as code use a checkpoint/replay model that ensures zero worker waste:

  1. The workflow script runs until it hits a task(), step(), sleep(), or waitForApproval() call that isn't cached yet
  2. The script exits, and Windmill saves a checkpoint (all completed step results) to the database
  3. The parent workflow fully suspends, releasing its worker slot back to the pool. No worker is held while waiting.
  4. For task(): child jobs are created and dispatched. Each child runs independently on any available worker. When all children complete, the parent is automatically re-queued
  5. For step(): the inline result is persisted immediately, and the workflow is re-picked up on the next available worker
  6. For sleep(): the workflow suspends for the given duration. No worker is occupied during the sleep; the job is re-queued when the timer expires
  7. For waitForApproval(): the workflow suspends indefinitely (up to the timeout). The worker is released, and the job resumes only when a human approves or rejects
  8. On replay, all previously completed steps return their cached results instantly, and execution continues from where it left off

This design means the parent workflow process is never alive while child jobs run, during sleeps, or while waiting for approvals. It only occupies a worker during the brief moments between checkpoints. A workflow that sleeps for 24 hours or waits a week for approval consumes zero worker time during those waits.

Zero worker waste

Unlike traditional workflow engines that hold a thread or process alive while waiting, Windmill workflows fully suspend between steps. The parent job is marked as suspended in the database and becomes invisible to the worker pull query. When a child completes (or a sleep/approval timer fires), the database counter is decremented atomically. Only when all pending children reach zero does the parent become eligible for pickup again. This means:

  • A workflow with 100 parallel tasks uses 100 worker slots for the tasks, but 0 slots while orchestrating
  • A workflow sleeping for 1 hour uses 0 worker slots during the entire sleep
  • A workflow waiting for approval uses 0 worker slots until the human responds
  • Workers are free to process other jobs while your workflow waits

Suspended  WAC

Core primitives

workflow()

Marks a function as a workflow entry point. Required for using task(), step(), sleep(), waitForApproval(), and parallel().

import { workflow } from 'windmill-client';

export const main = workflow(async (x: number, y: number) => {
// orchestration logic here
return x + y;
});

task()

Wraps a function so that each call runs as a separate child job. The child gets its own logs, timeline entry, and can run on a different worker.

import { task, workflow } from 'windmill-client';

// Basic usage: task wraps the function, then you call it with arguments
async function double(n: number) {
return n * 2;
}

async function add(a: number, b: number) {
return a + b;
}

export const main = workflow(async () => {
const a = await task(double)(5); // runs as child job, returns 10
const b = await task(double)(3); // runs as child job, returns 6
return await task(add)(a, b); // runs as child job, returns 16
});

With options:

// With explicit path and options
const result = await task('f/my_folder/heavy_script', double, {
timeout: 600,
tag: 'gpu',
cache_ttl: 3600,
priority: 10,
concurrency_limit: 5,
concurrency_key: 'my_key',
concurrency_time_window_s: 60
})(42);

// With options only (no path override)
const result2 = await task(double, { timeout: 300, tag: 'highmem' })(42);

step()

Executes a lightweight function inline (in the parent process) and persists the result to the database. On replay, the cached value is returned without re-executing.

Use step() for non-deterministic operations that must return the same value across replays: timestamps, random IDs, config reads, or any cheap computation whose result must be stable.

import { step, task, workflow } from 'windmill-client';
import { randomUUID } from 'crypto';

async function processOrder(orderId: string, ts: string) {
console.log(`Processing order ${orderId} created at ${ts}`);
return { orderId, ts, status: 'processed' };
}

export const main = workflow(async () => {
// These values are computed once and cached across replays
const orderId = await step('order_id', () => randomUUID());
const timestamp = await step('timestamp', () => new Date().toISOString());

return await task(processOrder)(orderId, timestamp);
});
When to use step() vs task()
  • task(): heavy computation, external API calls, anything that benefits from being a separate job (own logs, own worker, parallelizable)
  • step(): lightweight deterministic-must-be-stable operations like generating IDs, reading timestamps, config lookups. No separate job overhead, but the result is persisted so replays are consistent

task() vs step() comparison

task()step()
ExecutionSeparate child job (own process)Inline in parent process
OverheadFull job lifecycleFunction call + one DB write
ParallelismYes (Promise.all / asyncio.gather)No, sequential only
Use caseHeavy compute, external APIsTimestamps, random IDs, config reads
LogsOwn log streamPart of parent logs
TimelineSeparate timeline entryNot a separate entry

Determinism requirement

Because workflows replay from checkpoints, the orchestration logic must be deterministic: given the same inputs, the workflow must call tasks in the same order on every replay. This ensures step keys are consistent and cached results map to the correct steps.

Safe patterns:

  • Branching on task results (results are replayed from checkpoint)
  • Loops over static or task-derived data
  • Any logic that depends only on inputs and previous task results

Patterns that break determinism (wrap these in step() instead):

  • Date.now() / datetime.now() — use await step('ts', () => Date.now())
  • Math.random() / random.random() — use await step('id', () => randomUUID())
  • Reading environment variables or config that may change between replays
  • External API calls whose results affect control flow

If a workflow's code changes between replays (e.g. a deployment mid-execution), Windmill detects the source hash mismatch and re-runs from scratch to avoid stale checkpoint data.

Parallel execution

Using Promise.all / asyncio.gather

Tasks wrapped with task() return promises/awaitables, so you can run them in parallel with standard language constructs:

import { task, workflow } from 'windmill-client';

async function fetchUser(id: number) {
return { id, name: `User ${id}` };
}

async function enrichUser(user: any) {
return { ...user, enriched: true };
}

export const main = workflow(async () => {
// All three fetches dispatch as child jobs simultaneously
const [user1, user2, user3] = await Promise.all([
task(fetchUser)(1),
task(fetchUser)(2),
task(fetchUser)(3)
]);

// Enrich in parallel too
const [e1, e2, e3] = await Promise.all([
task(enrichUser)(user1),
task(enrichUser)(user2),
task(enrichUser)(user3)
]);

return [e1, e2, e3];
});

parallel

Using parallel()

The parallel() utility provides a higher-level API for processing a list of items with optional concurrency control:

import { task, parallel, workflow } from 'windmill-client';

async function processItem(item: number) {
return item * 2;
}

export const main = workflow(async () => {
const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];

// Process all items in parallel (default: all at once)
const results = await parallel(items, (item) => task(processItem)(item));

// With concurrency control: process at most 3 at a time
const batched = await parallel(items, (item) => task(processItem)(item), { concurrency: 3 });

return { results, batched };
});

Sleep

sleep() suspends the workflow server-side for a given number of seconds. The workflow releases its worker slot during the sleep, unlike a regular time.sleep() or setTimeout() which would block a worker.

import { task, sleep, workflow } from 'windmill-client';

async function sendReminder(email: string) {
console.log(`Reminder sent to ${email}`);
}

export const main = workflow(async (email: string) => {
await task(sendReminder)(email);

// Suspend for 1 hour without holding a worker
await sleep(3600);

await task(sendReminder)(email);
return 'done';
});

The minimum sleep duration is 1 second. Values are rounded to the nearest integer.

Approval / human-in-the-loop

waitForApproval() suspends the workflow and waits for a human to approve or reject it from the Windmill UI, similar to approval steps in flows.

import { task, waitForApproval, workflow } from 'windmill-client';

async function deploy(env: string) {
console.log(`Deploying to ${env}`);
return `deployed to ${env}`;
}

export const main = workflow(async (env: string) => {
// Wait for human approval (default timeout: 1800s)
const approval = await waitForApproval({ timeout: 3600 });
// approval contains: { value: any, approver: string, approved: boolean }

if (!approval.approved) {
return `Deployment rejected by ${approval.approver}`;
}

return await task(deploy)(env);
});

With a form:

const approval = await waitForApproval({
timeout: 7200,
form: {
type: 'object',
properties: {
reason: { type: 'string', description: 'Approval reason' },
environment: { type: 'string', enum: ['staging', 'production'] }
}
}
});

The workflow appears as suspended in the job runs page. Navigate to the job detail to find the approve/reject buttons. The timeline does not display anything while the workflow is waiting — it only shows the full execution once the workflow resumes after approval.

getResumeUrls()

Use getResumeUrls() (wrapped in a step() for replay safety) to obtain URLs you can send to approvers via email, Slack, or any notification channel:

import { step, waitForApproval, workflow, getResumeUrls } from 'windmill-client';

export const main = workflow(async () => {
const urls = await step('get_urls', () => getResumeUrls());
// urls.approvalPage — link to the Windmill approval UI
// urls.resume — POST endpoint to approve programmatically
// urls.cancel — POST endpoint to reject programmatically

// Send the approval page link via your notification system
await step('notify', () => sendSlackMessage(urls.approvalPage));

const result = await waitForApproval({ timeout: 86400 });
return result;
});

Referencing external scripts and flows

Use taskScript() and taskFlow() to dispatch to existing Windmill scripts and flows by path, rather than defining the logic inline.

import { taskScript, taskFlow, workflow } from 'windmill-client';

export const main = workflow(async () => {
// Run an existing script by path
const data = await taskScript('f/data_team/extract_users')({ limit: 100 });

// Run an existing flow by path
const result = await taskFlow('f/data_team/etl_pipeline')({ data });

// With options
const heavy = await taskScript('f/ml/train_model', {
timeout: 3600,
tag: 'gpu'
})({ dataset: data });

return { result, heavy };
});

Script modules

Workflow-as-code scripts support companion module files stored in a __mod/ folder alongside the main script. This lets you split task functions into separate files while keeping them part of the same script.

Two layout options:

Flat layout:

f/my_folder/
├── my_workflow.ts # main script
└── my_workflow__mod/
├── extract.ts # module
└── transform.ts # module

Folder layout:

f/my_folder/
└── my_workflow__mod/
├── script.ts # main script (entry point)
├── extract.ts # module
└── transform.ts # module

Modules can be imported with relative paths and referenced via taskScript():

// my_workflow.ts (main script)
import { taskScript, workflow } from 'windmill-client';

export const main = workflow(async (url: string) => {
const data = await taskScript('./extract.ts')({ url });
const result = await taskScript('./transform.ts')({ data });
return result;
});
// my_workflow__mod/extract.ts
export async function main(url: string) {
const resp = await fetch(url);
return resp.json();
}
// my_workflow__mod/transform.ts
export async function main(data: any) {
return { count: data.length, items: data };
}

Modules are synced with the CLI and included in the script hash for proper versioning. Each module can have its own dependencies, and locks are tracked per-module.

modules

Task options

All task types (task(), taskScript(), taskFlow()) accept the following options:

OptionTypeDescription
timeoutnumberMaximum execution time in seconds
tagstringWorker tag to route the task to specific workers
cache_ttlnumberCache the result for this many seconds
prioritynumberPriority in the job queue (higher = picked up sooner)
concurrency_limitnumberMax concurrent executions for this task
concurrency_keystringKey for grouping concurrency limits
concurrency_time_window_snumberTime window in seconds for the concurrency limit

Error handling

When a child task fails, the error is propagated to the parent workflow. You can catch errors with standard try/catch or try/except:

import { task, workflow } from 'windmill-client';

async function riskyOperation(data: string) {
if (data === 'bad') throw new Error('Invalid data');
return `processed: ${data}`;
}

async function fallback(error: string) {
return `fallback result for: ${error}`;
}

export const main = workflow(async (data: string) => {
try {
return await task(riskyOperation)(data);
} catch (e) {
console.log(`Task failed: ${e.message}`);
return await task(fallback)(e.message);
}
});

In Python, task errors raise TaskError which includes step_key, child_job_id, and result attributes for detailed inspection.

Complete example

Here is a full example combining multiple primitives:

import {
task,
step,
workflow,
sleep,
parallel,
waitForApproval,
taskScript
} from 'windmill-client';

async function fetchPage(url: string) {
const resp = await fetch(url);
return resp.json();
}

async function processChunk(items: any[]) {
return items.map((item: any) => ({ ...item, processed: true }));
}

async function saveResults(results: any[]) {
console.log(`Saved ${results.length} results`);
return { saved: results.length };
}

export const main = workflow(async (baseUrl: string, pageCount: number) => {
// Persist run metadata
const runId = await step('run_id', () => crypto.randomUUID());
const startedAt = await step('started_at', () => new Date().toISOString());

// Fetch all pages in parallel
const pages = await parallel(
Array.from({ length: pageCount }, (_, i) => i + 1),
(page) => task(fetchPage)(`${baseUrl}?page=${page}`),
{ concurrency: 5 }
);

// Process each page's data in parallel
const processed = await parallel(pages, (pageData) => task(processChunk)(pageData), {
concurrency: 3
});

// Request approval before saving
const approval = await waitForApproval({ timeout: 3600 });
if (!approval.approved) {
return { runId, status: 'rejected', approver: approval.approver };
}

// Save with an existing script
const flat = processed.flat();
const saveResult = await task(saveResults)(flat);

// Brief delay before cleanup
await sleep(5);

return {
runId,
startedAt,
...saveResult,
approvedBy: approval.approver
};
});

Examples

ETL pipeline with retry

import { task, step, workflow, parallel } from 'windmill-client';

async function extractFromApi(endpoint: string) {
const resp = await fetch(endpoint);
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
return resp.json();
}

async function transformRecord(record: any) {
return { ...record, transformed: true, processedAt: new Date().toISOString() };
}

async function loadBatch(records: any[]) {
console.log(`Loading ${records.length} records`);
return { loaded: records.length };
}

export const main = workflow(async (endpoints: string[]) => {
// Extract from multiple APIs in parallel
const rawData = await parallel(endpoints, (ep) => task(extractFromApi)(ep), { concurrency: 10 });

// Transform all records in parallel
const allRecords = rawData.flat();
const transformed = await parallel(allRecords, (record) => task(transformRecord)(record), {
concurrency: 20
});

// Load in a single batch
return await task(loadBatch)(transformed);
});

Deployment with approval gate

import {
task,
step,
workflow,
sleep,
waitForApproval,
getResumeUrls,
taskScript
} from 'windmill-client';

async function runTests(branch: string) {
console.log(`Running tests for ${branch}`);
return { passed: 42, failed: 0 };
}

async function deployToStaging(branch: string) {
console.log(`Deploying ${branch} to staging`);
return { url: `https://staging.example.com` };
}

async function deployToProduction(branch: string) {
console.log(`Deploying ${branch} to production`);
return { url: `https://example.com` };
}

export const main = workflow(async (branch: string) => {
// Run tests
const testResults = await task(runTests)(branch);
if (testResults.failed > 0) {
return { status: 'failed', tests: testResults };
}

// Deploy to staging
const staging = await task(deployToStaging)(branch);

// Get approval URLs and notify
const urls = await step('approval_urls', () => getResumeUrls());
await step('notify_team', () => {
// send Slack/email with urls.approvalPage
console.log(`Approve production deploy: ${urls.approvalPage}`);
});

// Wait up to 24 hours for approval — zero worker usage during the wait
const approval = await waitForApproval({ timeout: 86400 });
if (!approval.approved) {
return { status: 'rejected', approver: approval.approver, staging };
}

// Deploy to production
const production = await task(deployToProduction)(branch);
return { status: 'deployed', staging, production, approvedBy: approval.approver };
});

Scheduled retry with backoff

import { task, step, workflow, sleep } from 'windmill-client';

async function callExternalApi(payload: any) {
const resp = await fetch('https://api.example.com/submit', {
method: 'POST',
body: JSON.stringify(payload)
});
if (!resp.ok) throw new Error(`API error: ${resp.status}`);
return resp.json();
}

export const main = workflow(async (payload: any, maxRetries: number = 3) => {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await task(callExternalApi)(payload);
} catch (e) {
if (attempt === maxRetries) throw e;
// Exponential backoff: 30s, 60s, 120s — worker is free during each sleep
const delay = 30 * Math.pow(2, attempt - 1);
await sleep(delay);
}
}
});

CLI sync

Workflow-as-code scripts and their modules are fully supported by the Windmill CLI:

# Push a WAC script with its modules
wmill script push f/my_folder/my_workflow

# Pull WAC scripts (modules are written to __mod/ folders)
wmill sync pull

# Preview a WAC script locally (modules are automatically included)
wmill script preview f/my_folder/my_workflow

The CLI tracks per-module hashes in wmill-lock.yaml and only regenerates locks for modules that changed.

API reference

TypeScript (windmill-client)

workflow(fn)

Marks an async function as a workflow entry point. Required for all other WAC primitives.

import { workflow } from 'windmill-client';

export const main = workflow(async (x: number) => {
// orchestration logic
return x;
});

task(fn) / task(fn, options) / task(path, fn) / task(path, fn, options)

Wraps a function so each call runs as a separate child job.

import { task } from 'windmill-client';

// Inline task
const result = await task(myFunction)(arg1, arg2);

// With options
const result = await task(myFunction, { timeout: 600, tag: 'gpu' })(arg1);

// With explicit script path
const result = await task('f/folder/script', myFunction)(arg1);

// With path and options
const result = await task('f/folder/script', myFunction, { timeout: 600 })(arg1);

Returns a wrapper function with _is_task: true and _task_path properties.

taskScript(path, options?)

Creates a task that dispatches to an existing Windmill script by path.

import { taskScript } from 'windmill-client';

const extract = taskScript('f/data/extract', { timeout: 3600 });
const data = await extract({ url: 'https://...' });

taskFlow(path, options?)

Creates a task that dispatches to an existing Windmill flow by path.

import { taskFlow } from 'windmill-client';

const pipeline = taskFlow('f/etl/pipeline', { concurrency_limit: 5 });
const result = await pipeline({ input: data });

step(name, fn)

Executes fn inline and persists the result. On replay, returns the cached value without re-executing. Use for non-deterministic values that must be stable across replays.

import { step } from 'windmill-client';

const ts = await step('timestamp', () => Date.now());
const id = await step('run_id', () => crypto.randomUUID());

sleep(seconds)

Suspends the workflow server-side for the given duration. The worker is released during the sleep. Minimum 1 second.

import { sleep } from 'windmill-client';

await sleep(3600); // 1 hour, zero worker usage

waitForApproval(options?)

Suspends the workflow waiting for human approval. Returns { value, approver, approved }.

import { waitForApproval } from 'windmill-client';

const result = await waitForApproval({
timeout: 3600, // optional, default 1800s
form: {
// optional JSON Schema for approval form
type: 'object',
properties: {
reason: { type: 'string' }
}
}
});

parallel(items, fn, options?)

Processes items in parallel with optional concurrency control. Returns results in input order.

import { parallel, task } from 'windmill-client';

const results = await parallel(
items,
(item) => task(process)(item),
{ concurrency: 5 } // optional, default: all at once
);

getResumeUrls(approver?)

Returns URLs for programmatic workflow control: { approvalPage, resume, cancel }.

import { getResumeUrls, step } from 'windmill-client';

const urls = await step('urls', () => getResumeUrls());

TaskOptions

All task types accept these options:

interface TaskOptions {
timeout?: number; // max execution time (seconds)
tag?: string; // worker tag
cache_ttl?: number; // cache result (seconds)
priority?: number; // queue priority (higher = sooner)
concurrency_limit?: number; // max concurrent executions
concurrency_key?: string; // concurrency grouping key
concurrency_time_window_s?: number; // concurrency time window (seconds)
}

Python (wmill)

@workflow

Marks an async function as a workflow entry point. Required for all other WAC primitives.

from wmill import workflow

@workflow
async def main(x: int):
# orchestration logic
return x

@task / @task(path=, timeout=, tag=, ...)

Wraps a function so each call runs as a separate child job.

from wmill import task

# Basic decorator
@task
async def double(n: int):
return n * 2

# With options
@task(path="f/folder/script", timeout=600, tag="gpu")
async def heavy_compute(data: list):
return sum(data)

Options: path, timeout, tag, cache_ttl, priority, concurrency_limit, concurrency_key, concurrency_time_window_s.

task_script(path, **options)

Creates a task that dispatches to an existing Windmill script by path.

from wmill import task_script

extract = task_script("f/data/extract", timeout=3600)
data = await extract(url="https://...")

task_flow(path, **options)

Creates a task that dispatches to an existing Windmill flow by path.

from wmill import task_flow

pipeline = task_flow("f/etl/pipeline", concurrency_limit=5)
result = await pipeline(input=data)

step(name, fn)

Executes fn inline and persists the result. On replay, returns the cached value. Supports both sync and async callables.

from wmill import step

ts = await step("timestamp", lambda: time.time())
config = await step("config", get_config) # async fn also works

sleep(seconds)

Suspends the workflow server-side. The worker is released during the sleep. Minimum 1 second.

from wmill import sleep

await sleep(3600) # 1 hour, zero worker usage

wait_for_approval(timeout=1800, form=None)

Suspends the workflow waiting for human approval. Returns {"value": ..., "approver": "...", "approved": True/False}.

from wmill import wait_for_approval

result = await wait_for_approval(
timeout=3600,
form={"type": "object", "properties": {"reason": {"type": "string"}}}
)

parallel(items, fn, concurrency=None)

Processes items in parallel with optional concurrency control. Returns results in input order.

from wmill import parallel, task

results = await parallel(
items,
lambda item: process(item),
concurrency=5 # optional, default: all at once
)

get_resume_urls(approver=None)

Returns URLs for programmatic workflow control: {"approvalPage": ..., "resume": ..., "cancel": ...}.

from wmill import get_resume_urls, step

urls = await step("urls", lambda: get_resume_urls())

TaskError

Exception raised when a child task fails. Available for try/except error handling.

from wmill.client import TaskError

try:
result = await risky_task(data)
except TaskError as e:
print(e.message) # error description
print(e.step_key) # checkpoint key of the failed step
print(e.child_job_id) # UUID of the failed child job
print(e.result) # error result from the child