场景一:结构化数据处理

最近更新时间: 2019-10-30 06:18:08

这里分别以 JSON 文件和 Hive 表为例 JSON文件操作( people.json --> RDD --> DataFrame -->注册临时表 --> SQL查询) 项目名:JSONFile 项目核心代码(具体代码,您可以点这里,自行下载):

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Demo {
    def main(args: Array[String]) {
      val dirIn = args(0) //数据输入PATH

      val conf = new SparkConf().setAppName("JSON")
      // SparkContext 是程序和集群的唯一通道
      val sc = new SparkContext(conf)
      // 通过SparkContext 创建SQLContext
      val sqlContext = new SQLContext(sc)

      val people = sqlContext.read.json(dirIn)

      people.printSchema()

      //注册DataFrame作为一个临时表
      people.registerTempTable("jsonTable")

      //使用SQL 语句操作
    val teenagers = sqlContext.sql("select name from jsonTable where age >=13 and age <= 19")
    teenagers.map(t => "Name:" + t(0)).collect.foreach(println)

    // val anotherRDD = sc.parallelize("""{"name":"spark","address":{"city":"USA","avenue":"SEVEN"}}""" :: Nil)
    // val anotherPeople = sqlContext.read.json(anotherRDD)
    sc.stop()
  }
}

提交 job,同上RDDToDataFrame submit 过程

sudo -u spark spark-submit --class com.托管Hadoop.JSON.Demo --master yarn-client /home/spark/JsonFile-1.0-SNAPSHOT.jar "file:///usr/hdp/2.4.0.0-169/spark/examples/src/main/resources/people.json"

Hive 表操作 项目名:SparkSQL_Hive INPUT:/usr/hdp/2.4.0.0-169/spark/examples/src/main/resources/kv1.txt 格式为:string + 空格 + string (一行) 核心代码(具体代码,您可以点这里,自行下载):

 object Demo  {
        def main(args: Array[String]) {
            val dirIn = args(0)
            val conf = new SparkConf().setAppName("RDDToDF")
            val sc = new SparkContext(conf)

            //通过sc创建HiveContext的实例hiveContext
            val hiveContext = new HiveContext(sc)
           //通过HiveContext的sql命令创建表
           hiveContext.sql("create table if not exists src (key int,value string)")
           //加载数据
           hiveContext.sql("load data local inpath '"+ dirIn +"' into table src")
           //HiveQL 的查询表达
           hiveContext.sql("from src select key,value").collect.foreach(println)
           sc.stop()
       }
    }

提交 job ,命令如下:

sudo -u spark spark-submit --class com.托管Hadoop.sparkHive.Demo --master yarn-client /home/spark/sparkSQL_Hive-1.0-SNAPSHOT.jar "file:///usr/hdp/2.4.0.0-169/spark/examples/src/main/resources/kv1.txt"