Custom Operator
最近更新时间: 2019-11-26 14:47:12
Custom Operator功能提供一个自定义的全新算子,可以充当ETL,也可以充当source或者sink。需要自己维护TypeInformation并通过flatMap函数实现业务逻辑,Custom Operator只支持JAR开发模式。
代码编写 ETL功能需要继承基类tableOperateProcESsor,并重写innerbuild函数。举例CustomTestSimple为一个Custom Operator简单实现:
构造器中定义了四个参数: name:本算子的name,通常设置为在jobGraph中显示名称。 childs:子算子集合 configs: Map格式参数集合 sm: StreamingMate可获取环境变量、维表等全局参数
with Logging,代码中可直接使用log实例,日志会打印在TaskManager的log信息中。
innerBuild函数,重载父类函数。将dataStream直接暴露给用户,本例中不做处理直接转给下一个算子。实际使用时以具体业务逻辑替换。
dataStream.dataType.asInstanceOf[RowTypeInfo]获取输入流的Schema 注意:必须要隐式声明implicit val tpe: TypeInformation[Row],供下游算子使用
class CustomTestSimple(name: String, childs: mutable.MutableList[ProcESsor[DataStream[Row]]], configs: Map[Any, Any], sm: StreamingMate)
extends tableOperateProcESsor(name, childs, configs, sm) {
override def innerBuild(dataStream1: DataStream[Row],dataStream2: DataStream[Row]): DataStream[Row] = {
configs.foreach(kv => {
println(kv._1 + " =>"+ kv._2)
})
val rowType = dataStream1.dataType.asInstanceOf[RowTypeInfo]
implicit val tpe: TypeInformation[Row] = rowType
val finalStream = dataStream1.flatMap[Row](new MapMapper(rowType)).setParallelism(1)
finalStream
}
}
class MapMapper(rowTypeInfo: RowTypeInfo) extends RichFlatMapFunction[Row, Row] {
override def flatMap(row: Row, collector: Collector[Row]): Unit = {
println("CustomTestSimple:"+ row)
collector.collect(row)
}
}
- 代码打包 同ETL
- 页面配置 同ETL