场景一:结构化数据处理
最近更新时间: 2019-10-30 06:18:08
这里分别以 JSON 文件和 Hive 表为例 JSON文件操作( people.json --> RDD --> DataFrame -->注册临时表 --> SQL查询) 项目名:JSONFile 项目核心代码(具体代码,您可以点这里,自行下载):
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object Demo {
def main(args: Array[String]) {
val dirIn = args(0) //数据输入PATH
val conf = new SparkConf().setAppName("JSON")
// SparkContext 是程序和集群的唯一通道
val sc = new SparkContext(conf)
// 通过SparkContext 创建SQLContext
val sqlContext = new SQLContext(sc)
val people = sqlContext.read.json(dirIn)
people.printSchema()
//注册DataFrame作为一个临时表
people.registerTempTable("jsonTable")
//使用SQL 语句操作
val teenagers = sqlContext.sql("select name from jsonTable where age >=13 and age <= 19")
teenagers.map(t => "Name:" + t(0)).collect.foreach(println)
// val anotherRDD = sc.parallelize("""{"name":"spark","address":{"city":"USA","avenue":"SEVEN"}}""" :: Nil)
// val anotherPeople = sqlContext.read.json(anotherRDD)
sc.stop()
}
}
提交 job,同上RDDToDataFrame submit 过程
sudo -u spark spark-submit --class com.托管Hadoop.JSON.Demo --master yarn-client /home/spark/JsonFile-1.0-SNAPSHOT.jar "file:///usr/hdp/2.4.0.0-169/spark/examples/src/main/resources/people.json"
Hive 表操作 项目名:SparkSQL_Hive INPUT:/usr/hdp/2.4.0.0-169/spark/examples/src/main/resources/kv1.txt 格式为:string + 空格 + string (一行) 核心代码(具体代码,您可以点这里,自行下载):
object Demo {
def main(args: Array[String]) {
val dirIn = args(0)
val conf = new SparkConf().setAppName("RDDToDF")
val sc = new SparkContext(conf)
//通过sc创建HiveContext的实例hiveContext
val hiveContext = new HiveContext(sc)
//通过HiveContext的sql命令创建表
hiveContext.sql("create table if not exists src (key int,value string)")
//加载数据
hiveContext.sql("load data local inpath '"+ dirIn +"' into table src")
//HiveQL 的查询表达
hiveContext.sql("from src select key,value").collect.foreach(println)
sc.stop()
}
}
提交 job ,命令如下: