【HBase代码笔记】RPC

 

Hbase RPC

@todo 草稿,未整理

在客户端调用一个接口方法的过程

 

对应的接口创建了一个代理

HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(

??????????????? HMasterInterface.class, HBaseRPCProtocolVersion.versionID,

???????????? ???masterLocation.getInetSocketAddress(), this.conf);

 

Java的反射机制可以使得对这个方法的调用被派发到这个代理类上面。

VersionedProtocol proxy =

??????? (VersionedProtocol) Proxy.newProxyInstance(

??????????? protocol.getClassLoader(), new Class[] { protocol },

??????????? new Invoker(addr, ticket, conf, factory));

??? long serverVersion = proxy.getProtocolVersion(protocol.getName(),

????????????????????????????????????????????????? clientVersion);

代理类的对应方法的调用其实封装到了一个Invoker对象上。该对象实现了InvocationHandler, class Invoker implements InvocationHandler
当某个方法调用时,Invoker的invoke 方法会被调用。在该方法中用HBaseClient句柄向服务端发起请求。

public Object invoke(Object proxy, Method method, Object[] args){

????? HbaseObjectWritable value = (HbaseObjectWritable)

??????? client.call(new Invocation(method, args), address, ticket);

????? return value.get();

??? }

在HBaseClient发起请求的时候,将要调用的方法和参数封装成一个Invocation对象。 

该对象有两个静态map,在HBaseRPC静态加载的时候会在Invocation类上创建这两个静态映射表。记录的方法名和code的相互映射关系,在传输的时候传输code即可。

? private static class Invocation implements Writable, Configurable {

??? // Here, for hbase, we maintain two static maps of method names to code and

??? // vice versa.

??? static final Map<Byte, String> CODE_TO_METHODNAME =

????? new HashMap<Byte, String>();

??? private static final Map<String, Byte> METHODNAME_TO_CODE =

????? new HashMap<String, Byte>();

HBaseClient的call其实是通过一个connection对象连接服务器地址,然后发送参数Writable,这里其实就是实现了Writable接口的Invocation对象。

public Writable call(Writable param, InetSocketAddress addr,

?????????????????????? UserGroupInformation ticket)

?????????????????????? throws IOException {

??? Call call = new Call(param);

??? Connection connection = getConnection(addr, ticket, call);

??? connection.sendParam(call);

 

 

在服务端调响应一个方法请求的过程

 

HMaster和HRegionServer等服务类等要响应一个方法请求的时候,其内部的HBaseServer负责接收到请求,HbaseServer再使用HMaster和HRegionServer等服务类的实例来做事情,结果给客户端,以完成客户端对服务端对应方法的调用。

在HMaster和HRegionServer等需要响应请求的服务中都有一个HBaseServer句柄,响应客户端的请求 private final HBaseServer server;
在这两个服务类初始化的时候会初始化这个句柄,绑定到其服务地址,在某个端口上侦听。

this.server = HBaseRPC.getServer(this, address.getBindAddress(),

??????? address.getPort(), conf.getInt(“hbase.regionserver.handler.count”, 10),

??????? false, conf);

Listener线程侦听请求,调用Connection从连接中读取Call,并把其缓存到队列中,供 handler处理。

? private void processData() throws? IOException, InterruptedException {

????? DataInputStream dis =

??????? new DataInputStream(new ByteArrayInputStream(data.array()));

????? int id = dis.readInt();??????????????????? // try to read an id

????? Writable param = ReflectionUtils.newInstance(paramClass, conf);

????? param.readFields(dis);

????? Call call = new Call(id, param, this);

????? callQueue.put(call);??? // queue the call; maybe blocked here

??? }

?

HBaseServer中有一组Handler线程类响应客户端请求。

public synchronized void start() {

??? responder.start();

??? listener.start();

??? handlers = new Handler[handlerCount];

??? for (int i = 0; i < handlerCount; i++) {

????? handlers[i] = new Handler(i);

????? handlers[i].start();

??? }

在Handler的run方法中,通过call执行请求,并把结果写入到respone中。

? value = call(call.param, call.timestamp);

? DataOutputStream out = new DataOutputStream(buf);

??????????? value.write(out);

call.setResponse(ByteBuffer.wrap(buf.toByteArray()));

Server的call函数中把对应的方法名和参数解析出来,在服务端执行,并返回执行结果。

public Writable call(Writable param, long receivedTime) {

Invocation call = (Invocation)param;

?Method method =

//?? implementation是构建HBaseServer是传入的实例的class,如HMaster或HRegionServer,HBaseServer就是代理这些类做事情。

mentation.getMethod(call.getMethodName(),?????????????????????????????????? call.getParameterClasses());

// instance是传入的实例,其实是这些实例在做事情。

Object value = method.invoke(instance, call.getParameters());

 

 

 

public static VersionedProtocol getProxy(Class<?> protocol,

????? long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,

????? Configuration conf, SocketFactory factory)

throws IOException {

// Returns an instance of a proxy class for the specified interfaces that // dispatches method invocations to the specified invocation handler.

?? ?VersionedProtocol proxy =

??????? (VersionedProtocol) Proxy.newProxyInstance(

??????????? protocol.getClassLoader(), new Class[] { protocol },

??????????? new Invoker(addr, ticket, conf, factory));

??? long serverVersion = proxy.getProtocolVersion(protocol.getName(),

????????????????????????????????????????????????? clientVersion);

??? if (serverVersion == clientVersion) {

????? return proxy;

??? }

??? throw new VersionMismatch(protocol.getName(), clientVersion,

????????????????????????????? serverVersion);

? }

 

private static class Invoker implements InvocationHandler

 

实现了InvocationHandler的接口方法

? public Object invoke(Object proxy, Method method, Object[] args)

??????? throws Throwable {

????? HbaseObjectWritable value = (HbaseObjectWritable)

?????? ?client.call(new Invocation(method, args), address, ticket);

??? // 将method 和参数封装为一个Invocation,通过HBaseClient发送到服务端,并得到返回结果。

????? return value.get();

}

 

 

public Writable call(Writable param, InetSocketAddress addr,

?????????????????????? UserGroupInformation ticket)

?????????????????????? throws IOException {

Call call = new Call(param);

// 连接到服务器,发送请求

??? Connection connection = getConnection(addr, ticket, call);

?connection.sendParam(call);

}

 

public class HBaseRPC

/** Add a new interface to the ipc map.

?? * @param c Class whose methods we’ll add to the map of methods to codes

?? * (and vice versa).

?? * @param startCode Current state of the byte code.

?? * @return State of <code>code</code> when this method is done.

?? */

? public static byte addToMap(final Class<?> c, final byte startCode) {

??? if (Invocation.CODE_TO_METHODNAME.get(startCode) != null) {

????? throw new IllegalArgumentException(“Code ” + startCode +

??????? “already had entry”);

??? }

??? byte localCode = startCode;

??? Method [] methods = c.getMethods();

??? // There are no guarantees about the order in which items are returned in

??? // so do a sort (Was seeing that sort was one way on one server and then

??? // another on different server).

??? Arrays.sort(methods, new Comparator<Method>() {

????? public int compare(Method left, Method right) {

??? ????return left.getName().compareTo(right.getName());

????? }

??? });

??? for (int i = 0; i < methods.length; i++) {

????? Invocation.addToMap(methods[i].getName(), localCode++);

??? }

??? return localCode;

? }

? HBaseRPC初始化的时候会静态的把以下四个类的方法和code的关系缓存在Invocation中,这样在传输时候只用传输code,在服务端根据code再还原成方法名。

? static {

??? code = HBaseRPC.addToMap(VersionedProtocol.class, code);

??? code = HBaseRPC.addToMap(HMasterInterface.class, code);

??? code = HBaseRPC.addToMap(HMasterRegionInterface.class, code);

??? code = HBaseRPC.addToMap(HRegionInterface.class, code);

? }

 

 

使用例子:利用java的代理机制,在客户端为对应的接口生成一个代理类。

HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(

??????????????? HMasterInterface.class, HBaseRPCProtocolVersion.versionID,

??????????????? masterLocation.getInetSocketAddress(), this.conf);

 

 

 

org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion

记录RPC的版本,只有客户端和服务端的版本一致了,才可以通信。

There is one version id for all the RPC interfaces. If any interface is changed, the versionID must be changed here.

 

Interface version. HMasterInterface version history:

  • Version was incremented to 2 when we brought the hadoop RPC local to hbase HADOOP-2495
  • Version was incremented to 3 when we changed the RPC to send codes instead of actual class names (HADOOP-2519).
  • Version 4 when we moved to all byte arrays (HBASE-42).
  • Version 5 HBASE-576.
  • Version 6 modifyTable.

HMasterRegionInterface version history:

  • Version 2 was when the regionServerStartup was changed to return a MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes.
  • Version 3 was when HMsg was refactored so it could carry optional messages (HBASE-504).
  • HBASE-576 we moved this to 4.

HRegionInterface version history:

  • Upped to 5 when we added scanner caching
  • HBASE-576, we moved this to 6.

TransactionalRegionInterface version history:

  • Moved to 2 for hbase-576.

Unified RPC version number history:

  • Version 10: initial version (had to be > all other RPC versions
  • Version 11: Changed getClosestRowBefore signature.
  • Version 12: HServerLoad extensions (HBASE-1018).
  • Version 13: HBASE-847
  • Version 14: HBASE-900
  • Version 15: HRegionInterface.exists
  • Version 16: Removed HMasterRegionInterface.getRootRegionLocation and HMasterInterface.findRootRegion. We use ZooKeeper to store root region location instead.
  • Version 17: Added incrementColumnValue.
  • Version 18: HBASE-1302.
  • Version 19: Added getClusterStatus().
  • Version 20: Backed Transaction HBase out of HBase core.
  • Version 21: HBASE-1665.

 

org.apache.hadoop.hbase.ipc.HBaseRPC

A simple RPC mechanism. This is a local hbase copy of the hadoop RPC so we can do things like address HADOOP-414 for hbase-only and try other hbase-specific optimizations like using our own version of ObjectWritable. Class has been renamed to avoid confusing it w/ hadoop versions.

A protocol is a Java interface. All parameters and return types must be one of:

  • a primitive type, boolean, byte, char, short, int, long, float, double, or void; or
  • a String; or
  • a Writable; or
  • an array of the above types

All methods in the protocol should throw only IOException. No field data of the protocol instance is transmitted.

原创文章。为了维护文章的版本一致、最新、可追溯,转载请注明: 转载自idouba

本文链接地址: 【HBase代码笔记】RPC


,

No comments yet.

发表评论