Event-Driven Architecture Chapter 07
The 1Engage backend uses Apache Kafka as the central event bus for asynchronous, loosely-coupled communication between microservices. This chapter covers the event envelope design, producer/consumer abstractions, idempotency guarantees, retry strategies with dead-letter queues, worker pool patterns, per-tenant rate limiting, and distributed tracing propagation through events.
HTTP Request Kafka Cluster Workers
┌──────────────┐ publish ┌─────────────────┐ consume ┌──────────────┐
│ API Service │─────────>│ Topic Partitions │────────>│ Consumer │
│ (Producer) │ │ [P0] [P1] [P2] │ │ Group │
└──────────────┘ └─────────────────┘ └──────────────┘
│ │
┌───────┴────────┐ │ retries
│ DLQ Topics │ │ exhausted
│ {topic}.dlq │<──────┘
└────────────────┘
7.1 Event Envelope #
Every event flowing through the system uses a standardized Event envelope defined in pkg/eventbus/event.go. The envelope carries metadata (routing, tracing, tenant context) alongside the business payload.
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Version int `json:"version"`
TenantID string `json:"tenant_id"`
OccurredAt time.Time `json:"occurred_at"`
Source string `json:"source"`
TraceID string `json:"trace_id,omitempty"`
SpanID string `json:"span_id,omitempty"`
Key string `json:"key,omitempty"`
Data json.RawMessage `json:"data"`
Headers map[string]string `json:"headers,omitempty"`
}
| Field | Type | Description |
|---|---|---|
ID |
string |
UUID v4. Unique event identifier, used as the idempotency key for deduplication. |
Type |
string |
Event category and action, format: {domain}.{action}.v{version} (e.g. notification.requested.v1). |
Version |
int |
Schema version of the Data payload. Enables evolving event schemas without breaking consumers. |
TenantID |
string |
Tenant this event belongs to. Default partition key for per-tenant ordering. |
OccurredAt |
time.Time |
UTC timestamp when the event was created. Automatically set by NewEvent(). |
Source |
string |
Service that produced the event (e.g. "broadcast-service", "auth-service"). |
TraceID |
string |
OpenTelemetry trace ID for distributed tracing. Injected by the producer. |
SpanID |
string |
OpenTelemetry span ID. Together with TraceID, enables linking producer and consumer spans. |
Key |
string |
Explicit partition key override. If empty, PartitionKey() falls back to TenantID. |
Data |
json.RawMessage |
Business payload as raw JSON. Decoded lazily by the consumer via UnmarshalData(). |
Headers |
map[string]string |
Optional metadata. Used for OTel trace propagation and DLQ retry metadata headers. |
NewEvent() Factory
The NewEvent() factory automatically generates an ID, sets the timestamp, and
marshals the data payload. Callers never need to construct the envelope manually.
func NewEvent(eventType, source, tenantID string, data any) (*Event, error) {
rawData, err := json.Marshal(data)
if err != nil {
return nil, err
}
return &Event{
ID: uuid.New().String(),
Type: eventType,
Version: 1,
TenantID: tenantID,
OccurredAt: time.Now().UTC(),
Source: source,
Data: rawData,
Headers: make(map[string]string),
}, nil
}
PartitionKey() — Per-Tenant Ordering
func (e *Event) PartitionKey() string {
if e.Key != "" {
return e.Key
}
return e.TenantID
}
By defaulting the partition key to TenantID, all events for the same tenant
land on the same Kafka partition. This guarantees ordering within a tenant
while allowing parallelism across tenants. Set Key explicitly only when you
need a different partitioning strategy (e.g. per-recipient).
7.2 Producer/Consumer Interfaces #
pkg/eventbus/interfaces.go defines broker-agnostic interfaces. All event publishing and consuming goes through these abstractions.
type Producer interface {
Publish(ctx context.Context, topic string, event *Event) error
PublishBatch(ctx context.Context, topic string, events []*Event) error
Close() error
}
type Handler func(ctx context.Context, event *Event) error
type Consumer interface {
Subscribe(topic, groupID string, handler Handler) error
Run(ctx context.Context) error
Close() error
}
type ProducerConsumer interface {
Producer
Consumer
}
Why interfaces? The codebase currently uses Kafka via segmentio/kafka-go,
but these interfaces mean the broker implementation can be swapped (e.g. to NATS, RabbitMQ,
or an in-memory bus for testing) without changing any business logic. The Handler
function type is the fundamental unit of event processing — every middleware (retry,
idempotency) is a Handler that wraps another Handler.
Additional supporting types in pkg/eventbus/interfaces.go:
PublishResult— metadata about a published message (topic, partition, offset)ConsumeMessage— message received from broker with Kafka metadataHealthChecker—Ping(ctx)for broker connectivity checks
7.3 Kafka Producer Implementation #
pkg/eventbus/kafka/producer.go implements eventbus.Producer
using segmentio/kafka-go. Key design decisions:
Lazy Writer Creation (Double-Check Locking)
Writers are created per-topic on first use, allowing different topics to have different
configurations. The getWriter() method uses RWMutex double-check
locking to avoid creating duplicate writers under concurrency:
type Producer struct {
writers map[string]*kafka.Writer
config *Config
mu sync.RWMutex
}
func (p *Producer) getWriter(topic string) *kafka.Writer {
// Fast path: read lock to check if writer exists
p.mu.RLock()
w, ok := p.writers[topic]
p.mu.RUnlock()
if ok {
return w
}
// Slow path: write lock to create writer
p.mu.Lock()
defer p.mu.Unlock()
// Double-check after acquiring write lock
if w, ok := p.writers[topic]; ok {
return w
}
w = &kafka.Writer{
Addr: kafka.TCP(p.config.Brokers...),
Topic: topic,
Balancer: &kafka.Hash{}, // Partition by key (tenant_id)
RequiredAcks: kafka.RequiredAcks(p.config.RequiredAcks),
BatchSize: p.config.BatchSize,
BatchTimeout: p.config.BatchTimeout,
WriteTimeout: p.config.WriteTimeout,
MaxAttempts: p.config.MaxRetries,
Async: false, // Synchronous for at-least-once semantics
}
p.writers[topic] = w
return w
}
Pattern: Read lock first (fast path for existing writers), then upgrade to write lock only when needed. The second check inside the write lock prevents two goroutines from both creating a writer for the same topic simultaneously.
Publish Flow
The Publish() method follows this sequence:
- Start producer span —
StartProducerSpan()creates an OTel span and injects trace context into event headers - Marshal event — serialize the entire event envelope to JSON
- Set partition key —
event.PartitionKey()used as the Kafka message key - Attach headers — event ID, type, tenant ID, source, plus trace context headers
- Write to Kafka — synchronous write via the per-topic writer
func (p *Producer) Publish(ctx context.Context, topic string, event *eventbus.Event) error {
ctx, span := eventbus.StartProducerSpan(ctx, topic, event)
defer span.End()
value, err := event.Marshal()
if err != nil { return err }
msg := kafka.Message{
Key: []byte(event.PartitionKey()),
Value: value,
Headers: []kafka.Header{
{Key: eventbus.HeaderEventID, Value: []byte(event.ID)},
{Key: eventbus.HeaderEventType, Value: []byte(event.Type)},
{Key: eventbus.HeaderTenantID, Value: []byte(event.TenantID)},
{Key: eventbus.HeaderSource, Value: []byte(event.Source)},
},
}
writer := p.getWriter(topic)
return writer.WriteMessages(ctx, msg)
}
7.4 Kafka Consumer Implementation #
pkg/eventbus/kafka/consumer.go implements eventbus.Consumer.
The consumer lifecycle has three phases:
Phase 1: Subscribe (before Run)
Subscribe() registers topic/groupID/handler tuples. It must be called
before Run() — subscribing while running returns an error.
func (c *Consumer) Subscribe(topic, groupID string, handler eventbus.Handler) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.running {
return errors.New("cannot subscribe while consumer is running")
}
// ... register subscription
c.subscriptions = append(c.subscriptions, sub)
return nil
}
Phase 2: Run (blocking)
Run() creates a kafka.Reader per subscription, launches a goroutine
for each, and blocks until the context is cancelled or a fatal error occurs.
func (c *Consumer) Run(ctx context.Context) error {
// Create readers for all subscriptions
for _, sub := range c.subscriptions {
sub.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: c.config.Brokers,
GroupID: sub.groupID,
Topic: sub.topic,
// ... additional config
})
}
// Start a goroutine for each subscription
for _, sub := range c.subscriptions {
c.wg.Add(1)
go func(s *subscription) {
defer c.wg.Done()
c.consumeLoop(ctx, s)
}(sub)
}
// Block until context cancelled or error
select {
case <-ctx.Done():
// graceful shutdown
case err := <-errCh:
return err
}
c.wg.Wait()
return ctx.Err()
}
Phase 3: consumeLoop (at-least-once delivery)
Each subscription runs its own consumeLoop. The loop implements
at-least-once delivery: offsets are committed only after successful processing.
func (c *Consumer) consumeLoop(ctx context.Context, sub *subscription) error {
for {
msg, err := sub.reader.FetchMessage(ctx) // blocks until message or cancel
if err != nil { /* backoff and retry */ }
if err := c.processMessage(ctx, sub, msg); err != nil {
if helpers.IsPermanent(err) {
// PERMANENT ERROR: commit offset and skip
sub.reader.CommitMessages(ctx, msg)
continue
}
// TRANSIENT ERROR: do NOT commit → Kafka will redeliver
time.Sleep(c.config.RetryBackoff)
continue
}
// SUCCESS: commit offset
sub.reader.CommitMessages(ctx, msg)
}
}
PermanentError handling: When a handler returns a PermanentError
(e.g. malformed data that can never succeed), the consumer commits the offset and moves on.
This prevents a poison pill message from blocking the entire partition forever.
FetchMessage
│
▼
processMessage
│
┌───┴───┐
│ │
success error?
│ │
│ permanent? ── yes ──> CommitMessages (skip)
│ │
│ transient
│ │
│ DON'T commit ──> backoff ──> retry
│
▼
CommitMessages
7.5 Kafka Client (Combined) #
pkg/eventbus/kafka/client.go combines Producer and Consumer via struct embedding:
type Client struct {
*Producer
*Consumer
config *Config
}
// Compile-time interface check
var _ eventbus.ProducerConsumer = (*Client)(nil)
Struct embedding gives Client all methods from both
Producer and Consumer without delegation boilerplate.
The Client satisfies the ProducerConsumer interface automatically.
Its Close() method is explicitly defined to shut down both embedded types.
The client also provides utility functions for topic management:
CreateTopics()— creates topics (local dev, default 3 partitions, replication factor 1)EnsureTopics()— idempotent topic creation (ignores "already exists" errors)
7.6 Idempotent Event Processing #
pkg/eventbus/idempotency.go provides a handler middleware that prevents duplicate event processing.
func IdempotentHandler(store IdempotencyStore, ttl time.Duration, handler Handler) Handler {
return func(ctx context.Context, event *Event) error {
key := IdempotencyKey(event) // = event.ID
processed, err := store.IsProcessed(ctx, key)
if err != nil {
return handler(ctx, event) // store error → process anyway
}
if processed {
return nil // skip duplicate
}
if err := handler(ctx, event); err != nil {
return err // don't mark as processed on failure
}
_ = store.MarkProcessed(ctx, key, ttl) // best-effort mark
return nil
}
}
Why idempotency? Kafka provides at-least-once delivery. If a consumer
crashes after processing a message but before committing the offset, Kafka will redeliver
the same message. Without idempotency checking, the business handler would execute twice.
The IdempotentHandler wrapper checks the event ID against a store before
processing, and marks it as processed afterward.
IdempotencyStore Interface
type IdempotencyStore interface {
IsProcessed(ctx context.Context, key string) (bool, error)
MarkProcessed(ctx context.Context, key string, ttl time.Duration) error
Delete(ctx context.Context, key string) error
}
The codebase includes InMemoryIdempotencyStore with sync.RWMutex
for development. It runs a background cleanup goroutine every minute to expire old entries.
Production recommendation: Use Redis with TTL for distributed, high-throughput deployments. The in-memory store does not persist across restarts and does not work in multi-instance deployments.
| TTL Constant | Duration | Use Case |
|---|---|---|
DefaultIdempotencyTTL |
24 hours | Most cases — events older than 24h are unlikely to be redelivered |
ShortIdempotencyTTL |
1 hour | High-throughput scenarios with quick retries |
LongIdempotencyTTL |
7 days | Critical events that must never be duplicated |
7.7 Retry with Exponential Backoff & DLQ #
pkg/eventbus/retry.go implements a retry middleware with exponential backoff and dead-letter queue (DLQ) fallback.
func RetryableHandler(cfg *RetryConfig, dlqProducer Producer, handler Handler) Handler {
return func(ctx context.Context, event *Event) error {
var lastErr error
backoff := cfg.InitialBackoff
for attempt := 0; attempt <= cfg.MaxRetries; attempt++ {
err := handler(ctx, event)
if err == nil {
return nil // success!
}
lastErr = err
if attempt < cfg.MaxRetries {
// Context-aware backoff
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
}
backoff = time.Duration(float64(backoff) * cfg.BackoffMultiplier)
if backoff > cfg.MaxBackoff {
backoff = cfg.MaxBackoff
}
}
}
// All retries exhausted → send to DLQ
if cfg.SendToDLQ && dlqProducer != nil {
sendToDLQ(ctx, dlqProducer, event, lastErr)
}
return lastErr
}
}
Default Retry Configuration
| Parameter | Default Value | Description |
|---|---|---|
MaxRetries |
3 |
Maximum retry attempts (4 total attempts including the original) |
InitialBackoff |
100ms |
Delay before the first retry |
BackoffMultiplier |
2.0 |
Backoff increases exponentially: 100ms → 200ms → 400ms → ... |
MaxBackoff |
30s |
Backoff is capped at this value |
SendToDLQ |
true |
Send to dead-letter queue after retries exhausted |
Dead-Letter Queue (DLQ) Design
When all retries are exhausted, the event is published to a DLQ topic. The DLQ topic
name follows the convention {original-topic}.dlq.
func DLQTopicName(topic string) string {
return topic + ".dlq"
}
Before publishing to the DLQ, metadata headers are attached to the event:
| Header Key | Value |
|---|---|
x-last-error |
The error message from the final failed attempt |
x-first-failed-at |
RFC3339 timestamp of when the DLQ send was triggered |
7.8 Handler Composition (Decorator Pattern) #
The Handler type (func(ctx, event) error) enables
decorator-style composition. Each middleware wraps the next handler,
forming a processing pipeline:
handler := eventbus.RetryableHandler(
retryCfg,
kafkaProducer,
eventbus.IdempotentHandler(
idempotencyStore,
eventbus.DefaultIdempotencyTTL,
myBusinessHandler, // the actual business logic
),
)
Incoming Event
│
▼
┌────────────────────┐
│ RetryableHandler │ Retries up to N times with exponential backoff
│ ┌────────────────┐ │
│ │IdempotentHandler│ │ Checks if event already processed
│ │ ┌────────────┐ │ │
│ │ │ Business │ │ │ Actual event processing logic
│ │ │ Handler │ │ │
│ │ └────────────┘ │ │
│ └────────────────┘ │
└────────────────────┘
│
success / error / DLQ
Execution flow:
- Retry layer receives the event, calls the inner handler
- Idempotency layer checks if event ID was already processed — if yes, returns
nilimmediately - Business handler executes the actual logic
- On success: idempotency layer marks as processed, retry layer returns
nil - On error: idempotency layer propagates the error, retry layer waits and retries
- After all retries exhausted: retry layer sends to DLQ
This is the same pattern used in HTTP middleware (e.g. Chi middleware chain). Because
Handler is just a function type, any function that takes a Handler
and returns a Handler can be composed in the chain.
7.9 Worker Pool Pattern #
The broadcast service uses a Dispatcher/Worker pattern for concurrent job processing. Defined in apps/broadcast-service/internal/worker/.
Dispatcher
type Dispatcher struct {
jobQueue chan job.Job
workers []*Worker
}
func NewDispatcher(workerCount int, queueSize int) *Dispatcher {
jobQueue := make(chan job.Job, queueSize)
d := &Dispatcher{jobQueue: jobQueue}
for i := 1; i <= workerCount; i++ {
d.workers = append(d.workers, NewWorker(i, jobQueue))
}
return d
}
// TryDispatch: non-blocking send with backpressure
func (d *Dispatcher) TryDispatch(job job.Job) bool {
select {
case d.jobQueue <- job:
return true // dispatched
default:
return false // queue full — backpressure
}
}
TryDispatch() uses a select with default to
attempt a non-blocking send. If the job queue is full, it returns false
immediately instead of blocking, giving the caller a chance to apply backpressure
(e.g. reject the request, delay, or log a warning).
Worker
type Worker struct {
id int
jobs <-chan job.Job // receive-only channel
}
func (w *Worker) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return // graceful shutdown
case j := <-w.jobs:
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("worker %d panic: %v", w.id, r)
}
}()
j.Run(ctx)
}()
}
}
}()
}
Key design points:
- Panic recovery: Each job runs in a closure with
defer recover()— a panic in one job doesn't crash the worker goroutine - Context cancellation: Workers shut down gracefully when the context is cancelled
- Receive-only channel: Workers only read from the job channel (
<-chan), enforcing a single producer (the dispatcher) - Job interface:
job.JobrequiresName() stringandRun(ctx) error
Dispatcher
┌──────────────────────────────────┐
│ │
│ TryDispatch(job) / Dispatch(job) │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ jobQueue │ buffered │
│ │ (channel) │ channel │
│ └───┬─────┬─────┘ │
│ │ │ │
└─────┴─────┴────────────────────┘
│ │
▼ ▼
Worker 1 Worker 2 ... Worker N
(goroutine per worker)
7.10 Bounded Worker Pool with Semaphore #
The broadcast worker (apps/broadcast-service/cmd/worker/main.go) uses a channel-based semaphore to bound the number of concurrent goroutines processing broadcast messages:
sem := make(chan struct{}, workerPoolSize) // e.g. 20
var wg sync.WaitGroup
// For each broadcast message event:
sem <- struct{}{} // acquire slot (blocks if pool full)
wg.Add(1)
go func() {
defer func() {
<-sem // release slot
wg.Done() // signal completion
}()
handler(ctx, event)
}()
How the semaphore works: A buffered channel of size workerPoolSize
acts as a counting semaphore. Sending to the channel “acquires a slot” — if
the buffer is full, the send blocks until another goroutine finishes and reads from the channel.
The WaitGroup is used for graceful shutdown to ensure all in-flight goroutines
complete before the process exits.
In the actual worker code, this pattern is applied specifically to
EventTypeBroadcastMessageRequested events, while other event types
(like upload requests) are processed synchronously in the consumer goroutine:
if event.Type == eventbus.EventTypeBroadcastMessageRequested {
sem <- struct{}{}
wg.Add(1)
go func() {
defer func() { <-sem; wg.Done() }()
if err := broadcastMessageHandler(ctx, event); err != nil {
slog.ErrorContext(ctx, "Broadcast message handler error",
"error", err, "event_id", event.ID)
}
}()
return nil // return immediately, don't block consumer
}
7.11 Per-Tenant Rate Limiting #
apps/broadcast-service/internal/ratelimit/limiter.go implements
per-key (per-tenant) rate limiting using Go's golang.org/x/time/rate package.
type PerKeyLimiter struct {
mu sync.RWMutex
limiters map[string]*rate.Limiter
rps float64
burst int
}
func (t *PerKeyLimiter) Wait(ctx context.Context, key string) error {
// Fast path: read lock
t.mu.RLock()
l, ok := t.limiters[key]
t.mu.RUnlock()
if !ok {
// Slow path: write lock, double-check
t.mu.Lock()
l, ok = t.limiters[key]
if !ok {
l = rate.NewLimiter(rate.Limit(t.rps), t.burst)
t.limiters[key] = l
}
t.mu.Unlock()
}
return l.Wait(ctx) // blocks until allowed
}
Why per-tenant rate limiting? The Meta/WhatsApp Business API enforces rate limits per WABA (WhatsApp Business Account). Each tenant has their own WABA, so each tenant needs an independent rate limiter. Without this, one tenant's high-volume broadcast could exhaust the rate limit for another tenant's account.
The default configuration (from environment variables):
| Parameter | Env Variable | Default | Description |
|---|---|---|---|
| RPS | META_RATE_LIMIT_RPS |
70 |
Requests per second per tenant |
| Burst | META_RATE_LIMIT_BURST |
10 |
Burst allowance above steady-state rate |
| Pool Size | WORKER_POOL_SIZE |
20 |
Max concurrent broadcast message workers |
Double-check locking pattern: Same as the Kafka producer's
getWriter(). First check with read lock (fast path), then write lock
with a second check to prevent race conditions when two goroutines try to create
a limiter for the same key simultaneously.
7.12 Distributed Tracing via Events #
pkg/eventbus/tracing.go enables trace context propagation through Kafka events, ensuring that a single distributed trace spans from the HTTP request through the event bus to the consumer worker.
Tracing Functions
| Function | Side | Description |
|---|---|---|
InjectTracing(ctx, event) |
Producer | Uses OTel propagator to inject trace context into event Headers. Also sets TraceID and SpanID fields directly for easy access. |
ExtractTracing(ctx, event) |
Consumer | Extracts trace context from event Headers and returns a new context with the parent span. |
StartProducerSpan(ctx, topic, event) |
Producer | Creates a producer span (SpanKindProducer) with event metadata attributes, then calls InjectTracing. |
StartConsumerSpan(ctx, event, name) |
Consumer | Calls ExtractTracing first, then creates a child consumer span (SpanKindConsumer) with event metadata. |
Trace Propagation Flow
// Producer side (pkg/eventbus/kafka/producer.go)
ctx, span := eventbus.StartProducerSpan(ctx, topic, event)
defer span.End()
// StartProducerSpan internally calls InjectTracing(ctx, event)
// → trace context is now in event.Headers
// Consumer side (pkg/eventbus/kafka/consumer.go)
ctx, span := eventbus.StartConsumerSpan(ctx, event, "consume "+sub.topic)
defer span.End()
// StartConsumerSpan internally calls ExtractTracing(ctx, event)
// → consumer span is a child of the producer span
HTTP Handler Kafka Consumer Worker ┌──────────────┐ ┌───────┐ ┌──────────────┐ │ Trace: abc123 │ │ │ │ Trace: abc123 │ │ Span: span-1 │────────>│ Event │────────>│ Span: span-3 │ │ │ inject │ Hdrs │ extract │ parent: span-2 │ │ └─ span-2 │ │ │ │ │ │ (producer) │ └───────┘ │ (consumer) │ └──────────────┘ └──────────────┘
Trace Header Keys
const (
HeaderTraceID = "trace-id"
HeaderSpanID = "span-id"
HeaderTraceFlags = "trace-flags"
HeaderTraceState = "trace-state"
)
A headerCarrier type (implementing propagation.TextMapCarrier)
adapts the event's map[string]string Headers to work with the OTel propagator API.
7.13 Topic Naming & Event Types #
Topics and event types are defined as constants in pkg/eventbus/helpers.go. The naming follows consistent conventions.
Topic Naming Convention
Topics use lowercase with dots for versioning: {domain}.v{version} or
{domain}.{subdomain}.{action}.v{version}
| Topic | Constant | Description |
|---|---|---|
notifications.v1 |
TopicNotifications |
Notification delivery requests |
broadcasts.v1 |
TopicBroadcasts |
Broadcast campaign events (upload, send, status) |
user-events.v1 |
TopicUserEvents |
User lifecycle events (created, updated, deleted) |
auth-events.v1 |
TopicAuthEvents |
Authentication events |
admin-events.v1 |
TopicAdminEvents |
Admin/tenant management events |
whatsapp.template.status.v1 |
TopicWhatsAppTemplateStatus |
WhatsApp template approval/rejection status from Meta |
whatsapp.template.quality.v1 |
TopicWhatsAppTemplateQuality |
WhatsApp template quality score changes from Meta |
whatsapp.message.incoming.v1 |
TopicWhatsAppMessageIncoming |
Incoming WhatsApp messages from Meta webhook |
whatsapp.message.status.v1 |
TopicWhatsAppMessageStatus |
Message delivery status updates (sent, delivered, read, failed) |
Event Type Convention
Event types follow the format {domain}.{action}.v{version}, built using
the BuildEventType() helper:
func BuildEventType(domain, action string, version int) string {
return fmt.Sprintf("%s.%s.v%d", domain, action, version)
}
| Event Type | Constant | Domain |
|---|---|---|
notification.requested.v1 |
EventTypeNotificationRequested |
notification |
notification.sent.v1 |
EventTypeNotificationSent |
notification |
notification.failed.v1 |
EventTypeNotificationFailed |
notification |
broadcast.requested.v1 |
EventTypeBroadcastRequested |
broadcast |
broadcast.sent.v1 |
EventTypeBroadcastSent |
broadcast |
broadcast.upload_requested.v1 |
EventTypeRecipientUploadRequested |
broadcast |
broadcast.broadcast_message_requested.v1 |
EventTypeBroadcastMessageRequested |
broadcast |
admin.businessnumber_requested.v1 |
EventTypeBusinessNumberRequested |
admin |
user.created.v1 |
EventTypeUserCreated |
user |
user.updated.v1 |
EventTypeUserUpdated |
user |
user.deleted.v1 |
EventTypeUserDeleted |
user |
whatsapp.template_status_update.v1 |
EventTypeWhatsAppTemplateStatus |
|
whatsapp.template_quality_update.v1 |
EventTypeWhatsAppTemplateQuality |
|
whatsapp.message_incoming.v1 |
EventTypeWhatsAppMessageIncoming |
|
whatsapp.message_status.v1 |
EventTypeWhatsAppMessageStatus |
DLQ Topic Mapping
| Primary Topic | DLQ Topic | Constant |
|---|---|---|
notifications.v1 |
notifications.v1.dlq |
DLQTopicNotifications |
broadcasts.v1 |
broadcasts.v1.dlq |
DLQTopicBroadcasts |
user-events.v1 |
user-events.v1.dlq |
DLQTopicUserEvents |
auth-events.v1 |
auth-events.v1.dlq |
DLQTopicAuthEvents |
Event Domains & Actions
// Domains
const (
DomainNotification = "notification"
DomainBroadcast = "broadcast"
DomainUser = "user"
DomainAuth = "auth"
DomainAdmin = "admin"
)
// Standard actions
const (
ActionRequested = "requested"
ActionCreated = "created"
ActionUpdated = "updated"
ActionDeleted = "deleted"
ActionSent = "sent"
ActionFailed = "failed"
)
1Engage Multitenant Backend — Codebase Guide — Chapter 07: Event-Driven Architecture