功能说明

最近更新时间: 2019-11-26 14:47:12

当前Flink版本只支持流式数据源,当流数据需要关联外部数据库(如MySQL、Oracle、Redis等),需要采用维表的形式支持。如下图所示,在ETL操作中,每一条消息会lookup一下外部数据库。 数据库的维表查询请求,有大量相同 key 的重复请求。如何减少重复请求?本地缓存是常用的方案。本方案目前提供两种缓存方案:LRU 和 ALL。 cache = ALL(默认): 全量内存缓存 cacheTTLMs:缓存的过期时间(ms) cache = LRU: LRU内存缓存 cachESize: 缓存的条目数量 cacheTTLMs:缓存的过期时间(ms)

  • All缓存 Async 和 LRU-Cache 能极大提高吞吐率并降低数据库的读压力,但是仍然会有大量的 IO 请求存在,尤其是当 miss key(维表中不存在的 key)很多的时候。如果维表数据不大(通常百万级以内),那么其实可以将整个维表缓存到本地。那么 miss key 永远不会去请求数据库。因为本地缓存就是维表的镜像,缓存中不存在那么远程数据库中也不存在。 ALL cache 可以通过 cache='ALL'参数开启,通过cacheTTLMs控制缓存的刷新间隔。会为 updater 节点起一个异步线程去同步缓存。在 Job 刚启动时,会先阻塞主数据流,直到缓存数据加载完毕,保证主数据流流过时缓存就已经 ready。在之后的更新缓存的过程中,不会阻塞主数据流。因为异步更新线程会将维表数据加载到临时缓存中,加载完毕后再与主缓存做原子替换。只有替换操作是加了锁的。 因为几乎没有 IO 操作,所以使用 cache ALL 的维表性能可以非常高。但是由于内存需要能同时容纳下两份维表拷贝,因此需要加大内存的配置。 全量内存缓存模式,用户需要实现一个updater,用以定期全量刷新数据;getKey用以生成关联缓存的key。由框架来管理缓存cacheTTLMs到期和updater调用。 Datasource.select()为具体的查询操作。
class MySQLUpdater(config: Map[String, Any])
extends AppConfigUpdater(config) with Logging {
val MySQLOptions = Map("url"-> config("url").toString,
"drivername"-> config("drivername").toString,
"username"-> config("username").toString,
"password"-> config("password").toString)

val dbKey = MySQLOptions.get("url").toString
val SQL= config("SQL").toString
val rowkeyFields = config("rowkeyFields").toString.split(",")

    Datasource.initIfNeeded(dbKey, MySQLOptions)

override def update(): Map[String, Any] = {
      Datasource.select(dbKey, SQL, rowkeyFields)
    }

override def getKey(params: Map[String, Any]): String = {
      Datasource.getKey(params)
    }
  }

object MySQLUpdater {
val key = "MySQLUpdater"
}
  • LRU缓存 通过 cache='LRU'参数可以开启 LRU 缓存优化,会为每个slot节点创建一个 LRU 本地缓存Map。当每个数据进来的时候,先去缓存中查询,如果存在则直接关联输出,减少了一次 IO 请求。如果不存在,再发起数据库查询请求(异步或同步方式,本系统默认全部采用异步方式处理),请求返回的结果会先存入缓存中以备下次查询。 为了防止缓存无限制增长,所以使用的是 LRU 缓存,并且可以通过 cachESize 调整缓存的大小。为了定期更新维表数据,可以通过 cacheTTLMs 调整缓存的失效时间。cacheTTLMs 是作用于每条缓存数据上的,也就是某条缓存数据在指定 timeout 时间内没有被访问,则会从缓存中移除。 在使用 LRU 缓存时,如果存在大量的 invalid key ,或者数据库中不存在的 key。由于命中不了缓存,导致缓存的收益较低,仍然会有大量请求打到数据库。因此我们将未命中的 key 也加进了缓存,提高了未命中 key 和 invalid key 情况下的缓存命中率。 LRU内存缓存,用户需要实现一个updater,用以单条刷新数据,每一次查询都会尝试调用updater方法。由框架来管理缓存cacheTTLMs到期,查询是否命中及updater调用。 Datasource.select()为具体的查询操作。
class MySQLQueryer(config: Map[String, Any])
extends AppConfigQueryer(config) with Logging {
val MySQLOptions = Map("url"-> config("url").toString,
"drivername"-> config("drivername").toString,
"username"-> config("username").toString,
"password"-> config("password").toString)

val dbKey = MySQLOptions.get("url").toString
val SQL = config("SQL").toString
val rowkeyFields = config.get("rowkeyFields").getOrElse("").toString.split(",")

  Datasource.initIfNeeded(dbKey, MySQLOptions)

override def query(params: collection.Map[String, Any]): Any = {
    Datasource.select(dbKey, SQL, params)
  }

override def getKey(params: collection.Map[String, Any]): String = {
    Datasource.getKey(params)
  }
}
object MySQLQueryer {
val key = "MySQLQueryer"
}
  • 执行流程图如下: