自学内容网 自学内容网

【Iceberg分析】Spark集成Iceberg采集输出

Spark集成Iceberg采集输出

Iceberg提供了两类指标和提供了两类指标输出器

ScanReport

包含在对给定表进行扫描规划期间收集到的指标。除了涉及表的一些一般信息(如快照 id 或表名)外,它还包括以下指标:

  • 扫描规划总持续时间
  • 结果中包含的数据,删除文件数量
  • 扫描/跳过的数据,删除清单数量
  • 扫描/跳过的数据,删除文件数
  • 扫描的相等,位置删除文件数

CommitReport

载有在提交对表的更改(又称生成快照)后收集的指标。除了涉及表的一些一般信息(如快照 id 或表名)外,它还包括以下指标:

  • 总持续时间
  • 提交成功所需的尝试次数
  • 添加/删除的数据,删除文件数
  • 添加/删除的相等,位置删除文件数
  • 添加/删除的相等,位置删除文件数

LoggingMetricsReporter

日志指标输出器,输出在日志文件中。

RESTMetricsReporter

Rest指标输出器,发送至Rest服务中

只能在使用restcatalog时,才能使用该指标输出器。

验证示例

在这里插入图片描述

相关环境配置

iceberg-demo相关配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.donny.demo</groupId>
    <artifactId>iceberg-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>iceberg-demo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>3.4.2</spark.version>
        <iceberg.version>1.6.1</iceberg.version>
        <parquet.version>1.13.1</parquet.version>
        <avro.version>1.11.3</avro.version>
        <parquet.hadoop.bundle.version>1.8.1</parquet.hadoop.bundle.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.parquet</groupId>
                    <artifactId>parquet-column</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.parquet</groupId>
                    <artifactId>parquet-hadoop-bundle</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.parquet</groupId>
                    <artifactId>parquet-hadoop</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-3.4_2.12</artifactId>
            <version>${iceberg.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-extensions-3.4_2.12</artifactId>
            <version>${iceberg.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.antlr</groupId>
                    <artifactId>antlr4</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.antlr</groupId>
                    <artifactId>antlr4-runtime</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-column</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop</artifactId>
            <version>${parquet.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-hadoop-bundle</artifactId>
            <version>${parquet.hadoop.bundle.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

重写日志输出配置文件log4j2.properties,将指标日志输出至指标日志文件。spark的默认日志配置文件来自spark-core包,org.apache.spark.log4j2-defaults.properties

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console
logger.icebergMetric.appenderRef.file.ref = RollingFile
logger.icebergMetric.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %p %c{1}: %m%n%ex

appender.CUSTOM.type = RollingFile
appender.CUSTOM.name = RollingFile
appender.CUSTOM.fileName = logs/iceberg_metrics.log
appender.CUSTOM.filePattern = logs/iceberg_metrics.%d{yyyy-MM-dd}-%i.log.gz
appender.CUSTOM.layout.type = PatternLayout
appender.CUSTOM.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %-5p %c{1}:%L - %m%n
appender.CUSTOM.strategy.type = DefaultRolloverStrategy
appender.CUSTOM.strategy.delete.type = Delete
appender.CUSTOM.strategy.delete.basePath = logs
appender.CUSTOM.strategy.delete.0.type = IfFileName
appender.CUSTOM.strategy.delete.0.regex = iceberg_metrics.*.log.gz
appender.CUSTOM.strategy.delete.1.type = IfLastModified
appender.CUSTOM.strategy.delete.1.age = P15D
appender.CUSTOM.policy.type = TimeBasedTriggeringPolicy

# Settings to quiet third party logs that are too verbose
logger.jetty.name = org.sparkproject.jetty
logger.jetty.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.repl1.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.repl1.level = info
logger.repl2.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.repl2.level = info

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
# in SparkSQL with Hive support
logger.metastore.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.metastore.level = fatal
logger.hive_functionregistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.hive_functionregistry.level = error

# Parquet related logging
logger.parquet.name = org.apache.parquet.CorruptStatistics
logger.parquet.level = error
logger.parquet2.name = parquet.CorruptStatistics
logger.parquet2.level = error

# Custom logger for your application
logger.icebergMetric.name = org.apache.iceberg.metrics.LoggingMetricsReporter
logger.icebergMetric.level = Info
logger.icebergMetric.additivity = false

Java主类,主要为表配置指标输出类,才能进行指标输出。

package com.donny.demo;

import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.LoggingMetricsReporter;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.IOException;


/**
 * @author 1792998761@qq.com
 * @version 1.0
 */
public class IcebergSparkDemo {

    public static void main(String[] args) throws AnalysisException, IOException, InterruptedException {
        SparkSession spark = SparkSession
                .builder()
                .master("local")
                .appName("Iceberg spark example")
                .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
                .config("spark.sql.catalog.local.type", "hadoop") //指定catalog 类型
                .config("spark.sql.catalog.local.warehouse", "iceberg_warehouse")
                .getOrCreate();

        spark.sql("CREATE TABLE local.iceberg_db.table2( id bigint, data string, ts timestamp) USING iceberg PARTITIONED BY (day(ts))");
        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (1, 'a', cast(1727601585 as timestamp)),(2, 'b', cast(1724923185 as timestamp)),(3, 'c', cast(1724919585 as timestamp))");
        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1727605185 as timestamp)),(5, 'e', cast(1725963585 as timestamp)),(6, 'f', cast(1726827585 as timestamp))");
        spark.sql("DELETE FROM local.iceberg_db.table2  where id in (2)");

        org.apache.iceberg.Table table = Spark3Util.loadIcebergTable(spark, "local.iceberg_db.table2");
        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (4, 'd', cast(1724750385 as timestamp)),(5, 'e', cast(1724663985 as timestamp)),(6, 'f', cast(1727342385 as timestamp))");
        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (7, 'h', cast(1727601585 as timestamp)),(8, 'i', cast(1724923185 as timestamp)),(9, 'j', cast(1724836785 as timestamp))");
        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (10, 'k', cast(1727601585 as timestamp)),(11, 'l', cast(1724923185 as timestamp)),(12, 'm', cast(1724836785 as timestamp))");
        // 配置表的指标输出器
        table.updateProperties()
                .set("metrics.reporters", LoggingMetricsReporter.class.getName())
                .commit();
        // 主动表扫描
        TableScan tableScan =
                table.newScan();
        try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
        }

        spark.sql("INSERT INTO local.iceberg_db.table2 VALUES (30, 't', cast(1727605185 as timestamp)),(31, 'y', cast(1725963585 as timestamp)),(32, 'i', cast(1726827585 as timestamp))");

        Dataset<Row> result = spark.sql("SELECT * FROM local.iceberg_db.table2 where ts >= '2024-09-20'");
        result.show();
        spark.close();
    }
}

结果说明

目前验证的时候只发现是需要主动调用scan,输出的指标(主动输出指标)

2024-10-07 09:38:11.903 INFO  LoggingMetricsReporter:38 - Received metrics report: ScanReport{
tableName=local.iceberg_db.table2,
snapshotId=3288641599702333945,
filter=true,
schemaId=0,
projectedFieldIds=[1, 2, 3],
projectedFieldNames=[id, data, ts],
scanMetrics=ScanMetricsResult{
totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.294853952S, count=1}, 
resultDataFiles=CounterResult{unit=COUNT, value=0},
    resultDeleteFiles=CounterResult{unit=COUNT, value=0},
    totalDataManifests=CounterResult{unit=COUNT, value=6},
    totalDeleteManifests=CounterResult{unit=COUNT, value=0},
    scannedDataManifests=CounterResult{unit=COUNT, value=0},
    skippedDataManifests=CounterResult{unit=COUNT, value=0},
    totalFileSizeInBytes=CounterResult{unit=BYTES, value=0},
    totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0},
    skippedDataFiles=CounterResult{unit=COUNT, value=0},
    skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
    scannedDeleteManifests=CounterResult{unit=COUNT, value=0},
    skippedDeleteManifests=CounterResult{unit=COUNT, value=0},
    indexedDeleteFiles=CounterResult{unit=COUNT, value=0},
    equalityDeleteFiles=CounterResult{unit=COUNT, value=0},
    positionalDeleteFiles=CounterResult{unit=COUNT, value=0}},
metadata={
engine-version=3.4.2, 
iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086), 
app-id=local-1728265088818, 
engine-name=spark}}

删除语句触发scan指标,(被动指标输出)

2024-10-07 11:15:54.708 INFO  LoggingMetricsReporter:38 - Received metrics report: ScanReport{
tableName=local.iceberg_db.table2,
snapshotId=7181960343136679052,
filter=ref(name="id") == "(1-digit-int)",
schemaId=0,
projectedFieldIds=[1, 2, 3],
projectedFieldNames=[id, data, ts],
scanMetrics=ScanMetricsResult{
totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.098792497S, count=1},
resultDataFiles=CounterResult{unit=COUNT, value=1},
    resultDeleteFiles=CounterResult{unit=COUNT, value=0},
    totalDataManifests=CounterResult{unit=COUNT, value=2},
    totalDeleteManifests=CounterResult{unit=COUNT, value=0},
    scannedDataManifests=CounterResult{unit=COUNT, value=2},
    skippedDataManifests=CounterResult{unit=COUNT, value=0},
    totalFileSizeInBytes=CounterResult{unit=BYTES, value=898},
    totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0},
    skippedDataFiles=CounterResult{unit=COUNT, value=4},
    skippedDeleteFiles=CounterResult{unit=COUNT, value=0},
    scannedDeleteManifests=CounterResult{unit=COUNT, value=0},
    skippedDeleteManifests=CounterResult{unit=COUNT, value=0},
    indexedDeleteFiles=CounterResult{unit=COUNT, value=0},
    equalityDeleteFiles=CounterResult{unit=COUNT, value=0},
    positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, 
metadata={
engine-version=3.4.2, 
iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086), 
app-id=local-1728270940331, 
engine-name=spark}}

insert触发的commit指标,(被动指标输出)

2024-10-06 15:48:47 INFO  LoggingMetricsReporter:38 - Received metrics report: 
CommitReport{
tableName=local.iceberg_db.table2, 
snapshotId=3288641599702333945, 
sequenceNumber=6, 
operation=append, 
commitMetrics=CommitMetricsResult{
totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.430784537S, count=1}, 
attempts=CounterResult{unit=COUNT, value=1}, 
addedDataFiles=CounterResult{unit=COUNT, value=3}, 
removedDataFiles=null, 
totalDataFiles=CounterResult{unit=COUNT, value=14}, 
addedDeleteFiles=null,
        addedEqualityDeleteFiles=null,
        addedPositionalDeleteFiles=null,
        removedDeleteFiles=null,
        removedEqualityDeleteFiles=null,
        removedPositionalDeleteFiles=null,
        totalDeleteFiles=CounterResult{unit=COUNT, value=0}, addedRecords=CounterResult{unit=COUNT, value=3}, 
        removedRecords=null, 
        totalRecords=CounterResult{unit=COUNT, value=14}, 
        addedFilesSizeInBytes=CounterResult{unit=BYTES, value=2646}, 
        removedFilesSizeInBytes=null, 
        totalFilesSizeInBytes=CounterResult{unit=BYTES, value=12376}, 
        addedPositionalDeletes=null,
        removedPositionalDeletes=null, 
        totalPositionalDeletes=CounterResult{unit=COUNT, value=0}, 
        addedEqualityDeletes=null, 
        removedEqualityDeletes=null, 
        totalEqualityDeletes=CounterResult{unit=COUNT, value=0}}, 
    metadata={
        engine-version=3.4.2, 
        app-id=local-1728200916879, 
        engine-name=spark, 
        iceberg-version=Apache Iceberg 1.6.1 (commit 8e9d59d299be42b0bca9461457cd1e95dbaad086)}}

原文地址:https://blog.csdn.net/weixin_43820556/article/details/142737924

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!