租户端 大数据 托管Hadoop 最佳实践 场景三:多源数据整合处理

场景三:多源数据整合处理

最近更新时间: 2019-10-30 06:30:22

有时我们就会遇到多种数据源同时存在的情况,此时,通过不同的数据源,构造基于相同的 Schema 的 DataFrame ,进行汇总操作,可用合并操作,这里有一个例子来帮助您理解。 项目名:comprehensive 核心代码(具体代码,您可以点这里,自行下载):


// 使用case 定义类 Log
case class Log(id:String,info:String)

object Demo  {
def main(args: Array[String]) {

val dirOut1 = args(1)
val dirOut2 = args(2)
val conf = new SparkConf().setAppName("RDDToDF")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
mix
import sqlContext.implicits._
val df1 = sc.parallelize(Array(("id1","info1"),("id2","info2"))).map(l => Log(l._1,l._2)).toDF

//查看 Schema 架构
df1.printSchema()

//df1 文件保存成 Parquet文件
df1.write.parquet(dirOut1)

val df2 = sc.parallelize(Array(("id3","info3"),("id4","info4"),("id5","info5"))).map(l =>Log(l._1,l._2)).toDF
df2.write.parquet(dirOut2)

//数据源1 进行加载
val data1 = sqlContext.read.parquet(dirOut1)
//数据源2 进行进行加载
val data2 = sqlContext.read.parquet(dirOut2)
//数据源进行整合
val data3 = data1.unionAll(data2)
//注册临时表
data3.registerTempTable("logs")
//查询执行
sqlContext.sql("select * from logs").collect.foreach(println)

sc.stop()
}
}       

提交 job,命令如下:

sudo -u spark spark-submit --class com.托管Hadoop.comprehensive.Demo --master yarn-client /home/spark/comprehensive-1.0-SNAPSHOT.jar "hdfs:///user/spark/out/log1.parqute" "hdfs:///user/spark/out/log2.parqute"