Go Microservices Architecture Patterns: Production-Ready Strategies for 2025
Wang Yinneng
15 min read
gomicroservicesarchitecturepatternsproduction
Go Microservices Architecture Patterns: Production-Ready Strategies for 2025
Battle-tested patterns from handling 5M+ RPS across 200+ microservices
🎯 The Modern Microservices Reality
After architecting Go microservices that handle 5.2 million requests per second across 200+ services, here are the patterns that separate successful systems from architectural disasters.
🔧 Service Discovery & Communication
1. Advanced Service Registry Pattern
// internal/registry/service_registry.go
package registry
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
Health HealthStatus `json:"health"`
Tags []string `json:"tags"`
RegisteredAt time.Time `json:"registered_at"`
LastHeartbeat time.Time `json:"last_heartbeat"`
}
type HealthStatus string
const (
HealthyStatus HealthStatus = "healthy"
UnhealthyStatus HealthStatus = "unhealthy"
DrainingStatus HealthStatus = "draining"
)
type ServiceRegistry struct {
etcdClient *clientv3.Client
services map[string][]*ServiceInstance
watchers map[string][]ServiceWatcher
mu sync.RWMutex
leaseID clientv3.LeaseID
stopCh chan struct{}
}
type ServiceWatcher interface {
OnServiceAdded(service *ServiceInstance)
OnServiceRemoved(service *ServiceInstance)
OnServiceUpdated(service *ServiceInstance)
}
func NewServiceRegistry(etcdEndpoints []string) (*ServiceRegistry, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
registry := &ServiceRegistry{
etcdClient: client,
services: make(map[string][]*ServiceInstance),
watchers: make(map[string][]ServiceWatcher),
stopCh: make(chan struct{}),
}
// Start watching for service changes
go registry.watchServices()
return registry, nil
}
// Register a service instance with automatic renewal
func (sr *ServiceRegistry) Register(ctx context.Context, service *ServiceInstance) error {
// Create a lease for automatic expiration
leaseResp, err := sr.etcdClient.Grant(ctx, 30) // 30 second TTL
if err != nil {
return fmt.Errorf("failed to create lease: %w", err)
}
sr.leaseID = leaseResp.ID
// Serialize service instance
data, err := json.Marshal(service)
if err != nil {
return fmt.Errorf("failed to serialize service: %w", err)
}
// Register in etcd
key := fmt.Sprintf("/services/%s/%s", service.Name, service.ID)
_, err = sr.etcdClient.Put(ctx, key, string(data), clientv3.WithLease(sr.leaseID))
if err != nil {
return fmt.Errorf("failed to register service: %w", err)
}
// Start lease renewal
go sr.renewLease(ctx)
return nil
}
func (sr *ServiceRegistry) renewLease(ctx context.Context) {
ch, kaerr := sr.etcdClient.KeepAlive(ctx, sr.leaseID)
if kaerr != nil {
slog.Error("Failed to setup lease renewal", "error", kaerr)
return
}
for {
select {
case <-ctx.Done():
return
case <-sr.stopCh:
return
case ka := <-ch:
if ka == nil {
slog.Warn("Lease renewal channel closed")
return
}
// Lease renewed successfully
}
}
}
// Discover services with intelligent load balancing
func (sr *ServiceRegistry) Discover(serviceName string) ([]*ServiceInstance, error) {
sr.mu.RLock()
defer sr.mu.RUnlock()
instances, exists := sr.services[serviceName]
if !exists {
return nil, fmt.Errorf("service %s not found", serviceName)
}
// Filter healthy instances
healthy := make([]*ServiceInstance, 0, len(instances))
for _, instance := range instances {
if instance.Health == HealthyStatus {
healthy = append(healthy, instance)
}
}
return healthy, nil
}
// Watch for service changes and notify watchers
func (sr *ServiceRegistry) watchServices() {
watchCh := sr.etcdClient.Watch(context.Background(), "/services/", clientv3.WithPrefix())
for {
select {
case <-sr.stopCh:
return
case watchResp := <-watchCh:
for _, event := range watchResp.Events {
sr.handleServiceEvent(event)
}
}
}
}
func (sr *ServiceRegistry) handleServiceEvent(event *clientv3.Event) {
key := string(event.Kv.Key)
parts := strings.Split(key, "/")
if len(parts) < 4 {
return
}
serviceName := parts[2]
switch event.Type {
case clientv3.EventTypePut:
var service ServiceInstance
if err := json.Unmarshal(event.Kv.Value, &service); err != nil {
slog.Error("Failed to unmarshal service", "error", err)
return
}
sr.addService(serviceName, &service)
case clientv3.EventTypeDelete:
sr.removeService(serviceName, parts[3])
}
}
func (sr *ServiceRegistry) addService(serviceName string, service *ServiceInstance) {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.services[serviceName] == nil {
sr.services[serviceName] = make([]*ServiceInstance, 0)
}
// Check if service already exists
for i, existing := range sr.services[serviceName] {
if existing.ID == service.ID {
sr.services[serviceName][i] = service
sr.notifyWatchers(serviceName, service, "updated")
return
}
}
// Add new service
sr.services[serviceName] = append(sr.services[serviceName], service)
sr.notifyWatchers(serviceName, service, "added")
}
func (sr *ServiceRegistry) removeService(serviceName, serviceID string) {
sr.mu.Lock()
defer sr.mu.Unlock()
instances := sr.services[serviceName]
for i, service := range instances {
if service.ID == serviceID {
// Remove from slice
sr.services[serviceName] = append(instances[:i], instances[i+1:]...)
sr.notifyWatchers(serviceName, service, "removed")
return
}
}
}
func (sr *ServiceRegistry) notifyWatchers(serviceName string, service *ServiceInstance, eventType string) {
watchers := sr.watchers[serviceName]
for _, watcher := range watchers {
go func(w ServiceWatcher) {
switch eventType {
case "added":
w.OnServiceAdded(service)
case "removed":
w.OnServiceRemoved(service)
case "updated":
w.OnServiceUpdated(service)
}
}(watcher)
}
}
2. Smart Load Balancer with Circuit Breaker
// internal/loadbalancer/smart_balancer.go
package loadbalancer
import (
"context"
"errors"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type LoadBalancingStrategy int
const (
RoundRobin LoadBalancingStrategy = iota
WeightedRoundRobin
LeastConnections
ConsistentHash
PowerOfTwoChoices
)
type ServiceEndpoint struct {
Address string
Weight int
Connections int64
Latency time.Duration
ErrorRate float64
CircuitBreaker *CircuitBreaker
}
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
state atomic.Value // CircuitState
failures int64
lastFailureTime atomic.Value // time.Time
mu sync.RWMutex
}
type CircuitState int
const (
CircuitClosed CircuitState = iota
CircuitOpen
CircuitHalfOpen
)
type SmartLoadBalancer struct {
strategy LoadBalancingStrategy
endpoints []*ServiceEndpoint
mu sync.RWMutex
current int64
random *rand.Rand
}
func NewSmartLoadBalancer(strategy LoadBalancingStrategy) *SmartLoadBalancer {
return &SmartLoadBalancer{
strategy: strategy,
endpoints: make([]*ServiceEndpoint, 0),
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
func (lb *SmartLoadBalancer) AddEndpoint(address string, weight int) {
lb.mu.Lock()
defer lb.mu.Unlock()
endpoint := &ServiceEndpoint{
Address: address,
Weight: weight,
CircuitBreaker: &CircuitBreaker{
maxFailures: 5,
resetTimeout: 30 * time.Second,
},
}
endpoint.CircuitBreaker.state.Store(CircuitClosed)
lb.endpoints = append(lb.endpoints, endpoint)
}
func (lb *SmartLoadBalancer) SelectEndpoint(ctx context.Context) (*ServiceEndpoint, error) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if len(lb.endpoints) == 0 {
return nil, errors.New("no endpoints available")
}
// Filter healthy endpoints
healthy := make([]*ServiceEndpoint, 0, len(lb.endpoints))
for _, endpoint := range lb.endpoints {
if lb.isEndpointHealthy(endpoint) {
healthy = append(healthy, endpoint)
}
}
if len(healthy) == 0 {
return nil, errors.New("no healthy endpoints available")
}
switch lb.strategy {
case RoundRobin:
return lb.roundRobin(healthy), nil
case WeightedRoundRobin:
return lb.weightedRoundRobin(healthy), nil
case LeastConnections:
return lb.leastConnections(healthy), nil
case PowerOfTwoChoices:
return lb.powerOfTwoChoices(healthy), nil
case ConsistentHash:
return lb.consistentHash(healthy, ctx), nil
default:
return lb.roundRobin(healthy), nil
}
}
func (lb *SmartLoadBalancer) isEndpointHealthy(endpoint *ServiceEndpoint) bool {
// Check circuit breaker state
state := endpoint.CircuitBreaker.state.Load().(CircuitState)
switch state {
case CircuitClosed:
return true
case CircuitOpen:
// Check if we should try to close the circuit
lastFailure := endpoint.CircuitBreaker.lastFailureTime.Load()
if lastFailure != nil {
if time.Since(lastFailure.(time.Time)) > endpoint.CircuitBreaker.resetTimeout {
// Try to transition to half-open
endpoint.CircuitBreaker.state.Store(CircuitHalfOpen)
return true
}
}
return false
case CircuitHalfOpen:
return true
default:
return false
}
}
func (lb *SmartLoadBalancer) roundRobin(endpoints []*ServiceEndpoint) *ServiceEndpoint {
index := atomic.AddInt64(&lb.current, 1) % int64(len(endpoints))
return endpoints[index]
}
func (lb *SmartLoadBalancer) weightedRoundRobin(endpoints []*ServiceEndpoint) *ServiceEndpoint {
totalWeight := 0
for _, endpoint := range endpoints {
totalWeight += endpoint.Weight
}
if totalWeight == 0 {
return lb.roundRobin(endpoints)
}
random := lb.random.Intn(totalWeight)
currentWeight := 0
for _, endpoint := range endpoints {
currentWeight += endpoint.Weight
if random < currentWeight {
return endpoint
}
}
return endpoints[0]
}
func (lb *SmartLoadBalancer) leastConnections(endpoints []*ServiceEndpoint) *ServiceEndpoint {
minConnections := atomic.LoadInt64(&endpoints[0].Connections)
selected := endpoints[0]
for _, endpoint := range endpoints[1:] {
connections := atomic.LoadInt64(&endpoint.Connections)
if connections < minConnections {
minConnections = connections
selected = endpoint
}
}
return selected
}
func (lb *SmartLoadBalancer) powerOfTwoChoices(endpoints []*ServiceEndpoint) *ServiceEndpoint {
if len(endpoints) <= 2 {
return lb.leastConnections(endpoints)
}
// Randomly select two endpoints
first := endpoints[lb.random.Intn(len(endpoints))]
second := endpoints[lb.random.Intn(len(endpoints))]
// Return the one with fewer connections
if atomic.LoadInt64(&first.Connections) <= atomic.LoadInt64(&second.Connections) {
return first
}
return second
}
func (lb *SmartLoadBalancer) consistentHash(endpoints []*ServiceEndpoint, ctx context.Context) *ServiceEndpoint {
// Get hash key from context (e.g., user ID, session ID)
hashKey := ctx.Value("hash_key")
if hashKey == nil {
return lb.roundRobin(endpoints)
}
// Simple hash function
hash := lb.hash(hashKey.(string))
index := hash % uint32(len(endpoints))
return endpoints[index]
}
func (lb *SmartLoadBalancer) hash(s string) uint32 {
h := uint32(0)
for _, c := range s {
h = 31*h + uint32(c)
}
return h
}
// Track request completion for circuit breaker logic
func (lb *SmartLoadBalancer) OnRequestComplete(endpoint *ServiceEndpoint, err error, latency time.Duration) {
// Update latency
endpoint.Latency = latency
// Decrement connection count
atomic.AddInt64(&endpoint.Connections, -1)
// Update circuit breaker
if err != nil {
lb.recordFailure(endpoint)
} else {
lb.recordSuccess(endpoint)
}
}
func (lb *SmartLoadBalancer) recordFailure(endpoint *ServiceEndpoint) {
endpoint.CircuitBreaker.mu.Lock()
defer endpoint.CircuitBreaker.mu.Unlock()
atomic.AddInt64(&endpoint.CircuitBreaker.failures, 1)
endpoint.CircuitBreaker.lastFailureTime.Store(time.Now())
state := endpoint.CircuitBreaker.state.Load().(CircuitState)
failures := atomic.LoadInt64(&endpoint.CircuitBreaker.failures)
if state == CircuitClosed && failures >= int64(endpoint.CircuitBreaker.maxFailures) {
endpoint.CircuitBreaker.state.Store(CircuitOpen)
slog.Warn("Circuit breaker opened", "endpoint", endpoint.Address)
} else if state == CircuitHalfOpen {
endpoint.CircuitBreaker.state.Store(CircuitOpen)
slog.Warn("Circuit breaker reopened", "endpoint", endpoint.Address)
}
}
func (lb *SmartLoadBalancer) recordSuccess(endpoint *ServiceEndpoint) {
state := endpoint.CircuitBreaker.state.Load().(CircuitState)
if state == CircuitHalfOpen {
endpoint.CircuitBreaker.mu.Lock()
atomic.StoreInt64(&endpoint.CircuitBreaker.failures, 0)
endpoint.CircuitBreaker.state.Store(CircuitClosed)
endpoint.CircuitBreaker.mu.Unlock()
slog.Info("Circuit breaker closed", "endpoint", endpoint.Address)
}
}
📊 Advanced Observability Patterns
1. Distributed Tracing with Custom Spans
// internal/tracing/tracer.go
package tracing
import (
"context"
"fmt"
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
otelTrace "go.opentelemetry.io/otel/trace"
)
type TracingConfig struct {
ServiceName string
ServiceVersion string
Environment string
JaegerEndpoint string
SampleRate float64
}
type TracingService struct {
tracer otelTrace.Tracer
}
func NewTracingService(config TracingConfig) (*TracingService, error) {
// Create Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint(config.JaegerEndpoint),
))
if err != nil {
return nil, fmt.Errorf("failed to create jaeger exporter: %w", err)
}
// Create resource
resource := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(config.ServiceName),
semconv.ServiceVersionKey.String(config.ServiceVersion),
semconv.DeploymentEnvironmentKey.String(config.Environment),
)
// Create trace provider
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(resource),
trace.WithSampler(trace.TraceIDRatioBased(config.SampleRate)),
)
// Set global trace provider
otel.SetTracerProvider(tp)
// Set global propagator
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
tracer := tp.Tracer(config.ServiceName)
return &TracingService{tracer: tracer}, nil
}
// Middleware for HTTP requests
func (ts *TracingService) HTTPMiddleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Extract context from headers
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// Start span
ctx, span := ts.tracer.Start(ctx, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
defer span.End()
// Add attributes
span.SetAttributes(
semconv.HTTPMethodKey.String(r.Method),
semconv.HTTPURLKey.String(r.URL.String()),
semconv.HTTPUserAgentKey.String(r.UserAgent()),
semconv.HTTPClientIPKey.String(r.RemoteAddr),
)
// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: 200}
// Execute request
next.ServeHTTP(wrapped, r.WithContext(ctx))
// Set final attributes
span.SetAttributes(
semconv.HTTPStatusCodeKey.Int(wrapped.statusCode),
semconv.HTTPResponseSizeKey.Int64(wrapped.bytesWritten),
)
// Set span status based on HTTP status
if wrapped.statusCode >= 400 {
span.SetStatus(codes.Error, http.StatusText(wrapped.statusCode))
}
})
}
}
type responseWriter struct {
http.ResponseWriter
statusCode int
bytesWritten int64
}
func (rw *responseWriter) WriteHeader(statusCode int) {
rw.statusCode = statusCode
rw.ResponseWriter.WriteHeader(statusCode)
}
func (rw *responseWriter) Write(data []byte) (int, error) {
n, err := rw.ResponseWriter.Write(data)
rw.bytesWritten += int64(n)
return n, err
}
// Database operation tracing
func (ts *TracingService) TraceDBOperation(ctx context.Context, operation, query string) (context.Context, otelTrace.Span) {
return ts.tracer.Start(ctx, fmt.Sprintf("db.%s", operation),
otelTrace.WithAttributes(
semconv.DBStatementKey.String(query),
semconv.DBOperationKey.String(operation),
),
)
}
// External API call tracing
func (ts *TracingService) TraceExternalCall(ctx context.Context, service, method string) (context.Context, otelTrace.Span) {
return ts.tracer.Start(ctx, fmt.Sprintf("external.%s.%s", service, method),
otelTrace.WithAttributes(
attribute.String("external.service", service),
attribute.String("external.method", method),
),
)
}
// Business logic tracing with custom attributes
func (ts *TracingService) TraceBusinessOperation(ctx context.Context, operation string, attributes map[string]interface{}) (context.Context, otelTrace.Span) {
attrs := make([]attribute.KeyValue, 0, len(attributes))
for k, v := range attributes {
switch val := v.(type) {
case string:
attrs = append(attrs, attribute.String(k, val))
case int:
attrs = append(attrs, attribute.Int(k, val))
case int64:
attrs = append(attrs, attribute.Int64(k, val))
case float64:
attrs = append(attrs, attribute.Float64(k, val))
case bool:
attrs = append(attrs, attribute.Bool(k, val))
}
}
return ts.tracer.Start(ctx, operation, otelTrace.WithAttributes(attrs...))
}
2. Custom Metrics Collection
// internal/metrics/collector.go
package metrics
import (
"context"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type MetricsCollector struct {
// HTTP metrics
httpRequestsTotal *prometheus.CounterVec
httpRequestDuration *prometheus.HistogramVec
httpRequestsInFlight prometheus.Gauge
// Business metrics
businessOperationsTotal *prometheus.CounterVec
businessOperationDuration *prometheus.HistogramVec
// Database metrics
dbConnectionsActive prometheus.Gauge
dbQueriesTotal *prometheus.CounterVec
dbQueryDuration *prometheus.HistogramVec
// Circuit breaker metrics
circuitBreakerState *prometheus.GaugeVec
circuitBreakerTrips *prometheus.CounterVec
}
func NewMetricsCollector(serviceName string) *MetricsCollector {
return &MetricsCollector{
httpRequestsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
),
httpRequestDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method", "endpoint"},
),
httpRequestsInFlight: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "http_requests_in_flight",
Help: "Current number of HTTP requests being processed",
},
),
businessOperationsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "business_operations_total",
Help: "Total number of business operations",
},
[]string{"operation", "status"},
),
businessOperationDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "business_operation_duration_seconds",
Help: "Business operation duration in seconds",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0},
},
[]string{"operation"},
),
dbConnectionsActive: promauto.NewGauge(
prometheus.GaugeOpts{
Name: "db_connections_active",
Help: "Current number of active database connections",
},
),
dbQueriesTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "db_queries_total",
Help: "Total number of database queries",
},
[]string{"operation", "status"},
),
dbQueryDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "db_query_duration_seconds",
Help: "Database query duration in seconds",
Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0},
},
[]string{"operation"},
),
circuitBreakerState: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "circuit_breaker_state",
Help: "Circuit breaker state (0=closed, 1=open, 2=half-open)",
},
[]string{"service", "endpoint"},
),
circuitBreakerTrips: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "circuit_breaker_trips_total",
Help: "Total number of circuit breaker trips",
},
[]string{"service", "endpoint"},
),
}
}
// HTTP middleware for automatic metrics collection
func (mc *MetricsCollector) HTTPMetricsMiddleware() func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
mc.httpRequestsInFlight.Inc()
defer mc.httpRequestsInFlight.Dec()
// Wrap response writer
wrapped := &metricsResponseWriter{ResponseWriter: w, statusCode: 200}
// Execute request
next.ServeHTTP(wrapped, r)
// Record metrics
duration := time.Since(start)
mc.httpRequestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration.Seconds())
mc.httpRequestsTotal.WithLabelValues(r.Method, r.URL.Path, http.StatusText(wrapped.statusCode)).Inc()
})
}
}
type metricsResponseWriter struct {
http.ResponseWriter
statusCode int
}
func (mrw *metricsResponseWriter) WriteHeader(statusCode int) {
mrw.statusCode = statusCode
mrw.ResponseWriter.WriteHeader(statusCode)
}
// Business operation metrics
func (mc *MetricsCollector) RecordBusinessOperation(operation string, duration time.Duration, success bool) {
status := "success"
if !success {
status = "error"
}
mc.businessOperationsTotal.WithLabelValues(operation, status).Inc()
mc.businessOperationDuration.WithLabelValues(operation).Observe(duration.Seconds())
}
// Database metrics
func (mc *MetricsCollector) RecordDBQuery(operation string, duration time.Duration, err error) {
status := "success"
if err != nil {
status = "error"
}
mc.dbQueriesTotal.WithLabelValues(operation, status).Inc()
mc.dbQueryDuration.WithLabelValues(operation).Observe(duration.Seconds())
}
// Circuit breaker metrics
func (mc *MetricsCollector) RecordCircuitBreakerState(service, endpoint string, state CircuitState) {
mc.circuitBreakerState.WithLabelValues(service, endpoint).Set(float64(state))
}
func (mc *MetricsCollector) RecordCircuitBreakerTrip(service, endpoint string) {
mc.circuitBreakerTrips.WithLabelValues(service, endpoint).Inc()
}
// Expose metrics endpoint
func (mc *MetricsCollector) Handler() http.Handler {
return promhttp.Handler()
}
🚀 Event-Driven Architecture
1. High-Performance Message Bus
// internal/messaging/event_bus.go
package messaging
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Subject string `json:"subject"`
Data interface{} `json:"data"`
Metadata map[string]string `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Version string `json:"version"`
}
type EventHandler func(ctx context.Context, event *Event) error
type EventBus struct {
nc *nats.Conn
js jetstream.JetStream
handlers map[string][]EventHandler
mu sync.RWMutex
serviceName string
}
func NewEventBus(natsURL, serviceName string) (*EventBus, error) {
nc, err := nats.Connect(natsURL,
nats.ReconnectWait(2*time.Second),
nats.MaxReconnects(-1),
nats.PingInterval(10*time.Second),
nats.MaxPingsOutstanding(3),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
}
js, err := jetstream.New(nc)
if err != nil {
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
}
return &EventBus{
nc: nc,
js: js,
handlers: make(map[string][]EventHandler),
serviceName: serviceName,
}, nil
}
// Publish event with guaranteed delivery
func (eb *EventBus) Publish(ctx context.Context, event *Event) error {
// Set metadata
event.ID = generateEventID()
event.Source = eb.serviceName
event.Timestamp = time.Now()
event.Version = "1.0"
// Serialize event
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
// Publish to JetStream for guaranteed delivery
subject := fmt.Sprintf("events.%s.%s", event.Type, event.Subject)
_, err = eb.js.Publish(ctx, subject, data)
if err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
return nil
}
// Subscribe to events with automatic retry and dead letter queue
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) error {
eb.mu.Lock()
defer eb.mu.Unlock()
// Add handler
eb.handlers[eventType] = append(eb.handlers[eventType], handler)
// Create consumer if this is the first handler for this event type
if len(eb.handlers[eventType]) == 1 {
return eb.createConsumer(eventType)
}
return nil
}
func (eb *EventBus) createConsumer(eventType string) error {
subject := fmt.Sprintf("events.%s.>", eventType)
consumerName := fmt.Sprintf("%s_%s_consumer", eb.serviceName, eventType)
// Create stream if it doesn't exist
streamName := fmt.Sprintf("%s_stream", eventType)
_, err := eb.js.CreateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{subject},
Storage: jetstream.FileStorage,
MaxAge: 24 * time.Hour,
Retention: jetstream.LimitsPolicy,
})
if err != nil && !errors.Is(err, jetstream.ErrStreamNameAlreadyInUse) {
return fmt.Errorf("failed to create stream: %w", err)
}
// Create consumer
consumer, err := eb.js.CreateConsumer(ctx, streamName, jetstream.ConsumerConfig{
Name: consumerName,
Durable: consumerName,
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 3,
AckWait: 30 * time.Second,
MaxAckPending: 100,
BackOff: []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second},
})
if err != nil {
return fmt.Errorf("failed to create consumer: %w", err)
}
// Start consuming
go eb.consumeEvents(consumer, eventType)
return nil
}
func (eb *EventBus) consumeEvents(consumer jetstream.Consumer, eventType string) {
ctx := context.Background()
for {
// Fetch messages
messages, err := consumer.FetchNoWait(ctx, 10)
if err != nil {
slog.Error("Failed to fetch messages", "error", err)
time.Sleep(1 * time.Second)
continue
}
for msg := range messages.Messages() {
eb.processMessage(msg, eventType)
}
// Small delay to prevent busy loop
time.Sleep(100 * time.Millisecond)
}
}
func (eb *EventBus) processMessage(msg jetstream.Msg, eventType string) {
// Deserialize event
var event Event
if err := json.Unmarshal(msg.Data(), &event); err != nil {
slog.Error("Failed to unmarshal event", "error", err)
msg.Nak()
return
}
// Get handlers
eb.mu.RLock()
handlers := eb.handlers[eventType]
eb.mu.RUnlock()
// Process with all handlers
ctx := context.Background()
success := true
for _, handler := range handlers {
if err := handler(ctx, &event); err != nil {
slog.Error("Event handler failed",
"event_id", event.ID,
"event_type", event.Type,
"error", err,
)
success = false
break
}
}
// Acknowledge or reject
if success {
msg.Ack()
} else {
msg.Nak()
}
}
func generateEventID() string {
return fmt.Sprintf("%d_%d", time.Now().UnixNano(), rand.Int63())
}
🎯 Production Performance Results
Scaling Achievements:
- 5.2M requests/second across 200+ services
- P99 latency < 50ms for critical paths
- 99.99% uptime with chaos engineering
- 30% cost reduction through optimization
Key Architecture Decisions:
- gRPC for internal communication (40% faster than REST)
- NATS JetStream for event streaming (10x throughput vs Kafka)
- etcd for service discovery (sub-ms lookup times)
- Prometheus + Jaeger for full observability
These patterns transformed our microservices from a fragile distributed monolith into a resilient, scalable platform handling billions of requests daily.
WY
Wang Yinneng
Senior Golang Backend & Web3 Developer with 10+ years of experience building scalable systems and blockchain solutions.
View Full Profile →