spark-2.11-thriftServer

本文记录Thrift server的学习,先从一个服务异常为切入点:

Thrift Server的本地启动

– 感谢韦大侠帮助
直接运行 HiveThriftServer2是无法正常启动的,首先需要配置启动参数:
pimg_5c1771760f7a5.png
在 Program arguments 中添加:-hiveconf spark.master=local[2] -hiveconf spark.driver.bindAddress=127.0.0.1
在Working directory中添加:/Users/renweiming/spark/spark-2.x
这样启动的话还是会抛出异常信息:
pimg_5c1771cf3f9a6.png
解决缺少类的方法是 File -> Project Structure…
pimg_5c177201e0a2a.png
打开项目结构对话框
pimg_5c177250538bb.png
点击左下角的“➕”来增加Jar。一个小建议,如果你的Spark项目已经构建过了,那么其实Spark所需的jar都已经存在于本地的maven仓库里了,直接去仓库中,将对应的jar添加即可,就不再去下载jar包了。
常见的一些所需jar包:
| 异常 | 所需jar | 说明 |
| ———— | ————- | —————- |
| java.lang.ClassNotFoundException: com.google.common.cache.CacheLoader | com.google.guava | 该问题,一般是因为Scope设置的是Provided,修改为Compile即可,见表格下的截图 |
| 我擦,一下子启动起来了,以前配置过,以后遇到再补充吧,不给表现机会。。| …| …|
pimg_5c1774288b08d.png

带着问题学习

在Thrift server中,出现了如下异常:
18/12/17 15:01:00 INFO AbstractService: Service:HiveServer2 is stopped.
通过向上追日志,发现还有如下信息:
18/12/17 15:00:59 INFO HiveServer2: Shutting down HiveServer2
继续向上追:
18/12/17 15:00:59 ERROR SparkExecuteStatementOperation: Error running hive query as user : livepm
继续向上追:

从代码结构来看:
HiveServer2继承CompositeService,继承AbstractService。上面的异常就是从 AbstractService的stop()方法中报出来的。而这个方法调用是从如下的流程中调用的:HiveServer2.stop() -> CompositeService.stop() -> AbstractService.stop()。
所以,从代码上来分析,肯定是什么地方调用了stop方法。
在HiveServer的init方法中,添加了程序关闭的钩子函数:
pimg_5c1778027340a.png
所以肯定是主动调用了stop方法,或者是异常导致HiveServer关闭了。

开始看到什么记什么,稍后在整理
Thrift server的UI其实是在Spark的UI上增加了一个tab页,默认端口为4040,可以通过 spark.ui.port 进行自定义。Thrift Server的展示效果,是通过类 ThriftServerPage 和 ThriftServerSessionPage 来定义的。

在初始化HiveThriftServer的时候,会根据Hive的配置项hive.server2.transport.mode的值(可选的有http和binary)来决定生成哪种 ThriftCLIService。如果是http则生成 ThriftHttpCLIService,否则生成 ThriftBinaryCLIService。

启动流程

入口必然是HiveThriftServer2,HiveThriftServer2的main方法,在方法的开头就生成了一个HiveServer2对象,并调用它的parse方法得到一个ServerOptionsProcessorResponse对象,该对象中包含的ServerOptionsExecutor是 StartOptionExecutor对象。 – 但是这个服务好像就没有被调用,也就是说 HiveServer2并没有被调用。

接下来
创建了一个HiveThriftServer2对象,并调用了这个服务的init和start。
pimg_5c18ae94ed28f.png
然而start方法只是调用了父类的start,并将自身的启动标示started设置为true。父类(HiveServer2)的start方法,同样又调用了父类(CompositeService)的start方法。在这个类的方法中,它将serviceList中的service(通过addService添加的)分别调用start方法,然后又调用了父类(AbstractService)的start方法。在这个顶级抽象方法中,设置了服务的开始时间、检查了当前状态为INITED,并将服务的状态设置为 STARTED。
所以我们的关注点就落在了 serviceList中service是如何添加进去的。我们返回看一下HiveThriftServer2的init方法,就是上图中我们调用的init方法。
pimg_5c18aee6c6ac3.png
这样就添加了两个service,分别是 SparkSQLCLIService 和 ThriftCLIService。针对THriftCLIService,会根据 hive.server2.transport.mode 的配置项(binary 或 http)(我们公司使用的是默认值“binary”,也就是用的 ThriftBinaryCLIService )具体生成ThriftHttpCLIService或ThriftBinaryCLIService。
继续将此方法看完,在init的最后执行的是 initCompositeService(hiveConf),此方法是ReflectedCompositeService定义的一个方法,具体如下:
pimg_5c18b128e9a91.png
这个方法,要把它作为HiveThriftServer2的一部分来读,那么它的功能就是初始化父类ServiceList中的service:

得到父类(CompositeService)的serviceList,并对里面的service进行初始化(执行init方法)
设置hiveConf信息,通过反射设置。
检查服务的当前状态(确定必须是NOTINITED状态)并设置新的状态(INITED状态)。 – 为啥不直接调用AbstractService的init方法呢?

继续按着顺序来吧,看一看前面通过addService添加到serviceList中的每个service吧。

首先看看 SparkSQLCLIService 的init

为父类设置HiveConf
生成SparkSQLSessionManager,并将SparkSQLSessionManager通过addService添加到CompositeService的serviceList中。注意这里的CompositeService是SparkSQLCLIService自己的CompositeService。之后又调用了 initCompositeService。相当于将添加进去的service进行初始化。

SparkSQLSessionManager的初始化

首先,将hiveConf设置给父类(SparkSQLSessionManager的父类 SessionManager )。
然后,判断是否开启了 hive.server2.logging.operation.enabled 配置(默认为true),如果开启了,则执行initOperationLogRootDir来初始化操作日志目录。
然后,根据配置项 hive.server2.async.exec.threads 的值,生成一个固定线程数的线程池,并将线程池设置给父类的 backgroundOperationPool。
最后,生成一个 SparkSQLOperationManager 对象,并将此对象 设置给父类(SessionManager)的 operationManager。然后对 operationManager 进行初始化。

SparkSQLOperationManager的初始化

根据配置 hive.server2.logging.operation.enabled 的值(默认为true),来确定是否要启动一个 hive.server2.logging.operation.level 配置级别的Appender。

再看看 ThriftBinaryCLIService

首先,根据系统环境或配置项 hive.server2.thrift.bind.host 中得到需要绑定的host,如果没有则绑定本地地址。
然后,根据配置项 hive.server2.thrift.worker.keepalive.time 获取worker保持活跃的时长(默认为60秒)。
继续然后,从系统变量 HIVE_SERVER2_THRIFT_PORT 或配置项 hive.server2.thrift.port 中获取服务绑定的端口号,默认为10000。
最后,从配置项 hive.server2.thrift.min.worker.threads 和 hive.server2.thrift.max.worker.threads 中分别读取worker的最小线程数和最大线程数。
调用父类(AbastractService)的init方法。
worker活跃时长、最大线程数和最小线程数,用来初始化Service内部的线程池。

综上来看,其实就是一层一层的进行初始化,但是为啥不直接调用初始化方法呢?个人推测是覆盖性操作。

继续回到 HiveThriftServer2 初始化的位置,接下来就是启动 HiveThriftServer2,调用start方法,调用链如下:
HiveThriftServer.start() -> HiveServer2.start() -> CompositeService.start()(–此方法中会将serviceList中的所有service进行启动)
所以,接下来我们对 SparkSQLCLIService 和 ThriftBinaryCLIService 的start方法进行分析。

SparkSQLCLIService的启动

SparkSQLCLIService类中并没有定义start方法,但是它的父类CLIService中定义了。父类的启动很简单,首先调用父类(CompositeService)的start方法,相当于对serviceList中的各个service进行启动。然后自己内部生成一个 HiveMetaStoreClient,并链接获取默认的数据库(只是为了测试链接)。 至于serviceList中service的启动,需要结合上面的初始化来看看需要具体启动那些service。稍后上图。

ThriftBinaryCLIService的启动

ThriftBinaryCLIService自身也没有定义start方法,其父类(ThriftCLISService)定义了start方法。具体逻辑如下:调用父类(AbstractService)的start方法。然后,将自己作为参数生成一个Thread,并进行启动(因为ThriftCLIService实现了Runnable接口)。然而,ThriftCLIService自身只是定义了抽象方法run,具体实现由 ThriftBinaryCLIService 或 ThrfitHttpCLIService 来实现。因为我们使用的是 ThriftBinaryCLIService ,因此我们只对此类进行分析。
对于ThriftBinaryCLIService的启动,我们进行分析:
创建线程池、创建TTransportFactory、创建TProcessorFactory、创建serverSocket,这些是用来创建TThreadPoolServer.Args,然后使用它来创建 TThreadPoolServer,然后调用service的serve方法来启动服务。这里就会输出 Starting … on port … with … … worker threads的日志。