Template & Broadcasting Chapter 09
The template and broadcasting system powers 1Engage's high-volume messaging capabilities. This guide covers WhatsApp and email templates, the broadcast campaign flow, and best practices for handling millions of messages with proper chunking, rate limiting, and fault tolerance.
On This Page
1 System Overview
The broadcasting system handles high-volume message delivery through multiple channels (WhatsApp, Email) using a template-based approach. Templates are pre-approved message formats with variable placeholders. Broadcasts distribute templated messages to thousands or millions of recipients through an event-driven, horizontally scalable architecture.
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROADCAST ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ API/CLI │───▶│ Campaign │───▶│ Message │ │
│ │ Request │ │ Service │ │ Service │ │
│ └──────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ┌───────────────────────┼──────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Kafka │ │ DB │ │ S3 │ │
│ │ Queue │ │ (GORM) │ │ (Media) │ │
│ └────┬─────┘ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Consumer Worker │ ◄── Worker Pool (N goroutines) │
│ │ Service │ │
│ └────────┬────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Meta │ │ Email │ │ Rate │ │
│ │ API │ │ SMTP │ │Limiter │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
This architecture separates concerns for scalability and reliability: the campaign service handles business logic, message service manages data persistence and event publishing, and consumer workers handle actual delivery with rate limiting and retry logic. Kafka provides buffering during traffic spikes.
2 Template Systems
2.1 WhatsApp Templates
WhatsApp Business API MetaWhatsApp templates require Meta approval before use. They support multiple formats including text, images, videos, documents, and carousel layouts.
Template Types
| Type | Use Case | Category |
|---|---|---|
customize |
Marketing campaigns, promotions | marketing |
retention |
Utility messages, notifications | utility |
carousel |
Multi-card product showcases | marketing |
Template Structure
apps/broadcast-service/internal/domain/whatsapp_template.gotype TemplateComponent struct {
Type string // "header", "body", "footer", "button"
Format string // "text", "image", "video", "document"
Text string // Content with {{1}} variables
Example string // Example values for approval
}
type WhatsAppTemplate struct {
Name string // Unique template name
Language string // e.g., "en", "id"
Category string // "marketing", "utility"
Status string // "draft", "pending", "approved", "rejected"
Components []TemplateComponent
}
Variable Syntax
Templates use {{1}}, {{2}}, etc. as placeholders for dynamic content.
These are replaced at send-time with actual values from the recipient data.
// Template with variables
"Hello {{1}}, your appointment is scheduled for {{2}} at {{3}}."
// Becomes at send-time
"Hello John, your appointment is scheduled for Jan 15 at 2:30 PM."
Template Lifecycle
draft ──▶ submit ──▶ in_review ──▶ approved
│
└──────────▶ rejected
2.2 Email Templates
pkg/shared/mail/mailtemplate/
Email templates are simpler HTML templates using Go's standard html/template package.
They don't require external approval and are embedded in the binary.
//go:embed *.html
var templateFS embed.FS
func Render(templateName string, data interface{}) (string, error) {
tmpl, err := template.ParseFS(templateFS, templateName)
if err != nil {
return "", err
}
var buf bytes.Buffer
if err := tmpl.Execute(&buf, data); err != nil {
return "", err
}
return buf.String(), nil
}
3 Broadcast Flow
3.1 Campaign Creation
┌─────────────────────────────────────────────────────────────────────────────┐
│ BROADCAST CAMPAIGN LIFECYCLE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ CREATE APPROVE EXECUTE │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Draft │─────────────▶│Pending │──────────────▶│Sending │ │
│ │ │ (if req) │ │ (scheduled) │ │ │
│ └────────┘ └────────┘ └───┬────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Sent/ │ │
│ │ Completed │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
3.2 Message Processing Pipeline
The message service transforms a broadcast campaign into individual messages through a multi-stage pipeline with memory-efficient cursor-based pagination and chunked batch processing.
const (
recipientChunkSize = 1000 // Recipients per DB query
dbInsertChunkSize = 500 // Messages per DB batch insert
kafkaBatchSize = 500 // Events per Kafka batch publish
)
func (s *BroadcastMessageService) ProcessBroadcast(ctx context.Context, campaignID string) error {
campaign, err := s.campaignRepo.GetByID(ctx, campaignID)
if err != nil {
return fmt.Errorf("get campaign: %w", err)
}
// Cursor-based pagination for memory efficiency
var afterID *string
for {
// Fetch recipients in chunks
recipients, err := s.rcRepo.ListByCursor(
campaign.TenantID,
campaign.GroupID,
&isValid,
afterID,
recipientChunkSize,
)
if err != nil {
return fmt.Errorf("list recipients: %w", err)
}
if len(recipients) == 0 {
break
}
// Create messages in batches
messages := make([]*Message, 0, len(recipients))
for _, r := range recipients {
msg := &Message{
CampaignID: campaign.ID,
RecipientID: r.ID,
Status: StatusPending,
}
messages = append(messages, msg)
}
// Bulk insert with chunking
inserted, err := s.bmRepo.BulkCreateChunked(messages, dbInsertChunkSize)
if err != nil {
return fmt.Errorf("bulk insert: %w", err)
}
// Publish events in batches
if err := s.publishBatch(ctx, inserted); err != nil {
return fmt.Errorf("publish events: %w", err)
}
// Update cursor for next iteration
lastID := recipients[len(recipients)-1].ID
afterID = &lastID
}
return nil
}
3.3 Consumer Worker Processing
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONSUMER WORKER PROCESSING │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Kafka Consumer │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Job Queue │ ◄── Buffered channel (size: 100) │
│ │ (chan) │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Worker Pool (N workers) │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Worker 1│ │Worker 2│ │Worker 3│ │Worker N│ ... │ │
│ │ └───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘ │ │
│ └──────┼───────────┼───────────┼───────────┼──────────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Process: 1. Rate Limit Check │ │
│ │ 2. Template Variable Replacement │ │
│ │ 3. Media URL Signing │ │
│ │ 4. Send to Meta API │ │
│ │ 5. Update Status │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
type Dispatcher struct {
workerPool int
jobQueue chan Job
workers []*Worker
wg sync.WaitGroup
}
func NewDispatcher(workerPool int, queueSize int) *Dispatcher {
return &Dispatcher{
workerPool: workerPool,
jobQueue: make(chan Job, queueSize),
}
}
func (d *Dispatcher) Start(ctx context.Context) {
d.workers = make([]*Worker, d.workerPool)
for i := 0; i < d.workerPool; i++ {
d.workers[i] = NewWorker(d.jobQueue, &d.wg)
d.workers[i].Start(ctx)
}
}
func (d *Dispatcher) Dispatch(job Job) {
d.wg.Add(1)
d.jobQueue <- job
}
3.4 Rate Limiting
Meta's WhatsApp API has strict rate limits per phone number ID. We implement per-key rate limiting to avoid 429 errors and ensure reliable delivery.
type PerKeyLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
rps float64
burst int
}
func (p *PerKeyLimiter) Wait(ctx context.Context, key string) error {
p.mu.RLock()
limiter, exists := p.limiters[key]
p.mu.RUnlock()
if !exists {
p.mu.Lock()
// Double-check after acquiring write lock
if limiter, exists = p.limiters[key]; !exists {
limiter = rate.NewLimiter(rate.Limit(p.rps), p.burst)
p.limiters[key] = limiter
}
p.mu.Unlock()
}
return limiter.Wait(ctx)
}
4 Large-Scale Best Practices
4.1 Memory Management with Chunking
Processing millions of recipients requires streaming processing rather than loading everything into memory. We use cursor-based pagination and process in fixed-size chunks.
Chunk Sizes
| Constant | Value | Purpose |
|---|---|---|
recipientChunkSize |
1000 | Recipients fetched per DB query |
dbInsertChunkSize |
500 | Messages inserted per DB transaction |
kafkaBatchSize |
500 | Events published per Kafka batch |
4.2 Idempotency
Kafka provides at-least-once delivery semantics. Without idempotency, duplicate messages could be sent to the same recipient. We use an in-memory idempotency store to prevent this.
type IdempotencyStore interface {
IsProcessed(ctx context.Context, key string) (bool, error)
MarkProcessed(ctx context.Context, key string, ttl time.Duration) error
}
// Handler wrapper for automatic idempotency
func IdempotentHandler(
store IdempotencyStore,
ttl time.Duration,
handler HandlerFunc,
) HandlerFunc {
return func(ctx context.Context, event *Event) error {
processed, err := store.IsProcessed(ctx, event.ID)
if err != nil {
return fmt.Errorf("check idempotency: %w", err)
}
if processed {
return nil // Already processed, skip
}
if err := handler(ctx, event); err != nil {
return err
}
return store.MarkProcessed(ctx, event.ID, ttl)
}
}
4.3 Retry Strategy
Transient failures (network timeouts, rate limits) should be retried with exponential backoff. Permanent failures (invalid phone number, template rejected) should be logged and skipped.
type RetryConfig struct {
MaxRetries int
InitialBackoff time.Duration
MaxBackoff time.Duration
BackoffMultiplier float64
SendToDLQ bool
}
var DefaultRetryConfig = RetryConfig{
MaxRetries: 3,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
SendToDLQ: true,
}
func IsPermanent(err error) bool {
// Check for permanent errors that shouldn't be retried
var apiErr *APIError
if errors.As(err, &apiErr) {
return apiErr.Code == 400 || apiErr.Code == 404
}
return false
}
4.4 Event Publishing Best Practices
pkg/eventbus/kafka/producer.gofunc (p *Producer) PublishBatch(ctx context.Context, topic string, events []*Event) error {
messages := make([]kafka.Message, len(events))
for i, event := range events {
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
messages[i] = kafka.Message{
Key: []byte(event.TenantID), // Partition by tenant for ordering
Value: data,
Headers: []kafka.Header{
{Key: "event-type", Value: []byte(event.Type)},
{Key: "trace-id", Value: []byte(event.TraceID)},
},
}
}
// Synchronous write for at-least-once semantics
return p.writer.WriteMessages(ctx, messages...)
}
4.5 Monitoring & Observability
- Messages sent per second (throughput)
- Delivery success/failure rates
- Queue depth (Lag)
- Rate limit hits
- Retry counts
- DLQ message counts
4.6 Presigned URL Caching
Media files (images, videos) in broadcasts are served via S3 presigned URLs. Generating these URLs for every message is expensive. Use a cache to reduce S3 API calls.
type presignedURLCache struct {
mu sync.RWMutex
cache map[string]cacheEntry
}
type cacheEntry struct {
url string
expiresAt time.Time
}
func (c *presignedURLCache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
entry, exists := c.cache[key]
if !exists || time.Now().After(entry.expiresAt) {
return "", false
}
return entry.url, true
}
5 Key Files Reference
Template System
| File | Purpose |
|---|---|
| apps/broadcast-service/internal/service/whatsapp_template_service.go | WhatsApp template CRUD and Meta API integration |
| apps/broadcast-service/internal/domain/whatsapp_template.go | Template domain models and interfaces |
| pkg/shared/mail/mailtemplate/ | Email template rendering |
Broadcast Service
| File | Purpose |
|---|---|
| apps/broadcast-service/internal/service/broadcast_service.go | Campaign management (create, list, update, delete) |
| apps/broadcast-service/internal/service/broadcast_message_service.go | Message processing with chunking and batching |
| apps/broadcast-service/internal/service/consumer_message_service.go | Kafka consumer for message delivery |
Worker Pool & Rate Limiting
| File | Purpose |
|---|---|
| apps/broadcast-service/internal/worker/dispatcher.go | Worker pool dispatcher |
| apps/broadcast-service/internal/worker/worker.go | Individual worker implementation |
| apps/broadcast-service/internal/ratelimit/limiter.go | Per-key rate limiting |
Event Bus
| File | Purpose |
|---|---|
| pkg/eventbus/event.go | Standardized event envelope structure |
| pkg/eventbus/kafka/producer.go | Kafka producer with batch publishing |
| pkg/eventbus/kafka/consumer.go | Kafka consumer with retry logic |
| pkg/eventbus/idempotency.go | In-memory idempotency store |
| pkg/eventbus/retry.go | Retry logic with exponential backoff |
| pkg/eventbus/helpers.go | Event type constants and builders |
Last updated: 2025. Code references are from the 1Engage multitenant backend codebase.