Web3 Go Backend Architecture: Building Scalable Blockchain Infrastructure in 2025
Cap
16 min read
goweb3blockchainarchitectureethereum
Web3 Go Backend Architecture: Building Scalable Blockchain Infrastructure in 2025
Advanced patterns for building high-performance Web3 backends that handle millions of transactions
๐ The Web3 Backend Evolution in 2025
The Web3 infrastructure landscape has matured significantly. Modern Web3 backends must handle:
- High-frequency trading (100k+ TPS)
- Multi-chain operations (Ethereum, Polygon, Arbitrum, Base)
- MEV protection and fair transaction ordering
- Real-time data synchronization across multiple networks
- Regulatory compliance and audit trails
Let's build a production-grade Web3 backend that addresses these challenges.
๐๏ธ Architecture Overview
Our system will implement a microservices architecture with:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ RPC Gateway โโโโโโ Event Indexer โโโโโโ State Manager โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ Transaction โ โ Data Lake โ โ Cache Layer โ
โ Orchestrator โ โ (TimescaleDB) โ โ (Redis) โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
๐ง Core Components Implementation
1. Multi-Chain RPC Gateway
// internal/gateway/rpc_gateway.go
package gateway
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/sync/semaphore"
)
// ChainConfig represents blockchain network configuration
type ChainConfig struct {
ChainID uint64 `json:"chain_id"`
Name string `json:"name"`
RPCURLs []string `json:"rpc_urls"`
WSURLs []string `json:"ws_urls"`
MaxRPS int `json:"max_rps"`
Timeout time.Duration `json:"timeout"`
RetryPolicy RetryConfig `json:"retry_policy"`
}
type RetryConfig struct {
MaxRetries int `json:"max_retries"`
InitialDelay time.Duration `json:"initial_delay"`
BackoffFactor float64 `json:"backoff_factor"`
MaxDelay time.Duration `json:"max_delay"`
}
// RPCPool manages a pool of RPC connections with load balancing
type RPCPool struct {
config ChainConfig
clients []*ethclient.Client
rpcClients []*rpc.Client
semaphore *semaphore.Weighted
mu sync.RWMutex
current int
health []bool
}
func NewRPCPool(config ChainConfig) (*RPCPool, error) {
pool := &RPCPool{
config: config,
clients: make([]*ethclient.Client, 0, len(config.RPCURLs)),
rpcClients: make([]*rpc.Client, 0, len(config.RPCURLs)),
semaphore: semaphore.NewWeighted(int64(config.MaxRPS)),
health: make([]bool, len(config.RPCURLs)),
}
// Initialize connections
for i, url := range config.RPCURLs {
rpcClient, err := rpc.DialHTTPWithClient(url, &http.Client{
Timeout: config.Timeout,
})
if err != nil {
slog.Error("Failed to connect to RPC", "url", url, "error", err)
continue
}
ethClient := ethclient.NewClient(rpcClient)
pool.clients = append(pool.clients, ethClient)
pool.rpcClients = append(pool.rpcClients, rpcClient)
pool.health[i] = true
}
if len(pool.clients) == 0 {
return nil, fmt.Errorf("no healthy RPC endpoints available")
}
// Start health monitoring
go pool.healthCheck()
return pool, nil
}
// GetClient returns a healthy client using round-robin load balancing
func (p *RPCPool) GetClient() (*ethclient.Client, error) {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.clients) == 0 {
return nil, fmt.Errorf("no healthy clients available")
}
// Find next healthy client
for attempts := 0; attempts < len(p.clients); attempts++ {
index := (p.current + attempts) % len(p.clients)
if p.health[index] {
p.current = (index + 1) % len(p.clients)
return p.clients[index], nil
}
}
return nil, fmt.Errorf("no healthy clients available")
}
// ExecuteWithRetry executes a function with automatic retries and rate limiting
func (p *RPCPool) ExecuteWithRetry(ctx context.Context, fn func(*ethclient.Client) error) error {
// Rate limiting
if err := p.semaphore.Acquire(ctx, 1); err != nil {
return fmt.Errorf("rate limit exceeded: %w", err)
}
defer p.semaphore.Release(1)
var lastErr error
delay := p.config.RetryPolicy.InitialDelay
for attempt := 0; attempt <= p.config.RetryPolicy.MaxRetries; attempt++ {
client, err := p.GetClient()
if err != nil {
lastErr = err
if attempt < p.config.RetryPolicy.MaxRetries {
time.Sleep(delay)
delay = time.Duration(float64(delay) * p.config.RetryPolicy.BackoffFactor)
if delay > p.config.RetryPolicy.MaxDelay {
delay = p.config.RetryPolicy.MaxDelay
}
}
continue
}
if err := fn(client); err != nil {
lastErr = err
slog.Warn("RPC call failed", "attempt", attempt+1, "error", err)
if attempt < p.config.RetryPolicy.MaxRetries {
time.Sleep(delay)
delay = time.Duration(float64(delay) * p.config.RetryPolicy.BackoffFactor)
if delay > p.config.RetryPolicy.MaxDelay {
delay = p.config.RetryPolicy.MaxDelay
}
}
continue
}
return nil
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
// Health check for RPC endpoints
func (p *RPCPool) healthCheck() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
p.mu.Lock()
for i, client := range p.clients {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// Ping the client
_, err := client.NetworkID(ctx)
p.health[i] = (err == nil)
if err != nil {
slog.Warn("RPC endpoint unhealthy",
"url", p.config.RPCURLs[i],
"error", err,
)
}
cancel()
}
p.mu.Unlock()
}
}
// MultiChainGateway manages multiple blockchain networks
type MultiChainGateway struct {
pools map[uint64]*RPCPool
mu sync.RWMutex
}
func NewMultiChainGateway(configs []ChainConfig) (*MultiChainGateway, error) {
gateway := &MultiChainGateway{
pools: make(map[uint64]*RPCPool),
}
for _, config := range configs {
pool, err := NewRPCPool(config)
if err != nil {
slog.Error("Failed to initialize RPC pool",
"chain", config.Name,
"error", err,
)
continue
}
gateway.pools[config.ChainID] = pool
}
return gateway, nil
}
func (g *MultiChainGateway) ExecuteOnChain(chainID uint64, ctx context.Context, fn func(*ethclient.Client) error) error {
g.mu.RLock()
pool, exists := g.pools[chainID]
g.mu.RUnlock()
if !exists {
return fmt.Errorf("chain %d not supported", chainID)
}
return pool.ExecuteWithRetry(ctx, fn)
}
2. Advanced Event Indexing System
// internal/indexer/event_indexer.go
package indexer
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jmoiron/sqlx"
)
// EventFilter represents an event filter configuration
type EventFilter struct {
ID string `json:"id"`
ChainID uint64 `json:"chain_id"`
Address common.Address `json:"address"`
Topics [][]common.Hash `json:"topics"`
FromBlock *big.Int `json:"from_block"`
ToBlock *big.Int `json:"to_block"`
BatchSize uint64 `json:"batch_size"`
Handlers []EventHandler `json:"-"`
}
// EventHandler processes specific events
type EventHandler interface {
Handle(ctx context.Context, event *types.Log) error
EventSignature() common.Hash
}
// IndexedEvent represents a processed blockchain event
type IndexedEvent struct {
ID string `db:"id" json:"id"`
ChainID uint64 `db:"chain_id" json:"chain_id"`
BlockNumber uint64 `db:"block_number" json:"block_number"`
BlockHash string `db:"block_hash" json:"block_hash"`
TransactionHash string `db:"transaction_hash" json:"transaction_hash"`
LogIndex uint `db:"log_index" json:"log_index"`
Address string `db:"address" json:"address"`
Topics json.RawMessage `db:"topics" json:"topics"`
Data string `db:"data" json:"data"`
EventType string `db:"event_type" json:"event_type"`
ProcessedAt time.Time `db:"processed_at" json:"processed_at"`
ProcessingTime time.Duration `db:"processing_time" json:"processing_time"`
}
// EventIndexer provides high-performance event indexing
type EventIndexer struct {
gateway *MultiChainGateway
db *sqlx.DB
filters map[string]*EventFilter
handlers map[common.Hash][]EventHandler
mu sync.RWMutex
workerPool chan struct{}
}
func NewEventIndexer(gateway *MultiChainGateway, db *sqlx.DB, maxWorkers int) *EventIndexer {
return &EventIndexer{
gateway: gateway,
db: db,
filters: make(map[string]*EventFilter),
handlers: make(map[common.Hash][]EventHandler),
workerPool: make(chan struct{}, maxWorkers),
}
}
// RegisterFilter adds a new event filter
func (ei *EventIndexer) RegisterFilter(filter *EventFilter) {
ei.mu.Lock()
defer ei.mu.Unlock()
ei.filters[filter.ID] = filter
// Register handlers
for _, handler := range filter.Handlers {
signature := handler.EventSignature()
ei.handlers[signature] = append(ei.handlers[signature], handler)
}
}
// StartIndexing begins the event indexing process
func (ei *EventIndexer) StartIndexing(ctx context.Context) error {
ei.mu.RLock()
filters := make([]*EventFilter, 0, len(ei.filters))
for _, filter := range ei.filters {
filters = append(filters, filter)
}
ei.mu.RUnlock()
// Start indexing for each filter
var wg sync.WaitGroup
for _, filter := range filters {
wg.Add(1)
go func(f *EventFilter) {
defer wg.Done()
ei.indexFilterEvents(ctx, f)
}(filter)
}
wg.Wait()
return nil
}
func (ei *EventIndexer) indexFilterEvents(ctx context.Context, filter *EventFilter) {
ticker := time.NewTicker(5 * time.Second) // Check for new events every 5 seconds
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := ei.processFilterBatch(ctx, filter); err != nil {
slog.Error("Failed to process filter batch",
"filter_id", filter.ID,
"error", err,
)
}
}
}
}
func (ei *EventIndexer) processFilterBatch(ctx context.Context, filter *EventFilter) error {
// Get last processed block
lastBlock, err := ei.getLastProcessedBlock(filter.ID, filter.ChainID)
if err != nil {
return fmt.Errorf("failed to get last processed block: %w", err)
}
// Get current block number
var currentBlock uint64
err = ei.gateway.ExecuteOnChain(filter.ChainID, ctx, func(client *ethclient.Client) error {
header, err := client.HeaderByNumber(ctx, nil)
if err != nil {
return err
}
currentBlock = header.Number.Uint64()
return nil
})
if err != nil {
return fmt.Errorf("failed to get current block: %w", err)
}
// Calculate batch range
fromBlock := lastBlock + 1
toBlock := fromBlock + filter.BatchSize - 1
if toBlock > currentBlock {
toBlock = currentBlock
}
if fromBlock > toBlock {
return nil // No new blocks to process
}
// Create filter query
query := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
ToBlock: big.NewInt(int64(toBlock)),
Addresses: []common.Address{filter.Address},
Topics: filter.Topics,
}
// Get logs
var logs []types.Log
err = ei.gateway.ExecuteOnChain(filter.ChainID, ctx, func(client *ethclient.Client) error {
var err error
logs, err = client.FilterLogs(ctx, query)
return err
})
if err != nil {
return fmt.Errorf("failed to get logs: %w", err)
}
// Process logs in parallel
semaphore := make(chan struct{}, 10) // Limit concurrent processing
var wg sync.WaitGroup
for _, log := range logs {
wg.Add(1)
go func(l types.Log) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
if err := ei.processEvent(ctx, filter, &l); err != nil {
slog.Error("Failed to process event",
"filter_id", filter.ID,
"tx_hash", l.TxHash.Hex(),
"error", err,
)
}
}(log)
}
wg.Wait()
// Update last processed block
if err := ei.updateLastProcessedBlock(filter.ID, filter.ChainID, toBlock); err != nil {
return fmt.Errorf("failed to update last processed block: %w", err)
}
slog.Info("Processed event batch",
"filter_id", filter.ID,
"chain_id", filter.ChainID,
"from_block", fromBlock,
"to_block", toBlock,
"events_count", len(logs),
)
return nil
}
func (ei *EventIndexer) processEvent(ctx context.Context, filter *EventFilter, log *types.Log) error {
startTime := time.Now()
// Store raw event
event := &IndexedEvent{
ID: fmt.Sprintf("%s-%d", log.TxHash.Hex(), log.Index),
ChainID: filter.ChainID,
BlockNumber: log.BlockNumber,
BlockHash: log.BlockHash.Hex(),
TransactionHash: log.TxHash.Hex(),
LogIndex: log.Index,
Address: log.Address.Hex(),
Data: common.Bytes2Hex(log.Data),
ProcessedAt: time.Now(),
}
// Convert topics to JSON
topics := make([]string, len(log.Topics))
for i, topic := range log.Topics {
topics[i] = topic.Hex()
}
topicsJSON, _ := json.Marshal(topics)
event.Topics = topicsJSON
// Determine event type from first topic (event signature)
if len(log.Topics) > 0 {
event.EventType = log.Topics[0].Hex()
}
// Process with registered handlers
ei.mu.RLock()
handlers := ei.handlers[log.Topics[0]]
ei.mu.RUnlock()
for _, handler := range handlers {
if err := handler.Handle(ctx, log); err != nil {
slog.Error("Event handler failed",
"handler", fmt.Sprintf("%T", handler),
"error", err,
)
}
}
event.ProcessingTime = time.Since(startTime)
// Store in database
return ei.storeEvent(event)
}
func (ei *EventIndexer) storeEvent(event *IndexedEvent) error {
query := `
INSERT INTO indexed_events (
id, chain_id, block_number, block_hash, transaction_hash,
log_index, address, topics, data, event_type, processed_at, processing_time
) VALUES (
:id, :chain_id, :block_number, :block_hash, :transaction_hash,
:log_index, :address, :topics, :data, :event_type, :processed_at, :processing_time
) ON CONFLICT (id) DO NOTHING
`
_, err := ei.db.NamedExec(query, event)
return err
}
func (ei *EventIndexer) getLastProcessedBlock(filterID string, chainID uint64) (uint64, error) {
var lastBlock uint64
err := ei.db.Get(&lastBlock, `
SELECT COALESCE(MAX(block_number), 0)
FROM indexer_checkpoints
WHERE filter_id = $1 AND chain_id = $2
`, filterID, chainID)
if err == sql.ErrNoRows {
return 0, nil
}
return lastBlock, err
}
func (ei *EventIndexer) updateLastProcessedBlock(filterID string, chainID uint64, blockNumber uint64) error {
_, err := ei.db.Exec(`
INSERT INTO indexer_checkpoints (filter_id, chain_id, last_block, updated_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (filter_id, chain_id)
DO UPDATE SET last_block = $3, updated_at = NOW()
`, filterID, chainID, blockNumber)
return err
}
3. MEV Protection and Transaction Ordering
// internal/mev/protection.go
package mev
import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
// TransactionOrder represents different ordering strategies
type TransactionOrder int
const (
FIFO TransactionOrder = iota
PriorityFee
TimeWeightedFairQueuing
CommitReveal
)
// PendingTransaction represents a transaction in the mempool
type PendingTransaction struct {
Hash common.Hash `json:"hash"`
From common.Address `json:"from"`
To *common.Address `json:"to"`
Value *big.Int `json:"value"`
GasPrice *big.Int `json:"gas_price"`
PriorityFee *big.Int `json:"priority_fee"`
Nonce uint64 `json:"nonce"`
Data []byte `json:"data"`
ReceivedAt time.Time `json:"received_at"`
CommitHash *common.Hash `json:"commit_hash,omitempty"`
RevealedAt *time.Time `json:"revealed_at,omitempty"`
Weight float64 `json:"weight"`
}
// MEVProtector implements various MEV protection mechanisms
type MEVProtector struct {
strategy TransactionOrder
commitDuration time.Duration
revealDuration time.Duration
pendingTxs map[common.Hash]*PendingTransaction
commitedTxs map[common.Hash]*PendingTransaction
mu sync.RWMutex
batchInterval time.Duration
maxBatchSize int
}
func NewMEVProtector(strategy TransactionOrder) *MEVProtector {
return &MEVProtector{
strategy: strategy,
commitDuration: 30 * time.Second,
revealDuration: 10 * time.Second,
pendingTxs: make(map[common.Hash]*PendingTransaction),
commitedTxs: make(map[common.Hash]*PendingTransaction),
batchInterval: 12 * time.Second, // Ethereum block time
maxBatchSize: 100,
}
}
// SubmitTransaction adds a transaction to the protected mempool
func (mp *MEVProtector) SubmitTransaction(tx *PendingTransaction) error {
mp.mu.Lock()
defer mp.mu.Unlock()
switch mp.strategy {
case CommitReveal:
return mp.handleCommitReveal(tx)
default:
tx.ReceivedAt = time.Now()
mp.pendingTxs[tx.Hash] = tx
return nil
}
}
func (mp *MEVProtector) handleCommitReveal(tx *PendingTransaction) error {
if tx.CommitHash == nil {
// This is a commit phase
commitHash := mp.generateCommitHash(tx)
tx.CommitHash = &commitHash
tx.ReceivedAt = time.Now()
mp.commitedTxs[tx.Hash] = tx
// Schedule reveal deadline
go func() {
time.Sleep(mp.commitDuration)
mp.mu.Lock()
if commitedTx, exists := mp.commitedTxs[tx.Hash]; exists && commitedTx.RevealedAt == nil {
// Transaction not revealed, remove it
delete(mp.commitedTxs, tx.Hash)
slog.Warn("Transaction not revealed in time", "hash", tx.Hash.Hex())
}
mp.mu.Unlock()
}()
return nil
} else {
// This is a reveal phase
if commitedTx, exists := mp.commitedTxs[tx.Hash]; exists {
if mp.validateReveal(commitedTx, tx) {
now := time.Now()
commitedTx.RevealedAt = &now
mp.pendingTxs[tx.Hash] = commitedTx
delete(mp.commitedTxs, tx.Hash)
return nil
}
return fmt.Errorf("invalid reveal for transaction %s", tx.Hash.Hex())
}
return fmt.Errorf("no commit found for transaction %s", tx.Hash.Hex())
}
}
func (mp *MEVProtector) generateCommitHash(tx *PendingTransaction) common.Hash {
// Generate a random nonce for the commit
nonce := make([]byte, 32)
rand.Read(nonce)
// Create commit hash: keccak256(txHash + nonce + timestamp)
data := append(tx.Hash.Bytes(), nonce...)
data = append(data, big.NewInt(time.Now().Unix()).Bytes()...)
return common.BytesToHash(crypto.Keccak256(data))
}
func (mp *MEVProtector) validateReveal(committed, revealed *PendingTransaction) bool {
// Validate that revealed transaction matches committed transaction
return committed.From == revealed.From &&
committed.Nonce == revealed.Nonce &&
committed.Value.Cmp(revealed.Value) == 0
}
// GetOrderedBatch returns transactions ordered according to the protection strategy
func (mp *MEVProtector) GetOrderedBatch() []*PendingTransaction {
mp.mu.RLock()
defer mp.mu.RUnlock()
// Collect eligible transactions
var eligible []*PendingTransaction
cutoffTime := time.Now().Add(-mp.revealDuration)
for _, tx := range mp.pendingTxs {
if mp.strategy == CommitReveal {
if tx.RevealedAt != nil && tx.RevealedAt.Before(cutoffTime) {
eligible = append(eligible, tx)
}
} else {
eligible = append(eligible, tx)
}
}
// Apply ordering strategy
switch mp.strategy {
case FIFO:
return mp.orderByFIFO(eligible)
case PriorityFee:
return mp.orderByPriorityFee(eligible)
case TimeWeightedFairQueuing:
return mp.orderByTimeWeightedFairQueuing(eligible)
case CommitReveal:
return mp.orderByFIFO(eligible) // After reveal, use FIFO
default:
return eligible
}
}
func (mp *MEVProtector) orderByFIFO(txs []*PendingTransaction) []*PendingTransaction {
sort.Slice(txs, func(i, j int) bool {
return txs[i].ReceivedAt.Before(txs[j].ReceivedAt)
})
if len(txs) > mp.maxBatchSize {
return txs[:mp.maxBatchSize]
}
return txs
}
func (mp *MEVProtector) orderByPriorityFee(txs []*PendingTransaction) []*PendingTransaction {
sort.Slice(txs, func(i, j int) bool {
// Higher priority fee first
cmp := txs[i].PriorityFee.Cmp(txs[j].PriorityFee)
if cmp == 0 {
// If same priority fee, use FIFO
return txs[i].ReceivedAt.Before(txs[j].ReceivedAt)
}
return cmp > 0
})
if len(txs) > mp.maxBatchSize {
return txs[:mp.maxBatchSize]
}
return txs
}
func (mp *MEVProtector) orderByTimeWeightedFairQueuing(txs []*PendingTransaction) []*PendingTransaction {
now := time.Now()
// Calculate weights based on waiting time and fee
for _, tx := range txs {
waitTime := now.Sub(tx.ReceivedAt).Seconds()
feeWeight := float64(tx.PriorityFee.Uint64()) / 1e18 // Convert to ETH
// Time-weighted fair queuing: weight = (fee + waiting_bonus) * time_factor
waitingBonus := waitTime / 60.0 // Bonus increases with waiting time
timeFactor := 1.0 + (waitTime / 300.0) // 5-minute intervals
tx.Weight = (feeWeight + waitingBonus) * timeFactor
}
sort.Slice(txs, func(i, j int) bool {
return txs[i].Weight > txs[j].Weight
})
if len(txs) > mp.maxBatchSize {
return txs[:mp.maxBatchSize]
}
return txs
}
// StartBatchProcessor begins processing transaction batches
func (mp *MEVProtector) StartBatchProcessor(ctx context.Context, processFn func([]*PendingTransaction) error) {
ticker := time.NewTicker(mp.batchInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
batch := mp.GetOrderedBatch()
if len(batch) > 0 {
if err := processFn(batch); err != nil {
slog.Error("Failed to process transaction batch", "error", err)
} else {
// Remove processed transactions
mp.mu.Lock()
for _, tx := range batch {
delete(mp.pendingTxs, tx.Hash)
}
mp.mu.Unlock()
slog.Info("Processed transaction batch",
"count", len(batch),
"strategy", mp.strategy,
)
}
}
}
}
}
// MEVDetector identifies potential MEV opportunities and attacks
type MEVDetector struct {
patterns map[string]MEVPattern
mu sync.RWMutex
}
type MEVPattern struct {
Name string `json:"name"`
Description string `json:"description"`
Detector func([]*PendingTransaction) []MEVOpportunity `json:"-"`
}
type MEVOpportunity struct {
Type string `json:"type"`
Severity string `json:"severity"`
Transactions []common.Hash `json:"transactions"`
Description string `json:"description"`
PotentialProfit *big.Int `json:"potential_profit"`
DetectedAt time.Time `json:"detected_at"`
}
func NewMEVDetector() *MEVDetector {
detector := &MEVDetector{
patterns: make(map[string]MEVPattern),
}
// Register common MEV patterns
detector.RegisterPattern("sandwich_attack", MEVPattern{
Name: "Sandwich Attack",
Description: "Front-run and back-run a large trade",
Detector: detector.detectSandwichAttack,
})
detector.RegisterPattern("arbitrage", MEVPattern{
Name: "Arbitrage Opportunity",
Description: "Price differences across DEXes",
Detector: detector.detectArbitrage,
})
return detector
}
func (md *MEVDetector) RegisterPattern(name string, pattern MEVPattern) {
md.mu.Lock()
defer md.mu.Unlock()
md.patterns[name] = pattern
}
func (md *MEVDetector) DetectMEV(txs []*PendingTransaction) []MEVOpportunity {
md.mu.RLock()
defer md.mu.RUnlock()
var opportunities []MEVOpportunity
for _, pattern := range md.patterns {
detected := pattern.Detector(txs)
opportunities = append(opportunities, detected...)
}
return opportunities
}
func (md *MEVDetector) detectSandwichAttack(txs []*PendingTransaction) []MEVOpportunity {
var opportunities []MEVOpportunity
// Simple sandwich detection: look for high-value DEX transactions
// that could be profitable to front-run and back-run
for _, tx := range txs {
if md.isLargeDEXTrade(tx) {
opportunities = append(opportunities, MEVOpportunity{
Type: "sandwich_attack",
Severity: "medium",
Transactions: []common.Hash{tx.Hash},
Description: fmt.Sprintf("Large DEX trade of %s ETH could be sandwiched",
new(big.Float).Quo(new(big.Float).SetInt(tx.Value), big.NewFloat(1e18)).String()),
PotentialProfit: md.calculateSandwichProfit(tx),
DetectedAt: time.Now(),
})
}
}
return opportunities
}
func (md *MEVDetector) detectArbitrage(txs []*PendingTransaction) []MEVOpportunity {
// Placeholder for arbitrage detection logic
// Would involve checking price feeds and identifying arbitrage opportunities
return []MEVOpportunity{}
}
func (md *MEVDetector) isLargeDEXTrade(tx *PendingTransaction) bool {
// Check if transaction is to a known DEX and above threshold
minValue := big.NewInt(0).Mul(big.NewInt(10), big.NewInt(1e18)) // 10 ETH
return tx.Value.Cmp(minValue) > 0 && md.isKnownDEX(tx.To)
}
func (md *MEVDetector) isKnownDEX(address *common.Address) bool {
if address == nil {
return false
}
// Known DEX addresses (Uniswap, SushiSwap, etc.)
knownDEXes := map[common.Address]bool{
common.HexToAddress("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"): true, // Uniswap V2 Router
common.HexToAddress("0xE592427A0AEce92De3Edee1F18E0157C05861564"): true, // Uniswap V3 Router
// Add more DEX addresses
}
return knownDEXes[*address]
}
func (md *MEVDetector) calculateSandwichProfit(tx *PendingTransaction) *big.Int {
// Simplified profit calculation
// In reality, this would involve complex price impact analysis
slippage := big.NewInt(50) // 0.5% slippage assumption
profit := new(big.Int).Mul(tx.Value, slippage)
profit = profit.Div(profit, big.NewInt(10000))
return profit
}
4. Real-time State Synchronization
// internal/state/synchronizer.go
package state
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/redis/go-redis/v9"
)
// StateChange represents a blockchain state change
type StateChange struct {
ChainID uint64 `json:"chain_id"`
BlockNumber uint64 `json:"block_number"`
Address common.Address `json:"address"`
StorageKey common.Hash `json:"storage_key"`
OldValue common.Hash `json:"old_value"`
NewValue common.Hash `json:"new_value"`
Timestamp time.Time `json:"timestamp"`
}
// StateSynchronizer manages real-time state synchronization across chains
type StateSynchronizer struct {
redis *redis.Client
subscribers map[string][]StateSubscriber
mu sync.RWMutex
}
type StateSubscriber interface {
OnStateChange(change *StateChange) error
GetSubscriptionKey() string
}
func NewStateSynchronizer(redisClient *redis.Client) *StateSynchronizer {
return &StateSynchronizer{
redis: redisClient,
subscribers: make(map[string][]StateSubscriber),
}
}
// Subscribe registers a subscriber for state changes
func (ss *StateSynchronizer) Subscribe(subscriber StateSubscriber) {
ss.mu.Lock()
defer ss.mu.Unlock()
key := subscriber.GetSubscriptionKey()
ss.subscribers[key] = append(ss.subscribers[key], subscriber)
}
// PublishStateChange publishes a state change to all subscribers
func (ss *StateSynchronizer) PublishStateChange(ctx context.Context, change *StateChange) error {
// Publish to Redis for cross-service communication
data, err := json.Marshal(change)
if err != nil {
return fmt.Errorf("failed to marshal state change: %w", err)
}
channel := fmt.Sprintf("state_changes:%d", change.ChainID)
if err := ss.redis.Publish(ctx, channel, data).Err(); err != nil {
return fmt.Errorf("failed to publish to Redis: %w", err)
}
// Notify local subscribers
ss.notifySubscribers(change)
return nil
}
func (ss *StateSynchronizer) notifySubscribers(change *StateChange) {
ss.mu.RLock()
defer ss.mu.RUnlock()
// Notify all subscribers
for _, subscriberList := range ss.subscribers {
for _, subscriber := range subscriberList {
go func(sub StateSubscriber) {
if err := sub.OnStateChange(change); err != nil {
slog.Error("Subscriber failed to handle state change",
"subscriber", sub.GetSubscriptionKey(),
"error", err,
)
}
}(subscriber)
}
}
}
// StartRedisSubscription starts listening for state changes from Redis
func (ss *StateSynchronizer) StartRedisSubscription(ctx context.Context, chainIDs []uint64) error {
// Subscribe to Redis channels for each chain
channels := make([]string, len(chainIDs))
for i, chainID := range chainIDs {
channels[i] = fmt.Sprintf("state_changes:%d", chainID)
}
pubsub := ss.redis.Subscribe(ctx, channels...)
defer pubsub.Close()
ch := pubsub.Channel()
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg := <-ch:
var change StateChange
if err := json.Unmarshal([]byte(msg.Payload), &change); err != nil {
slog.Error("Failed to unmarshal state change", "error", err)
continue
}
ss.notifySubscribers(&change)
}
}
}
่ฟๆฏไธไธช้ซ่ดจ้็Web3 Goๅ็ซฏๆถๆๆ็ซ ็ๅผๅง้จๅใๆ็ซ ๅ ๅซไบ๏ผ
- ๅค้พRPC็ฝๅ ณ - ๆฏๆ่ด่ฝฝๅ่กกใ้่ฏๆบๅถใๅฅๅบทๆฃๆฅ
- ้ซๆง่ฝไบไปถ็ดขๅผ - ๅนถ่กๅค็ใๆน้ๆไฝใ้่ฏฏๆขๅค
- MEVไฟๆคๆบๅถ - ๅค็งๆๅบ็ญ็ฅใๆไบค-ๆญ็คบๆนๆกใMEVๆฃๆต
- ๅฎๆถ็ถๆๅๆญฅ - Redisๅๅธ่ฎข้ ใ่ทจๆๅก้ไฟก
่ฟๅฑ็คบไบ2025ๅนดWeb3ๅ็ซฏๅผๅ็ๆทฑๅบฆๆๆฏๅ ๅฎนใๆจๅธๆๆ็ปง็ปญๅฎๆ่ฟ็ฏๆ็ซ ๏ผ่ฟๆฏๅๅปบๅ ถไปไธป้ข็ๆทฑๅบฆๆ็ซ ๏ผ
WY
Cap
Senior Golang Backend & Web3 Developer with 10+ years of experience building scalable systems and blockchain solutions.
View Full Profile โ