同城医药问答网

 找回密码
 立即注册
查看: 82|回复: 0

Spark 的“血液” --Spark RPC(一)简述

[复制链接]

1

主题

1

帖子

3

积分

新手上路

Rank: 1

积分
3
发表于 2022-12-21 19:41:21 | 显示全部楼层 |阅读模式
一. Spark rpc概述

首先说明RPC,引用百度百科:
RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。
Spark RPC可以说 是 Spark 分布式集群的基础,若是将 Spark 类比为一个人的话,Spark RPC 就是这个人的血液部分。
有一位大神将 Spark RPC 中的 RPC 部分剥离出来,弄成一个新的可运行的 RPC 项目,地址在这Spark RPC
虽然名字不一样,但这个项目的类和内容基本和 Spark Core 中 RPC 部分的代码和结构基本是一样的,可以通过这个来学习 Spark RPC。
PS:所用 spark 版本:spark 2.1.0
二. Spark RPC ,从简单的例子开始

接下来我们来演示如何下载并运行最简单的 Hello World 中的例子。
首先,我使用的编译器是 IDEA ,通过 idea 将 github 上的代码 clone 下来。
可以看到项目目录下有两个模块,

  • kraps-rpc
  • kraps-rpc-example
我们要做的即是运行 kraps-rpc-example 中的代码。
启动 PRC 的话首先需要启动 Server 端,开启监听服务,这里在 HelloworldServer.scala 中都已经帮我们写好,不过在 main 方法中需要修改一下内容,就是将 host 改为本机地址。
def main(args: Array[String]): Unit = {
//    val host = args(0)
    val host = "localhost"
    val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }然后我们只需要右键该文件然后执行即可。
然后到 HelloworldClient 文件中,这里面提供了同步和异步两个方法可以运行。代码同样都已经写好,通过修改注释即可使用不同的方法运行。同样是右键点击该文件执行。
def main(args: Array[String]): Unit = {
    //异步方法
    //asyncCall()
    //同步方法
    syncCall()
  }异步方法中, ask 会返回一个 Future 。在 Future 运行结果出来前,我们可以去做其他事情。scala 中的 Future 和 Java 的 Future 有些不同,不过这可以先不去管,先当作 Java 里面的 Future 即可。
def asyncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete {
      case scala.util.Success(value) => println(s"Got the result = $value")
      case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("3s"))
    //在future结果运行出来前,会先打印这条语句。
    println("print me at first!")
    Thread.sleep(7)
  }而同步方法是直接将结果返回,并且会阻塞,直到结果返回
def syncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val result = endPointRef.askWithRetry[String](SayBye("neo"))
    println(result)

  }很简单是吧,接下来我们先来了解一些 Spark RPC 运行过程中至关重要的两个编程模型,以及在这其中使用到的一些主要的类。
三. Spark RPC 中各类说明

Spark RPC 是使用了 Actor 模型和 Reactor 模型的混合模式,我们结合两种模型分别说明 Spark RPC 中各个类的作用:
首先我们先来看 Spark RPC 的类图。



是不是感觉很乱?没事,我们来逐步剖析各个类。
Spark RPC 主要用到了 Actor 模型 和 Reactor 模型,我们从这两个模型的角度来拆解。
Actor 模型

其实之前也有写过一篇介绍 Actor 模型的文章,感兴趣的同学可以点击这里查看 Actor模型浅析 一致性和隔离性
其实 Actor 主要就是这副图的内容:




RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem
我们逐个来看:
RpcEnv --RPC Environment

RPC Environment 是 RpcEndpoint 的运行环境。它管理 RpcEndpoint 的整个生命周期:

  • 通过名字或 URI 注册 RpcEndpoint。
  • 对到底的消息进行路由,决定分发给哪个 RpcEndpoint。
  • 停止 RpcEndpoint。
RPC Environment在 akka 已经被移除的2.0后面版本中,RPC Environment 的实现类是 NettyRpcEnv。通常是由 NettyRpcEnvFactory.create 创建。
RpcEndpoint

RpcEndpoint 能通过 callbacks 接收消息。通常需要我们自己写一个类继承 RpcEndpoint 。编写自己的接收信息和返回信息规则。
RpcEndpoint 的生命周期被 RPC Environment 管理。其生命周期包括,onStart, receive 和 onStop。
它是作为服务端,比如上面例子中的 HelloworldServer 就是一个 RpcEndpoint 。
RpcEndpointRef

RpcEndpointRef 是 RpcEndpoint 在 RPC Environment 中的一个引用。
它包含一个地址(即 Spark URL)和名字。RpcEndpointRef 作为客户端向服务端发送请求并接收返回信息,通常可以选择使用同步或异步的方式进行发送。
Reactor 模型

我们可以从一张图来看 Reactor 的架构。



使用Reactor模型,由底层netty创建的EventLoop做I/O多路复用,这里使用Multiple Reactors这种形式,如上图所示,从netty的角度而言,Main Reactor 和 Sub Reactor 对应 BossGroup 和 WorkerGroup 的概念,前者负责监听 TCP 连接、建立和断开,后者负责真正的 I/O 读写。
而图中的 ThreadPool 就是的 Dispatcher 中的线程池,它来解耦开来耗时的业务逻辑和 I/O 操作,这样就可以更 scalabe,只需要少数的线程就可以处理成千上万的连接,这种思想是标准的分治策略,offload 非 I/O 操作到另外的线程池。
Dispatcher

Dispatcher 的主要作用是保存注册的RpcEndpoint、分发相应的Message到RpcEndPoint中进行处理。Dispatcher 即是上图中 ThreadPool的角色。它同时也维系一个 threadpool,用来处理每次接受到的 InboxMessage 。而这里处理 InboxMessage 是通过 inbox 实现的。
Inbox

Inbox 其实属于 Actor 模型,是 Actor 中的信箱,不过它和 Dispatcher 联系紧密所以放这边。
InboxMessage 有多个实现它的类,比如 OneWayMessage,RpcMessage,等等。Dispatcher会将接收到的 InboxMessage 分发到对应 RpcEndpoint 的 Inbox 中,然后 Inbox 便会处理这个 InboxMessage 。
OK,这次就先介绍到这里,下次我们从代码的角度来看 Spark RPC 的运行机制
如果觉得对你有帮助,不妨关注一波吧~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|同城医药问答网

GMT+8, 2025-3-16 06:11 , Processed in 0.091972 second(s), 22 queries .

Powered by Discuz! X3.4

© 2001-2013 Comsenz Inc.

快速回复 返回顶部 返回列表