admin

Hadoop RPC通信Client客户端的流程分析

admin 安全防护 2023-01-22 481浏览 0

Hadoop RPC通信Client客户端的流程分析

Hadoop的RPC的通信与其他系统的RPC通信不太一样,作者针对Hadoop的使用特点,专门的设计了一套RPC框架,这套框架个人感觉还是 有点小复杂的。所以我打算分成Client客户端和Server服务端2个模块做分析。如果你对RPC的整套流程已经非常了解的前提下,对于Hadoop 的RPC,你也一定可以非常迅速的了解的。OK,下面切入正题。

Hadoop的RPC的相关代码都在org.apache.hadoop.ipc的包下,首先RPC的通信必须遵守许多的协议,其中最最基本的协议即使如下:

/**
*SuperclassofallprotocolsthatuseHadoopRPC.
*Subclassesofthisinterfacearealsosupposedtohave
*astaticfinallongversionIDfield.
*HadoopRPC所有协议的基类,返回协议版本号
*/
publicinterfaceVersionedProtocol{

/**
*Returnprotocolversioncorrespondingtoprotocolinterface.
*@paramprotocolTheclassnameoftheprotocolinterface
*@paramclientVersionTheversionoftheprotocolthattheclientspeaks
*@returntheversionthattheserverwillspeak
*/
publiclonggetProtocolVersion(Stringprotocol,
longclientVersion)throwsIOException;
}

他是所有协议的基类,他的下面还有一堆的子类,分别对应于不同情况之间的通信,下面是一张父子类图:

Hadoop RPC通信Client客户端的流程分析

顾名思义,只有客户端和服务端遵循相同的版本号,才能进行通信。

RPC客户端的所有相关操作都被封装在了一个叫Client.java的文件中:

/**AclientforanIPCservice.IPCcallstakeasingle{@linkWritable}asa
*parameter,andreturna{@linkWritable}astheirvalue.Aservicerunson
*aportandisdefinedbyaparameterclassandavalueclass.
*RPC客户端类
*@seeServer
*/
publicclassClient{

publicstaticfinalLogLOG=
LogFactory.getLog(Client.class);
//客户端到服务端的连接
privateHashtable<ConnectionId,Connection>connections=
newHashtable<ConnectionId,Connection>();

//回调值类
privateClass<?extendsWritable>valueClass;//classofcallvalues
//call回调id的计数器
privateintcounter;//counterforcallids
//原子变量判断客户端是否还在运行
privateAtomicBooleanrunning=newAtomicBoolean(true);//ifclientruns
finalprivateConfigurationconf;

//socket工厂,用来创建socket
privateSocketFactorysocketFactory;//howtocreatesockets
privateintrefCount=1;
......

从代码中明显的看到,这里存在着一个类似于connections连接池的东西,其实这暗示着连接是可以被复用的,在hashtable中,与每个Connecttion连接的对应的是一个ConnectionId,显然这里不是一个Long类似的数值:

/**
*Thisclassholdstheaddressandtheuserticket.Theclientconnections
*toserversareuniquelyidentifiedby<remoteAddress,protocol,ticket>
*连接的唯一标识,主要通过<远程地址,协议类型,用户组信息>
*/
staticclassConnectionId{
//远程的socket地址
InetSocketAddressaddress;
//用户组信息
UserGroupInformationticket;
//协议类型
Class<?>protocol;
privatestaticfinalintPRIME=16777619;
privateintrpcTimeout;
privateStringserverPrincipal;
privateintmaxIdleTime;//connectionswillbeculledifitwasidlefor
//maxIdleTimemsecs
privateintmaxRetries;//themax.no.ofretriesforsocketconnections
privatebooleantcpNoDelay;//ifTthendisableNagle'sAlgorithm
privateintpingInterval;//howoftensendspingtotheserverinmsecs
....

这里用了3个属性组成唯一的标识属性,为了保证可以进行ID的复用,所以作者对ConnectionId的equal比较方法和hashCode 进行了重写:

/**
*作者重写了equal比较方法,只要成员变量都想等也就想到了
*/
@Override
publicbooleanequals(Objectobj){
if(obj==this){
returntrue;
}
if(objinstanceofConnectionId){
ConnectionIdthat=(ConnectionId)obj;
returnisEqual(this.address,that.address)
&&this.maxIdleTime==that.maxIdleTime
&&this.maxRetries==that.maxRetries
&&this.pingInterval==that.pingInterval
&&isEqual(this.protocol,that.protocol)
&&this.rpcTimeout==that.rpcTimeout
&&isEqual(this.serverPrincipal,that.serverPrincipal)
&&this.tcpNoDelay==that.tcpNoDelay
&&isEqual(this.ticket,that.ticket);
}
returnfalse;
}

/**
*重写了hashCode的生成规则,保证不同的对象产生不同的hashCode值
*/
@Override
publicinthashCode(){
intresult=1;
result=PRIME*result+((address==null)?0:address.hashCode());
result=PRIME*result+maxIdleTime;
result=PRIME*result+maxRetries;
result=PRIME*result+pingInterval;
result=PRIME*result+((protocol==null)?0:protocol.hashCode());
result=PRIME*rpcTimeout;
result=PRIME*result
+((serverPrincipal==null)?0:serverPrincipal.hashCode());
result=PRIME*result+(tcpNoDelay?1231:1237);
result=PRIME*result+((ticket==null)?0:ticket.hashCode());
returnresult;
}

这样就能保证对应同类型的连接就能够完全复用了,而不是仅仅凭借引用的关系判断对象是否相等,这里就是一个不错的设计了。

与连接Id对应的就是Connection了,它里面维护是一下的一些变量;

/**Threadthatreadsresponsesandnotifiescallers.Eachconnectionownsa
*socketconnectedtoaremoteaddress.Callsaremultiplexedthroughthis
*socket:responsesmaybedeliveredoutoforder.*/
privateclassConnectionextendsThread{
//所连接的服务器地址
privateInetSocketAddressserver;//serverip:port
//服务端的krb5的名字,与安全方面相关
privateStringserverPrincipal;//server'skrb5principalname
//连接头部,内部包含了,所用的协议,客户端用户组信息以及验证的而方法
privateConnectionHeaderheader;//connectionheader
//远程连接ID
privatefinalConnectionIdremoteId;//connectionid
//连接验证方法
privateAuthMethodauthMethod;//authenticationmethod
//下面3个变量都是安全方面的
privatebooleanuseSasl;
privateToken<?extendsTokenIdentifier>token;
privateSaslRpcClientsaslRpcClient;

//下面是一组socket通信方面的变量
privateSocketsocket=null;//connectedsocket
privateDataInputStreamin;
privateDataOutputStreamout;
privateintrpcTimeout;
privateintmaxIdleTime;//connectionswillbeculledifitwasidlefor
//maxIdleTimemsecs
privateintmaxRetries;//themax.no.ofretriesforsocketconnections
//tcpNoDelay可设置是否阻塞模式
privatebooleantcpNoDelay;//ifTthendisableNagle'sAlgorithm
privateintpingInterval;//howoftensendspingtotheserverinmsecs

//currentlyactivecalls当前活跃的回调,一个连接可能会有很多个call回调
privateHashtable<Integer,Call>calls=newHashtable<Integer,Call>();
//最后一次IO活动通信的时间
privateAtomicLonglastActivity=newAtomicLong();//lastI/Oactivitytime
//连接关闭标记
privateAtomicBooleanshouldCloseConnection=newAtomicBoolean();//indicateiftheconnectionisclosed
privateIOExceptioncloseException;//closereason
.....

里面维护了大量的和连接通信相关的变量,在这里有一个很有意思的东西connectionHeader,连接头部,里面的数据时为了在通信最开始的时候被使用:

classConnectionHeaderimplementsWritable{
publicstaticfinalLogLOG=LogFactory.getLog(ConnectionHeader.class);

//客户端和服务端通信的协议名称
privateStringprotocol;
//客户端的用户组信息
privateUserGroupInformationugi=null;
//验证的方式,关系到写入数据的时的格式
privateAuthMethodauthMethod;
.....

起到标识验证的作用。一个Client类的基本结构我们基本可以描绘出来了,下面是完整的类关系图:

Hadoop RPC通信Client客户端的流程分析

在上面这幅图中,你肯定会发现我少了一个很关键的类了,就是Call回调类。Call回调在很多异步通信中是经常出现的。因为在通信过程中,当一个对象通 过网络发送请求给另外一个对象的时候,如果采用同步的方式,会一直阻塞在那里,会带来非常不好的效率和体验的,所以很多时候,我们采用的是一种叫回调接口 的方式。在这期间,用户可以继续做自己的事情。所以同样的Call这个概念当然也是适用在Hadoop RPC中。在Hadoop的RPC的核心调 用原理, 简单的说,就是我把parame参数序列化到一个对象中,通过参数的形式把对象传入,进行RPC通信,最后服务端把处理好的结果值放入call对象,在返 回给客户端,也就是说客户端和服务端都是通过Call对象进行操作,Call里面存着,请求的参数,和处理后的结构值2个变量。通过Call对象的封装, 客户单实现了完美的无须知道细节的调用。下面是Call类的类按时:

/**Acallwaitingforavalue.*/
//客户端的一个回调
privateclassCall{
/回调ID
intid;//callid
//被序列化的参数
Writableparam;//parameter
//返回值
Writablevalue;//value,nulliferror
//出错时返回的异常
IOExceptionerror;//exception,nullifvalue
//回调是否已经被完成
booleandone;//truewhencallisdone
....

看到这个Call回调类,也许你慢慢的会明白Hadoop RPC的一个基本原型了,这些Call当然是存在于某个连接中的,一个连接可能会发生多个回调,所以在Connection中维护了calls列表:

privateclassConnectionextendsThread{
....
//currentlyactivecalls当前活跃的回调,一个连接可能会有很多个call回调
privateHashtable<Integer,Call>calls=newHashtable<Integer,Call>();

作者在设计Call类的时候,比较聪明的考虑一种并发情况下的Call调用,所以为此设计了下面这个Call的子类,就是专门用于短时间内的瞬间Call调用:

/**Callimplementationusedforparallelcalls.*/
/**继承自Call回调类,可以并行的使用,通过加了index下标做Call的区分*/
privateclassParallelCallextendsCall{
/每个ParallelCall并行的回调就会有对应的结果类
privateParallelResultsresults;
//index作为Call的区分
privateintindex;
....

如果要查找值,就通过里面的ParallelCall查找,原理是根据index索引:

/**Resultcollectorforparallelcalls.*/
privatestaticclassParallelResults{
//并行结果类中拥有一组返回值,需要ParallelCall的index索引匹配
privateWritable[]values;
//结果值的数量
privateintsize;
//values中已知的值的个数
privateintcount;

.....

/**Collectaresult.*/
publicsynchronizedvoidcallComplete(ParallelCallcall){
//将call中的值赋给result中
values[call.index]=call.value;//storethevalue
count++;//countit
//如果计数的值等到最终大小,通知caller
if(count==size)//ifallvaluesarein
notify();//thennotifywaitingcaller
}
}

因为Call结构集是这些并发Call共有的,所以用的是static变量,都存在在了values数组中了,只有所有的并发Call都把值取出来了,才 算回调成功,这个是个非常细小的辅助设计,这个在有些书籍上并没有多少提及。下面我们看看一般Call回调的流程,正如刚刚说的,最终客户端看到的形式就 是,传入参数,获得结果,忽略内部一切逻辑,这是怎么做到的呢,答案在下面:

在执行之前,你会先得到ConnectionId:

publicWritablecall(Writableparam,InetSocketAddressaddr,
Class<?>protocol,UserGroupInformationticket,
intrpcTimeout)
throwsInterruptedException,IOException{
ConnectionIdremoteId=ConnectionId.getConnectionId(addr,protocol,
ticket,rpcTimeout,conf);
returncall(param,remoteId);
}

接着才是主流程:

publicWritablecall(Writableparam,ConnectionIdremoteId)
throwsInterruptedException,IOException{
//根据参数构造一个Call回调
Callcall=newCall(param);
//根据远程ID获取连接
Connectionconnection=getConnection(remoteId,call);
//发送参数
connection.sendParam(call);//sendtheparameter
booleaninterrupted=false;
synchronized(call){
//如果call.done为false,就是Call还没完成
while(!call.done){
try{
//等待远端程序的执行完毕
call.wait();//waitfortheresult
}catch(InterruptedExceptionie){
//savethefactthatwewereinterrupted
interrupted=true;
}
}

//如果是异常中断,则终止当前线程
if(interrupted){
//settheinterruptflagnowthatwearedonewaiting
Thread.currentThread().interrupt();
}

//如果call回到出错,则返回call出错信息
if(call.error!=null){
if(call.errorinstanceofRemoteException){
call.error.fillInStackTrace();
throwcall.error;
}else{//localexception
//usetheconnectionbecauseitwillreflectanipchange,unlike
//theremoteId
throwwrapException(connection.getRemoteAddress(),call.error);
}
}else{
//如果是正常情况下,返回回调处理后的值
returncall.value;
}
}
}

在这上面的操作步骤中,重点关注2个函数,获取连接操作,看看人家是如何保证连接的复用性的:

privateConnectiongetConnection(ConnectionIdremoteId,
Callcall)
throwsIOException,InterruptedException{
.....
/*wecouldavoidthisallocationforeachRPCbyhavinga
*connectionsIdobjectandwithset()method.Weneedtomanagethe
*refsforkeysinHashMapproperly.Fornowitsok.
*/
do{
synchronized(connections){
//从connection连接池中获取连接,可以保证相同的连接ID可以复用
connection=connections.get(remoteId);
if(connection==null){
connection=newConnection(remoteId);
connections.put(remoteId,connection);
}
}
}while(!connection.addCall(call));

有点单例模式的味道哦,还有一个方法叫sendParam发送参数方法:

publicvoidsendParam(Callcall){
if(shouldCloseConnection.get()){
return;
}

DataOutputBufferd=null;
try{
synchronized(this.out){
if(LOG.isDebugEnabled())
LOG.debug(getName()+"sending#"+call.id);

//forserializingthe
//datatobewritten
//将call回调中的参数写入到输出流中,传向服务端
d=newDataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[]data=d.getData();
intdataLength=d.getLength();
out.writeInt(dataLength);//firstputthedatalength
out.write(data,0,dataLength);//writethedata
out.flush();
}
....

代码只发送了Call的id,和请求参数,并没有把所有的Call的内容都扔出去了,一定是为了减少数据量的传输,这里还把数据的长度写入了,这是为了方 便服务端准确的读取到不定长的数据。这服务端中间的处理操作不是今天讨论的重点。Call的执行过程就是这样。那么Call是如何被调用的呢,这又要重新 回到了Client客户端上去了,Client有一个run()函数,所有的操作都是始于此的;

publicvoidrun(){
if(LOG.isDebugEnabled())
LOG.debug(getName()+":starting,havingconnections"
+connections.size());

//等待工作,等待请求调用
while(waitForWork()){//waithereforwork-readorcloseconnection
//调用完请求,则立即获取回复
receiveResponse();
}

close();

if(LOG.isDebugEnabled())
LOG.debug(getName()+":stopped,remainingconnections"
+connections.size());
}

操作很简单,程序一直跑着,有请求,处理请求,获取请求,没有请求,就死等。

privatesynchronizedbooleanwaitForWork(){
if(calls.isEmpty()&&!shouldCloseConnection.get()&&running.get()){
longtimeout=maxIdleTime-
(System.currentTimeMillis()-lastActivity.get());
if(timeout>0){
try{
wait(timeout);
}catch(InterruptedExceptione){}
}
}
....

获取回复的操作如下:

/*Receivearesponse.
*Becauseonlyonereceiver,sonosynchronizationonin.
*获取回复值
*/
privatevoidreceiveResponse(){
if(shouldCloseConnection.get()){
return;
}
//更新最近一次的call活动时间
touch();

try{
intid=in.readInt();//trytoreadanid

if(LOG.isDebugEnabled())
LOG.debug(getName()+"gotvalue#"+id);

//从获取call中取得相应的call
Callcall=calls.get(id);

//判断该结果状态
intstate=in.readInt();//readcallstatus
if(state==Status.SUCCESS.state){
Writablevalue=ReflectionUtils.newInstance(valueClass,conf);
value.readFields(in);//readvalue
call.setValue(value);
calls.remove(id);
}elseif(state==Status.ERROR.state){
call.setException(newRemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
calls.remove(id);
}elseif(state==Status.FATAL.state){
//Closetheconnection
markClosed(newRemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
}
.....
}catch(IOExceptione){
markClosed(e);
}
}

从之前维护的Call列表中取出,做判断。Client本身的执行流程比较的简单:

Hadoop RPC通信Client客户端的流程分析

Hadoop RPC客户端的通信模块的部分大致就是我上面的这个流程,中间其实还忽略了很多的细节,大家学习的时候,针对源码会有助于更好的理解,Hadoop RPC的服务端的实现更加复杂,所以建议采用分模块的学习或许会更好一点。

继续浏览有关 Hadoop 的文章
发表评论