JAR开发
最近更新时间: 2019-11-11 08:00:19
UDAF功能采用Flink自身UDF语法,详细说明参见https://ci.Apache.org/projects/Flink/Flink-docs-release-1.7/dev/table/UDFs.html
- 代码编写 UDAF功能需要继承基类AggregateFunction,并重写createAccumulator、getValue、accumulate、getRESultType四个方法(有些可选方法见官网)。
import org.Apache.Flink.table.functions.AggregateFunction
import Java.lang.Long
import Java.util.concurrent.atomic.AtomicLong
import org.Apache.Flink.api.common.typeinfo.{TypeInformation, TypES}
class UDAFTestScala/*占位符*/ extends AggregateFunction[Long, AtomicLong] {
//初始化count UDAF的accumulator
override def createAccumulator: AtomicLong = new AtomicLong(0L)
//getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法
override def getValue(accumulator: AtomicLong): Long = accumulator.get()
//accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator
def accumulate(accumulator: AtomicLong, iValue: Long): Unit = {
accumulator.addAndGet(iValue)
}
override def getRESultType: TypeInformation[Long] = TypES.LONG
}
- 代码打包
mvn clean package -DskipTests -Dcheckstyle.skip=true
- 页面配置 在资源管理功能中将打包后的streaming-flink-test-1.0-SANPSHOT.jar上传。