MTK7621:交换芯片工作队列
mt7530交换芯片的数据接收中断后,把具体接收数据工作任务、委托到workqueue队列中,让内核work_thread()线程任务来处理,这部分内容请参考《workqueue工作原理》中的描述。
workqueue基本工作流程框架如下:
框架业务关系:
1,程序把work单加入到workqueue中,就等于把工作安排好,是工单的生成者、派遣者;
2,work_pool是工厂,提供工作的场地,worker是工人,负责执行工单,是消费者;
3,PWQ(pool work queue)是派遣工单给工厂协调者,负责匹配生产者与消费者之间协调;
workqueue框架启动流程:
1,内核kernel_init_freeable初始化函数(in main.c)调用workqueue_init()初始化程序;
2,workqueue_init()函数为每隔CPU创建一个worker_pool池;
3,kthread_create_on_node()创建一个worker进程,并worker_attach_to_pool(work,pool);添加到池子中;
worker执行工作流程:workqueue.c-->static ini worker_thread(void* __worker)
/*
* Finish PREP stage. We're guaranteed to have at least one idle
* worker or that someone else has already assumed the manager
* role. This is where @worker starts participating in concurrency
* management if applicable and concurrency management is restored
* after being rebound. See rebind_workers() for details.
*/
worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
struct work_struct *work =
list_first_entry(&pool->worklist,
struct work_struct, entry);
pool->watchdog_ts = jiffies;
if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
/* optimization path, not strictly necessary */
process_one_work(worker, work); /* worker done work,执行queue中的work任务 */
if (unlikely(!list_empty(&worker->scheduled)))
process_scheduled_works(worker);
} else {
move_linked_works(work, &worker->scheduled, NULL);
process_scheduled_works(worker);
}
} while (keep_working(pool));
数据接收任务单的执行者是谁?
硬件中断后程序最终通过insert_work()函数,把待处理的数据任务放到workqueue中,开篇已经描述worker_pool中已经具有worker线程one-by-one执行工作任务,也及时说数据会被process_one_work(worker, work)程序接收、并送往网络协议栈。
/**
* process_one_work - process single work
* @worker: self
* @work: work to process
*
* Process @work. This function contains all the logics necessary to
* process a single work including synchronization against and
* interaction with other workers on the same cpu, queueing and
* flushing. As long as context requirement is met, any worker can
* call this function to process a work.
*
* CONTEXT:
* spin_lock_irq(pool->lock) which is released and regrabbed.
*/
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
struct pool_workqueue *pwq = get_work_pwq(work);
struct worker_pool *pool = worker->pool;
bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
int work_color;
struct worker *collision;
#ifdef CONFIG_LOCKDEP
/*
* It is permissible to free the struct work_struct from
* inside the function that is called from it, this we need to
* take into account for lockdep too. To avoid bogus "held
* lock freed" warnings as well as problems when looking into
* work->lockdep_map, make a copy and use that here.
*/
struct lockdep_map lockdep_map;
lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
/* ensure we're on the correct CPU */
WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
raw_smp_processor_id() != pool->cpu);
/*
* A single work shouldn't be executed concurrently by
* multiple workers on a single cpu. Check whether anyone is
* already processing the work. If so, defer the work to the
* currently executing one.
*/
collision = find_worker_executing_work(pool, work);
if (unlikely(collision)) {
move_linked_works(work, &collision->scheduled, NULL);
return;
}
/* claim and dequeue */
debug_work_deactivate(work);
hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
worker->current_work = work;
worker->current_func = work->func;
worker->current_pwq = pwq;
work_color = get_work_color(work);
list_del_init(&work->entry);
/*
* CPU intensive works don't participate in concurrency management.
* They're the scheduler's responsibility. This takes @worker out
* of concurrency management and the next code block will chain
* execution of the pending work items.
*/
if (unlikely(cpu_intensive))
worker_set_flags(worker, WORKER_CPU_INTENSIVE);
/*
* Wake up another worker if necessary. The condition is always
* false for normal per-cpu workers since nr_running would always
* be >= 1 at this point. This is used to chain execution of the
* pending work items for WORKER_NOT_RUNNING workers such as the
* UNBOUND and CPU_INTENSIVE ones.
*/
if (need_more_worker(pool))
wake_up_worker(pool);
/*
* Record the last pool and clear PENDING which should be the last
* update to @work. Also, do this inside @pool->lock so that
* PENDING and queued state changes happen together while IRQ is
* disabled.
*/
set_work_pool_and_clear_pending(work, pool->id);
spin_unlock_irq(&pool->lock);
lock_map_acquire(&pwq->wq->lockdep_map);
lock_map_acquire(&lockdep_map);
/*
* Strictly speaking we should mark the invariant state without holding
* any locks, that is, before these two lock_map_acquire()'s.
*
* However, that would result in:
*
* A(W1)
* WFC(C)
*A(W1)
*C(C)
*
* Which would create W1->C->W1 dependencies, even though there is no
* actual deadlock possible. There are two solutions, using a
* read-recursive acquire on the work(queue) 'locks', but this will then
* hit the lockdep limitation on recursive locks, or simply discard
* these locks.
*
* AFAICT there is no possible deadlock scenario between the
* flush_work() and complete() primitives (except for single-threaded
* workqueues), so hiding them isn't a problem.
*/
lockdep_invariant_state(true);
trace_workqueue_execute_start(work);
worker->current_func(work); /* !!! 此回调函数是执行此工作入口,因此需要查看任务工单内容 */
/*
* While we must be careful to not use "work" after this, the trace
* point will only record its address.
*/
trace_workqueue_execute_end(work);
lock_map_release(&lockdep_map);
lock_map_release(&pwq->wq->lockdep_map);
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
" last function: %pf\n",
current->comm, preempt_count(), task_pid_nr(current),
worker->current_func);
debug_show_held_locks(current);
dump_stack();
}
/*
* The following prevents a kworker from hogging CPU on !PREEMPT
* kernels, where a requeueing work item waiting for something to
* happen could deadlock with stop_machine as such work item could
* indefinitely requeue itself while all other CPUs are trapped in
* stop_machine. At the same time, report a quiescent RCU state so
* the same condition doesn't freeze RCU.
*/
cond_resched_rcu_qs();
spin_lock_irq(&pool->lock);
/* clear cpu intensive status */
if (unlikely(cpu_intensive))
worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
/* we're done with it, release */
hash_del(&worker->hentry);
worker->current_work = NULL;
worker->current_func = NULL;
worker->current_pwq = NULL;
worker->desc_valid = false;
pwq_dec_nr_in_flight(pwq, work_color);
}
数据是如何被接收的?
网卡在被ifup | ifconfig up,会触发网卡驱动的fe_probe函数,函数调用关系如下:
static int fe_probe(struct platform_device *pdev)
{
fe_base = devm_ioremap_resource(&pdev->dev, res);
netdev = alloc_etherdev(sizeof(*priv));
SET_NETDEV_DEV(netdev, &pdev->dev);
netdev->netdev_ops = &fe_netdev_ops;
netdev->base_addr = (unsigned long)fe_base;
netdev->irq = platform_get_irq(pdev, 0);
priv = netdev_priv(netdev);
INIT_WORK(&priv->pending_work, fe_pending_work);/* 初始化 work 工单,worker-> func () 回调函数 */
netif_napi_add(netdev, &priv->rx_napi, fe_poll, napi_weight); /* 注册数据接收函数 fe_poll 函数 */
fe_set_ethtool_ops(netdev);
err = register_netdev(netdev);
}
static void fe_pending_work(struct work_struct *work)
{
struct fe_priv *priv = container_of(work, struct fe_priv, pending_work);
int i;
bool pending;
for (i = 0; i < ARRAY_SIZE(fe_work); i++) {
pending = test_and_clear_bit(fe_work[i].bitnr,
priv->pending_flags);
if (pending)
fe_work[i].action(priv); /* 激活 任务标识 */
}
}
/* 数据接收函数入口 */
static int fe_poll(struct napi_struct *napi, int budget)
{
struct fe_priv *priv = container_of(napi, struct fe_priv, rx_napi);
struct fe_hw_stats *hwstat = priv->hw_stats;
int tx_done, rx_done, tx_again;
u32 status, fe_status, status_reg, mask;
u32 tx_intr, rx_intr, status_intr;
status = fe_reg_r32(FE_REG_FE_INT_STATUS);
fe_status = status;
tx_intr = priv->soc->tx_int;
rx_intr = priv->soc->rx_int;
status_intr = priv->soc->status_int;
tx_done = 0;
rx_done = 0;
tx_again = 0;
if (fe_reg_table[FE_REG_FE_INT_STATUS2]) {
fe_status = fe_reg_r32(FE_REG_FE_INT_STATUS2);
status_reg = FE_REG_FE_INT_STATUS2;
} else {
status_reg = FE_REG_FE_INT_STATUS;
}
if (status & tx_intr)
tx_done = fe_poll_tx(priv, budget, tx_intr, &tx_again); /* 调用数据发送函数 fe_poll_tx() 函数 */
if (status & rx_intr)
rx_done = fe_poll_rx(napi, budget, priv, rx_intr); /* 调用数据接收函数 fe_poll_rx() 函数 */
if (unlikely(fe_status & status_intr)) {
if (hwstat && spin_trylock(&hwstat->stats_lock)) {
fe_stats_update(priv);
spin_unlock(&hwstat->stats_lock);
}
fe_reg_w32(status_intr, status_reg);
}
if (unlikely(netif_msg_intr(priv))) {
mask = fe_reg_r32(FE_REG_FE_INT_ENABLE);
netdev_info(priv->netdev,
"done tx %d, rx %d, intr 0x%08x/0x%x\n",
tx_done, rx_done, status, mask);
}
if (!tx_again && (rx_done < budget)) {
status = fe_reg_r32(FE_REG_FE_INT_STATUS);
if (status & (tx_intr | rx_intr)) {
/* let napi poll again */
rx_done = budget;
goto poll_again;
}
napi_complete_done(napi, rx_done);
fe_int_enable(tx_intr | rx_intr);
} else {
rx_done = budget;
}
poll_again:
return rx_done;
}
/* 数据如何被读取出来、送往内核网络协议栈 */
static int fe_poll_rx(struct napi_struct *napi, int budget,
struct fe_priv *priv, u32 rx_intr)
{
struct net_device *netdev = priv->netdev;
struct net_device_stats *stats = &netdev->stats;
struct fe_soc_data *soc = priv->soc;
struct fe_rx_ring *ring = &priv->rx_ring;
int idx = ring->rx_calc_idx;
u32 checksum_bit;
struct sk_buff *skb;
u8 *data, *new_data;
struct fe_rx_dma *rxd, trxd;
int done = 0, pad;
if (netdev->features & NETIF_F_RXCSUM)
checksum_bit = soc->checksum_bit;
else
checksum_bit = 0;
if (priv->flags & FE_FLAG_RX_2B_OFFSET)
pad = 0;
else
pad = NET_IP_ALIGN;
while (done < budget) {
unsigned int pktlen;
dma_addr_t dma_addr;
/* 环形缓冲区获取数据 */
idx = NEXT_RX_DESP_IDX(idx);
rxd = &ring->rx_dma[idx];
data = ring->rx_data[idx];
fe_get_rxd(&trxd, rxd);
if (!(trxd.rxd2 & RX_DMA_DONE))
break;
/* alloc new buffer */
new_data = page_frag_alloc(&ring->frag_cache, ring->frag_size,
GFP_ATOMIC);
if (unlikely(!new_data)) {
stats->rx_dropped++;
goto release_desc;
}
dma_addr = dma_map_single(&netdev->dev,
new_data + NET_SKB_PAD + pad,
ring->rx_buf_size,
DMA_FROM_DEVICE);
if (unlikely(dma_mapping_error(&netdev->dev, dma_addr))) {
skb_free_frag(new_data);
goto release_desc;
}
/* receive data */
skb = build_skb(data, ring->frag_size);
if (unlikely(!skb)) {
skb_free_frag(new_data);
goto release_desc;
}
skb_reserve(skb, NET_SKB_PAD + NET_IP_ALIGN);
dma_unmap_single(&netdev->dev, trxd.rxd1,
ring->rx_buf_size, DMA_FROM_DEVICE);
pktlen = RX_DMA_GET_PLEN0(trxd.rxd2);
skb->dev = netdev;
skb_put(skb, pktlen);
if (trxd.rxd4 & checksum_bit)
skb->ip_summed = CHECKSUM_UNNECESSARY;
else
skb_checksum_none_assert(skb);
skb->protocol = eth_type_trans(skb, netdev); /* 获取协议号 */
if (netdev->features & NETIF_F_HW_VLAN_CTAG_RX &&
RX_DMA_VID(trxd.rxd3))
__vlan_hwaccel_put_tag(skb, htons(ETH_P_8021Q),
RX_DMA_VID(trxd.rxd3));/* VLAN 打 tag 标 */
#ifdef CONFIG_NET_MEDIATEK_OFFLOAD /* NET_MEDIATEK_OFFLOAD 网络功能卸载 */
if (mtk_offload_check_rx(priv, skb, trxd.rxd4) == 0) {
#endif
stats->rx_packets++;
stats->rx_bytes += pktlen;
napi_gro_receive(napi, skb); /* 调用 napi_gro_receive() 接收数据*/
#ifdef CONFIG_NET_MEDIATEK_OFFLOAD
} else {
dev_kfree_skb(skb);
}
#endif
ring->rx_data[idx] = new_data;
rxd->rxd1 = (unsigned int)dma_addr;
release_desc:
if (priv->flags & FE_FLAG_RX_SG_DMA)
rxd->rxd2 = RX_DMA_PLEN0(ring->rx_buf_size);
else
rxd->rxd2 = RX_DMA_LSO;
ring->rx_calc_idx = idx;
/* make sure that all changes to the dma ring are flushed before
* we continue
*/
wmb();
fe_reg_w32(ring->rx_calc_idx, FE_REG_RX_CALC_IDX0);
done++;
}
if (done < budget)
fe_reg_w32(rx_intr, FE_REG_FE_INT_STATUS);
return done;
}
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
skb_mark_napi_id(skb, napi);
trace_napi_gro_receive_entry(skb);
skb_gro_reset_offset(skb);
return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}
EXPORT_SYMBOL(napi_gro_receive);
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
{
switch (ret) {
case GRO_NORMAL:
if (netif_receive_skb_internal(skb)) /* ip层的数据接收 */
ret = GRO_DROP;
break;
case GRO_DROP:
kfree_skb(skb);
break;
case GRO_MERGED_FREE:
if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
napi_skb_free_stolen_head(skb);
else
__kfree_skb(skb);
break;
case GRO_HELD:
case GRO_MERGED:
case GRO_CONSUMED:
break;
}
return ret;
}
static int netif_receive_skb_internal(struct sk_buff *skb)
{
int ret;
net_timestamp_check(netdev_tstamp_prequeue, skb);
if (skb_defer_rx_timestamp(skb))
return NET_RX_SUCCESS;
if (static_key_false(&generic_xdp_needed)) {
int ret;
preempt_disable();
rcu_read_lock();
ret = do_xdp_generic(rcu_dereference(skb->dev->xdp_prog), skb);
rcu_read_unlock();
preempt_enable();
if (ret != XDP_PASS)
return NET_RX_DROP;
}
rcu_read_lock();
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu >= 0) {
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}
}
#endif
ret = __netif_receive_skb(skb); /* 调用数据接收函数 __netif_receive_skb() */
rcu_read_unlock();
return ret;
}
__netif_receive_skb() --> __netif_receive_skb_core() --> deliver_skb() 函数调用关系,最终调用 协议栈分发函数 deliver_skb() ;
/* skb 发送到网络协议栈 */
static inline int deliver_skb(struct sk_buff *skb,
struct packet_type *pt_prev,
struct net_device *orig_dev)
{
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
return -ENOMEM;
refcount_inc(&skb->users);
return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}
协议栈分发函数deliver_skb()调用的pt_prev->func(skb, skb->dev,pt_prev, orig_dev)函数,是在网络协议栈初始化时赋予的指针函数,接下来再分析网络协议栈的初始化过程。
原文地址:https://blog.csdn.net/l00102795/article/details/140098803
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!