自学内容网 自学内容网

Teledyne LeCroy:800G高速以太网一站式自动化测试解决方案(网络打流测试+物理层加压干扰+协议分析)

LinkExpert一站式测试解决方案

LinkExpert 是一款软件应用程序,可对Teledyne LeCroy的协议分析仪和训练器进行自动化硬件控制和管理。除了作为合规性、一致性和验证测试的便捷接口外,它还能轻松地将这些测试添加到自动回归测试流程中。

现在,新的 LinkExpert集成可实现800G以太网连接的自动协商和链路训练 (AN/LT)。该集成使工程师能够轻松利用Xena以太网流量发生器的AN/LT测试功能和SierraNet协议分析仪。

该解决方案为800G以太网提供了一站式集成自动化测试,可将端口的物理层性能测试与分析和Layer23层流量压力测试无缝衔接,提供了跨以太网协议层级的Layer123完整验证,提供了端口一致性验证、端口性能压力测试、协议可视化分析一站式测试服务。

端口一致性测试

可作为Golden Sample,对接DUT进行端口一致性测试校验

端口兼容性测试

可调节Pre-Coding,PPM Adjustment将端口进行劣化,以模拟特定DUT的性能

端口性能测试

线速和超线速的极限压力测试,微突发场景等

真实业务压力测试

RDMA,PFC,ECQCN,ECN,QP业务模型仿真

速率支持

OSFP接口和QSFP-DD原生接口

56G PAM4- 50G/100G/200G/400G

112G PAM4 - 100G/200G/400G/800G

功能举例

800G ANLT测试界面

  • 作为端节点,进行以太网协议的握手适配,对接
  • 作为监控节点,串行接入在链路中,进行物理层链路和数据链路层的报文捕获和分析,即Layer1,Layer2,Layer3等全覆盖
  • 负面压力环境仿真,如FEC Error Injection测试验证,Link Flap验证,Packet Loss验证 

硬件构成:Xena Freya Z800+SierraNet M1288

800G以太网测试与分析仪表

Xena Ethernet Tester

  • 10M-800G以太网端口速率支持

  • 支持全线速的以太网流量生成和收发统计

  • Layer1、2、3层测试和自动化支持

  • 固定式1U桌面便携型和模块化4U机架式

  • 多种不同速率测试板卡和报文损伤仿真板卡

  • 高可扩展性、多业务槽位后期扩展支持

  • RFC2544,RFC2889,RFC3918,Y.1564等基准性能测试套件支持

SierraNet Protocol Analyzer/Jammer

  • 10G至800G以太网和FC光纤通道支持
  • 链路可视化分析和干扰注入
  • 高阶多态触发和过滤功能
  • 全栈(Layer1-7)线速捕获分析和错误注入
  • Infusion Jammer干扰注入支持
  • 业内唯一的L1层协议捕获和ANLT分析仪
  • 支持脚本引擎VSE用于自动化后期分析

仪表典型应用场景

XENA以太网测试仪

XENA 800G Freya以太网测试仪

  • 灵活生成以太网数据流量流进行收发包测试,以评估网络设备(交换机、路由器、防火墙)在压力条件下的运行情况
  • 模拟多个用户或设备同时访问网络,了解网络负载均衡性能
  • 测量网络设备或链路可处理的最大数据传输速率,以测试网络吞吐量
  • 利用 Chimera 网络仿真的各种故障(延迟/抖动/丢包)在实验室中模拟真实生产环境
  • 创建具有不同优先级的不同类型流量,以测试 PFC、QoS机制
  • 模拟分布式拒绝服务 (DDoS) 等攻击,或创建模仿恶意行为的流量模式
  • 有助于在受控环境中创建逼真的网络条件,作为网络模拟和建模的一部分

SierraNet 协议分析仪

力科800G M1288协议分析仪

  • 监控和捕获所有网络流量,以便进行详细分析
  • 加快故障排除速度并识别故障的链接
  • 验证数据包是否符合所需的协议标准和规范
  • 检测和分析互操作性问题,确保不同的网络设备和系统正常工作
  • 以太网和光纤通道的质量保证和验证
  • 产品基准和竞争分析
  • 可全面查看整个网络的所有通信情况
  • 检测和分析安全威胁,如未经授权的访问和可疑的流量模式
  • 监控带宽使用情况,识别大流量用户和应用程序
  • 故障排除和现场支持,以诊断故障链路、互操作性问题或网络性能问题

典型待测物

网卡,线材类

AI HPC高速网卡,SmartNIC,DPU等,用以服务器之间的连接或服务器与交换机的连接

AEC,ACC 线材,验证其ANLT的生效情况

交换机类

端口性能验证

业务模型搭建测试极限并发性能

负面压力模拟,将真实的用户场景搬进实验室进行测试验证,并可定量精准,可重复的测试

规模组网验证

RDMA ,QP业务模型搭建

友好的API二次开发接口

signal_integrity_hist_plot.py

################################################################
#
#              SIGNAL INTEGRITY HISTOGRAM PLOTTING
#
# What this script example does:
# 1. Connect to a tester
# 2. Reserve a port. 
# 3. Collecting SIV data from all 8 lanes
# 4. Plot the live histogram 
#
################################################################

import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt
from xoa_driver.misc import Hex
import logging
import math
from typing import List

import matplotlib.pyplot as plt
from collections import deque

#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "10.165.136.60"
USERNAME = "xoa"
PORT = "6/0"
FIGURE_TITLE = "Z800 Freya Signal Integrity Histogram Plot"
DENSITY = 1 # how many batches of siv data to show on the plot. A higher density means more data and slower plotting.
LANES = [0,1,2,3,4,5,6,7] # select lanes to display, ranging from 0 to 7
PLOTTING_INTERVAL = 1 # plots refreshed every n second
PLOTTING_DURATION = 120 # number of seconds for plotting

async def siv_plot(
        chassis: str, 
        username: str, 
        port_str: str,
        figure_title: str,
        density: int,
        plotting_interval: int,
        plotting_duration: int,
        lanes: List[int],
        ):
    
    # configure basic logger
    logging.basicConfig(
        format="%(asctime)s  %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.FileHandler(filename="siv_plot.log", mode="a"),
            logging.StreamHandler()]
        )
    
    # disable matplotlib.font_manager logging
    logging.getLogger('matplotlib.font_manager').disabled = True

    # remove duplicates and sort list
    lanes = list(set(lanes))
    lanes.sort()
    
    async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
        logging.info(f"#####################################################################")
        logging.info(f"Chassis:                 {chassis}")
        logging.info(f"Username:                {username}")
        logging.info(f"Port:                    {port_str}")
        logging.info(f"Lanes:                   {lanes}")
        logging.info(f"Figure Title:            {figure_title}")
        logging.info(f" Data Density:            {density}")
        logging.info(f" Plot Refresh Interval:   {plotting_interval} s")
        logging.info(f" Plot Duration:           {plotting_duration} s")
        logging.info(f"#####################################################################")

        # Access module on the tester
        _mid = int(port_str.split("/")[0])
        _pid = int(port_str.split("/")[1])
        module_obj = tester.modules.obtain(_mid)

        if not isinstance(module_obj, modules.Z800FreyaModule):
            logging.info(f"Module {_mid} is not Z800 Freya module. Abort.")
            return None
        
        port_obj = module_obj.ports.obtain(_pid)
        await mgmt.free_module(module=module_obj, should_free_ports=False)
        await mgmt.reserve_port(port_obj)

        resp = await port_obj.capabilities.get()
        max_serdes = resp.serdes_count
        serdes_cnt_to_show = len(lanes)
        if max(lanes) > 7:
            logging.warning(f"Exceed max serdes index. Abort.")
            return None
        if serdes_cnt_to_show > max_serdes:
            logging.warning(f"Exceed max serdes count. Abort.")
            return None
        if serdes_cnt_to_show == 0:
            logging.warning(f"Nothing to show Abort.")
            return None

        # figure config
        plt.ion()
        fig = plt.figure(constrained_layout=True)
        fig.suptitle(f"{figure_title}\nChassis {chassis}, Port {port_str}, L={lanes}, D={density}")

        # grid spec
        if serdes_cnt_to_show == 1:
            gs = fig.add_gridspec(nrows=1, ncols=1)
        if serdes_cnt_to_show > 1:
            gs = fig.add_gridspec(nrows=math.ceil(serdes_cnt_to_show/2), ncols=2)

        # add subplots
        siv_subplots = []
        for i in range(serdes_cnt_to_show):
            siv_subplots.append(fig.add_subplot(gs[i%gs.nrows, int(i/gs.nrows)]))
        
        # data dequeue for each serdes lane. queue depth = density*2000
        INT_CNT_PER_DATA = 2000
        data_queue = []
        for _ in range(serdes_cnt_to_show):
            data_queue.append(deque((), maxlen=density*INT_CNT_PER_DATA))

        # set x and y label for each subplot
        for i in range(serdes_cnt_to_show):
            siv_subplots[i].set(xlabel=f"Value", ylabel=f"Lane {lanes[i]}")
        
        # group control commands for each serdes lane together to later send it as a command group.
        control_cmd_group = []
        for i in range(serdes_cnt_to_show):
            control_cmd_group.append(port_obj.l1.serdes[lanes[i]].medium.siv.control.set(opcode=enums.Layer1Opcode.START_SCAN))
        
        # get commands for each serdes lane together to later send it as a command group.
        get_cmd_group = []
        for i in range(serdes_cnt_to_show):
            get_cmd_group.append(port_obj.l1.serdes[lanes[i]].medium.siv.data.get())

        resp_group = ()
        plot_count = math.ceil(plotting_duration/plotting_interval)
        for _ in range(plot_count):
            await utils.apply(*control_cmd_group)
            while True:
                # get responses from all lanes
                resp_group = await utils.apply(*get_cmd_group)
                result_flags = [x.result for x in resp_group]
                if 0 in result_flags:
                    # if not all lanes are ready in data, query again.
                    continue
                else:
                    for i in range(serdes_cnt_to_show):
                        siv_raw_levels = resp_group[i].value[0:12]
                        siv_raw_values = resp_group[i].value[12:]

                        # convert from 12 raw bytes into 6 signed int
                        siv_int_levels = []
                        for x in zip(siv_raw_levels[0::2], siv_raw_levels[1::2]):
                            siv_int_levels.append(int.from_bytes(bytes(x), byteorder='big', signed=True))
                        # Please note: only the first slicer data is used here.

                        # convert from 4000 bytes into 2000 signed int
                        siv_int_values = []
                        for x in zip(siv_raw_values[0::2], siv_raw_values[1::2]):
                            siv_int_values.append(int.from_bytes(bytes(x), byteorder='big', signed=True))
                        # put value data in queue
                        data_queue[i].extend(tuple(siv_int_values))

                        # siv data ranges from -64 to 63, thus 128 bins in total.
                        siv_subplots[i].cla()
                        siv_subplots[i].relim()
                        siv_subplots[i].autoscale_view()
                        siv_subplots[i].set(xlabel=f"Value", ylabel=f"Lane {lanes[i]}")
                        siv_subplots[i].hist(x=[*data_queue[i]], bins=128, range=(-64, 63), density=False, color="blue", orientation="horizontal")

                        # levels contains 6 values, 4 average pam4 levels and 2 slicers, (<p1> <p2> <p3> <m1> <m2> <m3>)
                        # add base slicer (this is always at 0)
                        y = 0
                        siv_subplots[i].axhline(y, color='black', linestyle='-', linewidth=0.5)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'base={y}', fontsize="small")
                        # add upper slicer <p2>
                        y = siv_int_levels[1]
                        siv_subplots[i].axhline(y, color='green', linestyle='dashed', linewidth=0.5)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'slicer={y}', fontsize="small")
                        # add lower slicer <m2>
                        y = siv_int_levels[4]
                        siv_subplots[i].axhline(y, color='green', linestyle='dashed', linewidth=0.5)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'slicer={y}', fontsize="small")
                        # add average level 3 <p3>
                        y = siv_int_levels[2]
                        siv_subplots[i].axhline(y, color='black', linestyle='dashed', linewidth=0.1)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'level3={y}', fontsize="small")
                        # add average level 2 <p1>
                        y = siv_int_levels[0]
                        siv_subplots[i].axhline(y, color='black', linestyle='dashed', linewidth=0.1)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'level2={y}', fontsize="small")
                        # add average level 1 <m3>
                        y = siv_int_levels[5]
                        siv_subplots[i].axhline(y, color='black', linestyle='dashed', linewidth=0.1)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'level1={y}', fontsize="small")
                        # add average level 0 <m1>
                        y = siv_int_levels[3]
                        siv_subplots[i].axhline(y, color='black', linestyle='dashed', linewidth=0.1)
                        siv_subplots[i].text(siv_subplots[i].get_xlim()[1] + 0.1, y, f'level0={y}', fontsize="small")
                        

                    plt.show()
                    plt.pause(plotting_interval)
                    break
        
        await mgmt.free_port(port_obj)
        logging.info(f"Bye!")

async def main():
    stop_event = asyncio.Event()
    try:
        await siv_plot(
            chassis=CHASSIS_IP,
            username=USERNAME,
            port_str=PORT,
            figure_title=FIGURE_TITLE,
            density=DENSITY,
            plotting_interval=PLOTTING_INTERVAL,
            plotting_duration = PLOTTING_DURATION,
            lanes=LANES
            )
    except KeyboardInterrupt:
        stop_event.set()

if __name__ == "__main__":
    asyncio.run(main())

fec_error_dist_plot.py 

################################################################
#
#                   PRE-FEC ERROR DIST PLOT
#
# What this script example does:
# 1. Connect to a tester
# 2. Reserve a all ports on a module
# 3. Set the port FEC mode on
# 4. Clear FEC stats
# 5. Query FEC Blocks (symbol error) and FEC stats
# 6. Plot the Pre-FEC error distribution for all ports
#
################################################################

import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt
from xoa_driver.misc import Hex
import logging
import math

import matplotlib.pyplot as plt
import numpy as np
from collections import deque

#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "10.165.136.66"
USERNAME = "xoa"
MODULE = "4"
FIGURE_TITLE = "Pre-FEC Error Distribution Plot (log10)"
PLOTTING_INTERVAL = 1 # plots refreshed every n second
PLOTTING_DURATION = 120 # number of seconds for plotting

# Enable the FEC mode you want
FEC_MODE = enums.FECMode.ON # either RS FEC KR or KP. Determined by the port automatically
# FEC_MODE = enums.FECMode.FC_FEC
# FEC_MODE = enums.FECMode.RS_FEC_INT
        
#---------------------------
# pre_fec_error_dist_plot
#---------------------------
async def pre_fec_error_dist_plot(
        chassis: str,
        username: str,
        module_str: str,
        figure_title: str,
        plotting_interval: int,
        plotting_duration: int,
        fec_mode: enums.FECMode
        ):
    # configure basic logger
    logging.basicConfig(
        format="%(asctime)s  %(message)s",
        level=logging.DEBUG,
        handlers=[
            logging.FileHandler(filename="fec_plot.log", mode="a"),
            logging.StreamHandler()]
        )
    
    # disable matplotlib.font_manager logging
    logging.getLogger('matplotlib.font_manager').disabled = True

    # Establish connection to a Valkyrie tester using Python context manager
    # The connection will be automatically terminated when it is out of the block
    async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
        logging.info(f"#####################################################################")
        logging.info(f"Chassis:                 {chassis}")
        logging.info(f"Username:                {username}")
        logging.info(f"Port:                    {module_str}")
        logging.info(f"Figure Title:            {figure_title}")
        logging.info(f" Plot Refresh Interval:   {plotting_interval} s")
        logging.info(f" Plot Duration:           {plotting_duration} s")
        logging.info(f"#####################################################################")

        # Access module on the tester
        _mid = int(module_str)
        module_obj = tester.modules.obtain(_mid)

        if isinstance(module_obj, modules.E100ChimeraModule):
            logging.info(f"FEC not supported on E100 Chimera modules")
            return None
        
        if isinstance(module_obj, modules.Z10OdinModule):
            logging.info(f"FEC not supported on Z10 Odin modules")
            return None 
        
        # reserve all ports on a module
        port_objs = [x for x in module_obj.ports]
        port_cnt = len(port_objs)

        # Forcibly reserve the port
        await mgmt.free_module(module=module_obj, should_free_ports=False)
        resp = await module_obj.revision.get()
        module_module_name = resp.revision
        for p in port_objs:
            await mgmt.reserve_port(p)

        await asyncio.sleep(1)
        
        # figure config
        plt.ion()
        fig = plt.figure(constrained_layout=True)
        fig.suptitle(f"{figure_title}\nChassis {chassis}, Module {module_str}, {module_module_name}")
        
        # grid spec
        if port_cnt == 1:
            gs = fig.add_gridspec(nrows=1, ncols=1)
        if port_cnt > 1:
            gs = fig.add_gridspec(nrows=math.ceil(port_cnt/2), ncols=2)

        # add subplots
        pre_fec_subplots = []
        for i in range(port_cnt):
            pre_fec_subplots.append(fig.add_subplot(gs[i%gs.nrows, int(i/gs.nrows)]))

        # set x and y label for each subplot
        for i in range(port_cnt):
            pre_fec_subplots[i].set(xlabel=f"Symbol Errors", ylabel=f"FEC Codewords ({module_str}/{i}) (log10)")

        # set FEC mode on
        logging.info(f"Set FEC Mode = {fec_mode.name}")
        for p in port_objs:
            await p.fec_mode.set(mode=fec_mode)

        # clear FEC counter
        logging.info(f"Clear FEC counter")
        for p in port_objs:
            await p.pcs_pma.rx.clear.set()

        # query FEC Totals and Pre-FEC Error Distribution
        plot_count = math.ceil(plotting_duration/plotting_interval)
        pre_fec_error_dist_data = [[0]*17]*port_cnt
        print(len(pre_fec_subplots))
        for _ in range(plot_count):
            logging.info(f"PRE-FEC ERROR DISTRIBUTION")
            for i in range(port_cnt):
                port_obj = port_objs[i]
                logging.info(f"Port {port_obj.kind.module_id}/{port_obj.kind.port_id}")
                # await port_obj.pcs_pma.rx.clear.set()
                _total_status, _fec_status = await utils.apply(
                    port_obj.pcs_pma.rx.total_status.get(),
                    port_obj.pcs_pma.rx.fec_status.get()
                )
                n = _fec_status.data_count - 2
                for j in range(n):
                    logging.info(f"  FEC Blocks (Symbol Errors = {j}): {_fec_status.stats[j]}")
                logging.info(f"  FEC Blocks (Symbol Errors > {n-1}): {_fec_status.stats[n]}")
                
                x_axis = [str(x) for x in range(n)]
                x_axis.append(f"> {n-1}")
                color_array = ['y']*(n-1)
                color_array.insert(0, 'g')
                color_array.append('r')

                pre_fec_error_dist_data[i] = [x + y for x, y in zip(pre_fec_error_dist_data[i], _fec_status.stats[0:n+1])]
                pre_fec_ber_str = ""
                if _total_status.total_pre_fec_ber == 0:
                    pre_fec_ber_str = f"Pre-FEC BER = 0"
                else:
                    pre_fec_ber_str = f"Pre-FEC BER = {abs(1/_total_status.total_pre_fec_ber)}"

                pre_fec_subplots[i].cla()
                pre_fec_subplots[i].relim()
                pre_fec_subplots[i].autoscale_view()
                pre_fec_subplots[i].set(xlabel=f"Symbol Errors", ylabel=f"FEC Codewords ({module_str}/{i})")
                pre_fec_error_dist_data_log10 = []
                for x in pre_fec_error_dist_data[i]:
                    if x > 0:
                        pre_fec_error_dist_data_log10.append(np.log10(x))
                    else:
                        pre_fec_error_dist_data_log10.append(0)
                # print(pre_fec_error_dist_data_log10)
                tmp = pre_fec_subplots[i].bar(x=x_axis, height=pre_fec_error_dist_data_log10, color=color_array,)
                pre_fec_subplots[i].bar_label(container=tmp, fmt='%.1f')
                x0, xmax = pre_fec_subplots[i].get_xbound()
                y0, ymax = pre_fec_subplots[i].get_ybound()

                pre_fec_subplots[i].text((x0+xmax)*0.7, (y0+ymax)*0.9, pre_fec_ber_str, fontsize="small")

            plt.show()
            logging.info(f"Clear FEC counter")
            for p in port_objs:
                await p.pcs_pma.rx.clear.set()
            plt.pause(plotting_interval)
            


async def main():
    stop_event = asyncio.Event()
    try:
        await pre_fec_error_dist_plot(
            chassis=CHASSIS_IP,
            username=USERNAME,
            module_str=MODULE,
            figure_title=FIGURE_TITLE,
            plotting_interval=PLOTTING_INTERVAL,
            plotting_duration=PLOTTING_DURATION,
            fec_mode=FEC_MODE
            )
    except KeyboardInterrupt:
        stop_event.set()


if __name__ == "__main__":
    asyncio.run(main())


原文地址:https://blog.csdn.net/pangzi_sh/article/details/142885438

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!