Back to Blog
Go Backend

Web3 Go Backend Architecture: Building Scalable Blockchain Infrastructure in 2025

Cap
16 min read
goweb3blockchainarchitectureethereum

Web3 Go Backend Architecture: Building Scalable Blockchain Infrastructure in 2025

Advanced patterns for building high-performance Web3 backends that handle millions of transactions

๐ŸŒ The Web3 Backend Evolution in 2025

The Web3 infrastructure landscape has matured significantly. Modern Web3 backends must handle:

  • High-frequency trading (100k+ TPS)
  • Multi-chain operations (Ethereum, Polygon, Arbitrum, Base)
  • MEV protection and fair transaction ordering
  • Real-time data synchronization across multiple networks
  • Regulatory compliance and audit trails

Let's build a production-grade Web3 backend that addresses these challenges.

๐Ÿ—๏ธ Architecture Overview

Our system will implement a microservices architecture with:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   RPC Gateway   โ”‚โ”€โ”€โ”€โ”€โ”‚  Event Indexer  โ”‚โ”€โ”€โ”€โ”€โ”‚  State Manager  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
         โ”‚                       โ”‚                       โ”‚
         โ–ผ                       โ–ผ                       โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Transaction     โ”‚    โ”‚   Data Lake     โ”‚    โ”‚   Cache Layer   โ”‚
โ”‚ Orchestrator    โ”‚    โ”‚  (TimescaleDB)  โ”‚    โ”‚    (Redis)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ”ง Core Components Implementation

1. Multi-Chain RPC Gateway

// internal/gateway/rpc_gateway.go
package gateway

import (
    "context"
    "fmt"
    "log/slog"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/ethclient"
    "github.com/ethereum/go-ethereum/rpc"
    "golang.org/x/sync/semaphore"
)

// ChainConfig represents blockchain network configuration
type ChainConfig struct {
    ChainID     uint64        `json:"chain_id"`
    Name        string        `json:"name"`
    RPCURLs     []string      `json:"rpc_urls"`
    WSURLs      []string      `json:"ws_urls"`
    MaxRPS      int           `json:"max_rps"`
    Timeout     time.Duration `json:"timeout"`
    RetryPolicy RetryConfig   `json:"retry_policy"`
}

type RetryConfig struct {
    MaxRetries      int           `json:"max_retries"`
    InitialDelay    time.Duration `json:"initial_delay"`
    BackoffFactor   float64       `json:"backoff_factor"`
    MaxDelay        time.Duration `json:"max_delay"`
}

// RPCPool manages a pool of RPC connections with load balancing
type RPCPool struct {
    config     ChainConfig
    clients    []*ethclient.Client
    rpcClients []*rpc.Client
    semaphore  *semaphore.Weighted
    mu         sync.RWMutex
    current    int
    health     []bool
}

func NewRPCPool(config ChainConfig) (*RPCPool, error) {
    pool := &RPCPool{
        config:    config,
        clients:   make([]*ethclient.Client, 0, len(config.RPCURLs)),
        rpcClients: make([]*rpc.Client, 0, len(config.RPCURLs)),
        semaphore: semaphore.NewWeighted(int64(config.MaxRPS)),
        health:    make([]bool, len(config.RPCURLs)),
    }

    // Initialize connections
    for i, url := range config.RPCURLs {
        rpcClient, err := rpc.DialHTTPWithClient(url, &http.Client{
            Timeout: config.Timeout,
        })
        if err != nil {
            slog.Error("Failed to connect to RPC", "url", url, "error", err)
            continue
        }

        ethClient := ethclient.NewClient(rpcClient)
        
        pool.clients = append(pool.clients, ethClient)
        pool.rpcClients = append(pool.rpcClients, rpcClient)
        pool.health[i] = true
    }

    if len(pool.clients) == 0 {
        return nil, fmt.Errorf("no healthy RPC endpoints available")
    }

    // Start health monitoring
    go pool.healthCheck()

    return pool, nil
}

// GetClient returns a healthy client using round-robin load balancing
func (p *RPCPool) GetClient() (*ethclient.Client, error) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    if len(p.clients) == 0 {
        return nil, fmt.Errorf("no healthy clients available")
    }

    // Find next healthy client
    for attempts := 0; attempts < len(p.clients); attempts++ {
        index := (p.current + attempts) % len(p.clients)
        if p.health[index] {
            p.current = (index + 1) % len(p.clients)
            return p.clients[index], nil
        }
    }

    return nil, fmt.Errorf("no healthy clients available")
}

// ExecuteWithRetry executes a function with automatic retries and rate limiting
func (p *RPCPool) ExecuteWithRetry(ctx context.Context, fn func(*ethclient.Client) error) error {
    // Rate limiting
    if err := p.semaphore.Acquire(ctx, 1); err != nil {
        return fmt.Errorf("rate limit exceeded: %w", err)
    }
    defer p.semaphore.Release(1)

    var lastErr error
    delay := p.config.RetryPolicy.InitialDelay

    for attempt := 0; attempt <= p.config.RetryPolicy.MaxRetries; attempt++ {
        client, err := p.GetClient()
        if err != nil {
            lastErr = err
            if attempt < p.config.RetryPolicy.MaxRetries {
                time.Sleep(delay)
                delay = time.Duration(float64(delay) * p.config.RetryPolicy.BackoffFactor)
                if delay > p.config.RetryPolicy.MaxDelay {
                    delay = p.config.RetryPolicy.MaxDelay
                }
            }
            continue
        }

        if err := fn(client); err != nil {
            lastErr = err
            slog.Warn("RPC call failed", "attempt", attempt+1, "error", err)
            
            if attempt < p.config.RetryPolicy.MaxRetries {
                time.Sleep(delay)
                delay = time.Duration(float64(delay) * p.config.RetryPolicy.BackoffFactor)
                if delay > p.config.RetryPolicy.MaxDelay {
                    delay = p.config.RetryPolicy.MaxDelay
                }
            }
            continue
        }

        return nil
    }

    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

// Health check for RPC endpoints
func (p *RPCPool) healthCheck() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        p.mu.Lock()
        for i, client := range p.clients {
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            
            // Ping the client
            _, err := client.NetworkID(ctx)
            p.health[i] = (err == nil)
            
            if err != nil {
                slog.Warn("RPC endpoint unhealthy", 
                    "url", p.config.RPCURLs[i], 
                    "error", err,
                )
            }
            cancel()
        }
        p.mu.Unlock()
    }
}

// MultiChainGateway manages multiple blockchain networks
type MultiChainGateway struct {
    pools map[uint64]*RPCPool
    mu    sync.RWMutex
}

func NewMultiChainGateway(configs []ChainConfig) (*MultiChainGateway, error) {
    gateway := &MultiChainGateway{
        pools: make(map[uint64]*RPCPool),
    }

    for _, config := range configs {
        pool, err := NewRPCPool(config)
        if err != nil {
            slog.Error("Failed to initialize RPC pool", 
                "chain", config.Name, 
                "error", err,
            )
            continue
        }
        gateway.pools[config.ChainID] = pool
    }

    return gateway, nil
}

func (g *MultiChainGateway) ExecuteOnChain(chainID uint64, ctx context.Context, fn func(*ethclient.Client) error) error {
    g.mu.RLock()
    pool, exists := g.pools[chainID]
    g.mu.RUnlock()

    if !exists {
        return fmt.Errorf("chain %d not supported", chainID)
    }

    return pool.ExecuteWithRetry(ctx, fn)
}

2. Advanced Event Indexing System

// internal/indexer/event_indexer.go
package indexer

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "math/big"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethclient"
    "github.com/jmoiron/sqlx"
)

// EventFilter represents an event filter configuration
type EventFilter struct {
    ID          string         `json:"id"`
    ChainID     uint64         `json:"chain_id"`
    Address     common.Address `json:"address"`
    Topics      [][]common.Hash `json:"topics"`
    FromBlock   *big.Int       `json:"from_block"`
    ToBlock     *big.Int       `json:"to_block"`
    BatchSize   uint64         `json:"batch_size"`
    Handlers    []EventHandler `json:"-"`
}

// EventHandler processes specific events
type EventHandler interface {
    Handle(ctx context.Context, event *types.Log) error
    EventSignature() common.Hash
}

// IndexedEvent represents a processed blockchain event
type IndexedEvent struct {
    ID              string          `db:"id" json:"id"`
    ChainID         uint64          `db:"chain_id" json:"chain_id"`
    BlockNumber     uint64          `db:"block_number" json:"block_number"`
    BlockHash       string          `db:"block_hash" json:"block_hash"`
    TransactionHash string          `db:"transaction_hash" json:"transaction_hash"`
    LogIndex        uint            `db:"log_index" json:"log_index"`
    Address         string          `db:"address" json:"address"`
    Topics          json.RawMessage `db:"topics" json:"topics"`
    Data            string          `db:"data" json:"data"`
    EventType       string          `db:"event_type" json:"event_type"`
    ProcessedAt     time.Time       `db:"processed_at" json:"processed_at"`
    ProcessingTime  time.Duration   `db:"processing_time" json:"processing_time"`
}

// EventIndexer provides high-performance event indexing
type EventIndexer struct {
    gateway    *MultiChainGateway
    db         *sqlx.DB
    filters    map[string]*EventFilter
    handlers   map[common.Hash][]EventHandler
    mu         sync.RWMutex
    workerPool chan struct{}
}

func NewEventIndexer(gateway *MultiChainGateway, db *sqlx.DB, maxWorkers int) *EventIndexer {
    return &EventIndexer{
        gateway:    gateway,
        db:         db,
        filters:    make(map[string]*EventFilter),
        handlers:   make(map[common.Hash][]EventHandler),
        workerPool: make(chan struct{}, maxWorkers),
    }
}

// RegisterFilter adds a new event filter
func (ei *EventIndexer) RegisterFilter(filter *EventFilter) {
    ei.mu.Lock()
    defer ei.mu.Unlock()

    ei.filters[filter.ID] = filter
    
    // Register handlers
    for _, handler := range filter.Handlers {
        signature := handler.EventSignature()
        ei.handlers[signature] = append(ei.handlers[signature], handler)
    }
}

// StartIndexing begins the event indexing process
func (ei *EventIndexer) StartIndexing(ctx context.Context) error {
    ei.mu.RLock()
    filters := make([]*EventFilter, 0, len(ei.filters))
    for _, filter := range ei.filters {
        filters = append(filters, filter)
    }
    ei.mu.RUnlock()

    // Start indexing for each filter
    var wg sync.WaitGroup
    for _, filter := range filters {
        wg.Add(1)
        go func(f *EventFilter) {
            defer wg.Done()
            ei.indexFilterEvents(ctx, f)
        }(filter)
    }

    wg.Wait()
    return nil
}

func (ei *EventIndexer) indexFilterEvents(ctx context.Context, filter *EventFilter) {
    ticker := time.NewTicker(5 * time.Second) // Check for new events every 5 seconds
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := ei.processFilterBatch(ctx, filter); err != nil {
                slog.Error("Failed to process filter batch", 
                    "filter_id", filter.ID,
                    "error", err,
                )
            }
        }
    }
}

func (ei *EventIndexer) processFilterBatch(ctx context.Context, filter *EventFilter) error {
    // Get last processed block
    lastBlock, err := ei.getLastProcessedBlock(filter.ID, filter.ChainID)
    if err != nil {
        return fmt.Errorf("failed to get last processed block: %w", err)
    }

    // Get current block number
    var currentBlock uint64
    err = ei.gateway.ExecuteOnChain(filter.ChainID, ctx, func(client *ethclient.Client) error {
        header, err := client.HeaderByNumber(ctx, nil)
        if err != nil {
            return err
        }
        currentBlock = header.Number.Uint64()
        return nil
    })
    if err != nil {
        return fmt.Errorf("failed to get current block: %w", err)
    }

    // Calculate batch range
    fromBlock := lastBlock + 1
    toBlock := fromBlock + filter.BatchSize - 1
    if toBlock > currentBlock {
        toBlock = currentBlock
    }

    if fromBlock > toBlock {
        return nil // No new blocks to process
    }

    // Create filter query
    query := ethereum.FilterQuery{
        FromBlock: big.NewInt(int64(fromBlock)),
        ToBlock:   big.NewInt(int64(toBlock)),
        Addresses: []common.Address{filter.Address},
        Topics:    filter.Topics,
    }

    // Get logs
    var logs []types.Log
    err = ei.gateway.ExecuteOnChain(filter.ChainID, ctx, func(client *ethclient.Client) error {
        var err error
        logs, err = client.FilterLogs(ctx, query)
        return err
    })
    if err != nil {
        return fmt.Errorf("failed to get logs: %w", err)
    }

    // Process logs in parallel
    semaphore := make(chan struct{}, 10) // Limit concurrent processing
    var wg sync.WaitGroup
    
    for _, log := range logs {
        wg.Add(1)
        go func(l types.Log) {
            defer wg.Done()
            semaphore <- struct{}{}
            defer func() { <-semaphore }()
            
            if err := ei.processEvent(ctx, filter, &l); err != nil {
                slog.Error("Failed to process event", 
                    "filter_id", filter.ID,
                    "tx_hash", l.TxHash.Hex(),
                    "error", err,
                )
            }
        }(log)
    }
    
    wg.Wait()

    // Update last processed block
    if err := ei.updateLastProcessedBlock(filter.ID, filter.ChainID, toBlock); err != nil {
        return fmt.Errorf("failed to update last processed block: %w", err)
    }

    slog.Info("Processed event batch", 
        "filter_id", filter.ID,
        "chain_id", filter.ChainID,
        "from_block", fromBlock,
        "to_block", toBlock,
        "events_count", len(logs),
    )

    return nil
}

func (ei *EventIndexer) processEvent(ctx context.Context, filter *EventFilter, log *types.Log) error {
    startTime := time.Now()

    // Store raw event
    event := &IndexedEvent{
        ID:              fmt.Sprintf("%s-%d", log.TxHash.Hex(), log.Index),
        ChainID:         filter.ChainID,
        BlockNumber:     log.BlockNumber,
        BlockHash:       log.BlockHash.Hex(),
        TransactionHash: log.TxHash.Hex(),
        LogIndex:        log.Index,
        Address:         log.Address.Hex(),
        Data:            common.Bytes2Hex(log.Data),
        ProcessedAt:     time.Now(),
    }

    // Convert topics to JSON
    topics := make([]string, len(log.Topics))
    for i, topic := range log.Topics {
        topics[i] = topic.Hex()
    }
    topicsJSON, _ := json.Marshal(topics)
    event.Topics = topicsJSON

    // Determine event type from first topic (event signature)
    if len(log.Topics) > 0 {
        event.EventType = log.Topics[0].Hex()
    }

    // Process with registered handlers
    ei.mu.RLock()
    handlers := ei.handlers[log.Topics[0]]
    ei.mu.RUnlock()

    for _, handler := range handlers {
        if err := handler.Handle(ctx, log); err != nil {
            slog.Error("Event handler failed", 
                "handler", fmt.Sprintf("%T", handler),
                "error", err,
            )
        }
    }

    event.ProcessingTime = time.Since(startTime)

    // Store in database
    return ei.storeEvent(event)
}

func (ei *EventIndexer) storeEvent(event *IndexedEvent) error {
    query := `
        INSERT INTO indexed_events (
            id, chain_id, block_number, block_hash, transaction_hash,
            log_index, address, topics, data, event_type, processed_at, processing_time
        ) VALUES (
            :id, :chain_id, :block_number, :block_hash, :transaction_hash,
            :log_index, :address, :topics, :data, :event_type, :processed_at, :processing_time
        ) ON CONFLICT (id) DO NOTHING
    `
    
    _, err := ei.db.NamedExec(query, event)
    return err
}

func (ei *EventIndexer) getLastProcessedBlock(filterID string, chainID uint64) (uint64, error) {
    var lastBlock uint64
    err := ei.db.Get(&lastBlock, `
        SELECT COALESCE(MAX(block_number), 0) 
        FROM indexer_checkpoints 
        WHERE filter_id = $1 AND chain_id = $2
    `, filterID, chainID)
    
    if err == sql.ErrNoRows {
        return 0, nil
    }
    return lastBlock, err
}

func (ei *EventIndexer) updateLastProcessedBlock(filterID string, chainID uint64, blockNumber uint64) error {
    _, err := ei.db.Exec(`
        INSERT INTO indexer_checkpoints (filter_id, chain_id, last_block, updated_at)
        VALUES ($1, $2, $3, NOW())
        ON CONFLICT (filter_id, chain_id)
        DO UPDATE SET last_block = $3, updated_at = NOW()
    `, filterID, chainID, blockNumber)
    return err
}

3. MEV Protection and Transaction Ordering

// internal/mev/protection.go
package mev

import (
    "context"
    "crypto/rand"
    "fmt"
    "math/big"
    "sort"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
)

// TransactionOrder represents different ordering strategies
type TransactionOrder int

const (
    FIFO TransactionOrder = iota
    PriorityFee
    TimeWeightedFairQueuing
    CommitReveal
)

// PendingTransaction represents a transaction in the mempool
type PendingTransaction struct {
    Hash         common.Hash    `json:"hash"`
    From         common.Address `json:"from"`
    To           *common.Address `json:"to"`
    Value        *big.Int       `json:"value"`
    GasPrice     *big.Int       `json:"gas_price"`
    PriorityFee  *big.Int       `json:"priority_fee"`
    Nonce        uint64         `json:"nonce"`
    Data         []byte         `json:"data"`
    ReceivedAt   time.Time      `json:"received_at"`
    CommitHash   *common.Hash   `json:"commit_hash,omitempty"`
    RevealedAt   *time.Time     `json:"revealed_at,omitempty"`
    Weight       float64        `json:"weight"`
}

// MEVProtector implements various MEV protection mechanisms
type MEVProtector struct {
    strategy        TransactionOrder
    commitDuration  time.Duration
    revealDuration  time.Duration
    pendingTxs      map[common.Hash]*PendingTransaction
    commitedTxs     map[common.Hash]*PendingTransaction
    mu              sync.RWMutex
    batchInterval   time.Duration
    maxBatchSize    int
}

func NewMEVProtector(strategy TransactionOrder) *MEVProtector {
    return &MEVProtector{
        strategy:        strategy,
        commitDuration:  30 * time.Second,
        revealDuration:  10 * time.Second,
        pendingTxs:      make(map[common.Hash]*PendingTransaction),
        commitedTxs:     make(map[common.Hash]*PendingTransaction),
        batchInterval:   12 * time.Second, // Ethereum block time
        maxBatchSize:    100,
    }
}

// SubmitTransaction adds a transaction to the protected mempool
func (mp *MEVProtector) SubmitTransaction(tx *PendingTransaction) error {
    mp.mu.Lock()
    defer mp.mu.Unlock()

    switch mp.strategy {
    case CommitReveal:
        return mp.handleCommitReveal(tx)
    default:
        tx.ReceivedAt = time.Now()
        mp.pendingTxs[tx.Hash] = tx
        return nil
    }
}

func (mp *MEVProtector) handleCommitReveal(tx *PendingTransaction) error {
    if tx.CommitHash == nil {
        // This is a commit phase
        commitHash := mp.generateCommitHash(tx)
        tx.CommitHash = &commitHash
        tx.ReceivedAt = time.Now()
        mp.commitedTxs[tx.Hash] = tx
        
        // Schedule reveal deadline
        go func() {
            time.Sleep(mp.commitDuration)
            mp.mu.Lock()
            if commitedTx, exists := mp.commitedTxs[tx.Hash]; exists && commitedTx.RevealedAt == nil {
                // Transaction not revealed, remove it
                delete(mp.commitedTxs, tx.Hash)
                slog.Warn("Transaction not revealed in time", "hash", tx.Hash.Hex())
            }
            mp.mu.Unlock()
        }()
        
        return nil
    } else {
        // This is a reveal phase
        if commitedTx, exists := mp.commitedTxs[tx.Hash]; exists {
            if mp.validateReveal(commitedTx, tx) {
                now := time.Now()
                commitedTx.RevealedAt = &now
                mp.pendingTxs[tx.Hash] = commitedTx
                delete(mp.commitedTxs, tx.Hash)
                return nil
            }
            return fmt.Errorf("invalid reveal for transaction %s", tx.Hash.Hex())
        }
        return fmt.Errorf("no commit found for transaction %s", tx.Hash.Hex())
    }
}

func (mp *MEVProtector) generateCommitHash(tx *PendingTransaction) common.Hash {
    // Generate a random nonce for the commit
    nonce := make([]byte, 32)
    rand.Read(nonce)
    
    // Create commit hash: keccak256(txHash + nonce + timestamp)
    data := append(tx.Hash.Bytes(), nonce...)
    data = append(data, big.NewInt(time.Now().Unix()).Bytes()...)
    
    return common.BytesToHash(crypto.Keccak256(data))
}

func (mp *MEVProtector) validateReveal(committed, revealed *PendingTransaction) bool {
    // Validate that revealed transaction matches committed transaction
    return committed.From == revealed.From &&
           committed.Nonce == revealed.Nonce &&
           committed.Value.Cmp(revealed.Value) == 0
}

// GetOrderedBatch returns transactions ordered according to the protection strategy
func (mp *MEVProtector) GetOrderedBatch() []*PendingTransaction {
    mp.mu.RLock()
    defer mp.mu.RUnlock()

    // Collect eligible transactions
    var eligible []*PendingTransaction
    cutoffTime := time.Now().Add(-mp.revealDuration)

    for _, tx := range mp.pendingTxs {
        if mp.strategy == CommitReveal {
            if tx.RevealedAt != nil && tx.RevealedAt.Before(cutoffTime) {
                eligible = append(eligible, tx)
            }
        } else {
            eligible = append(eligible, tx)
        }
    }

    // Apply ordering strategy
    switch mp.strategy {
    case FIFO:
        return mp.orderByFIFO(eligible)
    case PriorityFee:
        return mp.orderByPriorityFee(eligible)
    case TimeWeightedFairQueuing:
        return mp.orderByTimeWeightedFairQueuing(eligible)
    case CommitReveal:
        return mp.orderByFIFO(eligible) // After reveal, use FIFO
    default:
        return eligible
    }
}

func (mp *MEVProtector) orderByFIFO(txs []*PendingTransaction) []*PendingTransaction {
    sort.Slice(txs, func(i, j int) bool {
        return txs[i].ReceivedAt.Before(txs[j].ReceivedAt)
    })
    
    if len(txs) > mp.maxBatchSize {
        return txs[:mp.maxBatchSize]
    }
    return txs
}

func (mp *MEVProtector) orderByPriorityFee(txs []*PendingTransaction) []*PendingTransaction {
    sort.Slice(txs, func(i, j int) bool {
        // Higher priority fee first
        cmp := txs[i].PriorityFee.Cmp(txs[j].PriorityFee)
        if cmp == 0 {
            // If same priority fee, use FIFO
            return txs[i].ReceivedAt.Before(txs[j].ReceivedAt)
        }
        return cmp > 0
    })
    
    if len(txs) > mp.maxBatchSize {
        return txs[:mp.maxBatchSize]
    }
    return txs
}

func (mp *MEVProtector) orderByTimeWeightedFairQueuing(txs []*PendingTransaction) []*PendingTransaction {
    now := time.Now()
    
    // Calculate weights based on waiting time and fee
    for _, tx := range txs {
        waitTime := now.Sub(tx.ReceivedAt).Seconds()
        feeWeight := float64(tx.PriorityFee.Uint64()) / 1e18 // Convert to ETH
        
        // Time-weighted fair queuing: weight = (fee + waiting_bonus) * time_factor
        waitingBonus := waitTime / 60.0 // Bonus increases with waiting time
        timeFactor := 1.0 + (waitTime / 300.0) // 5-minute intervals
        
        tx.Weight = (feeWeight + waitingBonus) * timeFactor
    }
    
    sort.Slice(txs, func(i, j int) bool {
        return txs[i].Weight > txs[j].Weight
    })
    
    if len(txs) > mp.maxBatchSize {
        return txs[:mp.maxBatchSize]
    }
    return txs
}

// StartBatchProcessor begins processing transaction batches
func (mp *MEVProtector) StartBatchProcessor(ctx context.Context, processFn func([]*PendingTransaction) error) {
    ticker := time.NewTicker(mp.batchInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            batch := mp.GetOrderedBatch()
            if len(batch) > 0 {
                if err := processFn(batch); err != nil {
                    slog.Error("Failed to process transaction batch", "error", err)
                } else {
                    // Remove processed transactions
                    mp.mu.Lock()
                    for _, tx := range batch {
                        delete(mp.pendingTxs, tx.Hash)
                    }
                    mp.mu.Unlock()
                    
                    slog.Info("Processed transaction batch", 
                        "count", len(batch),
                        "strategy", mp.strategy,
                    )
                }
            }
        }
    }
}

// MEVDetector identifies potential MEV opportunities and attacks
type MEVDetector struct {
    patterns map[string]MEVPattern
    mu       sync.RWMutex
}

type MEVPattern struct {
    Name        string                                    `json:"name"`
    Description string                                    `json:"description"`
    Detector    func([]*PendingTransaction) []MEVOpportunity `json:"-"`
}

type MEVOpportunity struct {
    Type        string                `json:"type"`
    Severity    string                `json:"severity"`
    Transactions []common.Hash         `json:"transactions"`
    Description string                `json:"description"`
    PotentialProfit *big.Int          `json:"potential_profit"`
    DetectedAt  time.Time             `json:"detected_at"`
}

func NewMEVDetector() *MEVDetector {
    detector := &MEVDetector{
        patterns: make(map[string]MEVPattern),
    }
    
    // Register common MEV patterns
    detector.RegisterPattern("sandwich_attack", MEVPattern{
        Name:        "Sandwich Attack",
        Description: "Front-run and back-run a large trade",
        Detector:    detector.detectSandwichAttack,
    })
    
    detector.RegisterPattern("arbitrage", MEVPattern{
        Name:        "Arbitrage Opportunity",
        Description: "Price differences across DEXes",
        Detector:    detector.detectArbitrage,
    })
    
    return detector
}

func (md *MEVDetector) RegisterPattern(name string, pattern MEVPattern) {
    md.mu.Lock()
    defer md.mu.Unlock()
    md.patterns[name] = pattern
}

func (md *MEVDetector) DetectMEV(txs []*PendingTransaction) []MEVOpportunity {
    md.mu.RLock()
    defer md.mu.RUnlock()
    
    var opportunities []MEVOpportunity
    
    for _, pattern := range md.patterns {
        detected := pattern.Detector(txs)
        opportunities = append(opportunities, detected...)
    }
    
    return opportunities
}

func (md *MEVDetector) detectSandwichAttack(txs []*PendingTransaction) []MEVOpportunity {
    var opportunities []MEVOpportunity
    
    // Simple sandwich detection: look for high-value DEX transactions
    // that could be profitable to front-run and back-run
    for _, tx := range txs {
        if md.isLargeDEXTrade(tx) {
            opportunities = append(opportunities, MEVOpportunity{
                Type:        "sandwich_attack",
                Severity:    "medium",
                Transactions: []common.Hash{tx.Hash},
                Description: fmt.Sprintf("Large DEX trade of %s ETH could be sandwiched", 
                    new(big.Float).Quo(new(big.Float).SetInt(tx.Value), big.NewFloat(1e18)).String()),
                PotentialProfit: md.calculateSandwichProfit(tx),
                DetectedAt:  time.Now(),
            })
        }
    }
    
    return opportunities
}

func (md *MEVDetector) detectArbitrage(txs []*PendingTransaction) []MEVOpportunity {
    // Placeholder for arbitrage detection logic
    // Would involve checking price feeds and identifying arbitrage opportunities
    return []MEVOpportunity{}
}

func (md *MEVDetector) isLargeDEXTrade(tx *PendingTransaction) bool {
    // Check if transaction is to a known DEX and above threshold
    minValue := big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)) // 10 ETH
    return tx.Value.Cmp(minValue) > 0 && md.isKnownDEX(tx.To)
}

func (md *MEVDetector) isKnownDEX(address *common.Address) bool {
    if address == nil {
        return false
    }
    
    // Known DEX addresses (Uniswap, SushiSwap, etc.)
    knownDEXes := map[common.Address]bool{
        common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"): true, // Uniswap V2 Router
        common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): true, // Uniswap V3 Router
        // Add more DEX addresses
    }
    
    return knownDEXes[*address]
}

func (md *MEVDetector) calculateSandwichProfit(tx *PendingTransaction) *big.Int {
    // Simplified profit calculation
    // In reality, this would involve complex price impact analysis
    slippage := big.NewInt(50) // 0.5% slippage assumption
    profit := new(big.Int).Mul(tx.Value, slippage)
    profit = profit.Div(profit, big.NewInt(10000))
    return profit
}

4. Real-time State Synchronization

// internal/state/synchronizer.go
package state

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/redis/go-redis/v9"
)

// StateChange represents a blockchain state change
type StateChange struct {
    ChainID      uint64      `json:"chain_id"`
    BlockNumber  uint64      `json:"block_number"`
    Address      common.Address `json:"address"`
    StorageKey   common.Hash `json:"storage_key"`
    OldValue     common.Hash `json:"old_value"`
    NewValue     common.Hash `json:"new_value"`
    Timestamp    time.Time   `json:"timestamp"`
}

// StateSynchronizer manages real-time state synchronization across chains
type StateSynchronizer struct {
    redis       *redis.Client
    subscribers map[string][]StateSubscriber
    mu          sync.RWMutex
}

type StateSubscriber interface {
    OnStateChange(change *StateChange) error
    GetSubscriptionKey() string
}

func NewStateSynchronizer(redisClient *redis.Client) *StateSynchronizer {
    return &StateSynchronizer{
        redis:       redisClient,
        subscribers: make(map[string][]StateSubscriber),
    }
}

// Subscribe registers a subscriber for state changes
func (ss *StateSynchronizer) Subscribe(subscriber StateSubscriber) {
    ss.mu.Lock()
    defer ss.mu.Unlock()
    
    key := subscriber.GetSubscriptionKey()
    ss.subscribers[key] = append(ss.subscribers[key], subscriber)
}

// PublishStateChange publishes a state change to all subscribers
func (ss *StateSynchronizer) PublishStateChange(ctx context.Context, change *StateChange) error {
    // Publish to Redis for cross-service communication
    data, err := json.Marshal(change)
    if err != nil {
        return fmt.Errorf("failed to marshal state change: %w", err)
    }
    
    channel := fmt.Sprintf("state_changes:%d", change.ChainID)
    if err := ss.redis.Publish(ctx, channel, data).Err(); err != nil {
        return fmt.Errorf("failed to publish to Redis: %w", err)
    }
    
    // Notify local subscribers
    ss.notifySubscribers(change)
    
    return nil
}

func (ss *StateSynchronizer) notifySubscribers(change *StateChange) {
    ss.mu.RLock()
    defer ss.mu.RUnlock()
    
    // Notify all subscribers
    for _, subscriberList := range ss.subscribers {
        for _, subscriber := range subscriberList {
            go func(sub StateSubscriber) {
                if err := sub.OnStateChange(change); err != nil {
                    slog.Error("Subscriber failed to handle state change", 
                        "subscriber", sub.GetSubscriptionKey(),
                        "error", err,
                    )
                }
            }(subscriber)
        }
    }
}

// StartRedisSubscription starts listening for state changes from Redis
func (ss *StateSynchronizer) StartRedisSubscription(ctx context.Context, chainIDs []uint64) error {
    // Subscribe to Redis channels for each chain
    channels := make([]string, len(chainIDs))
    for i, chainID := range chainIDs {
        channels[i] = fmt.Sprintf("state_changes:%d", chainID)
    }
    
    pubsub := ss.redis.Subscribe(ctx, channels...)
    defer pubsub.Close()
    
    ch := pubsub.Channel()
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case msg := <-ch:
            var change StateChange
            if err := json.Unmarshal([]byte(msg.Payload), &change); err != nil {
                slog.Error("Failed to unmarshal state change", "error", err)
                continue
            }
            
            ss.notifySubscribers(&change)
        }
    }
}

่ฟ™ๆ˜ฏไธ€ไธช้ซ˜่ดจ้‡็š„Web3 GoๅŽ็ซฏๆžถๆž„ๆ–‡็ซ ็š„ๅผ€ๅง‹้ƒจๅˆ†ใ€‚ๆ–‡็ซ ๅŒ…ๅซไบ†๏ผš

  1. ๅคš้“พRPC็ฝ‘ๅ…ณ - ๆ”ฏๆŒ่ดŸ่ฝฝๅ‡่กกใ€้‡่ฏ•ๆœบๅˆถใ€ๅฅๅบทๆฃ€ๆŸฅ
  2. ้ซ˜ๆ€ง่ƒฝไบ‹ไปถ็ดขๅผ• - ๅนถ่กŒๅค„็†ใ€ๆ‰น้‡ๆ“ไฝœใ€้”™่ฏฏๆขๅค
  3. MEVไฟๆŠคๆœบๅˆถ - ๅคš็งๆŽ’ๅบ็ญ–็•ฅใ€ๆไบค-ๆญ็คบๆ–นๆกˆใ€MEVๆฃ€ๆต‹
  4. ๅฎžๆ—ถ็Šถๆ€ๅŒๆญฅ - Redisๅ‘ๅธƒ่ฎข้˜…ใ€่ทจๆœๅŠก้€šไฟก

่ฟ™ๅฑ•็คบไบ†2025ๅนดWeb3ๅŽ็ซฏๅผ€ๅ‘็š„ๆทฑๅบฆๆŠ€ๆœฏๅ†…ๅฎนใ€‚ๆ‚จๅธŒๆœ›ๆˆ‘็ปง็ปญๅฎŒๆˆ่ฟ™็ฏ‡ๆ–‡็ซ ๏ผŒ่ฟ˜ๆ˜ฏๅˆ›ๅปบๅ…ถไป–ไธป้ข˜็š„ๆทฑๅบฆๆ–‡็ซ ๏ผŸ

WY

Cap

Senior Golang Backend & Web3 Developer with 10+ years of experience building scalable systems and blockchain solutions.

View Full Profile โ†’