博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[hadoop源码阅读][6]-org.apache.hadoop.ipc-protocol和心跳分析
阅读量:6271 次
发布时间:2019-06-22

本文共 7480 字,大约阅读时间需要 24 分钟。

1.      protocolrpc中的作用

通过对org.apache.hadoop.ipc包分析中,Hadoop实现了基于IPC模型的RPC机制,可以不需要像Java中实现的RMI机制一样,在RPC调用的C/S两端分别创建StubSkeleton,而是通过一组协议来进行RPC调用就可以实现通信。这主要是由于Hadoop所采用的序列化机制简化了RPC调用的复杂性。Hadoop定义了自己的通信协议,这些协议都是建立在TCP/IP协议之上的,规范了通信两端的约定。

Hadoop集群中,不同进程之间通信需要使用合适的协议才能够进行交互。在协议接口中约定了通信双方的特定行为,那么,在实现这些通信协议的实现类中,就能看到指定进程是如何实现协议接口中约定的行为的。

 

2.      hadoop实现了那些protocol

所有要使用RPC服务的类都要实现该接口VersionedProtocol,我们可以来看一下有哪些接口继承了该接口。

VersionedProtocol协议是Hadoop的最顶层协议接口的抽象;

1HDFS相关

  • ClientDatanodeProtocolclientdatanode交互的接口,操作不多,只有一个block恢复的方法。那么,其它数据请求的方法呢?clientdatanode主要交互是通过流式的socket实现,源码在DataXceiver,在这里先不说了;

  • ClientProtocolclientNamenode交互的接口,所有控制流的请求均在这里,如:创建文件、删除文件等;

  • DatanodeProtocolDatanodeNamenode交互的接口,如心跳、blockreport等;

  • NamenodeProtocolSecondaryNodeNamenode交互的接口。

2Mapreduce相关

  • InterDatanodeProtocolDatanode内部交互的接口,用来更新block的元数据;

  • InnerTrackerProtocolTaskTrackerJobTracker交互的接口,功能与DatanodeProtocol相似;

  • JobSubmissionProtocolJobClientJobTracker交互的接口,用来提交Job、获得Job等与Job相关的操作;

  • TaskUmbilicalProtocolTask中子进程与母进程交互的接口,子进程即mapreduce等操作,母进程即TaskTracker,该接口可以回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。

3)其它

  • AdminOperationProtocol:不用用户操作的接口,提供一些管理操作,如刷新JobTrackernode列表;

  • RefreshAuthorizationPolicyProtocolRefreshUserMappingsProtocol:暂不明白。

3.      心跳机制分析

hadoop的集群是基于master/slave模式,namenodejobtracker属于master,而datanode/tasktracker属于slavesmaster只有一个,而slaves有多个。 namenodedatanode之间的通信,jobtrackertasktracker直接的通信,都是通过“心跳”完成的。

心跳的机制大概是这样的:

1) master启动的时候,会开一个ipc server在那里。

2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。

3.1   namenodedatanode的心跳

datanode.java代码里面offerService方法中,每隔3秒向namenode发送心跳的代码

 
public
void
offerService()
throws
Exception {
while
(shouldRun) {
try
{
long
startTime
=
now();
//
如果到了3秒钟,就向namenode发心跳
if
(startTime
-
lastHeartbeat
>
heartBeatInterval) {
//
//
All heartbeat messages include following info:
//
-- Datanode name
//
-- data transfer port
//
-- Total capacity
//
-- Bytes remaining
//
lastHeartbeat
=
startTime; DatanodeCommand[] cmds
=
namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount() );
//
注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法??
//
处理对心跳的返回值(namenode传给datanode的指令)
if
(
!
processCommand(cmds))
continue
; } } ... }
//
while (shouldRun)
}
//
offerService

 

现在的问题是datanode怎么获得namenode对象的?继续往下看

 
public
class
DataNode
extends
Configured
implements
InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable { ...
public
DatanodeProtocol namenode
=
null
;
//
DatanodeProtocol
...
void
startDataNode(Configuration conf, AbstractList
<
File
>
dataDirs, SecureResources resources)
throws
IOException {
this
.namenode
=
(DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.
class
, DatanodeProtocol.versionID, nameNodeAddr, conf);
//
DatanodeProtocol
} }
public
class
NameNode
implements
ClientProtocol, DatanodeProtocol, NamenodeProtocol, FSConstants, RefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol
//
NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。
public
interface
DatanodeProtocol
extends
VersionedProtocol { ...
public
DatanodeRegistration register(DatanodeRegistration registration)
throws
IOException;
/**
* sendHeartbeat() tells the NameNode that the DataNode is still alive and well. Includes some status info, too. * It also gives the NameNode a chance to return an array of "DatanodeCommand" objects. * A DatanodeCommand tells the DataNode to invalidate local block(s), or to copy them to other DataNodes, etc.
*/
public
DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
long
capacity,
long
dfsUsed,
long
remaining,
int
xmitsInProgress,
int
xceiverCount)
throws
IOException;
public
DatanodeCommand blockReport(DatanodeRegistration registration,
long
[] blocks)
throws
IOException; ... }

 

  • 1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!!
  • 2) 上面这段代码中,protocol的类型是DatanodeProtocol.class
  • 3) 对namenode的所有调用,都被委托(delegate)给了Invoker

剩下的就是和之前分析的client端和server端的流程一样了,总结一下流程就是:

datanode向namenode发送heartbeat过程是这样的:

    a) 在datanode初始化获得namenode的proxy

    b) 在datanode上,调用namenode proxy的heartbeat方法:
        namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());
    c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
    d) client call方法将Invocation转化为Call对象
    e) client 将call发送到真正的namenode服务器
    f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
    g) datanode接收结果,并将结果转化为DatanodeCommand[]

3.2 jobtracker和tasktracker的心跳

org.apache.hadoop.mapred.TaskTracker

 
//
代码一:
State offerService()
throws
Exception {
long
lastHeartbeat
=
System.currentTimeMillis();
while
(running
&&
!
shuttingDown) {
//
发送心跳,调用代码二
HeartbeatResponse heartbeatResponse
=
transmitHeartBeat(now);
return
State.NORMAL; } }
//
代码二:
HeartbeatResponse transmitHeartBeat(
long
now)
throws
IOException { HeartbeatResponse heartbeatResponse
=
jobClient.heartbeat(status, justStarted, justInited, askForNewTask, heartbeatResponseId);
return
heartbeatResponse; }

剩下的基本上和上面流程差不多....

 

4.      用户自定义的protocol使用hadoop rpc

MyProtocol.java

 

 
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.ipc.VersionedProtocol;
public
interface
MyProtocol
extends
VersionedProtocol {
public
Text println(Text t); }

 

MyServer.java

 

 
import
java.io.IOException;
import
java.net.UnknownHostException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.ipc.RPC;
import
org.apache.hadoop.ipc.RPC.Server;
public
class
MyServer
implements
MyProtocol {
private
Server server;
public
MyServer() {
try
{ server
=
RPC.getServer(
this
,
"
localhost
"
,
8888
,
new
Configuration()); server.start(); server.join(); }
catch
(UnknownHostException e) { e.printStackTrace(); }
catch
(IOException e) { e.printStackTrace(); }
catch
(InterruptedException e) { e.printStackTrace(); } } @Override
public
Text println(Text t) { System.out.println(t);
return
new
Text(
"
finish
"
); } @Override
public
long
getProtocolVersion(String protocol,
long
clientVersion)
throws
IOException {
return
1
; }
public
static
void
main(String[] args) {
new
MyServer(); } }

 

MyClient.java

 

 
import
java.io.IOException;
import
java.net.InetSocketAddress;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.ipc.RPC;
public
class
MyClient {
private
MyProtocol proxy;
public
MyClient() { InetSocketAddress addr
=
new
InetSocketAddress(
"
localhost
"
,
8888
);
try
{ proxy
=
(MyProtocol) RPC.waitForProxy(MyProtocol.
class
,
1
, addr ,
new
Configuration()); }
catch
(IOException e) { e.printStackTrace(); } }
public
void
println(String s) { System.out.println(proxy.println(
new
Text(s))); }
public
void
close() { RPC.stopProxy(proxy); }
public
static
void
main(String[] args) { MyClient c
=
new
MyClient(); c.println(
"
123
"
); c.close(); } }

 

结果输出:

在server端输出

 
12
/
06
/
23
17
:
11
:
30
INFO ipc.Server: Starting Socket Reader #
1
for
port
6789
12
/
06
/
23
17
:
11
:
30
INFO metrics.RpcMetrics: Initializing RPC Metrics with hostName
=
MyServer, port
=
6789
12
/
06
/
23
17
:
11
:
30
INFO metrics.RpcDetailedMetrics: Initializing RPC Metrics with hostName
=
MyServer, port
=
6789
12
/
06
/
23
17
:
11
:
30
INFO ipc.Server: IPC Server Responder: starting
12
/
06
/
23
17
:
11
:
30
INFO ipc.Server: IPC Server listener on
6789
: starting
12
/
06
/
23
17
:
11
:
30
INFO ipc.Server: IPC Server handler
0
on
6789
: starting
123

 

在client端输出

finish

 

5.      参考url

转载地址:http://nvlpa.baihongyu.com/

你可能感兴趣的文章
fmt标签如何计算两个日期之间相隔的天数
查看>>
Spark核心技术原理透视一(Spark运行原理)
查看>>
《Gradle权威指南》--Gradle任务
查看>>
IntelliJ IDEA创建文件时自动填入作者时间 定制格式
查看>>
Android app启动activity并调用onCreate()方法时都默默地干了什么?
查看>>
远程监视jboss应用java内存的配置
查看>>
前端如何接收 websocket 发送过来的实时数据
查看>>
JavaWeb下载文件response
查看>>
Laravel的三种安装方法总结
查看>>
SpringMVC加载配置Properties文件的几种方式
查看>>
C#设计模式总结 C#设计模式(22)——访问者模式(Vistor Pattern) C#设计模式总结 .NET Core launch.json 简介 利用Bootstrap Paginat...
查看>>
java 项目相关 学习笔记
查看>>
numpy opencv matlab eigen SVD结果对比
查看>>
WPF获取某控件的位置,也就是偏移量
查看>>
Boost C++ 库 中文教程(全)
查看>>
solr查询优化(实践了一下效果比较明显)
查看>>
jdk目录详解及其使用方法
查看>>
说说自己对RESTful API的理解s
查看>>
通过layout实现可拖拽自动排序的UICollectionView
查看>>
服务器错误码
查看>>