Concurrency Patterns Chapter 01

Every Go concurrency primitive used across the 1Engage backend — from sync.RWMutex double-check locking to select-based shutdown orchestration — explained with real code, file paths, and rationale.

On This Page

  1. sync.RWMutex & Double-Check Locking
  2. sync.WaitGroup
  3. Channels
  4. Goroutines
  5. Select Statements

1.1 sync.RWMutex & Double-Check Locking

RWMutex Double-Check

WHAT

sync.RWMutex provides shared-exclusive locking: any number of goroutines can hold a read lock simultaneously, but a write lock is exclusive. The double-check locking pattern takes a read lock first for the fast path, then escalates to a write lock only when needed — and re-checks after acquiring the write lock to avoid races.

WHY

Multiple goroutines (Kafka consumer loops, HTTP handlers, worker pool goroutines) access shared state concurrently. A regular sync.Mutex would serialize all access. RWMutex maximises read throughput by allowing concurrent reads while still protecting writes. Double-check locking prevents duplicate creation of expensive resources (Kafka writers, rate limiters) when multiple goroutines race through the fast-path miss simultaneously.

HOW

InMemoryIdempotencyStore — Read/Write Separation

pkg/eventbus/idempotency.go
type InMemoryIdempotencyStore struct {
    mu      sync.RWMutex
    entries map[string]time.Time
}

// IsProcessed — read path (many goroutines can check concurrently)
func (s *InMemoryIdempotencyStore) IsProcessed(ctx context.Context, key string) (bool, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    expiry, exists := s.entries[key]
    if !exists {
        return false, nil
    }
    if time.Now().After(expiry) {
        return false, nil
    }
    return true, nil
}

// MarkProcessed — write path (exclusive access)
func (s *InMemoryIdempotencyStore) MarkProcessed(ctx context.Context, key string, ttl time.Duration) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    s.entries[key] = time.Now().Add(ttl)
    return nil
}
Pattern: Classic RWMutex read/write split. IsProcessed is called for every incoming Kafka message (hot path), while MarkProcessed is called only on successful processing. RLock keeps the hot path non-blocking under concurrent consumer load.

Kafka Producer — Double-Check Locking for Lazy Writer Creation

pkg/eventbus/kafka/producer.go
type Producer struct {
    writers map[string]*kafka.Writer
    config  *Config
    mu      sync.RWMutex
}

func (p *Producer) getWriter(topic string) *kafka.Writer {
    // Fast path: read lock only
    p.mu.RLock()
    w, ok := p.writers[topic]
    p.mu.RUnlock()
    if ok {
        return w
    }

    // Slow path: escalate to write lock
    p.mu.Lock()
    defer p.mu.Unlock()

    // Double-check: another goroutine may have created it
    if w, ok := p.writers[topic]; ok {
        return w
    }

    // Create the writer (expensive)
    w = &kafka.Writer{
        Addr:         kafka.TCP(p.config.Brokers...),
        Topic:        topic,
        Balancer:     &kafka.Hash{},
        RequiredAcks: kafka.RequiredAcks(p.config.RequiredAcks),
        BatchSize:    p.config.BatchSize,
        BatchTimeout: p.config.BatchTimeout,
        WriteTimeout: p.config.WriteTimeout,
        MaxAttempts:  p.config.MaxRetries,
        Async:        false,
    }
    p.writers[topic] = w
    return w
}
Goroutine A RLock (miss) Lock check again → create writer
Goroutine B RLock (miss) Lock (blocks) check again → found! return
Why double-check? Between releasing RLock and acquiring Lock, another goroutine may have already created the writer for this topic. Without the second check, you'd create duplicate writers — wasting connections and potentially corrupting batches.

PerKeyLimiter — Per-Tenant Rate Limiter with Double-Check

apps/broadcast-service/internal/ratelimit/limiter.go
type PerKeyLimiter struct {
    mu       sync.RWMutex
    limiters map[string]*rate.Limiter
    rps      float64
    burst    int
}

func (t *PerKeyLimiter) Wait(ctx context.Context, key string) error {
    // Read lock: fast path
    t.mu.RLock()
    l, ok := t.limiters[key]
    t.mu.RUnlock()

    if !ok {
        // Write lock + double-check
        t.mu.Lock()
        l, ok = t.limiters[key]
        if !ok {
            l = rate.NewLimiter(rate.Limit(t.rps), t.burst)
            t.limiters[key] = l
        }
        t.mu.Unlock()
    }

    return l.Wait(ctx)
}
Same pattern, different domain. Each tenant gets its own rate.Limiter. The first message for a tenant creates the limiter; all subsequent messages for that tenant hit the RLock fast path. This prevents the Meta WhatsApp API from being overwhelmed per-tenant.

presignedURLCache — S3 Presigned URL Caching

apps/broadcast-service/internal/service/broadcast_message_service.go
type presignedURLCache struct {
    mu    sync.RWMutex
    cache map[string]string
}

func (c *presignedURLCache) Get(key string) (string, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()
    v, ok := c.cache[key]
    return v, ok
}

func (c *presignedURLCache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.cache[key] = value
}
Why cache S3 URLs? Presigning S3 URLs requires an API call. When a broadcast campaign sends 100k messages, the same template media URL is reused for each recipient. The RWMutex cache ensures only the first recipient triggers the S3 API call; all others get the cached URL via the fast RLock path.

1.2 sync.WaitGroup

WaitGroup

WHAT

sync.WaitGroup is a counter that blocks until all registered goroutines call Done(). It provides a structured way to wait for fan-out work to complete before proceeding.

WHY

During graceful shutdown, you cannot simply kill goroutines — they may be mid-processing a Kafka message or mid-API-call to Meta. WaitGroup ensures the main process waits for all in-flight work to drain before exiting.

HOW

Kafka Consumer — Tracking Consumer Loop Goroutines

pkg/eventbus/kafka/consumer.go
type Consumer struct {
    config        *Config
    subscriptions []*subscription
    mu            sync.RWMutex
    running       bool
    wg            sync.WaitGroup  // tracks all consumer goroutines
}

func (c *Consumer) Run(ctx context.Context) error {
    // ... setup omitted ...

    // Start a goroutine for each subscription
    errCh := make(chan error, len(c.subscriptions))

    for _, sub := range c.subscriptions {
        c.wg.Add(1)                       // register before launch
        go func(s *subscription) {
            defer c.wg.Done()              // always decrement on exit
            err := c.consumeLoop(ctx, s)
            if err != nil && !errors.Is(err, context.Canceled) {
                errCh <- err
            }
        }(sub)
    }

    // Wait for context cancellation or error
    select {
    case <-ctx.Done():
        slog.Info("Consumer context cancelled, shutting down...")
    case err := <-errCh:
        slog.Error("Consumer error", "error", err)
        return err
    }

    // Wait for ALL goroutines to finish draining
    c.wg.Wait()

    return ctx.Err()
}
Critical rule: Add(1) is called before the go statement, never inside the goroutine. If Wait() were called between the go launch and the goroutine's Add(1), the counter could be zero prematurely.

Broadcast Worker Pool — WaitGroup + Semaphore Channel

apps/broadcast-service/cmd/worker/main.go
sem := make(chan struct{}, workerPoolSize)  // semaphore: max N goroutines
var wg sync.WaitGroup

// Inside the event handler:
if event.Type == eventbus.EventTypeBroadcastMessageRequested {
    sem <- struct{}{}     // acquire semaphore (blocks if pool is full)
    wg.Add(1)              // register with WaitGroup
    go func() {
        defer func() {
            <-sem            // release semaphore slot
            wg.Done()        // signal completion
        }()
        if err := broadcastMessageHandler(ctx, event); err != nil {
            slog.ErrorContext(ctx, "Broadcast message handler error",
                "error", err, "event_id", event.ID)
        }
    }()
    return nil
}
Kafka msg sem <- (acquire) wg.Add(1) go process() <-sem + wg.Done()
Two primitives, two concerns. The semaphore channel bounds concurrency (max N in-flight API calls to Meta). The WaitGroup ensures graceful shutdown waits for every in-flight call to complete. Neither alone is sufficient.

1.3 Channels

Channels

WHAT

Channels are Go's primary mechanism for communication between goroutines. They can be buffered or unbuffered, typed, and restricted to send-only or receive-only.

WHY

The codebase uses channels for six distinct patterns: OS signal handling, job distribution, type enforcement, bounded concurrency, error aggregation, and async queuing. Each buffer size and direction choice is intentional.

HOW

Signal Channels — Graceful Shutdown

apps/admin-service/cmd/server/main.go (and all main.go files)
// Buffer size 1: ensures the signal is not lost if the goroutine
// hasn't reached the receive yet when the signal fires.
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit  // blocks until SIGINT or SIGTERM

slog.Info("Shutting down server...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

if err := server.Shutdown(shutdownCtx); err != nil {
    slog.Error("Server forced to shutdown", "error", err)
}
Why buffer size 1? The signal.Notify documentation states: "Package signal will not block sending to c." If the channel is unbuffered and the receiver hasn't reached <-quit yet, the signal is silently dropped. A buffer of 1 prevents this race.

Job Distribution — Buffered Fan-Out Channel

apps/broadcast-service/internal/worker/dispatcher.go
type Dispatcher struct {
    jobQueue chan job.Job
    workers  []*Worker
}

func NewDispatcher(workerCount int, queueSize int) *Dispatcher {
    jobQueue := make(chan job.Job, queueSize)  // buffered: absorbs bursts

    d := &Dispatcher{jobQueue: jobQueue}
    for i := 1; i <= workerCount; i++ {
        d.workers = append(d.workers, NewWorker(i, jobQueue))
    }
    return d
}

func (d *Dispatcher) Dispatch(job job.Job) {
    d.jobQueue <- job  // blocking send: backpressure if queue is full
}
HTTP Handler jobQueue (buffer=100) Worker 1
Fan-out: One channel, N workers. All workers read from the same channel. Go's channel semantics guarantee each job is delivered to exactly one worker. The buffer size (100 in production) decouples producers from consumers, absorbing bursts without blocking HTTP handlers.

Directional Channels — Type-Enforced Receive-Only

apps/broadcast-service/internal/worker/worker.go
type Worker struct {
    id   int
    jobs <-chan job.Job  // receive-only: compiler prevents accidental sends
}

func NewWorker(id int, jobs chan job.Job) *Worker {
    // Bidirectional channel is implicitly narrowed to receive-only
    return &Worker{id: id, jobs: jobs}
}
Compile-time safety. The worker can only read from the job channel. If someone accidentally writes w.jobs <- someJob, the compiler rejects it. This makes the data flow direction (dispatcher → worker) explicit and prevents subtle bugs.

Semaphore Pattern — Bounded Concurrency

apps/broadcast-service/cmd/worker/main.go
sem := make(chan struct{}, workerPoolSize)
// ...
sem <- struct{}{}  // acquire: blocks when pool is full
go func() {
    defer func() { <-sem }()  // release: free the slot
    // ... process message ...
}()
Why struct{} not bool? struct{} is zero bytes. The channel carries no data — only presence. This is the idiomatic Go semaphore. The buffer size equals the max concurrent goroutines.

Error Aggregation — First-Error-Wins

pkg/eventbus/kafka/consumer.go
// Buffer = number of subscriptions: no goroutine blocks on send
errCh := make(chan error, len(c.subscriptions))

for _, sub := range c.subscriptions {
    c.wg.Add(1)
    go func(s *subscription) {
        defer c.wg.Done()
        err := c.consumeLoop(ctx, s)
        if err != nil && !errors.Is(err, context.Canceled) {
            errCh <- err  // never blocks (buffered)
        }
    }(sub)
}

select {
case <-ctx.Done():
    // normal shutdown
case err := <-errCh:
    // first fatal error from any consumer
    return err
}
Buffer sizing matters. The buffer equals len(subscriptions) so that if all goroutines fail simultaneously, none blocks on the send. The select reads only the first error — this is the "first-error-wins" pattern that triggers an immediate shutdown.

Email Worker — Buffered Channel as Async Queue

pkg/shared/mail/worker.go
type EmailWorker struct {
    cfg   EmailConfig
    queue chan EmailJob
}

func NewEmailWorker(cfg EmailConfig, buffer int) *EmailWorker {
    w := &EmailWorker{
        cfg:   cfg,
        queue: make(chan EmailJob, buffer),  // buffered: async queue
    }
    go w.start()  // single consumer goroutine
    return w
}

func (w *EmailWorker) start() {
    for job := range w.queue {  // blocks until channel is closed
        body, err := mailtemplate.Render(job.Template, job.Data)
        if err != nil {
            log.Println("render email failed:", err)
            continue
        }
        SendHTML(w.cfg, job.To, job.Subject, body)
    }
}

func (w *EmailWorker) Send(job EmailJob) {
    w.queue <- job  // non-blocking as long as buffer isn't full
}
Pattern: in-process message queue. The buffered channel decouples email sending from the request path. HTTP handlers call Send() and return immediately. The background goroutine drains the queue sequentially, preventing SMTP connection storms.

1.4 Goroutines

Goroutine

WHAT

Goroutines are lightweight, cooperatively-scheduled threads managed by the Go runtime. They cost ~2–8 KB of stack and are the fundamental unit of concurrency in Go.

WHY

The codebase uses goroutines for four distinct lifecycle patterns: background servers, fire-and-forget side effects, background cleanup loops, and concurrent event consumers. Each pattern has different shutdown and error-handling requirements.

HOW

Background Server — Non-Blocking ListenAndServe

apps/admin-service/cmd/server/main.go
server := &http.Server{
    Addr:    ":" + cfg.Port,
    Handler: tracedHandler,
}

// Launch server in background goroutine
go func() {
    slog.Info("Starting admin-service", "port", cfg.Port)
    if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
        slog.Error("Server error", "error", err)
        os.Exit(1)
    }
}()

// Main goroutine blocks on signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
Why not block on ListenAndServe in main? Because the main goroutine needs to continue setting up the Kafka consumer and signal handler. The HTTP server runs in a background goroutine while main orchestrates shutdown. ErrServerClosed is expected during graceful shutdown and must be filtered out.

Fire-and-Forget — Async Email Notifications

apps/admin-service/internal/service/alert_service.go
func (s *AlertService) CreateAlert(req model.CreateAlertRequest) (*model.PlatformAlert, error) {
    alert := &model.PlatformAlert{ /* ... */ }

    if err := s.db.Create(alert).Error; err != nil {
        return nil, err
    }

    // Send email notification for critical alerts
    if req.Severity == model.AlertSeverityCritical || req.Severity == model.AlertSeverityWarning {
        go s.sendEmailNotification(alert)  // fire-and-forget
    }

    return alert, nil  // return immediately, don't wait for email
}
Trade-off: Fire-and-forget means email failures are only logged, never propagated to the caller. This is acceptable here because the alert is already persisted to the database — email is a best-effort notification. The caller should not block on SMTP round-trips.

Background Cleanup — Periodic Goroutine with Ticker

pkg/eventbus/idempotency.go
func NewInMemoryIdempotencyStore() *InMemoryIdempotencyStore {
    store := &InMemoryIdempotencyStore{
        entries: make(map[string]time.Time),
    }
    go store.cleanupLoop()  // background goroutine for GC
    return store
}

func (s *InMemoryIdempotencyStore) cleanupLoop() {
    ticker := time.NewTicker(1 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        s.cleanup()
    }
}

func (s *InMemoryIdempotencyStore) cleanup() {
    s.mu.Lock()
    defer s.mu.Unlock()

    now := time.Now()
    for key, expiry := range s.entries {
        if now.After(expiry) {
            delete(s.entries, key)
        }
    }
}
Preventing memory leaks. Without cleanup, the entries map grows unbounded as events are processed. The background goroutine runs every minute, holding a write lock briefly to purge expired entries. This keeps memory usage proportional to the active TTL window, not the total event count.

Concurrent Consumers — Goroutine-Per-Subscription

pkg/eventbus/kafka/consumer.go
// Each subscription gets its own goroutine for independent consumption
for _, sub := range c.subscriptions {
    c.wg.Add(1)
    go func(s *subscription) {
        defer c.wg.Done()
        err := c.consumeLoop(ctx, s)
        if err != nil && !errors.Is(err, context.Canceled) {
            errCh <- err
        }
    }(sub)
}
Why goroutine-per-subscription? Each subscription consumes from a different Kafka topic (broadcasts, template-status, message-status, template-quality). They must run independently — a slow consumer on one topic must not block consumption on another. The closure captures sub via the function parameter to avoid the classic loop variable capture bug.

1.5 Select Statements

Select

WHAT

select multiplexes across multiple channel operations. It blocks until one case is ready, or falls through to default for non-blocking behaviour. It's the core building block for timeout handling, cancellation propagation, and multi-channel coordination.

WHY

Every long-running goroutine in the codebase must be interruptible. select with ctx.Done() is the standard Go pattern for cooperative cancellation. Combined with default, it also implements backpressure and non-blocking dispatch.

HOW

Non-Blocking Send — TryDispatch with Backpressure

apps/broadcast-service/internal/worker/dispatcher.go
// Dispatch blocks until a worker picks up the job
func (d *Dispatcher) Dispatch(job job.Job) {
    d.jobQueue <- job
}

// TryDispatch returns immediately if the queue is full
func (d *Dispatcher) TryDispatch(job job.Job) bool {
    select {
    case d.jobQueue <- job:
        return true   // sent successfully
    default:
        return false  // queue full, apply backpressure
    }
}
Two APIs, two guarantees. Dispatch is blocking — use it when you must not lose the job. TryDispatch is non-blocking — use it when the caller can handle rejection (e.g., return HTTP 429 to the client). The default case makes the channel send non-blocking.

Context-Aware Backoff — Interruptible Retry Sleep

pkg/eventbus/retry.go
func RetryableHandler(cfg *RetryConfig, dlqProducer Producer, handler Handler) Handler {
    return func(ctx context.Context, event *Event) error {
        var lastErr error
        backoff := cfg.InitialBackoff

        for attempt := 0; attempt <= cfg.MaxRetries; attempt++ {
            err := handler(ctx, event)
            if err == nil {
                return nil
            }
            lastErr = err

            if attempt < cfg.MaxRetries {
                select {
                case <-ctx.Done():
                    return ctx.Err()       // shutdown interrupts the backoff
                case <-time.After(backoff):
                    // backoff elapsed, retry
                }

                // Exponential backoff with cap
                backoff = time.Duration(float64(backoff) * cfg.BackoffMultiplier)
                if backoff > cfg.MaxBackoff {
                    backoff = cfg.MaxBackoff
                }
            }
        }

        // All retries exhausted — send to DLQ
        if cfg.SendToDLQ && dlqProducer != nil {
            sendToDLQ(ctx, dlqProducer, event, lastErr)
        }
        return lastErr
    }
}
Why not time.Sleep? time.Sleep is un-interruptible. If the process receives SIGTERM during a 30-second backoff, Sleep would block for the full duration. The select with ctx.Done() allows immediate cancellation, enabling the shutdown sequence to complete within the 10-second timeout window.

Worker Job Loop — Select with Panic Recovery

apps/broadcast-service/internal/worker/worker.go
func (w *Worker) Start(ctx context.Context) {
    go func() {
        log.Printf("worker %d started\n", w.id)

        for {
            select {
            case <-ctx.Done():
                log.Printf("worker %d stopped\n", w.id)
                return  // clean exit on context cancellation

            case j := <-w.jobs:
                func() {
                    defer func() {
                        if r := recover(); r != nil {
                            log.Printf("worker %d panic recovered: %v\n", w.id, r)
                        }
                    }()
                    // panic in one job doesn't kill the worker
                    if err := j.Run(ctx); err != nil {
                        log.Printf("job %s failed: %v\n", j.Name(), err)
                    }
                }()
            }
        }
    }()
}
Resilience by design. The inner func() with recover() creates a panic boundary. If a job panics (e.g., nil pointer in template rendering), only that job fails — the worker goroutine continues processing the next job. Without this, one panicking job would crash the entire worker pool.

Ticker + Shutdown — Periodic Job with Clean Exit

apps/admin-service/cmd/worker/main.go
tokenRefreshInterval := 6 * time.Hour
ticker := time.NewTicker(tokenRefreshInterval)
defer ticker.Stop()

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

slog.Info("Token refresh worker started.", "interval", tokenRefreshInterval)

for {
    select {
    case <-ticker.C:
        slog.Info("Running token refresh job...")
        if err := tokenRefreshService.RefreshExpiringTokens(); err != nil {
            slog.Error("Token refresh job failed", "error", err)
        }

    case <-quit:
        slog.Info("Shutting down worker...")
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        _ = ctx
        slog.Info("Worker shutdown complete")
        return
    }
}
Cron-style worker. This is the admin-service token refresh worker. It runs a job every 6 hours, but remains responsive to shutdown signals at all times. The select ensures the worker can exit cleanly even if it's between tick intervals. ticker.Stop() in defer prevents the ticker goroutine from leaking.

Consumer Shutdown — First-Error-Wins Multiplexing

pkg/eventbus/kafka/consumer.go
// After launching all consumer goroutines:
select {
case <-ctx.Done():
    slog.Info("Consumer context cancelled, shutting down...")
case err := <-errCh:
    slog.Error("Consumer error", "error", err)
    return err
}

// This line only runs after the select unblocks
c.wg.Wait()
return ctx.Err()
Two exit paths, one select. The consumer exits either when the parent context is cancelled (normal shutdown via SIGTERM) or when any consumer goroutine reports a fatal error. In both cases, wg.Wait() ensures all goroutines have finished before the function returns. This prevents resource leaks (open Kafka readers, in-flight messages).

Quick Reference

Primitive Pattern Where Purpose
sync.RWMutex Read/Write Split idempotency.go, broadcast_message_service.go Concurrent reads, exclusive writes
sync.RWMutex Double-Check Locking producer.go, limiter.go Lazy singleton creation
sync.WaitGroup Fork-Join consumer.go, worker/main.go Wait for goroutine completion
chan os.Signal Signal Channel all main.go Graceful shutdown
chan Job Fan-Out dispatcher.go Job distribution to workers
<-chan Job Directional worker.go Compile-time send prevention
chan struct{} Semaphore worker/main.go Bounded concurrency
chan error Error Aggregation consumer.go First-error-wins
chan EmailJob Async Queue mail/worker.go Decouple email from request
select Non-Blocking Send dispatcher.go Backpressure
select Context-Aware Sleep retry.go Interruptible backoff
select Job + Cancel worker.go Graceful worker loop
select Ticker + Signal admin worker/main.go Periodic job with clean exit
select Multi-Channel Mux consumer.go First-error-wins shutdown