Back to Blog
Go Backend

Go Concurrency Deep Dive: Channels, Goroutines, and Select Statements

Cap
10 min read
golangconcurrencygoroutineschannels

Go Concurrency Deep Dive: Channels, Goroutines, and Select Statements

"Don't communicate by sharing memory; share memory by communicating." - Rob Pike

If you're coming from languages like Java or Python, Go's approach to concurrency might feel like entering a parallel universe (pun intended). Today, we're going to demystify Go's concurrency model by building real-world examples, breaking common misconceptions, and yes, probably causing a few deadlocks along the way.

🎯 What We're Building Today

By the end of this article, you'll have built:

  1. A concurrent web scraper that respects rate limits
  2. A worker pool pattern for processing tasks
  3. A timeout-aware service orchestrator
  4. A fan-in/fan-out pipeline for data processing

But first, let's understand the building blocks.

🧩 The Trinity of Go Concurrency

Goroutines: Your Lightweight Threads

Think of goroutines as super-lightweight threads managed by the Go runtime. Creating a goroutine is as simple as adding go before a function call:

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func main() {
    fmt.Printf("Number of CPUs: %d\n", runtime.NumCPU())
    fmt.Printf("Number of Goroutines: %d\n", runtime.NumGoroutine())
    
    var wg sync.WaitGroup
    
    // Launch 1000 goroutines
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            time.Sleep(100 * time.Millisecond)
            fmt.Printf("Goroutine %d finished\n", id)
        }(i)
    }
    
    fmt.Printf("Number of Goroutines after launch: %d\n", runtime.NumGoroutine())
    wg.Wait()
    fmt.Printf("All goroutines completed!\n")
}

Key Insight: The Go runtime multiplexes goroutines onto OS threads. You can have thousands of goroutines running on just a few threads!

Channels: The Pipes of Communication

Channels are typed conduits through which goroutines communicate. They're the implementation of the CSP (Communicating Sequential Processes) model.

// Unbuffered channel - synchronous communication
ch := make(chan int)

// Buffered channel - asynchronous up to buffer size
bufferedCh := make(chan string, 10)

// Send
ch <- 42

// Receive
value := <-ch

// Receive with ok check
value, ok := <-ch
if !ok {
    fmt.Println("Channel closed")
}

Select: The Swiss Army Knife

The select statement lets you wait on multiple channel operations:

select {
case msg1 := <-ch1:
    fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
    fmt.Println("Received from ch2:", msg2)
case <-time.After(1 * time.Second):
    fmt.Println("Timeout!")
default:
    fmt.Println("No channels ready")
}

🔨 Project 1: Building a Rate-Limited Web Scraper

Let's build something practical - a web scraper that respects rate limits using Go's concurrency primitives:

package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "sync"
    "time"
)

// URL represents a URL to scrape
type URL struct {
    Address string
    Retries int
}

// Result holds the scraping result
type Result struct {
    URL        string
    StatusCode int
    Body       []byte
    Error      error
}

// RateLimitedScraper scrapes URLs with rate limiting
type RateLimitedScraper struct {
    client      *http.Client
    rateLimiter <-chan time.Time
    maxRetries  int
}

// NewRateLimitedScraper creates a new scraper with rate limiting
func NewRateLimitedScraper(requestsPerSecond int, maxRetries int) *RateLimitedScraper {
    interval := time.Second / time.Duration(requestsPerSecond)
    return &RateLimitedScraper{
        client: &http.Client{
            Timeout: 10 * time.Second,
        },
        rateLimiter: time.Tick(interval),
        maxRetries:  maxRetries,
    }
}

// Scrape fetches a URL respecting rate limits
func (s *RateLimitedScraper) Scrape(ctx context.Context, url URL) Result {
    // Wait for rate limiter
    select {
    case <-s.rateLimiter:
        // Proceed with request
    case <-ctx.Done():
        return Result{URL: url.Address, Error: ctx.Err()}
    }
    
    req, err := http.NewRequestWithContext(ctx, "GET", url.Address, nil)
    if err != nil {
        return Result{URL: url.Address, Error: err}
    }
    
    resp, err := s.client.Do(req)
    if err != nil {
        if url.Retries < s.maxRetries {
            url.Retries++
            return s.Scrape(ctx, url) // Retry
        }
        return Result{URL: url.Address, Error: err}
    }
    defer resp.Body.Close()
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return Result{URL: url.Address, Error: err}
    }
    
    return Result{
        URL:        url.Address,
        StatusCode: resp.StatusCode,
        Body:       body,
        Error:      nil,
    }
}

// ScrapeConcurrently scrapes multiple URLs concurrently
func ScrapeConcurrently(urls []string, workers int, requestsPerSecond int) []Result {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    urlChan := make(chan URL, len(urls))
    resultChan := make(chan Result, len(urls))
    
    // Create scraper
    scraper := NewRateLimitedScraper(requestsPerSecond, 3)
    
    // Start workers
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for url := range urlChan {
                fmt.Printf("Worker %d scraping %s\n", workerID, url.Address)
                result := scraper.Scrape(ctx, url)
                resultChan <- result
            }
        }(i)
    }
    
    // Send URLs to workers
    for _, url := range urls {
        urlChan <- URL{Address: url}
    }
    close(urlChan)
    
    // Wait for workers and close results
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    // Collect results
    var results []Result
    for result := range resultChan {
        results = append(results, result)
    }
    
    return results
}

func main() {
    urls := []string{
        "https://example.com",
        "https://golang.org",
        "https://github.com",
        "https://stackoverflow.com",
        "https://reddit.com",
    }
    
    start := time.Now()
    results := ScrapeConcurrently(urls, 3, 2) // 3 workers, 2 requests per second
    
    fmt.Printf("\n=== Scraping completed in %v ===\n", time.Since(start))
    for _, result := range results {
        if result.Error != nil {
            fmt.Printf("❌ %s: %v\n", result.URL, result.Error)
        } else {
            fmt.Printf("✅ %s: Status %d, Size %d bytes\n", 
                result.URL, result.StatusCode, len(result.Body))
        }
    }
}

🎓 Key Patterns Demonstrated

  1. Rate Limiting with time.Tick: Creates a channel that sends time values at regular intervals
  2. Context for Cancellation: Propagates cancellation across goroutines
  3. Worker Pool Pattern: Fixed number of workers processing from a shared channel
  4. Graceful Shutdown: Using WaitGroup and channel closing for coordination

🚀 Project 2: Advanced Channel Patterns

Let's explore more sophisticated channel patterns with a data processing pipeline:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// Pipeline demonstrates advanced channel patterns
type Pipeline struct {
    ctx context.Context
}

// Generator creates a stream of numbers
func (p *Pipeline) Generator(start, count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < count; i++ {
            select {
            case out <- start + i:
            case <-p.ctx.Done():
                close(out)
                return
            }
        }
        close(out)
    }()
    return out
}

// Square squares the input numbers
func (p *Pipeline) Square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            select {
            case out <- n * n:
            case <-p.ctx.Done():
                close(out)
                return
            }
        }
        close(out)
    }()
    return out
}

// Filter filters even numbers
func (p *Pipeline) Filter(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if n%2 == 0 {
                select {
                case out <- n:
                case <-p.ctx.Done():
                    close(out)
                    return
                }
            }
        }
        close(out)
    }()
    return out
}

// FanOut distributes work across multiple workers
func (p *Pipeline) FanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        out := make(chan int)
        outs[i] = out
        
        go func(worker int, out chan<- int) {
            for n := range in {
                // Simulate work
                time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
                select {
                case out <- n:
                    fmt.Printf("Worker %d processed: %d\n", worker, n)
                case <-p.ctx.Done():
                    close(out)
                    return
                }
            }
            close(out)
        }(i, out)
    }
    return outs
}

// FanIn merges multiple channels into one
func (p *Pipeline) FanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                select {
                case out <- n:
                case <-p.ctx.Done():
                    return
                }
            }
        }(ch)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

// OrDone wraps a channel with cancellation support
func OrDone(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

// Tee splits one channel into two
func Tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
    out1 := make(chan int)
    out2 := make(chan int)
    
    go func() {
        defer close(out1)
        defer close(out2)
        
        for val := range OrDone(ctx, in) {
            var out1Val, out2Val = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case out1Val <- val:
                    out1Val = nil // Disable this case
                case out2Val <- val:
                    out2Val = nil // Disable this case
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    
    return out1, out2
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    pipeline := &Pipeline{ctx: ctx}
    
    // Create pipeline: Generate -> Square -> Filter -> FanOut -> FanIn
    numbers := pipeline.Generator(1, 20)
    squared := pipeline.Square(numbers)
    filtered := pipeline.Filter(squared)
    
    // Demonstrate tee
    filtered1, filtered2 := Tee(ctx, filtered)
    
    // Use one branch for fan-out/fan-in
    workers := pipeline.FanOut(filtered1, 3)
    merged := pipeline.FanIn(workers...)
    
    // Collect from both branches
    go func() {
        fmt.Println("\n=== Branch 2 (Direct) ===")
        for n := range filtered2 {
            fmt.Printf("Direct: %d\n", n)
        }
    }()
    
    fmt.Println("=== Branch 1 (Fan-Out/Fan-In) ===")
    for result := range merged {
        fmt.Printf("Final result: %d\n", result)
    }
}

⚠️ Common Pitfalls and How to Avoid Them

1. The Infamous Goroutine Leak

// BAD: This goroutine will leak if nobody reads from ch
func leak() {
    ch := make(chan int)
    go func() {
        val := <-ch  // Blocks forever if no sender
        fmt.Println(val)
    }()
    // Function returns, but goroutine is still alive!
}

// GOOD: Always ensure goroutines can exit
func noLeak(ctx context.Context) {
    ch := make(chan int)
    go func() {
        select {
        case val := <-ch:
            fmt.Println(val)
        case <-ctx.Done():
            return
        }
    }()
}

2. Closing Channels

// RULES:
// 1. Only the sender should close a channel
// 2. Sending on a closed channel panics
// 3. Receiving from a closed channel returns zero value immediately

// Pattern: Signal completion
done := make(chan struct{})
go func() {
    // Do work
    close(done) // Signal completion
}()

<-done // Wait for completion

3. Select Statement Gotchas

// Random selection when multiple cases are ready
select {
case <-ch1:
    // This might execute
case <-ch2:
    // Or this might execute
}

// Priority select pattern (ch1 has priority)
select {
case val := <-ch1:
    handleCh1(val)
default:
    select {
    case val := <-ch1:
        handleCh1(val)
    case val := <-ch2:
        handleCh2(val)
    }
}

🎯 Performance Considerations

Channel vs Mutex Performance

// Benchmark: Channel vs Mutex for counter
func BenchmarkChannel(b *testing.B) {
    counter := make(chan int, 1)
    counter <- 0
    
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            val := <-counter
            val++
            counter <- val
        }
    })
}

func BenchmarkMutex(b *testing.B) {
    var mu sync.Mutex
    counter := 0
    
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            counter++
            mu.Unlock()
        }
    })
}

Rule of Thumb:

  • Use channels for passing ownership of data
  • Use mutexes for protecting shared state
  • Channels have more overhead but provide better composition

🔍 Debugging Concurrency

1. Race Detector

go run -race main.go
go test -race ./...

2. Goroutine Dumps

// Dump all goroutine stacks
import _ "net/http/pprof"

go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
}()

// Visit http://localhost:6060/debug/pprof/goroutine

3. Visualization with go-deadlock

import "github.com/sasha-s/go-deadlock"

// Replace sync.Mutex with deadlock.Mutex
var mu deadlock.Mutex

🎬 Conclusion: Concurrency is Not Parallelism

Remember Rob Pike's famous talk: "Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once."

Go's concurrency model gives you the tools to structure your program as independently executing components. Whether they run in parallel depends on the runtime and available cores.

Your Concurrency Toolbox

  1. Goroutines: For independent tasks
  2. Channels: For communication and synchronization
  3. Select: For non-blocking operations and timeouts
  4. Context: For cancellation and deadlines
  5. sync Package: For traditional synchronization when needed

What's Next?

  • Experiment with the code examples
  • Try building a concurrent rate limiter
  • Implement a pub-sub system using channels
  • Profile your concurrent code to find bottlenecks

Remember: Start with simple, correct code. Then optimize. Premature optimization in concurrent code is a recipe for subtle bugs and sleepless nights.

Happy concurrent coding! 🚀


Have questions or found a bug in the examples? Reach out on Twitter or open an issue on GitHub.

WY

Cap

Senior Golang Backend & Web3 Developer with 10+ years of experience building scalable systems and blockchain solutions.

View Full Profile →