Custom Operator

最近更新时间: 2019-11-26 14:47:12

Custom Operator功能提供一个自定义的全新算子,可以充当ETL,也可以充当source或者sink。需要自己维护TypeInformation并通过flatMap函数实现业务逻辑,Custom Operator只支持JAR开发模式。

  • 代码编写 ETL功能需要继承基类tableOperateProcESsor,并重写innerbuild函数。举例CustomTestSimple为一个Custom Operator简单实现:

  • 构造器中定义了四个参数: name:本算子的name,通常设置为在jobGraph中显示名称。 childs:子算子集合 configs: Map格式参数集合 sm: StreamingMate可获取环境变量、维表等全局参数

  • with Logging,代码中可直接使用log实例,日志会打印在TaskManager的log信息中。

  • innerBuild函数,重载父类函数。将dataStream直接暴露给用户,本例中不做处理直接转给下一个算子。实际使用时以具体业务逻辑替换。

  • dataStream.dataType.asInstanceOf[RowTypeInfo]获取输入流的Schema 注意:必须要隐式声明implicit val tpe: TypeInformation[Row],供下游算子使用

class CustomTestSimple(name: String, childs: mutable.MutableList[ProcESsor[DataStream[Row]]], configs: Map[Any, Any], sm: StreamingMate)
extends tableOperateProcESsor(name, childs, configs, sm) {

override def innerBuild(dataStream1: DataStream[Row],dataStream2: DataStream[Row]): DataStream[Row] = {
    configs.foreach(kv => {
println(kv._1 + "   =>"+ kv._2)
    })

val rowType = dataStream1.dataType.asInstanceOf[RowTypeInfo]
implicit val tpe: TypeInformation[Row] = rowType

val finalStream = dataStream1.flatMap[Row](new MapMapper(rowType)).setParallelism(1)
    finalStream
  }
}
class MapMapper(rowTypeInfo: RowTypeInfo) extends RichFlatMapFunction[Row, Row] {
override def flatMap(row: Row, collector: Collector[Row]): Unit = {
println("CustomTestSimple:"+ row)
    collector.collect(row)
  }
}
  • 代码打包 同ETL
  • 页面配置 同ETL