Template & Broadcasting Chapter 09

The template and broadcasting system powers 1Engage's high-volume messaging capabilities. This guide covers WhatsApp and email templates, the broadcast campaign flow, and best practices for handling millions of messages with proper chunking, rate limiting, and fault tolerance.

On This Page

  1. System Overview
  2. Template Systems
  3. Broadcast Flow
  4. Large-Scale Best Practices
  5. Key Files

1 System Overview

The broadcasting system handles high-volume message delivery through multiple channels (WhatsApp, Email) using a template-based approach. Templates are pre-approved message formats with variable placeholders. Broadcasts distribute templated messages to thousands or millions of recipients through an event-driven, horizontally scalable architecture.

┌─────────────────────────────────────────────────────────────────────────────┐
│                           BROADCAST ARCHITECTURE                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────┐    ┌──────────────┐    ┌──────────────┐                 │
│   │   API/CLI    │───▶│   Campaign   │───▶│   Message    │                 │
│   │   Request    │    │   Service    │    │   Service    │                 │
│   └──────────────┘    └──────────────┘    └──────┬───────┘                 │
│                                                  │                          │
│                          ┌───────────────────────┼──────────────────┐      │
│                          │                       │                  │      │
│                          ▼                       ▼                  ▼      │
│                    ┌──────────┐          ┌──────────┐        ┌──────────┐ │
│                    │  Kafka   │          │   DB     │        │   S3     │ │
│                    │  Queue   │          │ (GORM)   │        │ (Media)  │ │
│                    └────┬─────┘          └──────────┘        └──────────┘ │
│                         │                                                   │
│                         ▼                                                   │
│               ┌─────────────────┐                                          │
│               │ Consumer Worker │  ◄── Worker Pool (N goroutines)         │
│               │     Service     │                                          │
│               └────────┬────────┘                                          │
│                        │                                                    │
│           ┌────────────┼────────────┐                                      │
│           ▼            ▼            ▼                                      │
│      ┌────────┐  ┌────────┐  ┌────────┐                                   │
│      │  Meta  │  │ Email  │  │ Rate   │                                   │
│      │   API  │  │ SMTP   │  │Limiter │                                   │
│      └────────┘  └────────┘  └────────┘                                   │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
        

This architecture separates concerns for scalability and reliability: the campaign service handles business logic, message service manages data persistence and event publishing, and consumer workers handle actual delivery with rate limiting and retry logic. Kafka provides buffering during traffic spikes.

2 Template Systems

2.1 WhatsApp Templates

WhatsApp Business API Meta

WhatsApp templates require Meta approval before use. They support multiple formats including text, images, videos, documents, and carousel layouts.

Template Types

Type Use Case Category
customize Marketing campaigns, promotions marketing
retention Utility messages, notifications utility
carousel Multi-card product showcases marketing

Template Structure

apps/broadcast-service/internal/domain/whatsapp_template.go
type TemplateComponent struct {
    Type    string      // "header", "body", "footer", "button"
    Format  string      // "text", "image", "video", "document"
    Text    string      // Content with {{1}} variables
    Example string      // Example values for approval
}

type WhatsAppTemplate struct {
    Name       string              // Unique template name
    Language   string              // e.g., "en", "id"
    Category   string              // "marketing", "utility"
    Status     string              // "draft", "pending", "approved", "rejected"
    Components []TemplateComponent
}

Variable Syntax

Templates use {{1}}, {{2}}, etc. as placeholders for dynamic content. These are replaced at send-time with actual values from the recipient data.

// Template with variables
"Hello {{1}}, your appointment is scheduled for {{2}} at {{3}}."

// Becomes at send-time
"Hello John, your appointment is scheduled for Jan 15 at 2:30 PM."

Template Lifecycle

draft ──▶ submit ──▶ in_review ──▶ approved
                       │
                       └──────────▶ rejected
        
Important: Templates must be approved by Meta before they can be used in broadcasts. The approval process typically takes 24-48 hours. apps/broadcast-service/internal/service/whatsapp_template_service.go

2.2 Email Templates

pkg/shared/mail/mailtemplate/

Email templates are simpler HTML templates using Go's standard html/template package. They don't require external approval and are embedded in the binary.

//go:embed *.html
var templateFS embed.FS

func Render(templateName string, data interface{}) (string, error) {
    tmpl, err := template.ParseFS(templateFS, templateName)
    if err != nil {
        return "", err
    }
    
    var buf bytes.Buffer
    if err := tmpl.Execute(&buf, data); err != nil {
        return "", err
    }
    return buf.String(), nil
}

3 Broadcast Flow

3.1 Campaign Creation

┌─────────────────────────────────────────────────────────────────────────────┐
│                        BROADCAST CAMPAIGN LIFECYCLE                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  CREATE                    APPROVE                  EXECUTE                │
│                                                                             │
│  ┌────────┐              ┌────────┐               ┌────────┐              │
│  │ Draft  │─────────────▶│Pending │──────────────▶│Sending │              │
│  │        │   (if req)   │        │   (scheduled)  │        │              │
│  └────────┘              └────────┘               └───┬────┘              │
│                                                       │                     │
│                                                       ▼                     │
│                                               ┌──────────────┐            │
│                                               │   Sent/      │            │
│                                               │   Completed  │            │
│                                               └──────────────┘            │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
        

3.2 Message Processing Pipeline

The message service transforms a broadcast campaign into individual messages through a multi-stage pipeline with memory-efficient cursor-based pagination and chunked batch processing.

apps/broadcast-service/internal/service/broadcast_message_service.go
const (
    recipientChunkSize = 1000  // Recipients per DB query
    dbInsertChunkSize  = 500   // Messages per DB batch insert
    kafkaBatchSize     = 500   // Events per Kafka batch publish
)

func (s *BroadcastMessageService) ProcessBroadcast(ctx context.Context, campaignID string) error {
    campaign, err := s.campaignRepo.GetByID(ctx, campaignID)
    if err != nil {
        return fmt.Errorf("get campaign: %w", err)
    }

    // Cursor-based pagination for memory efficiency
    var afterID *string
    
    for {
        // Fetch recipients in chunks
        recipients, err := s.rcRepo.ListByCursor(
            campaign.TenantID,
            campaign.GroupID,
            &isValid,
            afterID,
            recipientChunkSize,
        )
        if err != nil {
            return fmt.Errorf("list recipients: %w", err)
        }

        if len(recipients) == 0 {
            break
        }

        // Create messages in batches
        messages := make([]*Message, 0, len(recipients))
        for _, r := range recipients {
            msg := &Message{
                CampaignID: campaign.ID,
                RecipientID: r.ID,
                Status:     StatusPending,
            }
            messages = append(messages, msg)
        }

        // Bulk insert with chunking
        inserted, err := s.bmRepo.BulkCreateChunked(messages, dbInsertChunkSize)
        if err != nil {
            return fmt.Errorf("bulk insert: %w", err)
        }

        // Publish events in batches
        if err := s.publishBatch(ctx, inserted); err != nil {
            return fmt.Errorf("publish events: %w", err)
        }

        // Update cursor for next iteration
        lastID := recipients[len(recipients)-1].ID
        afterID = &lastID
    }

    return nil
}

3.3 Consumer Worker Processing

┌─────────────────────────────────────────────────────────────────────────────┐
│                     CONSUMER WORKER PROCESSING                              │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Kafka Consumer                                                            │
│        │                                                                    │
│        ▼                                                                    │
│   ┌─────────────┐                                                           │
│   │ Job Queue   │ ◄── Buffered channel (size: 100)                         │
│   │  (chan)     │                                                           │
│   └──────┬──────┘                                                           │
│          │                                                                  │
│          ▼                                                                  │
│   ┌──────────────────────────────────────────────────────────┐             │
│   │                    Worker Pool (N workers)               │             │
│   │  ┌────────┐  ┌────────┐  ┌────────┐  ┌────────┐         │             │
│   │  │Worker 1│  │Worker 2│  │Worker 3│  │Worker N│  ...     │             │
│   │  └───┬────┘  └───┬────┘  └───┬────┘  └───┬────┘         │             │
│   └──────┼───────────┼───────────┼───────────┼──────────────┘             │
│          │           │           │           │                              │
│          ▼           ▼           ▼           ▼                              │
│   ┌──────────────────────────────────────────────────────────┐             │
│   │  Process: 1. Rate Limit Check                            │             │
│   │           2. Template Variable Replacement                 │             │
│   │           3. Media URL Signing                           │             │
│   │           4. Send to Meta API                            │             │
│   │           5. Update Status                               │             │
│   └──────────────────────────────────────────────────────────┘             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
        
apps/broadcast-service/internal/worker/dispatcher.go
type Dispatcher struct {
    workerPool  int
    jobQueue    chan Job
    workers     []*Worker
    wg          sync.WaitGroup
}

func NewDispatcher(workerPool int, queueSize int) *Dispatcher {
    return &Dispatcher{
        workerPool: workerPool,
        jobQueue:   make(chan Job, queueSize),
    }
}

func (d *Dispatcher) Start(ctx context.Context) {
    d.workers = make([]*Worker, d.workerPool)
    for i := 0; i < d.workerPool; i++ {
        d.workers[i] = NewWorker(d.jobQueue, &d.wg)
        d.workers[i].Start(ctx)
    }
}

func (d *Dispatcher) Dispatch(job Job) {
    d.wg.Add(1)
    d.jobQueue <- job
}

3.4 Rate Limiting

Meta's WhatsApp API has strict rate limits per phone number ID. We implement per-key rate limiting to avoid 429 errors and ensure reliable delivery.

apps/broadcast-service/internal/ratelimit/limiter.go
type PerKeyLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.RWMutex
    rps      float64
    burst    int
}

func (p *PerKeyLimiter) Wait(ctx context.Context, key string) error {
    p.mu.RLock()
    limiter, exists := p.limiters[key]
    p.mu.RUnlock()

    if !exists {
        p.mu.Lock()
        // Double-check after acquiring write lock
        if limiter, exists = p.limiters[key]; !exists {
            limiter = rate.NewLimiter(rate.Limit(p.rps), p.burst)
            p.limiters[key] = limiter
        }
        p.mu.Unlock()
    }

    return limiter.Wait(ctx)
}

4 Large-Scale Best Practices

4.1 Memory Management with Chunking

Processing millions of recipients requires streaming processing rather than loading everything into memory. We use cursor-based pagination and process in fixed-size chunks.

Chunk Sizes

Constant Value Purpose
recipientChunkSize 1000 Recipients fetched per DB query
dbInsertChunkSize 500 Messages inserted per DB transaction
kafkaBatchSize 500 Events published per Kafka batch
Why these sizes? Larger chunks improve throughput but increase memory usage and transaction duration. These values balance performance with resource constraints. Monitor and tune based on your infrastructure.

4.2 Idempotency

Kafka provides at-least-once delivery semantics. Without idempotency, duplicate messages could be sent to the same recipient. We use an in-memory idempotency store to prevent this.

pkg/eventbus/idempotency.go
type IdempotencyStore interface {
    IsProcessed(ctx context.Context, key string) (bool, error)
    MarkProcessed(ctx context.Context, key string, ttl time.Duration) error
}

// Handler wrapper for automatic idempotency
func IdempotentHandler(
    store IdempotencyStore,
    ttl time.Duration,
    handler HandlerFunc,
) HandlerFunc {
    return func(ctx context.Context, event *Event) error {
        processed, err := store.IsProcessed(ctx, event.ID)
        if err != nil {
            return fmt.Errorf("check idempotency: %w", err)
        }
        if processed {
            return nil // Already processed, skip
        }

        if err := handler(ctx, event); err != nil {
            return err
        }

        return store.MarkProcessed(ctx, event.ID, ttl)
    }
}

4.3 Retry Strategy

Transient failures (network timeouts, rate limits) should be retried with exponential backoff. Permanent failures (invalid phone number, template rejected) should be logged and skipped.

pkg/eventbus/retry.go
type RetryConfig struct {
    MaxRetries        int
    InitialBackoff    time.Duration
    MaxBackoff        time.Duration
    BackoffMultiplier float64
    SendToDLQ         bool
}

var DefaultRetryConfig = RetryConfig{
    MaxRetries:        3,
    InitialBackoff:    100 * time.Millisecond,
    MaxBackoff:        30 * time.Second,
    BackoffMultiplier: 2.0,
    SendToDLQ:         true,
}

func IsPermanent(err error) bool {
    // Check for permanent errors that shouldn't be retried
    var apiErr *APIError
    if errors.As(err, &apiErr) {
        return apiErr.Code == 400 || apiErr.Code == 404
    }
    return false
}

4.4 Event Publishing Best Practices

pkg/eventbus/kafka/producer.go
func (p *Producer) PublishBatch(ctx context.Context, topic string, events []*Event) error {
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        data, err := json.Marshal(event)
        if err != nil {
            return fmt.Errorf("marshal event: %w", err)
        }

        messages[i] = kafka.Message{
            Key:   []byte(event.TenantID), // Partition by tenant for ordering
            Value: data,
            Headers: []kafka.Header{
                {Key: "event-type", Value: []byte(event.Type)},
                {Key: "trace-id", Value: []byte(event.TraceID)},
            },
        }
    }

    // Synchronous write for at-least-once semantics
    return p.writer.WriteMessages(ctx, messages...)
}

4.5 Monitoring & Observability

Key Metrics to Track:
  • Messages sent per second (throughput)
  • Delivery success/failure rates
  • Queue depth (Lag)
  • Rate limit hits
  • Retry counts
  • DLQ message counts

4.6 Presigned URL Caching

Media files (images, videos) in broadcasts are served via S3 presigned URLs. Generating these URLs for every message is expensive. Use a cache to reduce S3 API calls.

type presignedURLCache struct {
    mu    sync.RWMutex
    cache map[string]cacheEntry
}

type cacheEntry struct {
    url       string
    expiresAt time.Time
}

func (c *presignedURLCache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    
    entry, exists := c.cache[key]
    if !exists || time.Now().After(entry.expiresAt) {
        return "", false
    }
    return entry.url, true
}

5 Key Files Reference

Template System

File Purpose
apps/broadcast-service/internal/service/whatsapp_template_service.go WhatsApp template CRUD and Meta API integration
apps/broadcast-service/internal/domain/whatsapp_template.go Template domain models and interfaces
pkg/shared/mail/mailtemplate/ Email template rendering

Broadcast Service

File Purpose
apps/broadcast-service/internal/service/broadcast_service.go Campaign management (create, list, update, delete)
apps/broadcast-service/internal/service/broadcast_message_service.go Message processing with chunking and batching
apps/broadcast-service/internal/service/consumer_message_service.go Kafka consumer for message delivery

Worker Pool & Rate Limiting

File Purpose
apps/broadcast-service/internal/worker/dispatcher.go Worker pool dispatcher
apps/broadcast-service/internal/worker/worker.go Individual worker implementation
apps/broadcast-service/internal/ratelimit/limiter.go Per-key rate limiting

Event Bus

File Purpose
pkg/eventbus/event.go Standardized event envelope structure
pkg/eventbus/kafka/producer.go Kafka producer with batch publishing
pkg/eventbus/kafka/consumer.go Kafka consumer with retry logic
pkg/eventbus/idempotency.go In-memory idempotency store
pkg/eventbus/retry.go Retry logic with exponential backoff
pkg/eventbus/helpers.go Event type constants and builders

Last updated: 2025. Code references are from the 1Engage multitenant backend codebase.