stem 0.2.1
stem: ^0.2.1 copied to clipboard
Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling.
Stem #
Stem is a Dart-first background job and workflow platform: enqueue work, run workers, and orchestrate durable workflows.
For full docs, API references, and in-depth guides, visit https://kingwill101.github.io/stem.
Features #
- Task pipeline - enqueue with delays, priorities, idempotency helpers, and retries.
- Workers - isolate pools with soft/hard time limits, autoscaling, and remote control (
stem worker ping|revoke|shutdown). - Scheduling - Beat-style scheduler with interval/cron/solar/clocked entries and drift tracking.
- Workflows - Durable
Flowruntime with pluggable stores (in-memory, Redis, Postgres, SQLite) and CLI introspection viastem wf. - Observability - Dartastic OpenTelemetry metrics/traces, heartbeats, CLI inspection (
stem observe,stem dlq). - Security - Payload signing (HMAC or Ed25519), TLS automation scripts, revocation persistence.
- Adapters - In-memory drivers included here; Redis Streams and Postgres adapters ship via the
stem_redisandstem_postgrespackages. - Specs & tooling - OpenSpec change workflow, quality gates (see
example/quality_gates), chaos/regression suites.
Install #
dart pub add stem
# Optional adapters
dart pub add stem_redis # Redis broker/backend
dart pub add stem_postgres # Postgres broker/backend
dart pub add stem_sqlite # SQLite broker/backend
dart pub add -d stem_builder # for annotations/codegen (optional)
dart pub add -d stem_cli # for CLI tooling
Examples #
StemApp and StemWorkflowApp shortcut helpers lazily start their managed
worker by default. Pass allowWorkerAutoStart: false when you want producer
or orchestration shortcuts without starting that worker in the background,
then call start() explicitly when you're ready. StemWorkflowApp also
exposes startRuntime() and startWorker() when you want those lifecycles
split.
Minimal in-memory task + worker #
import "dart:async";
import "package:stem/stem.dart";
class HelloTask extends TaskHandler<void> {
@override
String get name => "demo.hello";
@override
Future<void> call(TaskContext context, Map<String, Object?> args) async {
final name = args.valueOr<String>("name", "world");
print("Hello $name");
}
}
Future<void> main() async {
final client = await StemClient.inMemory(tasks: [HelloTask()]);
final worker = await client.createWorker();
unawaited(worker.start());
await client.enqueueValue("demo.hello", const {"name": "Stem"});
await Future<void>.delayed(const Duration(seconds: 1));
await worker.shutdown();
await client.close();
}
Reusable stack from URL (Redis) #
import "package:stem/stem.dart";
import "package:stem_redis/stem_redis.dart";
Future<void> main() async {
final client = await StemClient.fromUrl(
"redis://localhost:6379",
adapters: const [StemRedisAdapter()],
overrides: const StemStoreOverrides(
backend: "redis://localhost:6379/1",
),
tasks: [HelloTask()],
);
final worker = await client.createWorker();
unawaited(worker.start());
await client.enqueueValue("demo.hello", const {"name": "Redis"});
await Future<void>.delayed(const Duration(seconds: 1));
await worker.shutdown();
await client.close();
}
Typed task definition and waiting for result #
class HelloArgs {
const HelloArgs({required this.name});
final String name;
Map<String, dynamic> toJson() => {"name": name};
factory HelloArgs.fromJson(Map<String, dynamic> json) =>
HelloArgs(name: json["name"] as String);
}
class HelloTask2 extends TaskHandler<String> {
static final definition = TaskDefinition<HelloArgs, String>.json(
name: "demo.hello2",
metadata: const TaskMetadata(description: "typed hello task"),
);
@override
String get name => definition.name;
@override
Future<String> call(TaskContext context, Map<String, Object?> args) async {
final payload = HelloArgs.fromJson(args.cast<String, dynamic>());
return "Hello ${payload.name}";
}
}
Future<void> main() async {
final client = await StemClient.inMemory(tasks: [HelloTask2()]);
final worker = await client.createWorker();
unawaited(worker.start());
final result = await HelloTask2.definition.enqueueAndWait(
client,
const HelloArgs(name: "Typed"),
);
print(result?.value);
await worker.shutdown();
await client.close();
}
Workflow quick-start (Flow) #
import "package:stem/stem.dart";
final onboardingFlow = Flow<String>(
name: "demo.onboarding",
build: (flow) {
flow.step("welcome", (ctx) async {
return "Welcome ${ctx.requiredParam<String>("name")}";
});
flow.step("done", (ctx) async => "Done");
},
);
Future<void> main() async {
final appClient = await StemClient.inMemory();
final app = await appClient.createWorkflowApp(
flows: [onboardingFlow],
allowWorkerAutoStart: false,
);
await app.start();
final ref = onboardingFlow.refJson(HelloArgs.fromJson);
final runId = await ref.start(app, params: const HelloArgs(name: "Stem"));
final result = await ref.waitFor(app, runId);
print(result?.value);
await app.shutdown();
await appClient.close();
}
Annotated workflow + task with stem_builder #
import "package:stem/stem.dart";
import "package:stem_builder/stem_builder.dart";
part "definitions.stem.g.dart";
@WorkflowDefn(name: "builder.signup", kind: WorkflowKind.script)
class BuilderSignupWorkflow {
Future<String> run(String email) async {
final userId = await createUser(email);
await finalizeSignup(userId: userId);
return userId;
}
@WorkflowStep(name: "create-user")
Future<String> createUser(String email) async {
return "user-$email";
}
@WorkflowStep(name: "finalize")
Future<void> finalizeSignup({required String userId}) async {}
}
@TaskDefn(name: "builder.send_welcome")
Future<void> sendWelcomeEmail(
String email, {
TaskExecutionContext? context,
}) async {
// optional: use context for logger/meta/retry helpers
}
dart run build_runner build
# After generation, use module + generated defs
// example usage after codegen
final client = await StemClient.inMemory(module: stemModule);
final app = await client.createWorkflowApp(allowWorkerAutoStart: false);
await app.start();
final runId = await StemWorkflowDefinitions.builderSignup.startAndWait(
app,
"alice@example.com",
);
final result = await StemWorkflowDefinitions.builderSignup.waitFor(app, runId);
print(result?.value); // {user: alice@example.com}
Workflow with multiple worker queues #
import "package:stem/stem.dart";
final onboardingFlow = Flow<Map<String, String>>(
name: "workflow.multi_workers",
build: (flow) {
flow.step("dispatch", (ctx) async {
final notifyTaskId = await ctx.enqueue(
"notify.send",
args: {"email": "alex@example.com"},
enqueueOptions: const TaskEnqueueOptions(queue: "notifications"),
);
final analyticsTaskId = await ctx.enqueue(
"analytics.track",
args: {"userId": "alex", "event": "account.created"},
enqueueOptions: const TaskEnqueueOptions(queue: "analytics"),
);
return {"notifyTaskId": notifyTaskId, "trackTaskId": analyticsTaskId};
});
},
);
class NotifyTask extends TaskHandler<String> {
@override
String get name => "notify.send";
@override
TaskOptions get options => const TaskOptions(queue: "notifications");
@override
Future<String> call(TaskContext context, Map<String, Object?> args) async =>
"notified:${args['email']}";
}
class AnalyticsTask extends TaskHandler<String> {
@override
String get name => "analytics.track";
@override
TaskOptions get options => const TaskOptions(queue: "analytics");
@override
Future<String> call(TaskContext context, Map<String, Object?> args) async =>
"tracked:${args['event']}";
}
Future<void> main() async {
final client = await StemClient.inMemory();
final app = await client.createWorkflowApp(
flows: [onboardingFlow],
workerConfig: const StemWorkerConfig(queue: "workflow"),
);
await app.start();
final notifications = await client.createWorker(
workerConfig: StemWorkerConfig(
queue: "notifications-worker",
consumerName: "notifications-worker",
subscription: RoutingSubscription.singleQueue("notifications"),
),
tasks: [NotifyTask()],
);
final analytics = await client.createWorker(
workerConfig: StemWorkerConfig(
queue: "analytics-worker",
consumerName: "analytics-worker",
subscription: RoutingSubscription.singleQueue("analytics"),
),
tasks: [AnalyticsTask()],
);
await notifications.start();
await analytics.start();
final result = await onboardingFlow.startAndWait(app);
final taskIds = result?.value ?? const <String, String>{};
print(await app.waitForTask<String>(taskIds['notifyTaskId']!));
print(await app.waitForTask<String>(taskIds['trackTaskId']!));
await notifications.shutdown();
await analytics.shutdown();
await app.close();
await client.close();
}
5) CLI at a glance #
# Start a worker or run built-in introspection commands
stem --help
stem worker start --help
stem wf --help
General worker management (multi-worker setup) #
import "package:stem/stem.dart";
class EmailTask extends TaskHandler<void> {
@override
String get name => "notify.send";
@override
TaskOptions get options => const TaskOptions(queue: "notify");
@override
Future<void> call(TaskContext context, Map<String, Object?> args) async {
print("notify queue: ${args['to']}");
}
}
class ReportTask extends TaskHandler<void> {
@override
String get name => "reports.aggregate";
@override
TaskOptions get options => const TaskOptions(queue: "reports");
@override
Future<void> call(TaskContext context, Map<String, Object?> args) async {
print("reports queue: ${args['reportId']}");
}
}
Future<void> main() async {
final client = await StemClient.inMemory();
final notifyWorker = await client.createWorker(
workerConfig: StemWorkerConfig(
queue: "notify-worker",
consumerName: "notify-worker",
subscription: RoutingSubscription.singleQueue("notify"),
),
tasks: [EmailTask()],
);
final reportsWorker = await client.createWorker(
workerConfig: StemWorkerConfig(
queue: "reports-worker",
consumerName: "reports-worker",
subscription: RoutingSubscription.singleQueue("reports"),
),
tasks: [ReportTask()],
);
await notifyWorker.start();
await reportsWorker.start();
await client.enqueue(
"notify.send",
args: {"to": "ops@example.com"},
);
await client.enqueue(
"reports.aggregate",
args: {"reportId": "r-2026-q1"},
);
await Future<void>.delayed(const Duration(milliseconds: 400));
await notifyWorker.shutdown();
await reportsWorker.shutdown();
await client.close();
}
- Full example that combines a workflow dispatching to dedicated workers: multiple_workers.dart
Want depth? #
This README is intentionally example-focused. For implementation details, runtime semantics, adapter tuning, and operational playbooks, see the full docs at https://kingwill101.github.io/stem.
Documentation & Examples #
- Guided onboarding: Guided onboarding (install → infra → ops → production).
- Examples (each has its own README):
- workflows - end-to-end workflow samples (in-memory, sleep/event, SQLite, Redis). See
versioned_rewind.dartfor auto-versioned step rewinds. - multiple_workers.dart - workflow dispatching tasks to
notificationsandanalyticsworkers. - cancellation_policy - demonstrates auto-cancelling long workflows using
WorkflowCancellationPolicy. - rate_limit_delay - delayed enqueue, priority clamping, Redis rate limiter.
- dlq_sandbox - dead-letter inspection and replay via CLI.
- autoscaling_demo - autoscaling worker concurrency under queue backlog.
- scheduler_observability - Beat drift metrics, schedule signals, and CLI checks.
- microservice, monolith_service, mixed_cluster - production-style topologies.
- progress_heartbeat - task progress + heartbeat reporting.
- task_context_mixed - TaskContext + TaskInvocationContext enqueue patterns, plus Celery-style apply_async options.
- task_usage_patterns.dart - in-memory TaskContext enqueue, TaskInvocationContext builder, and typed enqueue calls.
- worker_control_lab - worker ping/stats/revoke/shutdown drills.
- unique_tasks - enables
TaskOptions.uniquewith a shared lock store. - signing_key_rotation - rotate HMAC signing keys with overlap.
- ops_health_suite - CLI health checks + queue/worker snapshots.
- quality_gates - justfile-driven quality gate runner.
- security examples - payload signing + TLS profiles.
- postgres_tls - Redis broker + Postgres backend secured via the shared
STEM_TLS_*settings. - otel_metrics - OTLP collectors + Grafana dashboards.