基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性。这 里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算。
我们的应用场景是分析用户使用手机App的行为,描述如下所示:
手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming,因为Spark Streaming提供了与Kafka整合的内置支持
经过Spark Streaming实时计算程序分析,将结果写入Redis,可以实时获取用户的行为数据,并可以导出进行离线综合统计分析
Spark Streaming介绍
Spark Streaming提供了一个叫做DStream(Discretized Stream)的高级抽象,DStream表示一个持续不断输入的数据流,可以基于Kafka、TCP Socket、Flume等输入数据流创建。在内部,一个DStream实际上是由一个RDD序列组成的。Sparking Streaming是基于Spark平台的,也就继承了Spark平台的各种特性,如容错(Fault-tolerant)、可扩展 (Scalable)、高吞吐(High-throughput)等。
在Spark Streaming中,每个DStream包含了一个时间间隔之内的数据项的集合,我们可以理解为指定时间间隔之内的一个batch,每一个batch就 构成一个RDD数据集,所以DStream就是一个个batch的有序序列,时间是连续的,按照时间间隔将数据流分割成一个个离散的RDD数据集,如图所 示(来自官网):
我们都知道,Spark支持两种类型操作:Transformations和Actions。Transformation从一个已知的RDD数据集经过 转换得到一个新的RDD数据集,这些Transformation操作包括map、filter、flatMap、union、join等,而且 Transformation具有lazy的特性,调用这些操作并没有立刻执行对已知RDD数据集的计算操作,而是在调用了另一类型的Action操作才 会真正地执行。Action执行,会真正地对RDD数据集进行操作,返回一个计算结果给Driver程序,或者没有返回结果,如将计算结果数据进行持久 化,Action操作包括reduceByKey、count、foreach、collect等。关于Transformations和Actions 更详细内容,可以查看官网文档。
同样、Spark Streaming提供了类似Spark的两种操作类型,分别为Transformations和Output操作,它们的操作对象是DStream,作 用也和Spark类似:Transformation从一个已知的DStream经过转换得到一个新的DStream,而且Spark Streaming还额外增加了一类针对Window的操作,当然它也是Transformation,但是可以更灵活地控制DStream的大小(时间 间隔大小、数据元素个数),例如window(windowLength, slideInterval)、countByWindow(windowLength, slideInterval)、reduceByWindow(func, windowLength, slideInterval)等。Spark Streaming的Output操作允许我们将DStream数据输出到一个外部的存储系统,如数据库或文件系统等,执行Output操作类似执行 Spark的Action操作,使得该操作之前lazy的Transformation操作序列真正地执行。
Kafka+Spark Streaming+Redis编程实践
下面,我们根据上面提到的应用场景,来编程实现这个实时计算应用。首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下:
1 | {"uid":"068b746ed4620d25e26055a9f804385f","event_time":"1430204612405","os_type":"Android","click_count":6} |
一个事件包含4个字段:
uid:用户编号
event_time:事件发生时间戳
os_type:手机App操作系统类型
click_count:点击次数
下面是我们实现的代码,如下所示:
01 | package org.shirdrn.spark.streaming.utils |
03 | import java.util.Properties |
04 | import scala.util.Properties |
05 | import org.codehaus.jettison.json.JSONObject |
06 | import kafka.javaapi.producer.Producer |
07 | import kafka.producer.KeyedMessage |
08 | import kafka.producer.KeyedMessage |
09 | import kafka.producer.ProducerConfig |
10 | import scala.util.Random |
12 | object KafkaEventProducer { |
14 | private val users = Array( |
15 | "4A4D769EB9679C054DE81B973ED5D768" , "8dfeb5aaafc027d89349ac9a20b3930f" , |
16 | "011BBF43B89BFBF266C865DF0397AA71" , "f2a8474bf7bd94f0aabbd4cdd2c06dcf" , |
17 | "068b746ed4620d25e26055a9f804385f" , "97edfc08311c70143401745a03a50706" , |
18 | "d7f141563005d1b5d0d3dd30138f3f62" , "c8ee90aade1671a21336c721512b817a" , |
19 | "6b67c8c700427dee7552f81f3228c927" , "a95f22eabc4fd4b580c011a3161a9d9d" ) |
21 | private val random = new Random() |
23 | private var pointer = - 1 |
25 | def getUserID() : String = { |
27 | if (pointer > = users.length) { |
35 | def click() : Double = { |
39 | // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --create --topic user_events --replication-factor 2 --partitions 2 |
40 | // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --list |
41 | // bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2181,zk3:2181/kafka --describe user_events |
42 | // bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:22181/kafka --topic test_json_basis_event --from-beginning |
43 | def main(args : Array[String]) : Unit = { |
44 | val topic = "user_events" |
45 | val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
46 | val props = new Properties() |
47 | props.put( "metadata.broker.list" , brokers) |
48 | props.put( "serializer.class" , "kafka.serializer.StringEncoder" ) |
50 | val kafkaConfig = new ProducerConfig(props) |
51 | val producer = new Producer[String, String](kafkaConfig) |
55 | val event = new JSONObject() |
57 | .put( "uid" , getUserID) |
58 | .put( "event_time" , System.currentTimeMillis.toString) |
59 | .put( "os_type" , "Android" ) |
60 | .put( "click_count" , click) |
62 | // produce event message |
63 | producer.send( new KeyedMessage[String, String](topic, event.toString)) |
64 | println( "Message sent: " + event) |
通过控制上面程序最后一行的时间间隔来控制模拟写入速度。下面我们来讨论实现实时统计每个用户的点击次数,它是按照用户分组进行累加次数,逻辑比较简单,关键是在实现过程中要注意一些问题,如对象序列化等。先看实现代码,稍后我们再详细讨论,代码实现如下所示:
01 | object UserClickCountAnalytics { |
03 | def main(args : Array[String]) : Unit = { |
04 | var masterUrl = "local[1]" |
05 | if (args.length > 0 ) { |
09 | // Create a StreamingContext with the given master URL |
10 | val conf = new SparkConf().setMaster(masterUrl).setAppName( "UserClickCountStat" ) |
11 | val ssc = new StreamingContext(conf, Seconds( 5 )) |
13 | // Kafka configurations |
14 | val topics = Set( "user_events" ) |
15 | val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
16 | val kafkaParams = Map[String, String]( |
17 | "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" ) |
20 | val clickHashKey = "app::users::click" |
22 | // Create a direct stream |
23 | val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) |
25 | val events = kafkaStream.flatMap(line = > { |
26 | val data = JSONObject.fromObject(line. _ 2 ) |
30 | // Compute user click times |
31 | val userClicks = events.map(x = > (x.getString( "uid" ), x.getInt( "click_count" ))).reduceByKey( _ + _ ) |
32 | userClicks.foreachRDD(rdd = > { |
33 | rdd.foreachPartition(partitionOfRecords = > { |
34 | partitionOfRecords.foreach(pair = > { |
36 | val clickCount = pair. _ 2 |
37 | val jedis = RedisClient.pool.getResource |
39 | jedis.hincrBy(clickHashKey, uid, clickCount) |
40 | RedisClient.pool.returnResource(jedis) |
46 | ssc.awaitTermination() |
上面代码使用了Jedis客户端来操作Redis,将分组计数结果数据累加写入Redis存储,如果其他系统需要实时获取该数据,直接从Redis实时读取即可。RedisClient实现代码如下所示:
01 | object RedisClient extends Serializable { |
02 | val redisHost = "10.10.4.130" |
04 | val redisTimeout = 30000 |
05 | lazy val pool = new JedisPool( new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) |
07 | lazy val hook = new Thread { |
09 | println( "Execute hook thread: " + this ) |
13 | sys.addShutdownHook(hook.run) |
上面代码我们分别在local[K]和Spark Standalone集群模式下运行通过。
如果我们是在开发环境进行调试的时候,也就是使用local[K]部署模式,在本地启动K个Worker线程来计算,这K个Worker在同一个JVM实 例里,上面的代码默认情况是,如果没有传参数则是local[K]模式,所以如果使用这种方式在创建Redis连接池或连接的时候,可能非常容易调试通 过,但是在使用Spark Standalone、YARN Client(YARN Cluster)或Mesos集群部署模式的时候,就会报错,主要是由于在处理Redis连接池或连接的时候出错了。我们可以看一下Spark架构,如图 所示(来自官网):
无论是在本地模式、Standalone模式,还是在Mesos或YARN模式下,整个Spark集群的结构都可以用上图抽象表示,只是各个组件的运行环 境不同,导致组件可能是分布式的,或本地的,或单个JVM实例的。如在本地模式,则上图表现为在同一节点上的单个进程之内的多个组件;而在YARN Client模式下,Driver程序是在YARN集群之外的一个节点上提交Spark Application,其他的组件都运行在YARN集群管理的节点上。
在Spark集群环境部署Application后,在进行计算的时候会将作用于RDD数据集上的函数(Functions)发送到集群中Worker上 的Executor上(在Spark Streaming中是作用于DStream的操作),那么这些函数操作所作用的对象(Elements)必须是可序列化的,通过Scala也可以使用 lazy引用来解决,否则这些对象(Elements)在跨节点序列化传输后,无法正确地执行反序列化重构成实际可用的对象。上面代码我们使用lazy引 用(Lazy Reference)来实现的,代码如下所示:
01 | // lazy pool reference |
02 | lazy val pool = new JedisPool( new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) |
04 | partitionOfRecords.foreach(pair = > { |
06 | val clickCount = pair. _ 2 |
07 | val jedis = RedisClient.pool.getResource |
09 | jedis.hincrBy(clickHashKey, uid, clickCount) |
10 | RedisClient.pool.returnResource(jedis) |
另一种方式,我们将代码修改为,把对Redis连接的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理,如下所示:
001 | package org.shirdrn.spark.streaming |
003 | import org.apache.commons.pool 2 .impl.GenericObjectPoolConfig |
004 | import org.apache.spark.SparkConf |
005 | import org.apache.spark.streaming.Seconds |
006 | import org.apache.spark.streaming.StreamingContext |
007 | import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions |
008 | import org.apache.spark.streaming.kafka.KafkaUtils |
010 | import kafka.serializer.StringDecoder |
011 | import net.sf.json.JSONObject |
012 | import redis.clients.jedis.JedisPool |
014 | object UserClickCountAnalytics { |
016 | def main(args : Array[String]) : Unit = { |
017 | var masterUrl = "local[1]" |
018 | if (args.length > 0 ) { |
022 | // Create a StreamingContext with the given master URL |
023 | val conf = new SparkConf().setMaster(masterUrl).setAppName( "UserClickCountStat" ) |
024 | val ssc = new StreamingContext(conf, Seconds( 5 )) |
026 | // Kafka configurations |
027 | val topics = Set( "user_events" ) |
028 | val brokers = "10.10.4.126:9092,10.10.4.127:9092" |
029 | val kafkaParams = Map[String, String]( |
030 | "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder" ) |
033 | val clickHashKey = "app::users::click" |
035 | // Create a direct stream |
036 | val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) |
038 | val events = kafkaStream.flatMap(line = > { |
039 | val data = JSONObject.fromObject(line. _ 2 ) |
043 | // Compute user click times |
044 | val userClicks = events.map(x = > (x.getString( "uid" ), x.getInt( "click_count" ))).reduceByKey( _ + _ ) |
045 | userClicks.foreachRDD(rdd = > { |
046 | rdd.foreachPartition(partitionOfRecords = > { |
047 | partitionOfRecords.foreach(pair = > { |
050 | * Internal Redis client for managing Redis connection { Jedis} based on { RedisPool} |
052 | object InternalRedisClient extends Serializable { |
054 | @ transient private var pool : JedisPool = null |
056 | def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, |
057 | maxTotal : Int, maxIdle : Int, minIdle : Int) : Unit = { |
058 | makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true , false , 10000 ) |
061 | def makePool(redisHost : String, redisPort : Int, redisTimeout : Int, |
062 | maxTotal : Int, maxIdle : Int, minIdle : Int, testOnBorrow : Boolean, |
063 | testOnReturn : Boolean, maxWaitMillis : Long) : Unit = { |
065 | val poolConfig = new GenericObjectPoolConfig() |
066 | poolConfig.setMaxTotal(maxTotal) |
067 | poolConfig.setMaxIdle(maxIdle) |
068 | poolConfig.setMinIdle(minIdle) |
069 | poolConfig.setTestOnBorrow(testOnBorrow) |
070 | poolConfig.setTestOnReturn(testOnReturn) |
071 | poolConfig.setMaxWaitMillis(maxWaitMillis) |
072 | pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout) |
074 | val hook = new Thread{ |
075 | override def run = pool.destroy() |
077 | sys.addShutdownHook(hook.run) |
081 | def getPool : JedisPool = { |
087 | // Redis configurations |
091 | val redisHost = "10.10.4.130" |
093 | val redisTimeout = 30000 |
095 | InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) |
098 | val clickCount = pair. _ 2 |
099 | val jedis = InternalRedisClient.getPool.getResource |
100 | jedis.select(dbIndex) |
101 | jedis.hincrBy(clickHashKey, uid, clickCount) |
102 | InternalRedisClient.getPool.returnResource(jedis) |
108 | ssc.awaitTermination() |
上面代码实现,得益于Scala语言的特性,可以在代码中任何位置进行class或object的定义,我们将用来管理Redis连接的代码放在了 特定操作的内部,就避免了瞬态(Transient)对象跨节点序列化的问题。这样做还要求我们能够了解Spark内部是如何操作RDD数据集的,更多可 以参考RDD或Spark相关文档。
在集群上,以Standalone模式运行,执行如下命令:
2 | ./bin/spark-submit --class org.shirdrn.spark.streaming.UserClickCountAnalytics --master spark://hadoop1:7077 --executor-memory 1G --total-executor-cores 2 ~/spark-0.0.SNAPSHOT.jar spark://hadoop1:7077 |
可以查看集群中各个Worker节点执行计算任务的状态,也可以非常方便地通过Web页面查看。
下面,看一下我们存储到Redis中的计算结果,如下所示:
01 | 127.0.0.1:6379[1]> HGETALL app::users::click |
02 | 1) "4A4D769EB9679C054DE81B973ED5D768" |
04 | 3) "8dfeb5aaafc027d89349ac9a20b3930f" |
06 | 5) "011BBF43B89BFBF266C865DF0397AA71" |
08 | 7) "97edfc08311c70143401745a03a50706" |
10 | 9) "d7f141563005d1b5d0d3dd30138f3f62" |
12 | 11) "a95f22eabc4fd4b580c011a3161a9d9d" |
14 | 13) "6b67c8c700427dee7552f81f3228c927" |
16 | 15) "f2a8474bf7bd94f0aabbd4cdd2c06dcf" |
18 | 17) "c8ee90aade1671a21336c721512b817a" |
20 | 19) "068b746ed4620d25e26055a9f804385f" |
有关更多关于Spark Streaming的详细内容,可以参考官方文档。
附录
这里,附上前面开发的应用所对应的依赖,以及打包Spark Streaming应用程序的Maven配置,以供参考。如果使用maven-shade-plugin插件,配置有问题的话,打包后在Spark集群上 提交Application时候可能会报错Invalid signature file digest for Manifest main attributes。参考的Maven配置,如下所示:
001 | < project xmlns = "" xmlns:xsi = "" |
002 | xsi:schemaLocation = " " > |
003 | < modelVersion >4.0.0</ modelVersion > |
004 | < groupId >org.shirdrn.spark</ groupId > |
005 | < artifactId >spark</ artifactId > |
006 | < version >0.0.1-SNAPSHOT</ version > |
010 | < groupId >org.apache.spark</ groupId > |
011 | < artifactId >spark-core_2.10</ artifactId > |
012 | < version >1.3.0</ version > |
015 | < groupId >org.apache.spark</ groupId > |
016 | < artifactId >spark-streaming_2.10</ artifactId > |
017 | < version >1.3.0</ version > |
020 | < groupId >net.sf.json-lib</ groupId > |
021 | < artifactId >json-lib</ artifactId > |
022 | < version >2.3</ version > |
025 | < groupId >org.apache.spark</ groupId > |
026 | < artifactId >spark-streaming-kafka_2.10</ artifactId > |
027 | < version >1.3.0</ version > |
030 | < groupId >redis.clients</ groupId > |
031 | < artifactId >jedis</ artifactId > |
032 | < version >2.5.2</ version > |
035 | < groupId >org.apache.commons</ groupId > |
036 | < artifactId >commons-pool2</ artifactId > |
037 | < version >2.2</ version > |
042 | < sourceDirectory >${basedir}/src/main/scala</ sourceDirectory > |
043 | < testSourceDirectory >${basedir}/src/test/scala</ testSourceDirectory > |
046 | < directory >${basedir}/src/main/resources</ directory > |
051 | < directory >${basedir}/src/test/resources</ directory > |
056 | < artifactId >maven-compiler-plugin</ artifactId > |
057 | < version >3.1</ version > |
064 | < groupId >org.apache.maven.plugins</ groupId > |
065 | < artifactId >maven-shade-plugin</ artifactId > |
066 | < version >2.2</ version > |
068 | < createDependencyReducedPom >true</ createDependencyReducedPom > |
072 | < phase >package</ phase > |
079 | < include >*:*</ include > |
084 | < artifact >*:*</ artifact > |
086 | < exclude >META-INF/*.SF</ exclude > |
087 | < exclude >META-INF/*.DSA</ exclude > |
088 | < exclude >META-INF/*.RSA</ exclude > |
094 | implementation = "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> |
096 | implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer" > |
097 | < resource >reference.conf</ resource > |
100 | implementation = "org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer" > |
101 | < resource >log4j.properties</ resource > |
参考链接
本文基于许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我。