Concurrency Patterns Chapter 01
Every Go concurrency primitive used across the 1Engage backend —
from sync.RWMutex double-check locking to select-based
shutdown orchestration — explained with real code, file paths, and rationale.
On This Page
1.1 sync.RWMutex & Double-Check Locking
RWMutex Double-CheckWHAT
sync.RWMutex provides shared-exclusive locking: any number of goroutines
can hold a read lock simultaneously, but a write lock
is exclusive. The double-check locking pattern takes a read lock first
for the fast path, then escalates to a write lock only when needed — and
re-checks after acquiring the write lock to avoid races.
WHY
Multiple goroutines (Kafka consumer loops, HTTP handlers, worker pool goroutines)
access shared state concurrently. A regular sync.Mutex would serialize all
access. RWMutex maximises read throughput by allowing concurrent reads while
still protecting writes. Double-check locking prevents duplicate creation of expensive
resources (Kafka writers, rate limiters) when multiple goroutines race through the
fast-path miss simultaneously.
HOW
InMemoryIdempotencyStore — Read/Write Separation
pkg/eventbus/idempotency.gotype InMemoryIdempotencyStore struct {
mu sync.RWMutex
entries map[string]time.Time
}
// IsProcessed — read path (many goroutines can check concurrently)
func (s *InMemoryIdempotencyStore) IsProcessed(ctx context.Context, key string) (bool, error) {
s.mu.RLock()
defer s.mu.RUnlock()
expiry, exists := s.entries[key]
if !exists {
return false, nil
}
if time.Now().After(expiry) {
return false, nil
}
return true, nil
}
// MarkProcessed — write path (exclusive access)
func (s *InMemoryIdempotencyStore) MarkProcessed(ctx context.Context, key string, ttl time.Duration) error {
s.mu.Lock()
defer s.mu.Unlock()
s.entries[key] = time.Now().Add(ttl)
return nil
}
IsProcessed is called
for every incoming Kafka message (hot path), while MarkProcessed is
called only on successful processing. RLock keeps the hot path non-blocking
under concurrent consumer load.
Kafka Producer — Double-Check Locking for Lazy Writer Creation
pkg/eventbus/kafka/producer.gotype Producer struct {
writers map[string]*kafka.Writer
config *Config
mu sync.RWMutex
}
func (p *Producer) getWriter(topic string) *kafka.Writer {
// Fast path: read lock only
p.mu.RLock()
w, ok := p.writers[topic]
p.mu.RUnlock()
if ok {
return w
}
// Slow path: escalate to write lock
p.mu.Lock()
defer p.mu.Unlock()
// Double-check: another goroutine may have created it
if w, ok := p.writers[topic]; ok {
return w
}
// Create the writer (expensive)
w = &kafka.Writer{
Addr: kafka.TCP(p.config.Brokers...),
Topic: topic,
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequiredAcks(p.config.RequiredAcks),
BatchSize: p.config.BatchSize,
BatchTimeout: p.config.BatchTimeout,
WriteTimeout: p.config.WriteTimeout,
MaxAttempts: p.config.MaxRetries,
Async: false,
}
p.writers[topic] = w
return w
}
RLock and acquiring
Lock, another goroutine may have already created the writer for this topic.
Without the second check, you'd create duplicate writers — wasting connections and
potentially corrupting batches.
PerKeyLimiter — Per-Tenant Rate Limiter with Double-Check
apps/broadcast-service/internal/ratelimit/limiter.gotype PerKeyLimiter struct {
mu sync.RWMutex
limiters map[string]*rate.Limiter
rps float64
burst int
}
func (t *PerKeyLimiter) Wait(ctx context.Context, key string) error {
// Read lock: fast path
t.mu.RLock()
l, ok := t.limiters[key]
t.mu.RUnlock()
if !ok {
// Write lock + double-check
t.mu.Lock()
l, ok = t.limiters[key]
if !ok {
l = rate.NewLimiter(rate.Limit(t.rps), t.burst)
t.limiters[key] = l
}
t.mu.Unlock()
}
return l.Wait(ctx)
}
rate.Limiter. The first message for a tenant creates the limiter; all
subsequent messages for that tenant hit the RLock fast path. This prevents the Meta
WhatsApp API from being overwhelmed per-tenant.
presignedURLCache — S3 Presigned URL Caching
apps/broadcast-service/internal/service/broadcast_message_service.gotype presignedURLCache struct {
mu sync.RWMutex
cache map[string]string
}
func (c *presignedURLCache) Get(key string) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
v, ok := c.cache[key]
return v, ok
}
func (c *presignedURLCache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.cache[key] = value
}
RLock path.
1.2 sync.WaitGroup
WaitGroupWHAT
sync.WaitGroup is a counter that blocks until all registered goroutines
call Done(). It provides a structured way to wait for fan-out work to
complete before proceeding.
WHY
During graceful shutdown, you cannot simply kill goroutines — they may be
mid-processing a Kafka message or mid-API-call to Meta. WaitGroup ensures
the main process waits for all in-flight work to drain before exiting.
HOW
Kafka Consumer — Tracking Consumer Loop Goroutines
pkg/eventbus/kafka/consumer.gotype Consumer struct {
config *Config
subscriptions []*subscription
mu sync.RWMutex
running bool
wg sync.WaitGroup // tracks all consumer goroutines
}
func (c *Consumer) Run(ctx context.Context) error {
// ... setup omitted ...
// Start a goroutine for each subscription
errCh := make(chan error, len(c.subscriptions))
for _, sub := range c.subscriptions {
c.wg.Add(1) // register before launch
go func(s *subscription) {
defer c.wg.Done() // always decrement on exit
err := c.consumeLoop(ctx, s)
if err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}(sub)
}
// Wait for context cancellation or error
select {
case <-ctx.Done():
slog.Info("Consumer context cancelled, shutting down...")
case err := <-errCh:
slog.Error("Consumer error", "error", err)
return err
}
// Wait for ALL goroutines to finish draining
c.wg.Wait()
return ctx.Err()
}
Add(1) is called before the
go statement, never inside the goroutine. If Wait() were
called between the go launch and the goroutine's Add(1),
the counter could be zero prematurely.
Broadcast Worker Pool — WaitGroup + Semaphore Channel
apps/broadcast-service/cmd/worker/main.gosem := make(chan struct{}, workerPoolSize) // semaphore: max N goroutines
var wg sync.WaitGroup
// Inside the event handler:
if event.Type == eventbus.EventTypeBroadcastMessageRequested {
sem <- struct{}{} // acquire semaphore (blocks if pool is full)
wg.Add(1) // register with WaitGroup
go func() {
defer func() {
<-sem // release semaphore slot
wg.Done() // signal completion
}()
if err := broadcastMessageHandler(ctx, event); err != nil {
slog.ErrorContext(ctx, "Broadcast message handler error",
"error", err, "event_id", event.ID)
}
}()
return nil
}
1.3 Channels
ChannelsWHAT
Channels are Go's primary mechanism for communication between goroutines. They can be buffered or unbuffered, typed, and restricted to send-only or receive-only.
WHY
The codebase uses channels for six distinct patterns: OS signal handling, job distribution, type enforcement, bounded concurrency, error aggregation, and async queuing. Each buffer size and direction choice is intentional.
HOW
Signal Channels — Graceful Shutdown
apps/admin-service/cmd/server/main.go (and all main.go files)// Buffer size 1: ensures the signal is not lost if the goroutine
// hasn't reached the receive yet when the signal fires.
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit // blocks until SIGINT or SIGTERM
slog.Info("Shutting down server...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
slog.Error("Server forced to shutdown", "error", err)
}
signal.Notify documentation states:
"Package signal will not block sending to c." If the channel is unbuffered and the
receiver hasn't reached <-quit yet, the signal is silently dropped. A
buffer of 1 prevents this race.
Job Distribution — Buffered Fan-Out Channel
apps/broadcast-service/internal/worker/dispatcher.gotype Dispatcher struct {
jobQueue chan job.Job
workers []*Worker
}
func NewDispatcher(workerCount int, queueSize int) *Dispatcher {
jobQueue := make(chan job.Job, queueSize) // buffered: absorbs bursts
d := &Dispatcher{jobQueue: jobQueue}
for i := 1; i <= workerCount; i++ {
d.workers = append(d.workers, NewWorker(i, jobQueue))
}
return d
}
func (d *Dispatcher) Dispatch(job job.Job) {
d.jobQueue <- job // blocking send: backpressure if queue is full
}
Directional Channels — Type-Enforced Receive-Only
apps/broadcast-service/internal/worker/worker.gotype Worker struct {
id int
jobs <-chan job.Job // receive-only: compiler prevents accidental sends
}
func NewWorker(id int, jobs chan job.Job) *Worker {
// Bidirectional channel is implicitly narrowed to receive-only
return &Worker{id: id, jobs: jobs}
}
w.jobs <- someJob, the compiler
rejects it. This makes the data flow direction (dispatcher → worker) explicit and
prevents subtle bugs.
Semaphore Pattern — Bounded Concurrency
apps/broadcast-service/cmd/worker/main.gosem := make(chan struct{}, workerPoolSize)
// ...
sem <- struct{}{} // acquire: blocks when pool is full
go func() {
defer func() { <-sem }() // release: free the slot
// ... process message ...
}()
struct{} not bool?
struct{} is zero bytes. The channel carries no data — only presence.
This is the idiomatic Go semaphore. The buffer size equals the max concurrent goroutines.
Error Aggregation — First-Error-Wins
pkg/eventbus/kafka/consumer.go// Buffer = number of subscriptions: no goroutine blocks on send
errCh := make(chan error, len(c.subscriptions))
for _, sub := range c.subscriptions {
c.wg.Add(1)
go func(s *subscription) {
defer c.wg.Done()
err := c.consumeLoop(ctx, s)
if err != nil && !errors.Is(err, context.Canceled) {
errCh <- err // never blocks (buffered)
}
}(sub)
}
select {
case <-ctx.Done():
// normal shutdown
case err := <-errCh:
// first fatal error from any consumer
return err
}
len(subscriptions) so that if all goroutines fail simultaneously, none
blocks on the send. The select reads only the first error — this is
the "first-error-wins" pattern that triggers an immediate shutdown.
Email Worker — Buffered Channel as Async Queue
pkg/shared/mail/worker.gotype EmailWorker struct {
cfg EmailConfig
queue chan EmailJob
}
func NewEmailWorker(cfg EmailConfig, buffer int) *EmailWorker {
w := &EmailWorker{
cfg: cfg,
queue: make(chan EmailJob, buffer), // buffered: async queue
}
go w.start() // single consumer goroutine
return w
}
func (w *EmailWorker) start() {
for job := range w.queue { // blocks until channel is closed
body, err := mailtemplate.Render(job.Template, job.Data)
if err != nil {
log.Println("render email failed:", err)
continue
}
SendHTML(w.cfg, job.To, job.Subject, body)
}
}
func (w *EmailWorker) Send(job EmailJob) {
w.queue <- job // non-blocking as long as buffer isn't full
}
Send() and return
immediately. The background goroutine drains the queue sequentially, preventing SMTP
connection storms.
1.4 Goroutines
GoroutineWHAT
Goroutines are lightweight, cooperatively-scheduled threads managed by the Go runtime. They cost ~2–8 KB of stack and are the fundamental unit of concurrency in Go.
WHY
The codebase uses goroutines for four distinct lifecycle patterns: background servers, fire-and-forget side effects, background cleanup loops, and concurrent event consumers. Each pattern has different shutdown and error-handling requirements.
HOW
Background Server — Non-Blocking ListenAndServe
apps/admin-service/cmd/server/main.goserver := &http.Server{
Addr: ":" + cfg.Port,
Handler: tracedHandler,
}
// Launch server in background goroutine
go func() {
slog.Info("Starting admin-service", "port", cfg.Port)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("Server error", "error", err)
os.Exit(1)
}
}()
// Main goroutine blocks on signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
ErrServerClosed
is expected during graceful shutdown and must be filtered out.
Fire-and-Forget — Async Email Notifications
apps/admin-service/internal/service/alert_service.gofunc (s *AlertService) CreateAlert(req model.CreateAlertRequest) (*model.PlatformAlert, error) {
alert := &model.PlatformAlert{ /* ... */ }
if err := s.db.Create(alert).Error; err != nil {
return nil, err
}
// Send email notification for critical alerts
if req.Severity == model.AlertSeverityCritical || req.Severity == model.AlertSeverityWarning {
go s.sendEmailNotification(alert) // fire-and-forget
}
return alert, nil // return immediately, don't wait for email
}
Background Cleanup — Periodic Goroutine with Ticker
pkg/eventbus/idempotency.gofunc NewInMemoryIdempotencyStore() *InMemoryIdempotencyStore {
store := &InMemoryIdempotencyStore{
entries: make(map[string]time.Time),
}
go store.cleanupLoop() // background goroutine for GC
return store
}
func (s *InMemoryIdempotencyStore) cleanupLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
s.cleanup()
}
}
func (s *InMemoryIdempotencyStore) cleanup() {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
for key, expiry := range s.entries {
if now.After(expiry) {
delete(s.entries, key)
}
}
}
Concurrent Consumers — Goroutine-Per-Subscription
pkg/eventbus/kafka/consumer.go// Each subscription gets its own goroutine for independent consumption
for _, sub := range c.subscriptions {
c.wg.Add(1)
go func(s *subscription) {
defer c.wg.Done()
err := c.consumeLoop(ctx, s)
if err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}(sub)
}
sub via the function parameter
to avoid the classic loop variable capture bug.
1.5 Select Statements
SelectWHAT
select multiplexes across multiple channel operations. It blocks until one
case is ready, or falls through to default for non-blocking behaviour.
It's the core building block for timeout handling, cancellation propagation, and
multi-channel coordination.
WHY
Every long-running goroutine in the codebase must be interruptible. select
with ctx.Done() is the standard Go pattern for cooperative cancellation.
Combined with default, it also implements backpressure and non-blocking
dispatch.
HOW
Non-Blocking Send — TryDispatch with Backpressure
apps/broadcast-service/internal/worker/dispatcher.go// Dispatch blocks until a worker picks up the job
func (d *Dispatcher) Dispatch(job job.Job) {
d.jobQueue <- job
}
// TryDispatch returns immediately if the queue is full
func (d *Dispatcher) TryDispatch(job job.Job) bool {
select {
case d.jobQueue <- job:
return true // sent successfully
default:
return false // queue full, apply backpressure
}
}
Dispatch is blocking —
use it when you must not lose the job. TryDispatch is non-blocking —
use it when the caller can handle rejection (e.g., return HTTP 429 to the client). The
default case makes the channel send non-blocking.
Context-Aware Backoff — Interruptible Retry Sleep
pkg/eventbus/retry.gofunc RetryableHandler(cfg *RetryConfig, dlqProducer Producer, handler Handler) Handler {
return func(ctx context.Context, event *Event) error {
var lastErr error
backoff := cfg.InitialBackoff
for attempt := 0; attempt <= cfg.MaxRetries; attempt++ {
err := handler(ctx, event)
if err == nil {
return nil
}
lastErr = err
if attempt < cfg.MaxRetries {
select {
case <-ctx.Done():
return ctx.Err() // shutdown interrupts the backoff
case <-time.After(backoff):
// backoff elapsed, retry
}
// Exponential backoff with cap
backoff = time.Duration(float64(backoff) * cfg.BackoffMultiplier)
if backoff > cfg.MaxBackoff {
backoff = cfg.MaxBackoff
}
}
}
// All retries exhausted — send to DLQ
if cfg.SendToDLQ && dlqProducer != nil {
sendToDLQ(ctx, dlqProducer, event, lastErr)
}
return lastErr
}
}
time.Sleep? time.Sleep is
un-interruptible. If the process receives SIGTERM during a 30-second backoff,
Sleep would block for the full duration. The select with
ctx.Done() allows immediate cancellation, enabling the shutdown sequence
to complete within the 10-second timeout window.
Worker Job Loop — Select with Panic Recovery
apps/broadcast-service/internal/worker/worker.gofunc (w *Worker) Start(ctx context.Context) {
go func() {
log.Printf("worker %d started\n", w.id)
for {
select {
case <-ctx.Done():
log.Printf("worker %d stopped\n", w.id)
return // clean exit on context cancellation
case j := <-w.jobs:
func() {
defer func() {
if r := recover(); r != nil {
log.Printf("worker %d panic recovered: %v\n", w.id, r)
}
}()
// panic in one job doesn't kill the worker
if err := j.Run(ctx); err != nil {
log.Printf("job %s failed: %v\n", j.Name(), err)
}
}()
}
}
}()
}
func() with
recover() creates a panic boundary. If a job panics (e.g., nil pointer in
template rendering), only that job fails — the worker goroutine continues
processing the next job. Without this, one panicking job would crash the entire worker
pool.
Ticker + Shutdown — Periodic Job with Clean Exit
apps/admin-service/cmd/worker/main.gotokenRefreshInterval := 6 * time.Hour
ticker := time.NewTicker(tokenRefreshInterval)
defer ticker.Stop()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
slog.Info("Token refresh worker started.", "interval", tokenRefreshInterval)
for {
select {
case <-ticker.C:
slog.Info("Running token refresh job...")
if err := tokenRefreshService.RefreshExpiringTokens(); err != nil {
slog.Error("Token refresh job failed", "error", err)
}
case <-quit:
slog.Info("Shutting down worker...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = ctx
slog.Info("Worker shutdown complete")
return
}
}
select ensures the worker can exit cleanly even if it's between tick
intervals. ticker.Stop() in defer prevents the ticker
goroutine from leaking.
Consumer Shutdown — First-Error-Wins Multiplexing
pkg/eventbus/kafka/consumer.go// After launching all consumer goroutines:
select {
case <-ctx.Done():
slog.Info("Consumer context cancelled, shutting down...")
case err := <-errCh:
slog.Error("Consumer error", "error", err)
return err
}
// This line only runs after the select unblocks
c.wg.Wait()
return ctx.Err()
wg.Wait() ensures all goroutines have
finished before the function returns. This prevents resource leaks (open Kafka readers,
in-flight messages).
Quick Reference
| Primitive | Pattern | Where | Purpose |
|---|---|---|---|
sync.RWMutex |
Read/Write Split | idempotency.go, broadcast_message_service.go | Concurrent reads, exclusive writes |
sync.RWMutex |
Double-Check Locking | producer.go, limiter.go | Lazy singleton creation |
sync.WaitGroup |
Fork-Join | consumer.go, worker/main.go | Wait for goroutine completion |
chan os.Signal |
Signal Channel | all main.go | Graceful shutdown |
chan Job |
Fan-Out | dispatcher.go | Job distribution to workers |
<-chan Job |
Directional | worker.go | Compile-time send prevention |
chan struct{} |
Semaphore | worker/main.go | Bounded concurrency |
chan error |
Error Aggregation | consumer.go | First-error-wins |
chan EmailJob |
Async Queue | mail/worker.go | Decouple email from request |
select |
Non-Blocking Send | dispatcher.go | Backpressure |
select |
Context-Aware Sleep | retry.go | Interruptible backoff |
select |
Job + Cancel | worker.go | Graceful worker loop |
select |
Ticker + Signal | admin worker/main.go | Periodic job with clean exit |
select |
Multi-Channel Mux | consumer.go | First-error-wins shutdown |