场景三:多源数据整合处理
最近更新时间: 2019-10-30 06:30:22
有时我们就会遇到多种数据源同时存在的情况,此时,通过不同的数据源,构造基于相同的 Schema 的 DataFrame ,进行汇总操作,可用合并操作,这里有一个例子来帮助您理解。 项目名:comprehensive 核心代码(具体代码,您可以点这里,自行下载):
// 使用case 定义类 Log
case class Log(id:String,info:String)
object Demo {
def main(args: Array[String]) {
val dirOut1 = args(1)
val dirOut2 = args(2)
val conf = new SparkConf().setAppName("RDDToDF")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
mix
import sqlContext.implicits._
val df1 = sc.parallelize(Array(("id1","info1"),("id2","info2"))).map(l => Log(l._1,l._2)).toDF
//查看 Schema 架构
df1.printSchema()
//df1 文件保存成 Parquet文件
df1.write.parquet(dirOut1)
val df2 = sc.parallelize(Array(("id3","info3"),("id4","info4"),("id5","info5"))).map(l =>Log(l._1,l._2)).toDF
df2.write.parquet(dirOut2)
//数据源1 进行加载
val data1 = sqlContext.read.parquet(dirOut1)
//数据源2 进行进行加载
val data2 = sqlContext.read.parquet(dirOut2)
//数据源进行整合
val data3 = data1.unionAll(data2)
//注册临时表
data3.registerTempTable("logs")
//查询执行
sqlContext.sql("select * from logs").collect.foreach(println)
sc.stop()
}
}
提交 job,命令如下: