自学内容网 自学内容网

SeaTunnel 本地部署

SeaTunnel简介:Apache SeaTunnel 介绍-CSDN博客

部署

准备工作

在开始本地运行前,您需要确保您已经安装了SeaTunnel所需要的以下软件:

  • 安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置 JAVA_HOME

下载 SeaTunnel 发行包

下载二进制包

进入SeaTunnel下载页面下载最新版本的二进制安装包seatunnel-<version>-bin.tar.gz

或者您也可以通过终端下载:

export version="2.3.8"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

下载连接器插件

从2.2.0-beta版本开始,二进制包不再默认提供连接器依赖,因此在第一次使用时,您需要执行以下命令来安装连接器:(当然,您也可以从 Apache Maven Repository 手动下载连接器,然后将其移动至connectors/目录下,如果是2.3.5之前则需要放入connectors/seatunnel目录下)。

sh bin/install-plugin.sh

如果您需要指定的连接器版本,以2.3.7为例,您需要执行如下命令:

sh bin/install-plugin.sh 2.3.8

通常情况下,你不需要所有的连接器插件。你可以通过配置config/plugin_config来指定所需的插件。例如,如果你想让示例应用程序正常工作,你将需要connector-consoleconnector-fake插件。你可以修改plugin_config配置文件,如下所示:

--seatunnel-connectors--
connector-fake
connector-console
--end--

您可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称。

提示

如果您想通过手动下载连接器的方式来安装连接器插件,则需要下载您所需要的连接器插件即可,并将它们放在${SEATUNNEL_HOME}/connectors/目录下。

从源码构建SeaTunnel

下载源码

从源码构建SeaTunnel。下载源码的方式与下载二进制包的方式相同。 您可以从下载页面下载源码,或者从GitHub仓库克隆源码。

构建源码

cd seatunnel
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
# 获取构建好的二进制包
cp seatunnel-dist/target/apache-seatunnel-2.3.8-bin.tar.gz /The-Path-You-Want-To-Copy

cd /The-Path-You-Want-To-Copy
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"

当从源码构建时,所有的连接器插件和一些必要的依赖(例如:mysql驱动)都包含在二进制包中。您可以直接使用连接器插件,而无需单独安装它们。

SeaTunnel 引擎快速开始

步骤 1: 部署SeaTunnel及连接器

在开始前,请确保您已经按照安装中的描述下载并部署了SeaTunnel。

步骤 2: 添加作业配置文件来定义作业

编辑config/v2.batch.config.template,它决定了当seatunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    source_table_name = "fake"
    result_table_name = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Console {
    source_table_name = "fake1"
  }
}

关于配置的更多信息请查看配置的基本概念

步骤 3: 运行SeaTunnel应用程序

您可以通过以下命令启动应用程序:

提示

从2.3.1版本开始,seatunnel.sh中的-e参数被废弃,请改用-m参数。

cd "apache-seatunnel-${version}"
./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local

查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。

SeaTunnel控制台将会打印一些如下日志信息:

2022-12-19 11:01:45,417 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1:  SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO  org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438

扩展示例:从 MySQL 到 Doris 批处理模式

步骤1:下载连接器

首先,您需要在${SEATUNNEL_HOME}/config/plugin_config文件中加入连接器名称,然后,执行命令来安装连接器(当然,您也可以从 Apache Maven Repository 手动下载连接器,然后将其移动至connectors/目录下),最后,确认连接器connector-jdbcconnector-doris${SEATUNNEL_HOME}/connectors/目录下即可。

# 配置连接器名称
--seatunnel-connectors--
connector-jdbc
connector-doris
--end--

# 安装连接器
sh bin/install-plugin.sh

步骤2:放入 MySQL 驱动

您需要下载 jdbc driver jar package 驱动,并放置在 ${SEATUNNEL_HOME}/lib/目录下

步骤3:添加作业配置文件来定义作业

cd seatunnel/job/

vim st.conf

env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://localhost:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "pwd"
        table_path = "test.table_name"
        query = "select  * from test.table_name"
    }
}

sink {
   Doris {
          fenodes = "doris_ip:8030"
          username = "user"
          password = "pwd"
          database = "test_db"
          table = "table_name"
          sink.enable-2pc = "true"
          sink.label-prefix = "test-cdc"
          doris.config = {
            format = "json"
            read_json_by_line="true"
          }
      }
}

关于配置的更多信息请查看配置的基本概念

步骤 4: 运行SeaTunnel应用程序

您可以通过以下命令启动应用程序:

cd seatunnel/
./bin/seatunnel.sh --config ./job/st.conf -m local

查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。

SeaTunnel控制台将会打印一些如下日志信息:

***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2024-08-13 10:21:49
End Time                  : 2024-08-13 10:21:53
Total Time(s)             :                   4
Total Read Count          :                1000
Total Write Count         :                1000
Total Failed Count        :                   0
***********************************************

Flink 引擎快速开始

步骤 1: 部署SeaTunnel及连接器

在开始前,请确保您已经按照部署中的描述下载并部署了SeaTunnel。

Flink安装可参考:Flink入门-CSDN博客

配置SeaTunnel: 修改config/seatunnel-env.sh中的设置,将FLINK_HOME配置设置为Flink的部署目录。

步骤 3: 添加作业配置文件来定义作业

编辑config/v2.streaming.conf.template,它决定了SeaTunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    source_table_name = "fake"
    result_table_name = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Console {
    source_table_name = "fake1"
  }
}

关于配置的更多信息请查看配置的基本概念

步骤 4: 运行SeaTunnel应用程序

您可以通过以下命令启动应用程序:

Flink版本1.12.x1.14.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template

Flink版本1.15.x1.18.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template

查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。

SeaTunnel控制台将会打印一些如下日志信息:

fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144

Spark 引擎快速开始

步骤 1: 部署SeaTunnel及连接器

在开始前,请确保您已经按照部署中的描述下载并部署了SeaTunnel。

步骤 2: 部署并配置Spark

请先下载Spark(需要版本 >= 2.4.0)。 更多信息您可以查看入门: Standalone模式

配置SeaTunnel: 修改config/seatunnel-env.sh中的设置,它是基于你的引擎在部署时的安装路径。 将SPARK_HOME修改为Spark的部署目录。

步骤 3: 添加作业配置文件来定义作业

编辑config/v2.streaming.conf.template,它决定了当SeaTunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。

env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }
}

transform {
  FieldMapper {
    source_table_name = "fake"
    result_table_name = "fake1"
    field_mapper = {
      age = age
      name = new_name
    }
  }
}

sink {
  Console {
    source_table_name = "fake1"
  }
}

关于配置的更多信息请查看配置的基本概念

步骤 4: 运行SeaTunnel应用程序

您可以通过以下命令启动应用程序:

Spark 2.4.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

Spark 3.x.x

cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-3-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template

查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。

SeaTunnel控制台将会打印一些如下日志信息:

fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144


原文地址:https://blog.csdn.net/qq_36070104/article/details/142914212

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!