JAR开发
最近更新时间: 2019-11-26 14:47:12
- 代码编写 ETL功能需要继承基类ETLFunction,并重写ETL和tableSchema两个方法。
trait ETLFunction extends Serializable {
/**
* 把row暴露给用户,进行自定义操作
*
* @param row 任何文本
*/
def ETL(row: Row): Seq[Array[Any]]
/**
* 定义ETL输出的Schema
* 使用JSONRowSchemaConverter.convert(tableScheme())解析
*/
def tableScheme(): String
}
举例ETLTestSimple为一个ETL简单实现:
- params为页面设置的自定义参数,以map形式输入,此处只简单输出。
- with Logging,代码中可直接使用log实例,日志会打印在TaskManager的log信息中。
- ETL函数,重载父类函数。每一条流消息调用一次本方法。本例中将第一列取出,并拼接一个固定列值。实际使用时以具体业务逻辑替换。
- tableSchema函数,重载父类函数。因处理逻辑不确定,所以返回Schema需算子自己定义。以标准JSON Schema返回。 注意:必须要和ETL函数的返回值对应上,不然后续转换过程和报错。
class ETLTestSimple (dac: AppConfig, params: Map[String, Any]) extends ETLFunction with Logging {
params.foreach(kv => {
log.info(kv._1 + " =>"+ kv._2)
})
override def ETL(row: Row): Seq[Array[Any]] = {
log.info("in: "+ row.toString)
val rESult: Array[Any] = Array(
row.getField(0),"fieldAddInETL"
)
log.info("out:"+ rESult.mkString(","))
Seq(rESult)
}
/**
* 定义ETL输出的Schema
*/
override def tableScheme(): String = {
val Schema = "{'type':'object','propertiES':{'id':{'type':'integer'},'name':{'type':'string'}}}"
Schema
}
}
- 代码打包 mvn clean package -DskipTests -Dcheckstyle.skip=true
- 页面配置
- 在资源管理功能中将打包后的【streaming-flink-test-1.0-SANPSHOT.jar】上传。
- 在作业开发界面选择依赖的JAR。
- 拖拽算子生成graph,并设置ETL Operator参数。其中类名称需为全路径,在执行时从依赖JAR包中反射注入。