Kafka 和 Spark 来实现流式处理
最近更新时间: 2019-10-30 06:10:35
项目名:KafkaWordCount 输入源:伪造的数据(KafkaWordCountProducer) 模拟 Kafka 过程: Xshell 远程登录 MASTER 主机 用 ssh 切换到任意一台核心节的主机上( CORE 节点),具体步骤如下 #查看 CORE 的主机名 #cat /etc/hosts 172.31.. 托管Hadoop-core-1-001.ksc.com 托管Hadoop-core-1-001 172.31.. 托管Hadoop-core-1-002.ksc.com 托管Hadoop-core-1-002 172.31.. 托管Hadoop-core-1-003.ksc.com 托管Hadoop-core-1-003 172.31.. 托管Hadoop-master-1-001.ksc.com 托管Hadoop-master-1-001 172.31.. 托管Hadoop-master-2-001.ksc.com 托管Hadoop-master-2-001
ssh 进入任意 CORE 节点
#ssh 托管Hadoop-core-1-001.ksc.com #查看 kafka 所在目录 #ps aux | grep kafka (producer)发送消息,具体执行过程如下: #切入 kafka 工作主目录
cd /usr/hdp/2.4.0.0-169/kafka/bin
#创建 topic #./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #检验 topic 创建是否成功(如果正常返回 test) #./kafka-topics.sh --list --zookeeper localhost:2181 #打开 producer,发送消息 #./kafka-console-producer.sh --broker-list 托管Hadoop-core-1-001.ksc.com:6667 --topic test #####启动成功后,输入以下内容测试 hdfs hdfs spark spark spark storm streaming DF DStream (consumer)接受消息,具体执行如下过程: #切入kafka工作主目录
cd /usr/hdp/2.4.0.0-169/kafka/bin
#保持 producer 端不动,另起,一个 shell 窗口登入当前 CORE 节点(托管Hadoop-core-1-001.ksc.com) #./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #####启动成功后,如果一切正常将会显示 producer 端输入的内容 源码路径: /usr/hdp/2.4.0.0-169/spark/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala 提交 job : 停止运行刚才的 kafka-console-producer 和 kafka-console-consumer 运行 KafkaWordCountProducer #切换到 MASTER 主机, spark 工作目录下 #cd /usr/hdp/2.4.0.0-169/spark/bin/
./run-example org.apache.spark.examples.streaming.KafkaWordCountProducer 托管Hadoop-core-1-001.ksc.com:6667 test 3 5
运行 KafkaWordCount #保持上一步骤的producer端口,另起,一个 shell 窗口连接 MASTER #切换到spark工作目录下 #cd /usr/hdp/2.4.0.0-169/spark/bin/
sudo -u spark ./run-example org.apache.spark.examples.streaming.KafkaWordCount 托管Hadoop-core-1-001.ksc.com:2181 test-consumer-group test 1
参数解释: KafkaWordCountProducer KafkaWordCountProducer 托管Hadoop-core-1-001.ksc.com:6667 表示 producer 的地址和端口 test 表示 topic 3 表示每秒发多少条消息 5 表示每条消息中有几个单词 KafkaWordCount 托管Hadoop-core-1-001.ksc.com:2181 表示 zookeeper 的监听地址 test-consumer-group 表示 consumer-group 的名称,必须和 $KAFKA_HOME/config/consumer.properties 中的 group.id 的配置内容一致 test 表示 topic 1 表示线程数。 到此您已经基本掌握了在托管HADOOP上使用 Spark 。