自学内容网 自学内容网

深入理解ETH交易池

深入理解ETH 交易池(源码分析)

TxPool 数据结构

  • pending : 当前可以处理的(excuteable)
  • queue : 当前不能处理的(non-excuteable)
  • all : 所有允许查询的交易集合

// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config       TxPoolConfig
chainconfig  *params.ChainConfig
chain        blockChain
gasPrice     *big.Int
txFeed       event.Feed
scope        event.SubscriptionScope
chainHeadCh  chan ChainHeadEvent
chainHeadSub event.Subscription
signer       types.Signer
mu           sync.RWMutex

currentState  *state.StateDB      // Current state in the blockchain head
pendingState  *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas uint64              // Current gas limit for transaction caps

locals  *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal  // Journal of local transaction to back up to disk

pending map[common.Address]*txList   // All currently processable transactions
queue   map[common.Address]*txList   // Queued but non-processable transactions
beats   map[common.Address]time.Time // Last heartbeat from each known account
all     *txLookup                    // All transactions to allow lookups
priced  *txPricedList                // All transactions sorted by price

wg sync.WaitGroup // for shutdown sync

homestead bool
}

交易池RPC

https://ethereum.stackexchange.com/questions/3831/what-is-the-max-size-of-transactions-can-clients-like-geth-keep-in-txpool

  • 开始 tx_pool 的RPC接口启动geth, 添加配置选项 --rpcapi txpool

  • PublicTxPoolAPI

  • PublicTransactionPoolAPI

TRANSACTION POOL OPTIONS:

--txpool.locals value               Comma separated accounts to treat as locals (no flush, priority inclusion)  配置本地的地址列表, 优先包含到交易池中, 且当交易池满了的时候, 不除本地地址的交易


--txpool.nolocals                   Disables price exemptions for locally submitted transactions


--txpool.journal value              Disk journal for local transaction to survive node restarts (default: "transactions.rlp")


--txpool.rejournal value            Time interval to regenerate the local transaction journal (default: 1h0m0s)


--txpool.pricelimit value           Minimum gas price limit to enforce for acceptance into the pool (default: 1)


--txpool.pricebump value            Price bump percentage to replace an already existing transaction (default: 10)


--txpool.accountslots value         Minimum number of executable transaction slots guaranteed per account (default: 16)


--txpool.globalslots value          Maximum number of executable transaction slots for all accounts (default: 4096)


--txpool.accountqueue value         Maximum number of non-executable transaction slots permitted per account (default: 64)


--txpool.globalqueue value          Maximum number of non-executable transaction slots for all accounts (default: 1024)


--txpool.lifetime value             Maximum amount of time non-executable transaction are queued (default: 3h0m0s)   

  • 其中 --txpool.lifetime value 为一笔交易的最大的存活时间, 超过这个时间, 交易会被当前节点的txpool清除, 但是如果交易数以local地址的交易, 则不进行清除,

    问题: 其他peers节点上的这笔交易是否包含queued交易, 是否会清除这笔交易呢?

func (pool *TxPool) promoteExecutables(accounts []common.Address) {
    //.....
    //注意: 如果 poolConfig.locals 中包本地地址, 则不删除本地交易
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
    }
    //....
    for addr := range pool.queue {
        if !pool.locals.contains(addr) { // don't drop locals, 不删除 locals的交易
            addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
        }
}
    //.....
}
  • 将一个交易从pool.pending中移除, 并将其子交易(nonce大于此笔交易的)从pool.pending队列中移除, 加到 pool.queue队列, 将来再处理
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

// Remove it from the list of known transactions
pool.all.Remove(hash) 
if outofbound {
pool.priced.Removed()
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
// If no more pending transactions are left, remove the list
if pending.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
            
            //将所有子交易放到queued队列中
// Postpone any invalidated transactions
for _, tx := range invalids {
pool.enqueueTx(tx.Hash(), tx)
}
// Update the account nonce if needed
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
return
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
}
}
}
  • 将交易加入 pool.queue 队列中
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
}
  • 将交易从pool.queue中重新拿出来放到 pool.pending队列中
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction

// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
}
}
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
for addr, list := range pool.pending {
            
            //注意: 如果 poolConfig.locals 中包本地地址, 则不删除本地交易
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))

// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()

// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()

// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
// If still above threshold, reduce to limit or min allowance
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()

// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals, 不删除 locals的交易
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)

// Drop transactions until the total is below the limit or only locals remain
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]

addresses = addresses[:len(addresses)-1]

// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}

txpool公共接口

HTTP调用

curl -X POST 192.168.10.199:18545 -H 'Content-Type':'application/json' -d '{"method":"txpool_content", "jsonrpc":"2.0", "params":[], "id":1}'

使用 httpie
yum install httpie  或  pip install httpie

http --pretty format  POST  192.168.10.199:18545 jsonrpc='2.0' method=txpool_content params:='[]' id=1



txpool_content.json内容

{
    "jsonrpc":"2.0",
    "method":"txpool_content",
    "params":[],
    "id":1
}

http POST 192.168.10.199:18545 < txpool_content.json   --pretty format > tx_pools.json
//txpool.content

// Content returns the transactions contained within the transaction pool.
func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
content := map[string]map[string]map[string]*RPCTransaction{
"pending": make(map[string]map[string]*RPCTransaction),
"queued":  make(map[string]map[string]*RPCTransaction),
}
pending, queue := s.b.TxPoolContent()

// Flatten the pending transactions
for account, txs := range pending {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx)
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx)
}
content["queued"][account.Hex()] = dump
}
return content
}

// Status returns the number of pending and queued transaction in the pool.
func (s *PublicTxPoolAPI) Status() map[string]hexutil.Uint {
pending, queue := s.b.Stats()
return map[string]hexutil.Uint{
"pending": hexutil.Uint(pending),
"queued":  hexutil.Uint(queue),
}
}

// Inspect retrieves the content of the transaction pool and flattens it into an
// easily inspectable list.
func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string {
content := map[string]map[string]map[string]string{
"pending": make(map[string]map[string]string),
"queued":  make(map[string]map[string]string),
}
pending, queue := s.b.TxPoolContent()

// Define a formatter to flatten a transaction into a string
var format = func(tx *types.Transaction) string {
if to := tx.To(); to != nil {
return fmt.Sprintf("%s: %v wei + %v gas × %v wei", tx.To().Hex(), tx.Value(), tx.Gas(), tx.GasPrice())
}
return fmt.Sprintf("contract creation: %v wei + %v gas × %v wei", tx.Value(), tx.Gas(), tx.GasPrice())
}
// Flatten the pending transactions
for account, txs := range pending {
dump := make(map[string]string)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = format(tx)
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]string)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = format(tx)
}
content["queued"][account.Hex()] = dump
}
return content
}

geth 的 console 中获取 txpool 示例:

> txpool.content.queued['0xD7A4a381Fca4be6b8A645d858f1bDc3107Ac3f5D']
{
  313: {
    blockHash: null,
    blockNumber: null,
    from: "0xd7a4a381fca4be6b8a645d858f1bdc3107ac3f5d",
    gas: "0x249f0",
    gasPrice: "0x4e3b29200",
    hash: "0x3b0e83e739d7b0c7be27e7a08d5d4fb8111efa98086bf8ba5da738333508b301",
    input: "0xa9059cbb0000000000000000000000009083b493d83912e7bd7435abfeafb99e724540e2000000000000000000000000000000000000000000000000000000007744d62b",
    nonce: "0x139",
    r: "0x84ff7db834955fb8d6d75e1c1dbd3ed7f45797b79ac25def1f885f69e235384d",
    s: "0x71ff38c7ebe55eca6ea6a8dd9bd2a88e2b5639496dad3428eed6adc56c0c72d4",
    to: "0xeca059f3d6de135e520e789cdfeecbf5ceca3770",
    transactionIndex: null,
    v: "0x2b",
    value: "0x0"
  },
  314: {
    blockHash: null,
    blockNumber: null,
    from: "0xd7a4a381fca4be6b8a645d858f1bdc3107ac3f5d",
    gas: "0x249f0",
    gasPrice: "0x4e3b29200",
    hash: "0x9a5ee8870bc39f8bfa7f665a05d3031e752dee5e41b250222877c4a2024e4289",
    input: "0xa9059cbb0000000000000000000000009083b493d83912e7bd7435abfeafb99e724540e2000000000000000000000000000000000000000000000000000000007d2b74ec",
    nonce: "0x13a",
    r: "0x5c9bb58256c875a17a1bc626eb54ed242bb3eafe3639d4f01b5bd88cf8392b7b",
    s: "0x2bebf80deac40c2674bc2c494debfa0770adb7a46b5345d123585f6cecb952eb",
    to: "0xeca059f3d6de135e520e789cdfeecbf5ceca3770",
    transactionIndex: null,
    v: "0x2b",
    value: "0x0"
  }
}
> 

问题: 节点是否会将sendRawTransaction RPC接口收到的交易加入到 local(本地交易池中)?
  • internal/ethapi/api.go

    // SendRawTransaction will add the signed transaction to the transaction pool.
    // The sender is responsible for signing the transaction and using the correct nonce.
    func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) {
    tx := new(types.Transaction)
    if err := rlp.DecodeBytes(encodedTx, tx); err != nil {
    return common.Hash{}, err
    }
    return SubmitTransaction(ctx, s.b, tx)
    }
    
    
    // SubmitTransaction is a helper function that submits tx to txPool and logs a message.
    func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
        
        //调用backend提交交易到txpool
    if err := b.SendTx(ctx, tx); err != nil {
    return common.Hash{}, err
    }
    if tx.To() == nil {
    signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
    from, err := types.Sender(signer, tx)
    if err != nil {
    return common.Hash{}, err
    }
    addr := crypto.CreateAddress(from, tx.Nonce())
    log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex())
    } else {
    log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To())
    }
    return tx.Hash(), nil
    }
    
    
  • api/backend.go

    //将交易添加到local队列中
    func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
    return b.eth.txPool.AddLocal(signedTx)
    }
    
  • core/tx_pool.go

    AddLocal函数的注释已经说的很清楚 , 会将有效的交易加入到local队列中

    // AddLocal enqueues a single transaction into the pool if it is valid, marking
    // the sender as a local one in the mean time, ensuring it goes around the local
    // pricing constraints.
    func (pool *TxPool) AddLocal(tx *types.Transaction) error {
    return pool.addTx(tx, !pool.config.NoLocals)
    }
    
    // addTx enqueues a single transaction into the pool if it is valid.
    func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
    // Cache sender in transaction before obtaining lock (pool.signer is immutable)
    types.Sender(pool.signer, tx)
    
    pool.mu.Lock()
    defer pool.mu.Unlock()
    
    // Try to inject the transaction and update any state
    replace, err := pool.add(tx, local)
    if err != nil {
    return err
    }
    // If we added a new transaction, run promotion checks and return
    if !replace {
    from, _ := types.Sender(pool.signer, tx) // already validated
    pool.promoteExecutables([]common.Address{from})
    }
    return nil
    }
    
    
  • core/tx_pool.go

    add函数的注释也已经说的很清楚, 如果一笔新的交易被标记为local, 那么发送地址会被加入到白名单中, 并且阻止交易池因为价格约束将此地址(发送地址)相关交易移除

    
    // add validates a transaction and inserts it into the non-executable queue for
    // later pending promotion and execution. If the transaction is a replacement for
    // an already pending or queued one, it overwrites the previous and returns this
    // so outer code doesn't uselessly call promote.
    //
    // If a newly added transaction is marked as local, its sending account will be
    // whitelisted, preventing any associated transaction from being dropped out of
    // the pool due to pricing constraints.
    func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
    // If the transaction is already known, discard it
    hash := tx.Hash()
    if pool.all.Get(hash) != nil {
    log.Trace("Discarding already known transaction", "hash", hash)
    return false, fmt.Errorf("known transaction: %x", hash)
    }
    // If the transaction fails basic validation, discard it
    if err := pool.validateTx(tx, local); err != nil {
    log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
    invalidTxCounter.Inc(1)
    return false, err
    }
    // If the transaction pool is full, discard underpriced transactions
    if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
    // If the new transaction is underpriced, don't accept it
    if !local && pool.priced.Underpriced(tx, pool.locals) {
    log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
    underpricedTxCounter.Inc(1)
    return false, ErrUnderpriced
    }
    // New transaction is better than our worse ones, make room for it
    drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
    for _, tx := range drop {
    log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
    underpricedTxCounter.Inc(1)
    pool.removeTx(tx.Hash(), false)
    }
    }
    // If the transaction is replacing an already pending one, do directly
    from, _ := types.Sender(pool.signer, tx) // already validated
    if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
    // Nonce already pending, check if required price bump is met
    inserted, old := list.Add(tx, pool.config.PriceBump)
    if !inserted {
    pendingDiscardCounter.Inc(1)
    return false, ErrReplaceUnderpriced
    }
    // New transaction is better, replace old one
    if old != nil {
    pool.all.Remove(old.Hash())
    pool.priced.Removed()
    pendingReplaceCounter.Inc(1)
    }
    pool.all.Add(tx)
    pool.priced.Put(tx)
    pool.journalTx(from, tx)
    
    log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
    
    // We've directly injected a replacement transaction, notify subsystems
    go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
    
    return old != nil, nil
    }
    // New transaction isn't replacing a pending one, push into queue
    replace, err := pool.enqueueTx(hash, tx)
    if err != nil {
    return false, err
    }
    // Mark local addresses and journal local transactions
    if local {
    if !pool.locals.contains(from) {
    log.Info("Setting new local account", "address", from)
    pool.locals.add(from)
    }
    }
    pool.journalTx(from, tx)
    
    log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
    return replace, nil
    }
    
问题: 当节点挂了, pool.queue中的交易是否会丢失?

pool.queue中的交易会保存在 geth/transaction.rlp中, 当节点挂掉, 重启之后会从 transaction.rlp中读取交易, 并加载到txpool.queue中, 如下:

  Offset: 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F 
00000000: F8 AC 82 01 39 85 04 E3 B2 92 00 83 02 49 F0 94    x,..9..c2....Ip.
00000010: EC A0 59 F3 D6 DE 13 5E 52 0E 78 9C DF EE CB F5    l.YsV^.^R.x._nKu
00000020: CE CA 37 70 80 B8 44 A9 05 9C BB 00 00 00 00 00    NJ7p.8D)..;.....
00000030: 00 00 00 00 00 00 00 90 83 B4 93 D8 39 12 E7 BD    .........4.X9.g=
00000040: 74 35 AB FE AF B9 9E 72 45 40 E2 00 00 00 00 00    t5+~/9.rE@b.....
00000050: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00    ................
00000060: 00 00 00 00 00 00 00 77 44 D6 2B 2B A0 84 FF 7D    .......wDV++...}
00000070: B8 34 95 5F B8 D6 D7 5E 1C 1D BD 3E D7 F4 57 97    84._8VW^..=>WtW.
00000080: B7 9A C2 5D EF 1F 88 5F 69 E2 35 38 4D A0 71 FF    7.B]o.._ib58M.q.
00000090: 38 C7 EB E5 5E CA 6E A6 A8 DD 9B D2 A8 8E 2B 56    8Gke^Jn&(].R(.+V
000000a0: 39 49 6D AD 34 28 EE D6 AD C5 6C 0C 72 D4 F8 AC    9Im-4(nV-El.rTx,
000000b0: 82 01 3A 85 04 E3 B2 92 00 83 02 49 F0 94 EC A0    ..:..c2....Ip.l.
000000c0: 59 F3 D6 DE 13 5E 52 0E 78 9C DF EE CB F5 CE CA    YsV^.^R.x._nKuNJ
000000d0: 37 70 80 B8 44 A9 05 9C BB 00 00 00 00 00 00 00    7p.8D)..;.......
000000e0: 00 00 00 00 00 90 83 B4 93 D8 39 12 E7 BD 74 35    .......4.X9.g=t5
000000f0: AB FE AF B9 9E 72 45 40 E2 00 00 00 00 00 00 00    +~/9.rE@b.......
00000100: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00    ................
00000110: 00 00 00 00 00 7D 2B 74 EC 2B A0 5C 9B B5 82 56    .....}+tl+.\.5.V
00000120: C8 75 A1 7A 1B C6 26 EB 54 ED 24 2B B3 EA FE 36    Hu!z.F&kTm$+3j~6
00000130: 39 D4 F0 1B 5B D8 8C F8 39 2B 7B A0 2B EB F8 0D    9Tp.[X.x9+{.+kx.
00000140: EA C4 0C 26 74 BC 2C 49 4D EB FA 07 70 AD B7 A4    jD.&t<,IMkz.p-7$
00000150: 6B 53 45 D1 23 58 5F 6C EC B9 52 EB F8 6D 82 1B    kSEQ#X_ll9Rkxm..
00000160: 95 85 04 A8 17 C8 00 82 52 08 94 95 4D 1A 58 C7    ...(.H..R...M.XG
00000170: AB D4 AC 8E BE 05 F5 91 91 CF 71 8E B0 CB 89 87    +T,.>.u..Oq.0K..
00000180: 03 8D 7E A4 C6 80 00 80 1B A0 47 B7 90 AD 06 3C    ..~$F.....G7.-.<
00000190: AE 06 93 B8 B2 DB 41 5D 55 64 5D B2 5D 84 B1 62    ...82[A]Ud]2].1b
000001a0: 2D B9 AC 01 BD 9B A4 32 4F 45 A0 6B 2D E6 8B 30    -9,.=.$2OE.k-f.0
000001b0: 58 F7 8F 79 43 A6 41 A0 11 2F 4C B9 0C 16 6F 07    Xw.yC&A../L9..o.
000001c0: 2A 1E A2 E2 9B 89 63 A0 4C 76 B6                   *."b..c.Lv6
  • core/tx_pool.go

// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

// Create the transaction pool with its initial settings
pool := &TxPool{
config:      config,
chainconfig: chainconfig,
chain:       chain,
signer:      types.NewEIP155Signer(chainconfig.ChainID),
pending:     make(map[common.Address]*txList),
queue:       make(map[common.Address]*txList),
beats:       make(map[common.Address]time.Time),
all:         newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice:    new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())

// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
        
        //从文件(默认是transaction.rlp)中加载(本地)交易
pool.journal = newTxJournal(config.Journal)

if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
// Subscribe events from blockchain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)

// Start the event loop and return
pool.wg.Add(1)
go pool.loop()

return pool
}


原文地址:https://blog.csdn.net/qq_67479387/article/details/140226618

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!