本文共 2247 字,大约阅读时间需要 7 分钟。
DStreams的输出操作,即将DStreams输出到对应的目的地。输出操作包括:print、saveAsTextFiles、saveAsObjectFiles、saveAsHadoopFiles、foreachRDD。本例将使用foreachRDD把数据输出到外部mysql数据库。
【参考:】
(1)开启数据库
mysql -h192.168.31.3 -uroot -p
(2) 创建数据库
create database spark;
(3)创建表
use spark;create table wordcount(word varchar(50) default null,wordcount int(10) default null);
(1)依赖
mysql mysql-connector-java 5.1.27
(2)代码
package Sparkimport java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * 使用spark streaming完成有状态统计,并且将结果写入到mysql数据库中 * */object ForeachRDDApp { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789) val results=lines.flatMap( _.split(" ")) .map((_,1)).reduceByKey(_+_) //TODO... 将结果写入到MYSQL //参考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html results.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords =>{ //partitionOfRecords是整个分区的数据 val connection = createConnection() partitionOfRecords.foreach(record =>{ //record这个record才是每一条数据 val sql=" insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(sql) }) connection.close() } } } ssc.start() ssc.awaitTermination() } /** * 获取mysql连接 * @return */ def createConnection()={ Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://bigdata.ibeifeng.com:3306/spark","root","123456") }}
aa bb cc dd ee dd aa bb cc dd ee dd aa bb cc dd ee dd
mysql> select * from wordcount; +------+-----------+ | word | wordcount | +------+-----------+ | ee | 3 | | aa | 3 | | dd | 6 | | bb | 3 | | cc | 3 | +------+-----------+ 5 rows in set (0.00 sec)
转载地址:http://gvygi.baihongyu.com/