JAR开发

最近更新时间: 2019-11-26 14:47:12

  • 代码编写 ETL功能需要继承基类ETLFunction,并重写ETL和tableSchema两个方法。
trait ETLFunction extends Serializable {
/**
    * 把row暴露给用户,进行自定义操作
    *
    * @param row 任何文本
    */
def ETL(row: Row): Seq[Array[Any]]

/**
    * 定义ETL输出的Schema
    * 使用JSONRowSchemaConverter.convert(tableScheme())解析
    */
def tableScheme(): String

}

举例ETLTestSimple为一个ETL简单实现:

  1. params为页面设置的自定义参数,以map形式输入,此处只简单输出。
  2. with Logging,代码中可直接使用log实例,日志会打印在TaskManager的log信息中。
  3. ETL函数,重载父类函数。每一条流消息调用一次本方法。本例中将第一列取出,并拼接一个固定列值。实际使用时以具体业务逻辑替换。
  4. tableSchema函数,重载父类函数。因处理逻辑不确定,所以返回Schema需算子自己定义。以标准JSON Schema返回。 注意:必须要和ETL函数的返回值对应上,不然后续转换过程和报错。
class ETLTestSimple (dac: AppConfig, params: Map[String, Any]) extends ETLFunction with Logging {
  params.foreach(kv => {
    log.info(kv._1 + "   =>"+ kv._2)
  })
override def ETL(row: Row): Seq[Array[Any]] = {
    log.info("in: "+ row.toString)
val rESult: Array[Any] = Array(
      row.getField(0),"fieldAddInETL"
)
    log.info("out:"+ rESult.mkString(","))
Seq(rESult)
  }

/**
    * 定义ETL输出的Schema
    */
override def tableScheme(): String = {
val Schema = "{'type':'object','propertiES':{'id':{'type':'integer'},'name':{'type':'string'}}}"
Schema
  }
}
  • 代码打包 mvn clean package -DskipTests -Dcheckstyle.skip=true
  • 页面配置
  1. 在资源管理功能中将打包后的【streaming-flink-test-1.0-SANPSHOT.jar】上传。
  2. 在作业开发界面选择依赖的JAR。
  3. 拖拽算子生成graph,并设置ETL Operator参数。其中类名称需为全路径,在执行时从依赖JAR包中反射注入。