Go Concurrency Deep Dive: Channels, Goroutines, and Select Statements
Go Concurrency Deep Dive: Channels, Goroutines, and Select Statements
"Don't communicate by sharing memory; share memory by communicating." - Rob Pike
If you're coming from languages like Java or Python, Go's approach to concurrency might feel like entering a parallel universe (pun intended). Today, we're going to demystify Go's concurrency model by building real-world examples, breaking common misconceptions, and yes, probably causing a few deadlocks along the way.
🎯 What We're Building Today
By the end of this article, you'll have built:
- A concurrent web scraper that respects rate limits
- A worker pool pattern for processing tasks
- A timeout-aware service orchestrator
- A fan-in/fan-out pipeline for data processing
But first, let's understand the building blocks.
🧩 The Trinity of Go Concurrency
Goroutines: Your Lightweight Threads
Think of goroutines as super-lightweight threads managed by the Go runtime. Creating a goroutine is as simple as adding go
before a function call:
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func main() {
fmt.Printf("Number of CPUs: %d\n", runtime.NumCPU())
fmt.Printf("Number of Goroutines: %d\n", runtime.NumGoroutine())
var wg sync.WaitGroup
// Launch 1000 goroutines
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
fmt.Printf("Goroutine %d finished\n", id)
}(i)
}
fmt.Printf("Number of Goroutines after launch: %d\n", runtime.NumGoroutine())
wg.Wait()
fmt.Printf("All goroutines completed!\n")
}
Key Insight: The Go runtime multiplexes goroutines onto OS threads. You can have thousands of goroutines running on just a few threads!
Channels: The Pipes of Communication
Channels are typed conduits through which goroutines communicate. They're the implementation of the CSP (Communicating Sequential Processes) model.
// Unbuffered channel - synchronous communication
ch := make(chan int)
// Buffered channel - asynchronous up to buffer size
bufferedCh := make(chan string, 10)
// Send
ch <- 42
// Receive
value := <-ch
// Receive with ok check
value, ok := <-ch
if !ok {
fmt.Println("Channel closed")
}
Select: The Swiss Army Knife
The select
statement lets you wait on multiple channel operations:
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
case <-time.After(1 * time.Second):
fmt.Println("Timeout!")
default:
fmt.Println("No channels ready")
}
🔨 Project 1: Building a Rate-Limited Web Scraper
Let's build something practical - a web scraper that respects rate limits using Go's concurrency primitives:
package main
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
)
// URL represents a URL to scrape
type URL struct {
Address string
Retries int
}
// Result holds the scraping result
type Result struct {
URL string
StatusCode int
Body []byte
Error error
}
// RateLimitedScraper scrapes URLs with rate limiting
type RateLimitedScraper struct {
client *http.Client
rateLimiter <-chan time.Time
maxRetries int
}
// NewRateLimitedScraper creates a new scraper with rate limiting
func NewRateLimitedScraper(requestsPerSecond int, maxRetries int) *RateLimitedScraper {
interval := time.Second / time.Duration(requestsPerSecond)
return &RateLimitedScraper{
client: &http.Client{
Timeout: 10 * time.Second,
},
rateLimiter: time.Tick(interval),
maxRetries: maxRetries,
}
}
// Scrape fetches a URL respecting rate limits
func (s *RateLimitedScraper) Scrape(ctx context.Context, url URL) Result {
// Wait for rate limiter
select {
case <-s.rateLimiter:
// Proceed with request
case <-ctx.Done():
return Result{URL: url.Address, Error: ctx.Err()}
}
req, err := http.NewRequestWithContext(ctx, "GET", url.Address, nil)
if err != nil {
return Result{URL: url.Address, Error: err}
}
resp, err := s.client.Do(req)
if err != nil {
if url.Retries < s.maxRetries {
url.Retries++
return s.Scrape(ctx, url) // Retry
}
return Result{URL: url.Address, Error: err}
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return Result{URL: url.Address, Error: err}
}
return Result{
URL: url.Address,
StatusCode: resp.StatusCode,
Body: body,
Error: nil,
}
}
// ScrapeConcurrently scrapes multiple URLs concurrently
func ScrapeConcurrently(urls []string, workers int, requestsPerSecond int) []Result {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
urlChan := make(chan URL, len(urls))
resultChan := make(chan Result, len(urls))
// Create scraper
scraper := NewRateLimitedScraper(requestsPerSecond, 3)
// Start workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for url := range urlChan {
fmt.Printf("Worker %d scraping %s\n", workerID, url.Address)
result := scraper.Scrape(ctx, url)
resultChan <- result
}
}(i)
}
// Send URLs to workers
for _, url := range urls {
urlChan <- URL{Address: url}
}
close(urlChan)
// Wait for workers and close results
go func() {
wg.Wait()
close(resultChan)
}()
// Collect results
var results []Result
for result := range resultChan {
results = append(results, result)
}
return results
}
func main() {
urls := []string{
"https://example.com",
"https://golang.org",
"https://github.com",
"https://stackoverflow.com",
"https://reddit.com",
}
start := time.Now()
results := ScrapeConcurrently(urls, 3, 2) // 3 workers, 2 requests per second
fmt.Printf("\n=== Scraping completed in %v ===\n", time.Since(start))
for _, result := range results {
if result.Error != nil {
fmt.Printf("❌ %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("✅ %s: Status %d, Size %d bytes\n",
result.URL, result.StatusCode, len(result.Body))
}
}
}
🎓 Key Patterns Demonstrated
- Rate Limiting with time.Tick: Creates a channel that sends time values at regular intervals
- Context for Cancellation: Propagates cancellation across goroutines
- Worker Pool Pattern: Fixed number of workers processing from a shared channel
- Graceful Shutdown: Using WaitGroup and channel closing for coordination
🚀 Project 2: Advanced Channel Patterns
Let's explore more sophisticated channel patterns with a data processing pipeline:
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Pipeline demonstrates advanced channel patterns
type Pipeline struct {
ctx context.Context
}
// Generator creates a stream of numbers
func (p *Pipeline) Generator(start, count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
select {
case out <- start + i:
case <-p.ctx.Done():
close(out)
return
}
}
close(out)
}()
return out
}
// Square squares the input numbers
func (p *Pipeline) Square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
select {
case out <- n * n:
case <-p.ctx.Done():
close(out)
return
}
}
close(out)
}()
return out
}
// Filter filters even numbers
func (p *Pipeline) Filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
select {
case out <- n:
case <-p.ctx.Done():
close(out)
return
}
}
}
close(out)
}()
return out
}
// FanOut distributes work across multiple workers
func (p *Pipeline) FanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
out := make(chan int)
outs[i] = out
go func(worker int, out chan<- int) {
for n := range in {
// Simulate work
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
select {
case out <- n:
fmt.Printf("Worker %d processed: %d\n", worker, n)
case <-p.ctx.Done():
close(out)
return
}
}
close(out)
}(i, out)
}
return outs
}
// FanIn merges multiple channels into one
func (p *Pipeline) FanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-p.ctx.Done():
return
}
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// OrDone wraps a channel with cancellation support
func OrDone(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-in:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
// Tee splits one channel into two
func Tee(ctx context.Context, in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for val := range OrDone(ctx, in) {
var out1Val, out2Val = out1, out2
for i := 0; i < 2; i++ {
select {
case out1Val <- val:
out1Val = nil // Disable this case
case out2Val <- val:
out2Val = nil // Disable this case
case <-ctx.Done():
return
}
}
}
}()
return out1, out2
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pipeline := &Pipeline{ctx: ctx}
// Create pipeline: Generate -> Square -> Filter -> FanOut -> FanIn
numbers := pipeline.Generator(1, 20)
squared := pipeline.Square(numbers)
filtered := pipeline.Filter(squared)
// Demonstrate tee
filtered1, filtered2 := Tee(ctx, filtered)
// Use one branch for fan-out/fan-in
workers := pipeline.FanOut(filtered1, 3)
merged := pipeline.FanIn(workers...)
// Collect from both branches
go func() {
fmt.Println("\n=== Branch 2 (Direct) ===")
for n := range filtered2 {
fmt.Printf("Direct: %d\n", n)
}
}()
fmt.Println("=== Branch 1 (Fan-Out/Fan-In) ===")
for result := range merged {
fmt.Printf("Final result: %d\n", result)
}
}
⚠️ Common Pitfalls and How to Avoid Them
1. The Infamous Goroutine Leak
// BAD: This goroutine will leak if nobody reads from ch
func leak() {
ch := make(chan int)
go func() {
val := <-ch // Blocks forever if no sender
fmt.Println(val)
}()
// Function returns, but goroutine is still alive!
}
// GOOD: Always ensure goroutines can exit
func noLeak(ctx context.Context) {
ch := make(chan int)
go func() {
select {
case val := <-ch:
fmt.Println(val)
case <-ctx.Done():
return
}
}()
}
2. Closing Channels
// RULES:
// 1. Only the sender should close a channel
// 2. Sending on a closed channel panics
// 3. Receiving from a closed channel returns zero value immediately
// Pattern: Signal completion
done := make(chan struct{})
go func() {
// Do work
close(done) // Signal completion
}()
<-done // Wait for completion
3. Select Statement Gotchas
// Random selection when multiple cases are ready
select {
case <-ch1:
// This might execute
case <-ch2:
// Or this might execute
}
// Priority select pattern (ch1 has priority)
select {
case val := <-ch1:
handleCh1(val)
default:
select {
case val := <-ch1:
handleCh1(val)
case val := <-ch2:
handleCh2(val)
}
}
🎯 Performance Considerations
Channel vs Mutex Performance
// Benchmark: Channel vs Mutex for counter
func BenchmarkChannel(b *testing.B) {
counter := make(chan int, 1)
counter <- 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
val := <-counter
val++
counter <- val
}
})
}
func BenchmarkMutex(b *testing.B) {
var mu sync.Mutex
counter := 0
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
mu.Lock()
counter++
mu.Unlock()
}
})
}
Rule of Thumb:
- Use channels for passing ownership of data
- Use mutexes for protecting shared state
- Channels have more overhead but provide better composition
🔍 Debugging Concurrency
1. Race Detector
go run -race main.go
go test -race ./...
2. Goroutine Dumps
// Dump all goroutine stacks
import _ "net/http/pprof"
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Visit http://localhost:6060/debug/pprof/goroutine
3. Visualization with go-deadlock
import "github.com/sasha-s/go-deadlock"
// Replace sync.Mutex with deadlock.Mutex
var mu deadlock.Mutex
🎬 Conclusion: Concurrency is Not Parallelism
Remember Rob Pike's famous talk: "Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once."
Go's concurrency model gives you the tools to structure your program as independently executing components. Whether they run in parallel depends on the runtime and available cores.
Your Concurrency Toolbox
- Goroutines: For independent tasks
- Channels: For communication and synchronization
- Select: For non-blocking operations and timeouts
- Context: For cancellation and deadlines
- sync Package: For traditional synchronization when needed
What's Next?
- Experiment with the code examples
- Try building a concurrent rate limiter
- Implement a pub-sub system using channels
- Profile your concurrent code to find bottlenecks
Remember: Start with simple, correct code. Then optimize. Premature optimization in concurrent code is a recipe for subtle bugs and sleepless nights.
Happy concurrent coding! 🚀
Have questions or found a bug in the examples? Reach out on Twitter or open an issue on GitHub.
Cap
Senior Golang Backend & Web3 Developer with 10+ years of experience building scalable systems and blockchain solutions.
View Full Profile →