Event-Driven Architecture Chapter 07

The 1Engage backend uses Apache Kafka as the central event bus for asynchronous, loosely-coupled communication between microservices. This chapter covers the event envelope design, producer/consumer abstractions, idempotency guarantees, retry strategies with dead-letter queues, worker pool patterns, per-tenant rate limiting, and distributed tracing propagation through events.

  HTTP Request                     Kafka Cluster                 Workers
 ┌──────────────┐  publish   ┌─────────────────┐  consume  ┌──────────────┐
 │  API Service  │─────────>│  Topic Partitions  │────────>│  Consumer   │
 │  (Producer)   │          │  [P0] [P1] [P2]   │         │  Group      │
 └──────────────┘          └─────────────────┘         └──────────────┘
                                       │                    │
                              ┌───────┴────────┐       │ retries
                              │   DLQ Topics    │       │ exhausted
                              │  {topic}.dlq    │<──────┘
                              └────────────────┘

7.1 Event Envelope #

Every event flowing through the system uses a standardized Event envelope defined in pkg/eventbus/event.go. The envelope carries metadata (routing, tracing, tenant context) alongside the business payload.

type Event struct {
    ID         string            `json:"id"`
    Type       string            `json:"type"`
    Version    int               `json:"version"`
    TenantID   string            `json:"tenant_id"`
    OccurredAt time.Time         `json:"occurred_at"`
    Source     string            `json:"source"`
    TraceID    string            `json:"trace_id,omitempty"`
    SpanID     string            `json:"span_id,omitempty"`
    Key        string            `json:"key,omitempty"`
    Data       json.RawMessage   `json:"data"`
    Headers    map[string]string `json:"headers,omitempty"`
}
Field Type Description
ID string UUID v4. Unique event identifier, used as the idempotency key for deduplication.
Type string Event category and action, format: {domain}.{action}.v{version} (e.g. notification.requested.v1).
Version int Schema version of the Data payload. Enables evolving event schemas without breaking consumers.
TenantID string Tenant this event belongs to. Default partition key for per-tenant ordering.
OccurredAt time.Time UTC timestamp when the event was created. Automatically set by NewEvent().
Source string Service that produced the event (e.g. "broadcast-service", "auth-service").
TraceID string OpenTelemetry trace ID for distributed tracing. Injected by the producer.
SpanID string OpenTelemetry span ID. Together with TraceID, enables linking producer and consumer spans.
Key string Explicit partition key override. If empty, PartitionKey() falls back to TenantID.
Data json.RawMessage Business payload as raw JSON. Decoded lazily by the consumer via UnmarshalData().
Headers map[string]string Optional metadata. Used for OTel trace propagation and DLQ retry metadata headers.

NewEvent() Factory

The NewEvent() factory automatically generates an ID, sets the timestamp, and marshals the data payload. Callers never need to construct the envelope manually.

func NewEvent(eventType, source, tenantID string, data any) (*Event, error) {
    rawData, err := json.Marshal(data)
    if err != nil {
        return nil, err
    }

    return &Event{
        ID:         uuid.New().String(),
        Type:       eventType,
        Version:    1,
        TenantID:   tenantID,
        OccurredAt: time.Now().UTC(),
        Source:     source,
        Data:       rawData,
        Headers:    make(map[string]string),
    }, nil
}

PartitionKey() — Per-Tenant Ordering

func (e *Event) PartitionKey() string {
    if e.Key != "" {
        return e.Key
    }
    return e.TenantID
}

By defaulting the partition key to TenantID, all events for the same tenant land on the same Kafka partition. This guarantees ordering within a tenant while allowing parallelism across tenants. Set Key explicitly only when you need a different partitioning strategy (e.g. per-recipient).

7.2 Producer/Consumer Interfaces #

pkg/eventbus/interfaces.go defines broker-agnostic interfaces. All event publishing and consuming goes through these abstractions.

type Producer interface {
    Publish(ctx context.Context, topic string, event *Event) error
    PublishBatch(ctx context.Context, topic string, events []*Event) error
    Close() error
}

type Handler func(ctx context.Context, event *Event) error

type Consumer interface {
    Subscribe(topic, groupID string, handler Handler) error
    Run(ctx context.Context) error
    Close() error
}

type ProducerConsumer interface {
    Producer
    Consumer
}

Why interfaces? The codebase currently uses Kafka via segmentio/kafka-go, but these interfaces mean the broker implementation can be swapped (e.g. to NATS, RabbitMQ, or an in-memory bus for testing) without changing any business logic. The Handler function type is the fundamental unit of event processing — every middleware (retry, idempotency) is a Handler that wraps another Handler.

Additional supporting types in pkg/eventbus/interfaces.go:

7.3 Kafka Producer Implementation #

pkg/eventbus/kafka/producer.go implements eventbus.Producer using segmentio/kafka-go. Key design decisions:

Lazy Writer Creation (Double-Check Locking)

Writers are created per-topic on first use, allowing different topics to have different configurations. The getWriter() method uses RWMutex double-check locking to avoid creating duplicate writers under concurrency:

type Producer struct {
    writers map[string]*kafka.Writer
    config  *Config
    mu      sync.RWMutex
}

func (p *Producer) getWriter(topic string) *kafka.Writer {
    // Fast path: read lock to check if writer exists
    p.mu.RLock()
    w, ok := p.writers[topic]
    p.mu.RUnlock()
    if ok {
        return w
    }

    // Slow path: write lock to create writer
    p.mu.Lock()
    defer p.mu.Unlock()

    // Double-check after acquiring write lock
    if w, ok := p.writers[topic]; ok {
        return w
    }

    w = &kafka.Writer{
        Addr:         kafka.TCP(p.config.Brokers...),
        Topic:        topic,
        Balancer:     &kafka.Hash{}, // Partition by key (tenant_id)
        RequiredAcks: kafka.RequiredAcks(p.config.RequiredAcks),
        BatchSize:    p.config.BatchSize,
        BatchTimeout: p.config.BatchTimeout,
        WriteTimeout: p.config.WriteTimeout,
        MaxAttempts:  p.config.MaxRetries,
        Async:        false, // Synchronous for at-least-once semantics
    }

    p.writers[topic] = w
    return w
}

Pattern: Read lock first (fast path for existing writers), then upgrade to write lock only when needed. The second check inside the write lock prevents two goroutines from both creating a writer for the same topic simultaneously.

Publish Flow

The Publish() method follows this sequence:

  1. Start producer spanStartProducerSpan() creates an OTel span and injects trace context into event headers
  2. Marshal event — serialize the entire event envelope to JSON
  3. Set partition keyevent.PartitionKey() used as the Kafka message key
  4. Attach headers — event ID, type, tenant ID, source, plus trace context headers
  5. Write to Kafka — synchronous write via the per-topic writer
func (p *Producer) Publish(ctx context.Context, topic string, event *eventbus.Event) error {
    ctx, span := eventbus.StartProducerSpan(ctx, topic, event)
    defer span.End()

    value, err := event.Marshal()
    if err != nil { return err }

    msg := kafka.Message{
        Key:   []byte(event.PartitionKey()),
        Value: value,
        Headers: []kafka.Header{
            {Key: eventbus.HeaderEventID,   Value: []byte(event.ID)},
            {Key: eventbus.HeaderEventType, Value: []byte(event.Type)},
            {Key: eventbus.HeaderTenantID,  Value: []byte(event.TenantID)},
            {Key: eventbus.HeaderSource,    Value: []byte(event.Source)},
        },
    }

    writer := p.getWriter(topic)
    return writer.WriteMessages(ctx, msg)
}

7.4 Kafka Consumer Implementation #

pkg/eventbus/kafka/consumer.go implements eventbus.Consumer. The consumer lifecycle has three phases:

Phase 1: Subscribe (before Run)

Subscribe() registers topic/groupID/handler tuples. It must be called before Run() — subscribing while running returns an error.

func (c *Consumer) Subscribe(topic, groupID string, handler eventbus.Handler) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.running {
        return errors.New("cannot subscribe while consumer is running")
    }
    // ... register subscription
    c.subscriptions = append(c.subscriptions, sub)
    return nil
}

Phase 2: Run (blocking)

Run() creates a kafka.Reader per subscription, launches a goroutine for each, and blocks until the context is cancelled or a fatal error occurs.

func (c *Consumer) Run(ctx context.Context) error {
    // Create readers for all subscriptions
    for _, sub := range c.subscriptions {
        sub.reader = kafka.NewReader(kafka.ReaderConfig{
            Brokers: c.config.Brokers,
            GroupID: sub.groupID,
            Topic:   sub.topic,
            // ... additional config
        })
    }

    // Start a goroutine for each subscription
    for _, sub := range c.subscriptions {
        c.wg.Add(1)
        go func(s *subscription) {
            defer c.wg.Done()
            c.consumeLoop(ctx, s)
        }(sub)
    }

    // Block until context cancelled or error
    select {
    case <-ctx.Done():
        // graceful shutdown
    case err := <-errCh:
        return err
    }
    c.wg.Wait()
    return ctx.Err()
}

Phase 3: consumeLoop (at-least-once delivery)

Each subscription runs its own consumeLoop. The loop implements at-least-once delivery: offsets are committed only after successful processing.

func (c *Consumer) consumeLoop(ctx context.Context, sub *subscription) error {
    for {
        msg, err := sub.reader.FetchMessage(ctx)  // blocks until message or cancel
        if err != nil { /* backoff and retry */ }

        if err := c.processMessage(ctx, sub, msg); err != nil {
            if helpers.IsPermanent(err) {
                // PERMANENT ERROR: commit offset and skip
                sub.reader.CommitMessages(ctx, msg)
                continue
            }
            // TRANSIENT ERROR: do NOT commit → Kafka will redeliver
            time.Sleep(c.config.RetryBackoff)
            continue
        }

        // SUCCESS: commit offset
        sub.reader.CommitMessages(ctx, msg)
    }
}

PermanentError handling: When a handler returns a PermanentError (e.g. malformed data that can never succeed), the consumer commits the offset and moves on. This prevents a poison pill message from blocking the entire partition forever.

  FetchMessage
       │
       ▼
  processMessage
       │
  ┌───┴───┐
  │       │
success  error?
  │       │
  │    permanent? ── yes ──> CommitMessages (skip)
  │       │
  │    transient
  │       │
  │    DON'T commit ──> backoff ──> retry
  │
  ▼
CommitMessages

7.5 Kafka Client (Combined) #

pkg/eventbus/kafka/client.go combines Producer and Consumer via struct embedding:

type Client struct {
    *Producer
    *Consumer
    config *Config
}

// Compile-time interface check
var _ eventbus.ProducerConsumer = (*Client)(nil)

Struct embedding gives Client all methods from both Producer and Consumer without delegation boilerplate. The Client satisfies the ProducerConsumer interface automatically. Its Close() method is explicitly defined to shut down both embedded types.

The client also provides utility functions for topic management:

7.6 Idempotent Event Processing #

pkg/eventbus/idempotency.go provides a handler middleware that prevents duplicate event processing.

func IdempotentHandler(store IdempotencyStore, ttl time.Duration, handler Handler) Handler {
    return func(ctx context.Context, event *Event) error {
        key := IdempotencyKey(event)  // = event.ID

        processed, err := store.IsProcessed(ctx, key)
        if err != nil {
            return handler(ctx, event)  // store error → process anyway
        }

        if processed {
            return nil  // skip duplicate
        }

        if err := handler(ctx, event); err != nil {
            return err  // don't mark as processed on failure
        }

        _ = store.MarkProcessed(ctx, key, ttl)  // best-effort mark
        return nil
    }
}

Why idempotency? Kafka provides at-least-once delivery. If a consumer crashes after processing a message but before committing the offset, Kafka will redeliver the same message. Without idempotency checking, the business handler would execute twice. The IdempotentHandler wrapper checks the event ID against a store before processing, and marks it as processed afterward.

IdempotencyStore Interface

type IdempotencyStore interface {
    IsProcessed(ctx context.Context, key string) (bool, error)
    MarkProcessed(ctx context.Context, key string, ttl time.Duration) error
    Delete(ctx context.Context, key string) error
}

The codebase includes InMemoryIdempotencyStore with sync.RWMutex for development. It runs a background cleanup goroutine every minute to expire old entries.

Production recommendation: Use Redis with TTL for distributed, high-throughput deployments. The in-memory store does not persist across restarts and does not work in multi-instance deployments.

TTL Constant Duration Use Case
DefaultIdempotencyTTL 24 hours Most cases — events older than 24h are unlikely to be redelivered
ShortIdempotencyTTL 1 hour High-throughput scenarios with quick retries
LongIdempotencyTTL 7 days Critical events that must never be duplicated

7.7 Retry with Exponential Backoff & DLQ #

pkg/eventbus/retry.go implements a retry middleware with exponential backoff and dead-letter queue (DLQ) fallback.

func RetryableHandler(cfg *RetryConfig, dlqProducer Producer, handler Handler) Handler {
    return func(ctx context.Context, event *Event) error {
        var lastErr error
        backoff := cfg.InitialBackoff

        for attempt := 0; attempt <= cfg.MaxRetries; attempt++ {
            err := handler(ctx, event)
            if err == nil {
                return nil  // success!
            }
            lastErr = err

            if attempt < cfg.MaxRetries {
                // Context-aware backoff
                select {
                case <-ctx.Done():
                    return ctx.Err()
                case <-time.After(backoff):
                }
                backoff = time.Duration(float64(backoff) * cfg.BackoffMultiplier)
                if backoff > cfg.MaxBackoff {
                    backoff = cfg.MaxBackoff
                }
            }
        }

        // All retries exhausted → send to DLQ
        if cfg.SendToDLQ && dlqProducer != nil {
            sendToDLQ(ctx, dlqProducer, event, lastErr)
        }
        return lastErr
    }
}

Default Retry Configuration

Parameter Default Value Description
MaxRetries 3 Maximum retry attempts (4 total attempts including the original)
InitialBackoff 100ms Delay before the first retry
BackoffMultiplier 2.0 Backoff increases exponentially: 100ms → 200ms → 400ms → ...
MaxBackoff 30s Backoff is capped at this value
SendToDLQ true Send to dead-letter queue after retries exhausted

Dead-Letter Queue (DLQ) Design

When all retries are exhausted, the event is published to a DLQ topic. The DLQ topic name follows the convention {original-topic}.dlq.

func DLQTopicName(topic string) string {
    return topic + ".dlq"
}

Before publishing to the DLQ, metadata headers are attached to the event:

Header Key Value
x-last-error The error message from the final failed attempt
x-first-failed-at RFC3339 timestamp of when the DLQ send was triggered

7.8 Handler Composition (Decorator Pattern) #

The Handler type (func(ctx, event) error) enables decorator-style composition. Each middleware wraps the next handler, forming a processing pipeline:

handler := eventbus.RetryableHandler(
    retryCfg,
    kafkaProducer,
    eventbus.IdempotentHandler(
        idempotencyStore,
        eventbus.DefaultIdempotencyTTL,
        myBusinessHandler,  // the actual business logic
    ),
)
  Incoming Event
       │
       ▼
┌────────────────────┐
│  RetryableHandler     │  Retries up to N times with exponential backoff
│  ┌────────────────┐ │
│  │IdempotentHandler│ │  Checks if event already processed
│  │ ┌────────────┐ │ │
│  │ │  Business  │ │ │  Actual event processing logic
│  │ │  Handler   │ │ │
│  │ └────────────┘ │ │
│  └────────────────┘ │
└────────────────────┘
       │
   success / error / DLQ

Execution flow:

  1. Retry layer receives the event, calls the inner handler
  2. Idempotency layer checks if event ID was already processed — if yes, returns nil immediately
  3. Business handler executes the actual logic
  4. On success: idempotency layer marks as processed, retry layer returns nil
  5. On error: idempotency layer propagates the error, retry layer waits and retries
  6. After all retries exhausted: retry layer sends to DLQ

This is the same pattern used in HTTP middleware (e.g. Chi middleware chain). Because Handler is just a function type, any function that takes a Handler and returns a Handler can be composed in the chain.

7.9 Worker Pool Pattern #

The broadcast service uses a Dispatcher/Worker pattern for concurrent job processing. Defined in apps/broadcast-service/internal/worker/.

Dispatcher

type Dispatcher struct {
    jobQueue chan job.Job
    workers  []*Worker
}

func NewDispatcher(workerCount int, queueSize int) *Dispatcher {
    jobQueue := make(chan job.Job, queueSize)
    d := &Dispatcher{jobQueue: jobQueue}
    for i := 1; i <= workerCount; i++ {
        d.workers = append(d.workers, NewWorker(i, jobQueue))
    }
    return d
}

// TryDispatch: non-blocking send with backpressure
func (d *Dispatcher) TryDispatch(job job.Job) bool {
    select {
    case d.jobQueue <- job:
        return true    // dispatched
    default:
        return false   // queue full — backpressure
    }
}

TryDispatch() uses a select with default to attempt a non-blocking send. If the job queue is full, it returns false immediately instead of blocking, giving the caller a chance to apply backpressure (e.g. reject the request, delay, or log a warning).

Worker

type Worker struct {
    id   int
    jobs <-chan job.Job  // receive-only channel
}

func (w *Worker) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case <-ctx.Done():
                return  // graceful shutdown
            case j := <-w.jobs:
                func() {
                    defer func() {
                        if r := recover(); r != nil {
                            log.Printf("worker %d panic: %v", w.id, r)
                        }
                    }()
                    j.Run(ctx)
                }()
            }
        }
    }()
}

Key design points:

  Dispatcher
 ┌──────────────────────────────────┐
 │                                  │
 │  TryDispatch(job) / Dispatch(job)  │
 │         │                        │
 │         ▼                        │
 │   ┌───────────────┐             │
 │   │  jobQueue    │  buffered    │
 │   │  (channel)   │  channel     │
 │   └───┬─────┬─────┘             │
 │      │     │                      │
 └─────┴─────┴────────────────────┘
        │     │
        ▼     ▼
   Worker 1  Worker 2  ...  Worker N
   (goroutine per worker)

7.10 Bounded Worker Pool with Semaphore #

The broadcast worker (apps/broadcast-service/cmd/worker/main.go) uses a channel-based semaphore to bound the number of concurrent goroutines processing broadcast messages:

sem := make(chan struct{}, workerPoolSize) // e.g. 20
var wg sync.WaitGroup

// For each broadcast message event:
sem <- struct{}{}   // acquire slot (blocks if pool full)
wg.Add(1)
go func() {
    defer func() {
        <-sem        // release slot
        wg.Done()   // signal completion
    }()
    handler(ctx, event)
}()

How the semaphore works: A buffered channel of size workerPoolSize acts as a counting semaphore. Sending to the channel “acquires a slot” — if the buffer is full, the send blocks until another goroutine finishes and reads from the channel. The WaitGroup is used for graceful shutdown to ensure all in-flight goroutines complete before the process exits.

In the actual worker code, this pattern is applied specifically to EventTypeBroadcastMessageRequested events, while other event types (like upload requests) are processed synchronously in the consumer goroutine:

if event.Type == eventbus.EventTypeBroadcastMessageRequested {
    sem <- struct{}{}
    wg.Add(1)
    go func() {
        defer func() { <-sem; wg.Done() }()
        if err := broadcastMessageHandler(ctx, event); err != nil {
            slog.ErrorContext(ctx, "Broadcast message handler error",
                "error", err, "event_id", event.ID)
        }
    }()
    return nil  // return immediately, don't block consumer
}

7.11 Per-Tenant Rate Limiting #

apps/broadcast-service/internal/ratelimit/limiter.go implements per-key (per-tenant) rate limiting using Go's golang.org/x/time/rate package.

type PerKeyLimiter struct {
    mu       sync.RWMutex
    limiters map[string]*rate.Limiter
    rps      float64
    burst    int
}

func (t *PerKeyLimiter) Wait(ctx context.Context, key string) error {
    // Fast path: read lock
    t.mu.RLock()
    l, ok := t.limiters[key]
    t.mu.RUnlock()

    if !ok {
        // Slow path: write lock, double-check
        t.mu.Lock()
        l, ok = t.limiters[key]
        if !ok {
            l = rate.NewLimiter(rate.Limit(t.rps), t.burst)
            t.limiters[key] = l
        }
        t.mu.Unlock()
    }

    return l.Wait(ctx)  // blocks until allowed
}

Why per-tenant rate limiting? The Meta/WhatsApp Business API enforces rate limits per WABA (WhatsApp Business Account). Each tenant has their own WABA, so each tenant needs an independent rate limiter. Without this, one tenant's high-volume broadcast could exhaust the rate limit for another tenant's account.

The default configuration (from environment variables):

Parameter Env Variable Default Description
RPS META_RATE_LIMIT_RPS 70 Requests per second per tenant
Burst META_RATE_LIMIT_BURST 10 Burst allowance above steady-state rate
Pool Size WORKER_POOL_SIZE 20 Max concurrent broadcast message workers

Double-check locking pattern: Same as the Kafka producer's getWriter(). First check with read lock (fast path), then write lock with a second check to prevent race conditions when two goroutines try to create a limiter for the same key simultaneously.

7.12 Distributed Tracing via Events #

pkg/eventbus/tracing.go enables trace context propagation through Kafka events, ensuring that a single distributed trace spans from the HTTP request through the event bus to the consumer worker.

Tracing Functions

Function Side Description
InjectTracing(ctx, event) Producer Uses OTel propagator to inject trace context into event Headers. Also sets TraceID and SpanID fields directly for easy access.
ExtractTracing(ctx, event) Consumer Extracts trace context from event Headers and returns a new context with the parent span.
StartProducerSpan(ctx, topic, event) Producer Creates a producer span (SpanKindProducer) with event metadata attributes, then calls InjectTracing.
StartConsumerSpan(ctx, event, name) Consumer Calls ExtractTracing first, then creates a child consumer span (SpanKindConsumer) with event metadata.

Trace Propagation Flow

// Producer side (pkg/eventbus/kafka/producer.go)
ctx, span := eventbus.StartProducerSpan(ctx, topic, event)
defer span.End()
// StartProducerSpan internally calls InjectTracing(ctx, event)
// → trace context is now in event.Headers

// Consumer side (pkg/eventbus/kafka/consumer.go)
ctx, span := eventbus.StartConsumerSpan(ctx, event, "consume "+sub.topic)
defer span.End()
// StartConsumerSpan internally calls ExtractTracing(ctx, event)
// → consumer span is a child of the producer span
  HTTP Handler                  Kafka                    Consumer Worker
 ┌──────────────┐          ┌───────┐          ┌──────────────┐
 │ Trace: abc123  │          │       │          │ Trace: abc123  │
 │ Span:  span-1  │────────>│ Event │────────>│ Span:  span-3  │
 │                │ inject  │ Hdrs  │ extract  │ parent: span-2 │
 │  └─ span-2     │          │       │          │                │
 │   (producer)   │          └───────┘          │  (consumer)    │
 └──────────────┘                            └──────────────┘

Trace Header Keys

const (
    HeaderTraceID    = "trace-id"
    HeaderSpanID     = "span-id"
    HeaderTraceFlags = "trace-flags"
    HeaderTraceState = "trace-state"
)

A headerCarrier type (implementing propagation.TextMapCarrier) adapts the event's map[string]string Headers to work with the OTel propagator API.

7.13 Topic Naming & Event Types #

Topics and event types are defined as constants in pkg/eventbus/helpers.go. The naming follows consistent conventions.

Topic Naming Convention

Topics use lowercase with dots for versioning: {domain}.v{version} or {domain}.{subdomain}.{action}.v{version}

Topic Constant Description
notifications.v1 TopicNotifications Notification delivery requests
broadcasts.v1 TopicBroadcasts Broadcast campaign events (upload, send, status)
user-events.v1 TopicUserEvents User lifecycle events (created, updated, deleted)
auth-events.v1 TopicAuthEvents Authentication events
admin-events.v1 TopicAdminEvents Admin/tenant management events
whatsapp.template.status.v1 TopicWhatsAppTemplateStatus WhatsApp template approval/rejection status from Meta
whatsapp.template.quality.v1 TopicWhatsAppTemplateQuality WhatsApp template quality score changes from Meta
whatsapp.message.incoming.v1 TopicWhatsAppMessageIncoming Incoming WhatsApp messages from Meta webhook
whatsapp.message.status.v1 TopicWhatsAppMessageStatus Message delivery status updates (sent, delivered, read, failed)

Event Type Convention

Event types follow the format {domain}.{action}.v{version}, built using the BuildEventType() helper:

func BuildEventType(domain, action string, version int) string {
    return fmt.Sprintf("%s.%s.v%d", domain, action, version)
}
Event Type Constant Domain
notification.requested.v1 EventTypeNotificationRequested notification
notification.sent.v1 EventTypeNotificationSent notification
notification.failed.v1 EventTypeNotificationFailed notification
broadcast.requested.v1 EventTypeBroadcastRequested broadcast
broadcast.sent.v1 EventTypeBroadcastSent broadcast
broadcast.upload_requested.v1 EventTypeRecipientUploadRequested broadcast
broadcast.broadcast_message_requested.v1 EventTypeBroadcastMessageRequested broadcast
admin.businessnumber_requested.v1 EventTypeBusinessNumberRequested admin
user.created.v1 EventTypeUserCreated user
user.updated.v1 EventTypeUserUpdated user
user.deleted.v1 EventTypeUserDeleted user
whatsapp.template_status_update.v1 EventTypeWhatsAppTemplateStatus whatsapp
whatsapp.template_quality_update.v1 EventTypeWhatsAppTemplateQuality whatsapp
whatsapp.message_incoming.v1 EventTypeWhatsAppMessageIncoming whatsapp
whatsapp.message_status.v1 EventTypeWhatsAppMessageStatus whatsapp

DLQ Topic Mapping

Primary Topic DLQ Topic Constant
notifications.v1 notifications.v1.dlq DLQTopicNotifications
broadcasts.v1 broadcasts.v1.dlq DLQTopicBroadcasts
user-events.v1 user-events.v1.dlq DLQTopicUserEvents
auth-events.v1 auth-events.v1.dlq DLQTopicAuthEvents

Event Domains & Actions

// Domains
const (
    DomainNotification = "notification"
    DomainBroadcast    = "broadcast"
    DomainUser         = "user"
    DomainAuth         = "auth"
    DomainAdmin        = "admin"
)

// Standard actions
const (
    ActionRequested = "requested"
    ActionCreated   = "created"
    ActionUpdated   = "updated"
    ActionDeleted   = "deleted"
    ActionSent      = "sent"
    ActionFailed    = "failed"
)

1Engage Multitenant Backend — Codebase Guide — Chapter 07: Event-Driven Architecture