29、Spark写数据到Hudi时,同步hive表的一些坑
1.hudi的同步hive表没有comment
原以为hudi同步的hive表是根据数据写入的dataframe的schema创建的。就和spark write hive时类似,查看源码后发现不是。
1.1 hudi同步hive的模式
HMS , JDBC , HIVESQL。我这儿常用的是HMS和JDBC
各个同步模式对应的执行器:
1.2 schema生成
我们可以看到schema生成的代码块。先从提交的commit中获取元数据信息,没有的话则从数据文件中获取schema。两种方式获取到的schema都是没有comment信息的。
org.apache.hudi.common.table.TableSchemaResolver#getTableParquetSchema
/**
* Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
* commit. We will assume that the schema has not changed within a single atomic write.
*
* @return Parquet schema for this table
* @throws Exception
*/
private MessageType getTableParquetSchemaFromDataFile() throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
// If this is COW, get the last commit and read the schema from a file written in the
// last commit
HoodieInstant lastCommit =
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit "
+ lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :"
+ commitMetadata));
return readSchemaFromBaseFile(new Path(filePath));
case MERGE_ON_READ:
// If this is MOR, depending on whether the latest commit is a delta commit or
// compaction commit
// Get a datafile written and get the schema from that file
Option<HoodieInstant> lastCompactionCommit =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit);
Option<HoodieInstant> lastDeltaCommit;
if (lastCompactionCommit.isPresent()) {
lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant();
} else {
lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
}
LOG.info("Found the last delta commit " + lastDeltaCommit);
if (lastDeltaCommit.isPresent()) {
HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
// read from the log file wrote
commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
HoodieCommitMetadata.class);
Pair<String, HoodieFileFormat> filePathWithFormat =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
.filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
.map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
// No Log files in Delta-Commit. Check if there are any parquet files
return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
.filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
.findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() ->
new IllegalArgumentException("Could not find any data file written for commit "
+ lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath()
+ ", CommitMetadata :" + commitMetadata));
});
switch (filePathWithFormat.getRight()) {
case HOODIE_LOG:
return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft()));
case PARQUET:
return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft()));
default:
throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight()
+ " for file " + filePathWithFormat.getLeft());
}
} else {
return readSchemaFromLastCompaction(lastCompactionCommit);
}
default:
LOG.error("Unknown table type " + metaClient.getTableType());
throw new InvalidTableException(metaClient.getBasePath());
}
} catch (IOException e) {
throw new HoodieException("Failed to read data schema", e);
}
}
1.3建表DDL
获取到schema后,我们再看建表行为。
org.apache.hudi.hive.ddl.DDLExecutor#createTable 定义了这个接口建表方法。有两个实现类,一个是
org.apache.hudi.hive.ddl.HMSDDLExecutor。另一个是 org.apache.hudi.hive.ddl.QueryBasedDDLExecutor
首先,看org.apache.hudi.hive.ddl.HMSDDLExecutor#createTable方法:
ddl操作中使用的字段信息在HiveSchemaUtil.convertMapSchemaToHiveFieldSchema生成,可以直接在这个方法里看到字段的comment信息是直接写死为空字符串的。
再看,org.apache.hudi.hive.ddl.QueryBasedDDLExecutor#createTable方法。
方法里是通过HiveSchemaUtil.generateCreateDDL方法直接生成的ddl建表语句的。这个方法里generateSchemaString方法来生成字段信息的。在这个方法里,也是没有涉及comment信息的。
1.4结论
同步hive表是在 数据写入hudi目录后,根据目录里的schema来创建的hive表,所以创建的hive表没有带着dataframe的comment信息。需要手动执行修改字段comment。
2.追加comment
2.1.使用spark.sql的方式修改comment
用spark.sql()的方式执行 修改comment的sql语句,会调用hudi里的AlterHoodieTableChangeColumnCommand类。这个里面会比较schema,刷新sparksession里的catalog信息,会让任务hang住。(为什么hang住没去排查)大概操作就是写一个使用新的schema的空数据集到hudi来实现schema更新。
org.apache.spark.sql.hudi.command.AlterHoodieTableChangeColumnCommand。
2.2使用hive-sql的方式修改comment
用hive-jdbc的方式执行修改sql语句。这个方式不会更新hive表里的 TBLPROPERTIES 的 'spark.sql.sources.schema.part.0’信息。
使用dataframe的schame.tojson ,去修改 ‘spark.sql.sources.schema.part.0’ 信息
/**
* 将 dataframe 中的comment加到 hudi的hive表中
*
* @param df dataframe
* @param dbTable hive表
* @param spark spark session
*/
def addCommentForSyncHive(df: DataFrame, dbTable: String, spark: SparkSession, writeOptions: mutable.Map[String, String]): Unit = {
val comment: Map[String, String] = df.schema.map(sf => (sf.name, sf.getComment().getOrElse(""))).toMap
info(s"数据集的字段名->备注为:\n${comment.mkString("\n")}")
val jdbcUrlOption = writeOptions.get(DataSourceWriteOptions.HIVE_URL.key())
val jdbcUserOption = writeOptions.get(DataSourceWriteOptions.HIVE_USER.key())
val jdbcPassOption = writeOptions.get(DataSourceWriteOptions.HIVE_PASS.key())
assert(jdbcUrlOption.isDefined, s"${DataSourceWriteOptions.HIVE_URL.key()} 必须被指定")
val connection = DbUtil.createHiveConnection(
jdbcUrlOption.get, jdbcUserOption.getOrElse(""), jdbcPassOption.getOrElse("")
)
val stmt = connection.createStatement()
//需要手动更新hive表中的spark.sql.sources.schema.part.0信息
stmt.execute(s"ALTER TABLE $dbTable SET TBLPROPERTIES ('spark.sql.sources.schema.part.0' = '${df.schema.json}')")
// 获取表字段和类型
val tableSchema = spark.sql(s"DESCRIBE $dbTable")
.select("col_name", "data_type")
.collect()
.map(row => (row.getString(0), row.getString(1)))
tableSchema.foreach { case (column, dataType) =>
if (comment.contains(column) && !Seq("ym", "ymd").contains(column)) {
val newComment = comment.getOrElse(column, "")
val sql = s"""ALTER TABLE $dbTable CHANGE COLUMN $column $column $dataType COMMENT '$newComment'"""
info(s"添加备注执行sql:$sql")
try {
stmt.execute(sql)
} catch {
case e:Throwable =>
warn("添加备注sql执行失败")
}
}
}
stmt.close()
connection.close()
}
修改’spark.sql.sources.schema.part.0’时,因为schema带有备注,会很长,导致超过hive表元数据mysql表字段的长度限制。去mysql中修改这个长度限制(table_params表PARAM_VALUE字段)。
原文地址:https://blog.csdn.net/weixin_42960808/article/details/145110400
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!