stem 0.2.1 copy "stem: ^0.2.1" to clipboard
stem: ^0.2.1 copied to clipboard

Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling.

Stem Logo

pub package Dart License Buy Me A Coffee

Stem #

Stem is a Dart-first background job and workflow platform: enqueue work, run workers, and orchestrate durable workflows.

For full docs, API references, and in-depth guides, visit https://kingwill101.github.io/stem.

Features #

  • Task pipeline - enqueue with delays, priorities, idempotency helpers, and retries.
  • Workers - isolate pools with soft/hard time limits, autoscaling, and remote control (stem worker ping|revoke|shutdown).
  • Scheduling - Beat-style scheduler with interval/cron/solar/clocked entries and drift tracking.
  • Workflows - Durable Flow runtime with pluggable stores (in-memory, Redis, Postgres, SQLite) and CLI introspection via stem wf.
  • Observability - Dartastic OpenTelemetry metrics/traces, heartbeats, CLI inspection (stem observe, stem dlq).
  • Security - Payload signing (HMAC or Ed25519), TLS automation scripts, revocation persistence.
  • Adapters - In-memory drivers included here; Redis Streams and Postgres adapters ship via the stem_redis and stem_postgres packages.
  • Specs & tooling - OpenSpec change workflow, quality gates (see example/quality_gates), chaos/regression suites.

Install #

dart pub add stem
# Optional adapters
dart pub add stem_redis     # Redis broker/backend
dart pub add stem_postgres  # Postgres broker/backend
dart pub add stem_sqlite    # SQLite broker/backend
dart pub add -d stem_builder # for annotations/codegen (optional)
dart pub add -d stem_cli      # for CLI tooling

Examples #

StemApp and StemWorkflowApp shortcut helpers lazily start their managed worker by default. Pass allowWorkerAutoStart: false when you want producer or orchestration shortcuts without starting that worker in the background, then call start() explicitly when you're ready. StemWorkflowApp also exposes startRuntime() and startWorker() when you want those lifecycles split.

Minimal in-memory task + worker #

import "dart:async";
import "package:stem/stem.dart";

class HelloTask extends TaskHandler<void> {
  @override
  String get name => "demo.hello";

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    final name = args.valueOr<String>("name", "world");
    print("Hello $name");
  }
}

Future<void> main() async {
  final client = await StemClient.inMemory(tasks: [HelloTask()]);
  final worker = await client.createWorker();
  unawaited(worker.start());

  await client.enqueueValue("demo.hello", const {"name": "Stem"});
  await Future<void>.delayed(const Duration(seconds: 1));

  await worker.shutdown();
  await client.close();
}

Reusable stack from URL (Redis) #

import "package:stem/stem.dart";
import "package:stem_redis/stem_redis.dart";

Future<void> main() async {
  final client = await StemClient.fromUrl(
    "redis://localhost:6379",
    adapters: const [StemRedisAdapter()],
    overrides: const StemStoreOverrides(
      backend: "redis://localhost:6379/1",
    ),
    tasks: [HelloTask()],
  );

  final worker = await client.createWorker();
  unawaited(worker.start());

  await client.enqueueValue("demo.hello", const {"name": "Redis"});
  await Future<void>.delayed(const Duration(seconds: 1));

  await worker.shutdown();
  await client.close();
}

Typed task definition and waiting for result #

class HelloArgs {
  const HelloArgs({required this.name});
  final String name;

  Map<String, dynamic> toJson() => {"name": name};
  factory HelloArgs.fromJson(Map<String, dynamic> json) =>
      HelloArgs(name: json["name"] as String);
}

class HelloTask2 extends TaskHandler<String> {
  static final definition = TaskDefinition<HelloArgs, String>.json(
    name: "demo.hello2",
    metadata: const TaskMetadata(description: "typed hello task"),
  );

  @override
  String get name => definition.name;

  @override
  Future<String> call(TaskContext context, Map<String, Object?> args) async {
    final payload = HelloArgs.fromJson(args.cast<String, dynamic>());
    return "Hello ${payload.name}";
  }
}

Future<void> main() async {
  final client = await StemClient.inMemory(tasks: [HelloTask2()]);
  final worker = await client.createWorker();
  unawaited(worker.start());

  final result = await HelloTask2.definition.enqueueAndWait(
    client,
    const HelloArgs(name: "Typed"),
  );
  print(result?.value);

  await worker.shutdown();
  await client.close();
}

Workflow quick-start (Flow) #

import "package:stem/stem.dart";

final onboardingFlow = Flow<String>(
  name: "demo.onboarding",
  build: (flow) {
    flow.step("welcome", (ctx) async {
      return "Welcome ${ctx.requiredParam<String>("name")}";
    });
    flow.step("done", (ctx) async => "Done");
  },
);

Future<void> main() async {
  final appClient = await StemClient.inMemory();
  final app = await appClient.createWorkflowApp(
    flows: [onboardingFlow],
    allowWorkerAutoStart: false,
  );
  await app.start();

  final ref = onboardingFlow.refJson(HelloArgs.fromJson);
  final runId = await ref.start(app, params: const HelloArgs(name: "Stem"));
  final result = await ref.waitFor(app, runId);

  print(result?.value);
  await app.shutdown();
  await appClient.close();
}

Annotated workflow + task with stem_builder #

import "package:stem/stem.dart";
import "package:stem_builder/stem_builder.dart";

part "definitions.stem.g.dart";

@WorkflowDefn(name: "builder.signup", kind: WorkflowKind.script)
class BuilderSignupWorkflow {
  Future<String> run(String email) async {
    final userId = await createUser(email);
    await finalizeSignup(userId: userId);
    return userId;
  }

  @WorkflowStep(name: "create-user")
  Future<String> createUser(String email) async {
    return "user-$email";
  }

  @WorkflowStep(name: "finalize")
  Future<void> finalizeSignup({required String userId}) async {}
}

@TaskDefn(name: "builder.send_welcome")
Future<void> sendWelcomeEmail(
  String email, {
  TaskExecutionContext? context,
}) async {
  // optional: use context for logger/meta/retry helpers
}
dart run build_runner build

# After generation, use module + generated defs
// example usage after codegen
final client = await StemClient.inMemory(module: stemModule);
final app = await client.createWorkflowApp(allowWorkerAutoStart: false);
await app.start();

final runId = await StemWorkflowDefinitions.builderSignup.startAndWait(
  app,
  "alice@example.com",
);
final result = await StemWorkflowDefinitions.builderSignup.waitFor(app, runId);
print(result?.value); // {user: alice@example.com}

Workflow with multiple worker queues #

import "package:stem/stem.dart";

final onboardingFlow = Flow<Map<String, String>>(
  name: "workflow.multi_workers",
  build: (flow) {
    flow.step("dispatch", (ctx) async {
      final notifyTaskId = await ctx.enqueue(
        "notify.send",
        args: {"email": "alex@example.com"},
        enqueueOptions: const TaskEnqueueOptions(queue: "notifications"),
      );
      final analyticsTaskId = await ctx.enqueue(
        "analytics.track",
        args: {"userId": "alex", "event": "account.created"},
        enqueueOptions: const TaskEnqueueOptions(queue: "analytics"),
      );
      return {"notifyTaskId": notifyTaskId, "trackTaskId": analyticsTaskId};
    });
  },
);

class NotifyTask extends TaskHandler<String> {
  @override
  String get name => "notify.send";

  @override
  TaskOptions get options => const TaskOptions(queue: "notifications");

  @override
  Future<String> call(TaskContext context, Map<String, Object?> args) async =>
      "notified:${args['email']}";
}

class AnalyticsTask extends TaskHandler<String> {
  @override
  String get name => "analytics.track";

  @override
  TaskOptions get options => const TaskOptions(queue: "analytics");

  @override
  Future<String> call(TaskContext context, Map<String, Object?> args) async =>
      "tracked:${args['event']}";
}

Future<void> main() async {
  final client = await StemClient.inMemory();
  final app = await client.createWorkflowApp(
    flows: [onboardingFlow],
    workerConfig: const StemWorkerConfig(queue: "workflow"),
  );
  await app.start();

  final notifications = await client.createWorker(
    workerConfig: StemWorkerConfig(
      queue: "notifications-worker",
      consumerName: "notifications-worker",
      subscription: RoutingSubscription.singleQueue("notifications"),
    ),
    tasks: [NotifyTask()],
  );
  final analytics = await client.createWorker(
    workerConfig: StemWorkerConfig(
      queue: "analytics-worker",
      consumerName: "analytics-worker",
      subscription: RoutingSubscription.singleQueue("analytics"),
    ),
    tasks: [AnalyticsTask()],
  );

  await notifications.start();
  await analytics.start();

  final result = await onboardingFlow.startAndWait(app);
  final taskIds = result?.value ?? const <String, String>{};
  print(await app.waitForTask<String>(taskIds['notifyTaskId']!));
  print(await app.waitForTask<String>(taskIds['trackTaskId']!));

  await notifications.shutdown();
  await analytics.shutdown();
  await app.close();
  await client.close();
}

5) CLI at a glance #

# Start a worker or run built-in introspection commands
stem --help
stem worker start --help
stem wf --help

General worker management (multi-worker setup) #

import "package:stem/stem.dart";

class EmailTask extends TaskHandler<void> {
  @override
  String get name => "notify.send";

  @override
  TaskOptions get options => const TaskOptions(queue: "notify");

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    print("notify queue: ${args['to']}");
  }
}

class ReportTask extends TaskHandler<void> {
  @override
  String get name => "reports.aggregate";

  @override
  TaskOptions get options => const TaskOptions(queue: "reports");

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    print("reports queue: ${args['reportId']}");
  }
}

Future<void> main() async {
  final client = await StemClient.inMemory();

  final notifyWorker = await client.createWorker(
    workerConfig: StemWorkerConfig(
      queue: "notify-worker",
      consumerName: "notify-worker",
      subscription: RoutingSubscription.singleQueue("notify"),
    ),
    tasks: [EmailTask()],
  );

  final reportsWorker = await client.createWorker(
    workerConfig: StemWorkerConfig(
      queue: "reports-worker",
      consumerName: "reports-worker",
      subscription: RoutingSubscription.singleQueue("reports"),
    ),
    tasks: [ReportTask()],
  );

  await notifyWorker.start();
  await reportsWorker.start();

  await client.enqueue(
    "notify.send",
    args: {"to": "ops@example.com"},
  );
  await client.enqueue(
    "reports.aggregate",
    args: {"reportId": "r-2026-q1"},
  );

  await Future<void>.delayed(const Duration(milliseconds: 400));

  await notifyWorker.shutdown();
  await reportsWorker.shutdown();
  await client.close();
}

Want depth? #

This README is intentionally example-focused. For implementation details, runtime semantics, adapter tuning, and operational playbooks, see the full docs at https://kingwill101.github.io/stem.

Documentation & Examples #

0
likes
150
points
143
downloads

Documentation

API reference

Publisher

verified publisherglenfordwilliams.com

Weekly Downloads

Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling.

Repository (GitHub)
View/report issues

Topics

#background-processing #message-queue #observability

License

MIT (license)

Dependencies

ansicolor, collection, contextual, crypto, cryptography, dartastic_opentelemetry, dartastic_opentelemetry_api, glob, meta, stem_memory, timezone, uuid, yaml

More

Packages that depend on stem