与ETL结合方式

最近更新时间: 2019-11-11 08:08:56

本版本维表功能只支持在ETL算子中使用。本文提供MySQL版的维表实现案例,如需其他版本请用户自定义实现。

  • 代码编写 dac.getConfigByKey(MySQLUpdater.key)为从运行时环境获取缓存对象,其中MySQLUpdater.key为对应实现的key。
class ETLWithSide (dac: AppConfig, params: Map[String, Any]) extends ETLFunction with Logging {

/**
    * 把一个字符串解析成表(多行 多列)
    *
    * @param row 任何文本
    */
override def ETL(row: Row): Seq[Array[Any]] = {
//ALL 类型返回ALLCACHE
val cache1: Cache = dac.getConfigByKey(MySQLUpdater.key).get.asInstanceOf[AllCache]
//LRU和NONE 类型返回LRUCache
val cache2: Cache = dac.getConfigByKey(MySQLQueryer.key).get.asInstanceOf[LRUCache]

val result: Array[Any] = Array(
      row.getField(0),
cache1.get(Map("id"->row.getField(0))).getOrElse(Map()).asInstanceOf[Map[String,Any]].getOrElse("name",""),
cache2.get(Map("id"->row.getField(0))).getOrElse(Map()).asInstanceOf[Map[String,Any]].getOrElse("name","")
    )
println("ETLWithSide:"+ result.mkString(","))
Seq(result)
  }

/**
    * 定义ETL输出的Schema
    */
override def tableScheme(): String = {
val Schema = "{'type':'object','properties':{'id':{'type':'integer'},'nameFromAllCache':{'type':'string'},'nameFromLRUCache':{'type':'string'}}}"
Schema
  }
}
  • 代码打包 同ETL
  • 页面配置 其中数据源非必选参数,如使用外部数据源,可完全由自定义参数配置数据库连接。维表可设置多个。 如选择内部数据源,对应的参数对照表见【维表参数说明】