博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(7):实例-wordcount统计结果写入到MySQL
阅读量:4280 次
发布时间:2019-05-27

本文共 2247 字,大约阅读时间需要 7 分钟。

一、功能概述

DStreams的输出操作,即将DStreams输出到对应的目的地。输出操作包括:print、saveAsTextFiles、saveAsObjectFiles、saveAsHadoopFiles、foreachRDD。本例将使用foreachRDD把数据输出到外部mysql数据库。

【参考:】

二、功能实现

1.前提工作:在mysql数据库中创建表

(1)开启数据库

mysql -h192.168.31.3 -uroot -p 

(2) 创建数据库

create database spark;

(3)创建表

use spark;create table wordcount(word varchar(50) default null,wordcount int(10) default null);

 

2.scala代码

(1)依赖

mysql
mysql-connector-java
5.1.27

(2)代码

package Sparkimport java.sql.DriverManagerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * 使用spark streaming完成有状态统计,并且将结果写入到mysql数据库中  *  */object ForeachRDDApp {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]")    val ssc=new StreamingContext(sparkConf,Seconds(5))    val lines=ssc.socketTextStream("bigdata.ibeifeng.com",6789)    val results=lines.flatMap( _.split(" "))      .map((_,1)).reduceByKey(_+_)    //TODO... 将结果写入到MYSQL    //参考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html    results.foreachRDD { rdd =>      rdd.foreachPartition { partitionOfRecords =>{   //partitionOfRecords是整个分区的数据                  val connection = createConnection()          partitionOfRecords.foreach(record =>{     //record这个record才是每一条数据            val sql=" insert into wordcount(word, wordcount) values('" + record._1 + "'," + record._2 + ")"            connection.createStatement().execute(sql)          })          connection.close()      }      }    }    ssc.start()    ssc.awaitTermination()  }  /**    * 获取mysql连接    * @return    */  def createConnection()={    Class.forName("com.mysql.jdbc.Driver")    DriverManager.getConnection("jdbc:mysql://bigdata.ibeifeng.com:3306/spark","root","123456")  }}

三、测试

1.启动nc -lk 6789,输入测试数据

aa bb cc dd ee dd aa bb cc dd ee dd aa bb cc dd ee dd 

2.mysql中的是

        mysql> select * from wordcount;        +------+-----------+        | word | wordcount |        +------+-----------+        | ee   |         3 |        | aa   |         3 |        | dd   |         6 |        | bb   |         3 |        | cc   |         3 |        +------+-----------+        5 rows in set (0.00 sec)

(经测试,成功!)

转载地址:http://gvygi.baihongyu.com/

你可能感兴趣的文章
我的四轴专用PID参数整定方法及原理
查看>>
四轴自适应控制算法的一些尝试开源我的山猫飞控和梯度在线辨识自适应等算法—(转)
查看>>
LynxFly科研小四轴横空出世,开源,F4,WIFI --(转)
查看>>
对Kalman(卡尔曼)滤波器的理解
查看>>
用bochs调试自己写的系统引导代码
查看>>
经验法PID参数工程整定口诀浅析 - 全文
查看>>
四旋翼飞行器的飞控实现
查看>>
四轴PID讲解
查看>>
四旋翼飞行器基本知识(四旋翼飞行器结构和原理+四轴飞行diy全套入门教程)
查看>>
四旋翼飞行器的控制规律及算法实现
查看>>
微型四旋翼飞行器的设计与制作
查看>>
时下最强大疆无人机飞控技术大揭秘
查看>>
基于数据融合和串级PID的小型四旋翼无人机高度测量与控制系统
查看>>
四旋翼飞行器Quadrotor飞控之 PID调节(参考APM程序)
查看>>
最流行的开源飞控项目ArduPilot Mega(APM)介绍及发展历史
查看>>
利用加速度求解位置的算法——三轴传感器
查看>>
基于STM32的开源微型四轴飞行器
查看>>
Crazyflie微型四轴 深入解读1
查看>>
Crazyflie微型四轴 深入解读2
查看>>
四旋翼微型飞行器设计
查看>>