自学内容网 自学内容网

flink kafka sink (scala)

将对象数据通过Gson 转为jsonString,在将数据写到kafka中,这个可以根据需要修改,比如按照\t分开也行,可以节省字段名称的空间。

这里还有一个问题,就是每来一条数据都需要new Gson 对象,有没有办法减少创建呢

我们知道job 和task之间是不能够传输序列化的对象的。

那么如果需要减少Gson的创建,可以自定义map函数,继承并实现RichMapFunction中的方法,其中open就可以只创建一次Gson。

data.map(new Gson().toJson(_))
.addSink(new FlinkKafkaProducer[String]("topicName", new SimpleStringSchema(), props, Optional.ofNullable[FlinkKafkaPartitioner[String]](null)))
.uid("write-to-kafka")
.name("write-to-kafka")

自定义map:

private class DemoMap extends RichMapFunction[Data, String] {
var gson:Gson=_
override def open(parameters: Configuration): Unit = {
gson=new Gson()
}

override def map(value: Data): String = {
gson.toJson(value)
}

override def close(): Unit = {
}
}


原文地址:https://blog.csdn.net/m0_65850671/article/details/142357849

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!