深入解析Spark中的RPC
Spark是一个快速的、通用的分布式计算系统,而分布式的特性就意味着,必然存在节点间的通信。本文主要介绍不同的Spark组件之间是如何通过RPC(Remote Procedure Call) 进行点对点通信的,分为三个章节:
♦ Spark RPC的简单示例和实际应用;
♦ Spark RPC模块的设计原理;
♦ Spark RPC核心技术总结。
一、Spark RPC的简单示例和实际应用
Spark的RPC主要在两个模块中:
在Spark-core中,主要承载了更好的封装server和client的作用,以及和scala语言的融合,它依赖于模块org.apache.spark.spark-network-common;
在org.apache.spark.spark-network-common中,该模块是java语言编写的,最新版本是基于netty4开发的,提供全双工、多路复用I/O模型的Socket I/O能力,Spark的传输协议结构(wire protocol)也是自定义的。
为了更好的了解Spark RPC的内部实现细节,我基于Spark 2.1版本抽离了RPC通信的部分,单独启了一个项目,放到了github以及发布到Maven中央仓库做学习使用,提供了比较好的上手文档、参数设置和性能评估。下面就通过这个模块对Spark RPC先做一个感性的认识。
以下的代码均可以在kraps-rpc找到。
1.1 简单示例
假设我们要开发一个Hello服务,客户端可以传输string,服务端响应hi或者bye,并echo回去输入的string。
第一步,定义一个HelloEndpoint继承自RpcEndpoint表明可以并发的调用该服务,如果继承自ThreadSafeRpcEndpoint则表明该Endpoint不允许并发。
class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("start hello endpoint")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) => {
println(s"receive $msg")
context.reply(s"hi, $msg")
}
case SayBye(msg) => {
println(s"receive $msg")
context.reply(s"bye, $msg")
}
}
override def onStop(): Unit = {
println("stop hello endpoint")
}
}
case class SayHi(msg: String)
case class SayBye(msg: String)
和Java传统的RPC解决方案对比,可以看出这里不用定义接口或者方法标示(比如通常的id或者name),使用scala的模式匹配进行方法的路由。虽然点对点通信的契约交换受制于语言,这里就是SayHi和SayBye两个case class,但是Spark RPC定位于内部组件通信,所以无伤大雅。
第二步,把刚刚开发好的Endpoint交给Spark RPC管理其生命周期,用于响应外部请求。RpcEnvServerConfig可以定义一些参数、server名称(仅仅是一个标识)、bind地址和端口。通过NettyRpcEnvFactory这个工厂方法,生成RpcEnv,RpcEnv是整个Spark RPC的核心所在,后文会详细展开,通过setupEndpoint将”hello-service”这个名字和第一步定义的Endpoint绑定,后续client调用路由到这个Endpoint就需要”hello-service”这个名字。调用awaitTermination来阻塞服务端监听请求并且处理。
val config = RpcEnvServerConfig(new RpcConf(), "hello-server", "localhost", 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()
第三步,开发一个client调用刚刚启动的server,首先RpcEnvClientConfig和RpcEnv都是必须的,然后通过刚刚提到的”hello-service”名字新建一个远程Endpoint的引用(Ref),可以看做是stub,用于调用,这里首先展示通过异步的方式来做请求。
val rpcConf = new RpcConf()
val config = RpcEnvClientConfig(rpcConf, "hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hell-service")
val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
future.onComplete {
case scala.util.Success(value) => println(s"Got the result = $value")
case scala.util.Failure(e) => println(s"Got error: $e")
}
Await.result(future, Duration.apply("30s"))
也可以通过同步的方式,在最新的Spark中askWithRetry实际已更名为askSync。
val result = endPointRef.askWithRetry[String](SayBye("neo"))
这就是Spark RPC的通信过程,使用起来易用性可想而知,非常简单,RPC框架屏蔽了Socket I/O模型、线程模型、序列化/反序列化过程、使用netty做了包识别,长连接,网络重连重试等机制。
1.2 实际应用
在Spark内部,很多的Endpoint以及EndpointRef与之通信都是通过这种形式的,举例来说比如driver和executor之间的交互用到了心跳机制,使用HeartbeatReceiver来实现,这也是一个Endpoint,它的注册在SparkContext初始化的时候做的,代码如下:
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
而它的调用在Executor内的方式如下:
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message, RpcTimeout(conf,
时间:2018-10-09 22:44 来源: 转发量:次
声明:本站部分作品是由网友自主投稿和发布、编辑整理上传,对此类作品本站仅提供交流平台,转载的目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,不为其版权负责。如果您发现网站上有侵犯您的知识产权的作品,请与我们取得联系,我们会及时修改或删除。
相关文章:
- [数据挖掘]Spark 迁移到 K8S 在有赞的实践与经验
- [数据挖掘]盘点大数据处理引擎
- [数据挖掘]Spark Operator 初体验
- [数据挖掘]如何实现Spark on Kubernetes?
- [数据挖掘]Spark SQL 物化视图技术原理与实践
- [数据挖掘]Spark on K8S 的最佳实践和需要注意的坑
- [数据挖掘]Spark 3.0重磅发布!开发近两年,流、Python、SQL重
- [数据挖掘]Spark 3.0开发近两年终于发布,流、Python、SQL重大
- [数据挖掘]Apache Spark 3.0.0 正式版终于发布了,重要特性全面
- [数据挖掘]Spark 3.0 自适应查询优化介绍,在运行时加速 Sp
相关推荐:
网友评论: