深入理解ETH交易池
深入理解ETH 交易池(源码分析)
TxPool 数据结构
- pending : 当前可以处理的(excuteable)
- queue : 当前不能处理的(non-excuteable)
- all : 所有允许查询的交易集合
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
homestead bool
}
交易池RPC
https://ethereum.stackexchange.com/questions/3831/what-is-the-max-size-of-transactions-can-clients-like-geth-keep-in-txpool
-
开始
tx_pool
的RPC接口启动geth, 添加配置选项--rpcapi txpool
-
PublicTxPoolAPI
-
PublicTransactionPoolAPI
TRANSACTION POOL OPTIONS:
--txpool.locals value Comma separated accounts to treat as locals (no flush, priority inclusion) 配置本地的地址列表, 优先包含到交易池中, 且当交易池满了的时候, 不除本地地址的交易
--txpool.nolocals Disables price exemptions for locally submitted transactions
--txpool.journal value Disk journal for local transaction to survive node restarts (default: "transactions.rlp")
--txpool.rejournal value Time interval to regenerate the local transaction journal (default: 1h0m0s)
--txpool.pricelimit value Minimum gas price limit to enforce for acceptance into the pool (default: 1)
--txpool.pricebump value Price bump percentage to replace an already existing transaction (default: 10)
--txpool.accountslots value Minimum number of executable transaction slots guaranteed per account (default: 16)
--txpool.globalslots value Maximum number of executable transaction slots for all accounts (default: 4096)
--txpool.accountqueue value Maximum number of non-executable transaction slots permitted per account (default: 64)
--txpool.globalqueue value Maximum number of non-executable transaction slots for all accounts (default: 1024)
--txpool.lifetime value Maximum amount of time non-executable transaction are queued (default: 3h0m0s)
-
其中
--txpool.lifetime value
为一笔交易的最大的存活时间, 超过这个时间, 交易会被当前节点的txpool清除, 但是如果交易数以local地址
的交易, 则不进行清除,问题: 其他peers节点上的这笔交易是否包含queued交易, 是否会清除这笔交易呢?
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
//.....
//注意: 如果 poolConfig.locals 中包本地地址, 则不删除本地交易
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
//....
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals, 不删除 locals的交易
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
//.....
}
- 将一个交易从
pool.pending
中移除, 并将其子交易(nonce大于此笔交易的)从pool.pending
队列中移除, 加到pool.queue
队列, 将来再处理
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// Remove it from the list of known transactions
pool.all.Remove(hash)
if outofbound {
pool.priced.Removed()
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
// If no more pending transactions are left, remove the list
if pending.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
//将所有子交易放到queued队列中
// Postpone any invalidated transactions
for _, tx := range invalids {
pool.enqueueTx(tx.Hash(), tx)
}
// Update the account nonce if needed
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
return
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
}
}
}
- 将交易加入
pool.queue
队列中
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
}
- 将交易从
pool.queue
中重新拿出来放到pool.pending
队列中
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
// Drop all transactions over the allowed limit
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
}
}
// Notify subsystem for new promoted transactions.
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
// If the pending limit is overflown, start equalizing allowances
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
for addr, list := range pool.pending {
//注意: 如果 poolConfig.locals 中包本地地址, 则不删除本地交易
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()
// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
// If still above threshold, reduce to limit or min allowance
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
// Drop the transaction from the global pools too
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
// Update the account nonce to the dropped transaction
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals, 不删除 locals的交易
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)
// Drop transactions until the total is below the limit or only locals remain
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}
txpool公共接口
HTTP调用
curl -X POST 192.168.10.199:18545 -H 'Content-Type':'application/json' -d '{"method":"txpool_content", "jsonrpc":"2.0", "params":[], "id":1}'
使用 httpie
yum install httpie 或 pip install httpie
http --pretty format POST 192.168.10.199:18545 jsonrpc='2.0' method=txpool_content params:='[]' id=1
txpool_content.json内容
{
"jsonrpc":"2.0",
"method":"txpool_content",
"params":[],
"id":1
}
http POST 192.168.10.199:18545 < txpool_content.json --pretty format > tx_pools.json
//txpool.content
// Content returns the transactions contained within the transaction pool.
func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction {
content := map[string]map[string]map[string]*RPCTransaction{
"pending": make(map[string]map[string]*RPCTransaction),
"queued": make(map[string]map[string]*RPCTransaction),
}
pending, queue := s.b.TxPoolContent()
// Flatten the pending transactions
for account, txs := range pending {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx)
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]*RPCTransaction)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = newRPCPendingTransaction(tx)
}
content["queued"][account.Hex()] = dump
}
return content
}
// Status returns the number of pending and queued transaction in the pool.
func (s *PublicTxPoolAPI) Status() map[string]hexutil.Uint {
pending, queue := s.b.Stats()
return map[string]hexutil.Uint{
"pending": hexutil.Uint(pending),
"queued": hexutil.Uint(queue),
}
}
// Inspect retrieves the content of the transaction pool and flattens it into an
// easily inspectable list.
func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string {
content := map[string]map[string]map[string]string{
"pending": make(map[string]map[string]string),
"queued": make(map[string]map[string]string),
}
pending, queue := s.b.TxPoolContent()
// Define a formatter to flatten a transaction into a string
var format = func(tx *types.Transaction) string {
if to := tx.To(); to != nil {
return fmt.Sprintf("%s: %v wei + %v gas × %v wei", tx.To().Hex(), tx.Value(), tx.Gas(), tx.GasPrice())
}
return fmt.Sprintf("contract creation: %v wei + %v gas × %v wei", tx.Value(), tx.Gas(), tx.GasPrice())
}
// Flatten the pending transactions
for account, txs := range pending {
dump := make(map[string]string)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = format(tx)
}
content["pending"][account.Hex()] = dump
}
// Flatten the queued transactions
for account, txs := range queue {
dump := make(map[string]string)
for _, tx := range txs {
dump[fmt.Sprintf("%d", tx.Nonce())] = format(tx)
}
content["queued"][account.Hex()] = dump
}
return content
}
geth 的 console 中获取 txpool 示例:
> txpool.content.queued['0xD7A4a381Fca4be6b8A645d858f1bDc3107Ac3f5D']
{
313: {
blockHash: null,
blockNumber: null,
from: "0xd7a4a381fca4be6b8a645d858f1bdc3107ac3f5d",
gas: "0x249f0",
gasPrice: "0x4e3b29200",
hash: "0x3b0e83e739d7b0c7be27e7a08d5d4fb8111efa98086bf8ba5da738333508b301",
input: "0xa9059cbb0000000000000000000000009083b493d83912e7bd7435abfeafb99e724540e2000000000000000000000000000000000000000000000000000000007744d62b",
nonce: "0x139",
r: "0x84ff7db834955fb8d6d75e1c1dbd3ed7f45797b79ac25def1f885f69e235384d",
s: "0x71ff38c7ebe55eca6ea6a8dd9bd2a88e2b5639496dad3428eed6adc56c0c72d4",
to: "0xeca059f3d6de135e520e789cdfeecbf5ceca3770",
transactionIndex: null,
v: "0x2b",
value: "0x0"
},
314: {
blockHash: null,
blockNumber: null,
from: "0xd7a4a381fca4be6b8a645d858f1bdc3107ac3f5d",
gas: "0x249f0",
gasPrice: "0x4e3b29200",
hash: "0x9a5ee8870bc39f8bfa7f665a05d3031e752dee5e41b250222877c4a2024e4289",
input: "0xa9059cbb0000000000000000000000009083b493d83912e7bd7435abfeafb99e724540e2000000000000000000000000000000000000000000000000000000007d2b74ec",
nonce: "0x13a",
r: "0x5c9bb58256c875a17a1bc626eb54ed242bb3eafe3639d4f01b5bd88cf8392b7b",
s: "0x2bebf80deac40c2674bc2c494debfa0770adb7a46b5345d123585f6cecb952eb",
to: "0xeca059f3d6de135e520e789cdfeecbf5ceca3770",
transactionIndex: null,
v: "0x2b",
value: "0x0"
}
}
>
问题: 节点是否会将sendRawTransaction
RPC接口收到的交易加入到 local
(本地交易池中)?
-
internal/ethapi/api.go
// SendRawTransaction will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce. func (s *PublicTransactionPoolAPI) SendRawTransaction(ctx context.Context, encodedTx hexutil.Bytes) (common.Hash, error) { tx := new(types.Transaction) if err := rlp.DecodeBytes(encodedTx, tx); err != nil { return common.Hash{}, err } return SubmitTransaction(ctx, s.b, tx) } // SubmitTransaction is a helper function that submits tx to txPool and logs a message. func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { //调用backend提交交易到txpool if err := b.SendTx(ctx, tx); err != nil { return common.Hash{}, err } if tx.To() == nil { signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number()) from, err := types.Sender(signer, tx) if err != nil { return common.Hash{}, err } addr := crypto.CreateAddress(from, tx.Nonce()) log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex()) } else { log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To()) } return tx.Hash(), nil }
-
api/backend.go
//将交易添加到local队列中 func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) }
-
core/tx_pool.go
AddLocal
函数的注释已经说的很清楚 , 会将有效的交易加入到local
队列中// AddLocal enqueues a single transaction into the pool if it is valid, marking // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. func (pool *TxPool) AddLocal(tx *types.Transaction) error { return pool.addTx(tx, !pool.config.NoLocals) } // addTx enqueues a single transaction into the pool if it is valid. func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { // Cache sender in transaction before obtaining lock (pool.signer is immutable) types.Sender(pool.signer, tx) pool.mu.Lock() defer pool.mu.Unlock() // Try to inject the transaction and update any state replace, err := pool.add(tx, local) if err != nil { return err } // If we added a new transaction, run promotion checks and return if !replace { from, _ := types.Sender(pool.signer, tx) // already validated pool.promoteExecutables([]common.Address{from}) } return nil }
-
core/tx_pool.go
add
函数的注释也已经说的很清楚, 如果一笔新的交易被标记为local
, 那么发送地址会被加入到白名单中, 并且阻止交易池因为价格约束
将此地址(发送地址)相关交易移除// add validates a transaction and inserts it into the non-executable queue for // later pending promotion and execution. If the transaction is a replacement for // an already pending or queued one, it overwrites the previous and returns this // so outer code doesn't uselessly call promote. // // If a newly added transaction is marked as local, its sending account will be // whitelisted, preventing any associated transaction from being dropped out of // the pool due to pricing constraints. func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it if !local && pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) underpricedTxCounter.Inc(1) return false, ErrUnderpriced } // New transaction is better than our worse ones, make room for it drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) underpricedTxCounter.Inc(1) pool.removeTx(tx.Hash(), false) } } // If the transaction is replacing an already pending one, do directly from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardCounter.Inc(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed() pendingReplaceCounter.Inc(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) // We've directly injected a replacement transaction, notify subsystems go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) return old != nil, nil } // New transaction isn't replacing a pending one, push into queue replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } // Mark local addresses and journal local transactions if local { if !pool.locals.contains(from) { log.Info("Setting new local account", "address", from) pool.locals.add(from) } } pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil }
问题: 当节点挂了, pool.queue
中的交易是否会丢失?
pool.queue
中的交易会保存在 geth/transaction.rlp
中, 当节点挂掉, 重启之后会从 transaction.rlp
中读取交易, 并加载到txpool.queue
中, 如下:
Offset: 00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F
00000000: F8 AC 82 01 39 85 04 E3 B2 92 00 83 02 49 F0 94 x,..9..c2....Ip.
00000010: EC A0 59 F3 D6 DE 13 5E 52 0E 78 9C DF EE CB F5 l.YsV^.^R.x._nKu
00000020: CE CA 37 70 80 B8 44 A9 05 9C BB 00 00 00 00 00 NJ7p.8D)..;.....
00000030: 00 00 00 00 00 00 00 90 83 B4 93 D8 39 12 E7 BD .........4.X9.g=
00000040: 74 35 AB FE AF B9 9E 72 45 40 E2 00 00 00 00 00 t5+~/9.rE@b.....
00000050: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
00000060: 00 00 00 00 00 00 00 77 44 D6 2B 2B A0 84 FF 7D .......wDV++...}
00000070: B8 34 95 5F B8 D6 D7 5E 1C 1D BD 3E D7 F4 57 97 84._8VW^..=>WtW.
00000080: B7 9A C2 5D EF 1F 88 5F 69 E2 35 38 4D A0 71 FF 7.B]o.._ib58M.q.
00000090: 38 C7 EB E5 5E CA 6E A6 A8 DD 9B D2 A8 8E 2B 56 8Gke^Jn&(].R(.+V
000000a0: 39 49 6D AD 34 28 EE D6 AD C5 6C 0C 72 D4 F8 AC 9Im-4(nV-El.rTx,
000000b0: 82 01 3A 85 04 E3 B2 92 00 83 02 49 F0 94 EC A0 ..:..c2....Ip.l.
000000c0: 59 F3 D6 DE 13 5E 52 0E 78 9C DF EE CB F5 CE CA YsV^.^R.x._nKuNJ
000000d0: 37 70 80 B8 44 A9 05 9C BB 00 00 00 00 00 00 00 7p.8D)..;.......
000000e0: 00 00 00 00 00 90 83 B4 93 D8 39 12 E7 BD 74 35 .......4.X9.g=t5
000000f0: AB FE AF B9 9E 72 45 40 E2 00 00 00 00 00 00 00 +~/9.rE@b.......
00000100: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
00000110: 00 00 00 00 00 7D 2B 74 EC 2B A0 5C 9B B5 82 56 .....}+tl+.\.5.V
00000120: C8 75 A1 7A 1B C6 26 EB 54 ED 24 2B B3 EA FE 36 Hu!z.F&kTm$+3j~6
00000130: 39 D4 F0 1B 5B D8 8C F8 39 2B 7B A0 2B EB F8 0D 9Tp.[X.x9+{.+kx.
00000140: EA C4 0C 26 74 BC 2C 49 4D EB FA 07 70 AD B7 A4 jD.&t<,IMkz.p-7$
00000150: 6B 53 45 D1 23 58 5F 6C EC B9 52 EB F8 6D 82 1B kSEQ#X_ll9Rkxm..
00000160: 95 85 04 A8 17 C8 00 82 52 08 94 95 4D 1A 58 C7 ...(.H..R...M.XG
00000170: AB D4 AC 8E BE 05 F5 91 91 CF 71 8E B0 CB 89 87 +T,.>.u..Oq.0K..
00000180: 03 8D 7E A4 C6 80 00 80 1B A0 47 B7 90 AD 06 3C ..~$F.....G7.-.<
00000190: AE 06 93 B8 B2 DB 41 5D 55 64 5D B2 5D 84 B1 62 ...82[A]Ud]2].1b
000001a0: 2D B9 AC 01 BD 9B A4 32 4F 45 A0 6B 2D E6 8B 30 -9,.=.$2OE.k-f.0
000001b0: 58 F7 8F 79 43 A6 41 A0 11 2F 4C B9 0C 16 6F 07 Xw.yC&A../L9..o.
000001c0: 2A 1E A2 E2 9B 89 63 A0 4C 76 B6 *."b..c.Lv6
- core/tx_pool.go
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
//从文件(默认是transaction.rlp)中加载(本地)交易
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
// Subscribe events from blockchain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
return pool
}
原文地址:https://blog.csdn.net/qq_67479387/article/details/140226618
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!