一、需求分析 目前的 RPC 框架,我们使用 Vert.x 的 HttpServer 作为服务提供者的服务器,代码实现比较简单,其底层网络传输使用的是 HTTP 协议。
问题来了,使用 HTTP 协议会有什么问题么?或者说,有没有更好的选择?
一般情况下,RPC 框架会比较注重性能,而 HTTP 协议中的头部信息、请求响应格式较 “重”,会影响网络传输性能。
举个例子,利用浏览器网络控制台随便查看一个请求,能看到大量的请求和响应标头。
所以,我们需要自己自定义一套 RPC 协议,比如利用 TCP 等传输层协议、自己定义请求响应结构,来实现性能更高、更灵活、更安全的 RPC 框架。
本节会自定义 RPC 协议,巩固计算机网络知识,并提升自己的系统设计能力。
二、设计方案 自定义 RPC 协议可以分为 2 大核心部分:
1、网络传输设计 网络传输设计的目标是:选择一个能够高性能通信的网络协议和传输方式。
需求分析中已经提到了,HTTP 协议的头信息是比较大的,会影响传输性能。但其实除了这点外,HTTP 本身属于无状态协议,这意味着每个 HTTP 请求都是独立的,每次请求 / 响应都要重新建立和关闭连接,也会影响性能。
考虑到这点,在 HTTP/1.1 中引入了持久连接(Keep-Alive),允许在单个 TCP 连接上发送多个 HTTP 请求和响应,避免了每次请求都要重新建立和关闭连接的开销。
虽然如此,HTTP 本身是应用层协议,我们现在设计的 RPC 协议也是应用层协议,性能肯定是不如底层(传输层)的 TCP 协议要高的。所以我们想要追求更高的性能,还是选择使用 TCP 协议完成网络传输,有更多的自主设计空间。
2、消息结构设计 消息结构设计的目标是:用 最少的 空间传递 需要的 信息。
1)如何使用最少的空间呢?
之前接触到的数据类型可能都是整型、长整型、浮点数类型等等,这些类型其实都比较 “重”,占用的字节数较多。比如整型要占用 4 个字节、32 个 bit 位。
我们在自定义消息结构时,想要节省空间,就要尽可能使用更轻量的类型,比如 byte 字节类型,只占用 1 个字节、8 个 bit 位。
需要注意的是,Java 中实现 bit 位运算拼接相对比较麻烦,所以权衡开发成本,我们设计消息结构时,尽量给每个数据凑到整个字节。
2)消息内需要哪些信息呢?
目标肯定是能够完成请求嘛,那我们何不从之前的 HTTP 请求方式中,找到一些线索?
分析 HTTP 请求结构,我们能够得到 RPC 消息所需的信息:
魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似 HTTPS 的安全证书)
版本号:保证请求和响应的一致性(类似 HTTP 协议有 1.0/2.0 等版本)
序列化方式:来告诉服务端和客户端如何解析数据(类似 HTTP 的 Content-Type 内容类型)
类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP 有请求头和响应头)
状态:如果是响应,记录响应的结果(类似 HTTP 的 200 状态代码)
此外,还需要有请求 id,唯一标识某个请求,因为 TCP 是双向通信的,需要有个唯一标识来追踪每个请求。
最后,也是最重要的,要发送 body 内容数据。我们暂时称它为 请求体 ,类似于我们之前 HTTP 请求中发送的 RpcRequest。
如果是 HTTP 这种协议,有专门的 key / value 结构,很容易找到完整的 body 数据。但基于 TCP 协议,想要获取到完整的 body 内容数据,就需要一些 “小心思” 了,因为 TCP 协议本身会存在半包和粘包问题,每次传输的数据可能是不完整的,具体的后面会讲。
所以我们需要在消息头中新增一个字段 请求体数据长度
,保证能够完整地获取 body 内容信息。
基于以上的思考,我们可以得到最终的消息结构设计,如下图:
实际上,这些数据应该是紧凑的,请求头信息总长 17 个字节。也就是说,上述消息结构,本质上就是拼接在一起的一个字节数组。我们后续实现时,需要有 消息编码器 和 消息解码器 ,编码器先 new 一个空的 Buffer 缓冲区,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据。
通过这种约定的方式,我们就不用记录头信息了。比如 magic 魔数,不用存储 “magic” 这个字符串,而是读取第一个字节(前 8 bit)就能获取到。
Redis 底层很多数据结构都是这种设计。
明确了设计后,我们来开发实现,就比较简单了。
三、开发实现 1、消息结构 新建 protocol
包,将所有和自定义协议有关的代码都放到该包下。
1)新建协议消息类 ProtocolMessage
。
将消息头单独封装为一个内部类,消息体可以使用泛型类型,完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Data @AllArgsConstructor @NoArgsConstructor public class ProtocolMessage <T> { private Header header; private T body; @Data public static class Header { private byte magic; private byte version; private byte serializer; private byte type; private byte status; private long requestId; private int bodyLength; } }
2)新建协议常量类 ProtocolConstant
。
记录了和自定义协议有关的关键信息,比如消息头长度、魔数、版本号。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.yupi.yurpc.protocol;public interface ProtocolConstant { int MESSAGE_HEADER_LENGTH = 17 ; byte PROTOCOL_MAGIC = 0x1 ; byte PROTOCOL_VERSION = 0x1 ; }
3)新建消息字段的枚举类,比如:
协议状态枚举,暂时只定义成功、请求失败、响应失败三种枚举值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Getter public enum ProtocolMessageStatusEnum { OK("ok" , 20 ), BAD_REQUEST("badRequest" , 40 ), BAD_RESPONSE("badResponse" , 50 ); private final String text; private final int value; ProtocolMessageStatusEnum(String text, int value) { this .text = text; this .value = value; } public static ProtocolMessageStatusEnum getEnumByValue (int value) { for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) { if (anEnum.value == value) { return anEnum; } } return null ; } }
协议消息类型枚举,包括请求、响应、心跳、其他。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter public enum ProtocolMessageTypeEnum { REQUEST(0 ), RESPONSE(1 ), HEART_BEAT(2 ), OTHERS(3 ); private final int key; ProtocolMessageTypeEnum(int key) { this .key = key; } public static ProtocolMessageTypeEnum getEnumByKey (int key) { for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) { if (anEnum.key == key) { return anEnum; } } return null ; } }
协议消息的序列化器枚举,跟 RPC 框架已支持的序列化器对应。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Getter public enum ProtocolMessageSerializerEnum { JDK(0 , "jdk" ), JSON(1 , "json" ), KRYO(2 , "kryo" ), HESSIAN(3 , "hessian" ); private final int key; private final String value; ProtocolMessageSerializerEnum(int key, String value) { this .key = key; this .value = value; } public static List<String> getValues () { return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList()); } public static ProtocolMessageSerializerEnum getEnumByKey (int key) { for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) { if (anEnum.key == key) { return anEnum; } } return null ; } public static ProtocolMessageSerializerEnum getEnumByValue (String value) { if (ObjectUtil.isEmpty(value)) { return null ; } for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) { if (anEnum.value.equals(value)) { return anEnum; } } return null ; } }
2、网络传输 RPC 框架使用了高性能的 Vert.x 作为网络传输服务器,之前用的是 HttpServer。同样,Vert.x 也支持 TCP 服务器,相比于 Netty 或者自己写 Socket 代码,更加简单易用。
首先新建 server.tcp
包,将所有 TCP 服务相关的代码放到该包中。
1)TCP 服务器实现。
新建 VertxTcpServer
类,跟之前写的 VertxHttpServer
类似,先创建 Vert.x 的服务器实例,然后定义处理请求的方法,比如回复 “Hello, client!”,最后启动服务器。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class VertxTcpServer implements HttpServer { private byte [] handleRequest(byte [] requestData) { return "Hello, client!" .getBytes(); } @Override public void doStart (int port) { Vertx vertx = Vertx.vertx(); NetServer server = vertx.createNetServer(); server.connectHandler(socket -> { socket.handler(buffer -> { byte [] requestData = buffer.getBytes(); byte [] responseData = handleRequest(requestData); socket.write(Buffer.buffer(responseData)); }); }); server.listen(port, result -> { if (result.succeeded()) { System.out.println("TCP server started on port " + port); } else { System.err.println("Failed to start TCP server: " + result.cause()); } }); } public static void main (String[] args) { new VertxTcpServer ().doStart(8888 ); } }
上述代码中的 socket.write
方法,就是在向连接到服务器的客户端发送数据。注意发送的数据格式为 Buffer,这是 Vert.x 为我们提供的字节数组缓冲区实现。
2)TCP 客户端实现。
新建 VertxTcpClient
类,先创建 Vert.x 的客户端实例,然后定义处理请求的方法,比如回复 “Hello, server!”,并建立连接。
示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class VertxTcpClient { public void start () { Vertx vertx = Vertx.vertx(); vertx.createNetClient().connect(8888 , "localhost" , result -> { if (result.succeeded()) { System.out.println("Connected to TCP server" ); io.vertx.core.net.NetSocket socket = result.result(); socket.write("Hello, server!" ); socket.handler(buffer -> { System.out.println("Received response from server: " + buffer.toString()); }); } else { System.err.println("Failed to connect to TCP server" ); } }); } public static void main (String[] args) { new VertxTcpClient ().start(); } }
3、编码 / 解码器 在上一步中,我们也注意到了,Vert.x 的 TCP 服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此,我们需要编码器和解码器,将 Java 的消息对象和 Buffer 进行相互转换。
下图演示了整个请求和响应的过程,可以了解编码器和解码器的作用。
之前 HTTP 请求和响应时,直接从请求 body 处理器中获取到 body 字节数组,再通过序列化(反序列化)得到 RpcRequest 或 RpcResponse 对象。使用 TCP 服务器后,只不过改为从 Buffer 中获取字节数组,然后编解码为 RpcRequest 或 RpcResponse 对象。其他的后续处理流程都是可复用的。
1)首先实现消息编码器。
在 protocol 包下新建 ProtocolMessageEncoder
,核心流程是依次向 Buffer 缓冲区写入消息对象里的字段。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class ProtocolMessageEncoder { public static Buffer encode (ProtocolMessage<?> protocolMessage) throws IOException { if (protocolMessage == null || protocolMessage.getHeader() == null ) { return Buffer.buffer(); } ProtocolMessage.Header header = protocolMessage.getHeader(); Buffer buffer = Buffer.buffer(); buffer.appendByte(header.getMagic()); buffer.appendByte(header.getVersion()); buffer.appendByte(header.getSerializer()); buffer.appendByte(header.getType()); buffer.appendByte(header.getStatus()); buffer.appendLong(header.getRequestId()); ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer()); if (serializerEnum == null ) { throw new RuntimeException ("序列化协议不存在" ); } Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue()); byte [] bodyBytes = serializer.serialize(protocolMessage.getBody()); buffer.appendInt(bodyBytes.length); buffer.appendBytes(bodyBytes); return buffer; } }
2)实现消息解码器。
在 protocol 包下新建 ProtocolMessageDecoder
,核心流程是依次从 Buffer 缓冲区的指定位置读取字段,构造出完整的消息对象。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class ProtocolMessageDecoder { public static ProtocolMessage<?> decode(Buffer buffer) throws IOException { ProtocolMessage.Header header = new ProtocolMessage .Header(); byte magic = buffer.getByte(0 ); if (magic != ProtocolConstant.PROTOCOL_MAGIC) { throw new RuntimeException ("消息 magic 非法" ); } header.setMagic(magic); header.setVersion(buffer.getByte(1 )); header.setSerializer(buffer.getByte(2 )); header.setType(buffer.getByte(3 )); header.setStatus(buffer.getByte(4 )); header.setRequestId(buffer.getLong(5 )); header.setBodyLength(buffer.getInt(13 )); byte [] bodyBytes = buffer.getBytes(17 , 17 + header.getBodyLength()); ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer()); if (serializerEnum == null ) { throw new RuntimeException ("序列化消息的协议不存在" ); } Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue()); ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType()); if (messageTypeEnum == null ) { throw new RuntimeException ("序列化消息的类型不存在" ); } switch (messageTypeEnum) { case REQUEST: RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class); return new ProtocolMessage <>(header, request); case RESPONSE: RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class); return new ProtocolMessage <>(header, response); case HEART_BEAT: case OTHERS: default : throw new RuntimeException ("暂不支持该消息类型" ); } } }
3)编写单元测试类,先编码再解码,以测试编码器和解码器的正确性。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class ProtocolMessageTest { @Test public void testEncodeAndDecode () throws IOException { ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage <>(); ProtocolMessage.Header header = new ProtocolMessage .Header(); header.setMagic(ProtocolConstant.PROTOCOL_MAGIC); header.setVersion(ProtocolConstant.PROTOCOL_VERSION); header.setSerializer((byte ) ProtocolMessageSerializerEnum.JDK.getKey()); header.setType((byte ) ProtocolMessageTypeEnum.REQUEST.getKey()); header.setStatus((byte ) ProtocolMessageStatusEnum.OK.getValue()); header.setRequestId(IdUtil.getSnowflakeNextId()); header.setBodyLength(0 ); RpcRequest rpcRequest = new RpcRequest (); rpcRequest.setServiceName("myService" ); rpcRequest.setMethodName("myMethod" ); rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION); rpcRequest.setParameterTypes(new Class []{String.class}); rpcRequest.setArgs(new Object []{"aaa" , "bbb" }); protocolMessage.setHeader(header); protocolMessage.setBody(rpcRequest); Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage); ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer); Assert.assertNotNull(message); } }
4、请求处理器(服务提供者) 可以使用 netty 的 pipeline 组合多个 handler(比如编码 => 解码 => 请求 / 响应处理)
请求处理器的作用是接受请求,然后通过反射调用服务实现类。
类似之前的 HttpServerHandler,我们需要开发一个 TcpServerHandler,用于处理请求。和 HttpServerHandler 的区别只是在获取请求、写入响应的方式上,需要调用上面开发好的编码器和解码器。
通过实现 Vert.x 提供的 Handler<NetSocket>
接口,可以定义 TCP 请求处理器。
完整代码如下,大多数代码都是从之前写好的 HttpServerHandler 复制来的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class TcpServerHandler implements Handler <NetSocket> { @Override public void handle (NetSocket netSocket) { netSocket.handler(buffer -> { ProtocolMessage<RpcRequest> protocolMessage; try { protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer); } catch (IOException e) { throw new RuntimeException ("协议消息解码错误" ); } RpcRequest rpcRequest = protocolMessage.getBody(); RpcResponse rpcResponse = new RpcResponse (); try { Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName()); Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs()); rpcResponse.setData(result); rpcResponse.setDataType(method.getReturnType()); rpcResponse.setMessage("ok" ); } catch (Exception e) { e.printStackTrace(); rpcResponse.setMessage(e.getMessage()); rpcResponse.setException(e); } ProtocolMessage.Header header = protocolMessage.getHeader(); header.setType((byte ) ProtocolMessageTypeEnum.RESPONSE.getKey()); ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage <>(header, rpcResponse); try { Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage); netSocket.write(encode); } catch (IOException e) { throw new RuntimeException ("协议消息编码错误" ); } }); } }
5、请求发送(服务消费者) 调整服务消费者发送请求的代码,改 HTTP 请求为 TCP 请求。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public class ServiceProxy implements InvocationHandler { @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer()); String serviceName = method.getDeclaringClass().getName(); RpcRequest rpcRequest = RpcRequest.builder() .serviceName(serviceName) .methodName(method.getName()) .parameterTypes(method.getParameterTypes()) .args(args) .build(); try { byte [] bodyBytes = serializer.serialize(rpcRequest); RpcConfig rpcConfig = RpcApplication.getRpcConfig(); Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry()); ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo (); serviceMetaInfo.setServiceName(serviceName); serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION); List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey()); if (CollUtil.isEmpty(serviceMetaInfoList)) { throw new RuntimeException ("暂无服务地址" ); } ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0 ); Vertx vertx = Vertx.vertx(); NetClient netClient = vertx.createNetClient(); CompletableFuture<RpcResponse> responseFuture = new CompletableFuture <>(); netClient.connect(selectedServiceMetaInfo.getServicePort(), selectedServiceMetaInfo.getServiceHost(), result -> { if (result.succeeded()) { System.out.println("Connected to TCP server" ); io.vertx.core.net.NetSocket socket = result.result(); ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage <>(); ProtocolMessage.Header header = new ProtocolMessage .Header(); header.setMagic(ProtocolConstant.PROTOCOL_MAGIC); header.setVersion(ProtocolConstant.PROTOCOL_VERSION); header.setSerializer((byte ) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey()); header.setType((byte ) ProtocolMessageTypeEnum.REQUEST.getKey()); header.setRequestId(IdUtil.getSnowflakeNextId()); protocolMessage.setHeader(header); protocolMessage.setBody(rpcRequest); try { Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage); socket.write(encodeBuffer); } catch (IOException e) { throw new RuntimeException ("协议消息编码错误" ); } socket.handler(buffer -> { try { ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer); responseFuture.complete(rpcResponseProtocolMessage.getBody()); } catch (IOException e) { throw new RuntimeException ("协议消息解码错误" ); } }); } else { System.err.println("Failed to connect to TCP server" ); } }); RpcResponse rpcResponse = responseFuture.get(); netClient.close(); return rpcResponse.getData(); } catch (IOException e) { e.printStackTrace(); } return null ; } }
这里的代码看着比较复杂,但只需要关注上述代码中注释了 “发送 TCP 请求” 的部分即可。由于 Vert.x 提供的请求处理器是异步、反应式的,我们为了更方便地获取结果,可以使用 CompletableFuture
转异步为同步,参考代码如下:
1 2 3 4 5 6 7 8 9 CompletableFuture<RpcResponse> responseFuture = new CompletableFuture <>(); netClient.connect(xxx, result -> { responseFuture.complete(rpcResponseProtocolMessage.getBody()); }); ); RpcResponse rpcResponse = responseFuture.get();
四、测试 编写好上述代码后,就可以先测试请求响应流程是否跑通了。
修改服务提供者 ProviderExample
代码,改为启动 TCP 服务器。完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class ProviderExample { public static void main (String[] args) { RpcApplication.init(); String serviceName = UserService.class.getName(); LocalRegistry.register(serviceName, UserServiceImpl.class); RpcConfig rpcConfig = RpcApplication.getRpcConfig(); RegistryConfig registryConfig = rpcConfig.getRegistryConfig(); Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry()); ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo (); serviceMetaInfo.setServiceName(serviceName); serviceMetaInfo.setServiceHost(rpcConfig.getServerHost()); serviceMetaInfo.setServicePort(rpcConfig.getServerPort()); try { registry.register(serviceMetaInfo); } catch (Exception e) { throw new RuntimeException (e); } VertxTcpServer vertxTcpServer = new VertxTcpServer (); vertxTcpServer.doStart(8080 ); } }
然后启动消费者示例项目,应该能够正常完成调用。如果不能,那可能就是出现了粘包半包问题。
五、粘包半包问题解决 什么是粘包和半包? 使用 TCP 协议网络通讯时,可能会出现半包和粘包问题。
举个例子。
理想情况下,假如我们客户端 连续 2 次 要发送的消息是:
1 2 3 4 Hello, server!Hello, server!Hello, server!Hello, server! Hello, server!Hello, server!Hello, server!Hello, server!
但服务端收到的消息情况可能是:
1)每次收到的数据更少了,这种情况叫做 半包
:
1 2 3 4 Hello, server!Hello, server! Hello, server!Hello, server!Hello, server!
2)每次收到的数据更多了,这种情况叫做 粘包
:
1 2 Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!
半包粘包问题演示 为了更好地理解半包和粘包,可以编写代码来测试。
1)修改 TCP 客户端代码,连续发送 1000 次消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class VertxTcpClient { public void start () { Vertx vertx = Vertx.vertx(); vertx.createNetClient().connect(8888 , "localhost" , result -> { if (result.succeeded()) { System.out.println("Connected to TCP server" ); io.vertx.core.net.NetSocket socket = result.result(); for (int i = 0 ; i < 1000 ; i++) { socket.write("Hello, server!Hello, server!Hello, server!Hello, server!" ); } socket.handler(buffer -> { System.out.println("Received response from server: " + buffer.toString()); }); } else { System.err.println("Failed to connect to TCP server" ); } }); } public static void main (String[] args) { new VertxTcpClient ().start(); } }
2)修改 TCP 服务端代码,打印出每次收到的消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Slf4j public class VertxTcpServer implements HttpServer { @Override public void doStart (int port) { Vertx vertx = Vertx.vertx(); NetServer server = vertx.createNetServer(); server.connectHandler(socket -> { socket.handler(buffer -> { String testMessage = "Hello, server!Hello, server!Hello, server!Hello, server!" ; int messageLength = testMessage.getBytes().length; if (buffer.getBytes().length < messageLength) { System.out.println("半包, length = " + buffer.getBytes().length); return ; } if (buffer.getBytes().length > messageLength) { System.out.println("粘包, length = " + buffer.getBytes().length); return ; } String str = new String (buffer.getBytes(0 , messageLength)); System.out.println(str); if (testMessage.equals(str)) { System.out.println("good" ); } }); }); server.listen(port, result -> { if (result.succeeded()) { log.info("TCP server started on port " + port); } else { log.info("Failed to start TCP server: " + result.cause()); } }); } public static void main (String[] args) { new VertxTcpServer ().doStart(8888 ); } }
3)测试运行,查看服务端控制台,发现服务端接受消息时,出现了半包和粘包:
下面分别解决半包和粘包问题。
如何解决半包? 解决半包的核心思路是:在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取。
示例代码如下:
1 2 3 4 5 6 if (buffer == null || buffer.length() == 0 ) { throw new RuntimeException ("消息 buffer 为空" ); } if (buffer.getBytes().length < ProtocolConstant.MESSAGE_HEADER_LENGTH) { throw new RuntimeException ("出现了半包问题" ); }
如何解决粘包? 解决粘包的核心思路也是类似的:每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取。
示例代码如下:
1 2 byte [] bodyBytes = buffer.getBytes(17 , 17 + header.getBodyLength());
听上去简单,但实现起来还是比较麻烦的,要记录每次接收到的消息位置,维护字节数组缓存。有没有更简单的方式呢?
Vert.x 解决半包和粘包 在 Vert.x 框架中,可以使用内置的 RecordParser
完美解决半包粘包,它的作用是:保证下次读取到 特定长度 的字符。
先学会该类库的使用,跑通测试流程,再引入到业务代码中。
基础代码 1)先使用 RecordParser
来读取固定长度的消息,示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Slf4j public class VertxTcpServer implements HttpServer { @Override public void doStart (int port) { Vertx vertx = Vertx.vertx(); NetServer server = vertx.createNetServer(); server.connectHandler(socket -> { String testMessage = "Hello, server!Hello, server!Hello, server!Hello, server!" ; int messageLength = testMessage.getBytes().length; RecordParser parser = RecordParser.newFixed(messageLength); parser.setOutput(new Handler <Buffer>() { @Override public void handle (Buffer buffer) { String str = new String (buffer.getBytes()); System.out.println(str); if (testMessage.equals(str)) { System.out.println("good" ); } } }); socket.handler(parser); }); server.listen(port, result -> { if (result.succeeded()) { log.info("TCP server started on port " + port); } else { log.info("Failed to start TCP server: " + result.cause()); } }); } public static void main (String[] args) { new VertxTcpServer ().doStart(8888 ); } }
上述代码的核心是 RecordParser.newFixed(messageLength)
,为 Parser 指定每次读取固定值长度的内容。
测试发现,这次的输出结果非常整齐,解决了半包和粘包:
2)实际运用中,消息体的长度是不固定的,所以要通过调整 RecordParser 的固定长度(变长)来解决。
那我们的思路可以是,将读取完整的消息拆分为 2 次:
先完整读取请求头信息,由于请求头信息长度是固定的,可以使用 RecordParser
保证每次都完整读取。
再根据请求头长度信息更改 RecordParser
的固定长度,保证完整获取到请求体。
修改测试 TCP Server 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Slf4j public class VertxTcpServer implements HttpServer { @Override public void doStart (int port) { Vertx vertx = Vertx.vertx(); NetServer server = vertx.createNetServer(); server.connectHandler(socket -> { RecordParser parser = RecordParser.newFixed(8 ); parser.setOutput(new Handler <Buffer>() { int size = -1 ; Buffer resultBuffer = Buffer.buffer(); @Override public void handle (Buffer buffer) { if (-1 == size) { size = buffer.getInt(4 ); parser.fixedSizeMode(size); resultBuffer.appendBuffer(buffer); } else { resultBuffer.appendBuffer(buffer); System.out.println(resultBuffer.toString()); parser.fixedSizeMode(8 ); size = -1 ; resultBuffer = Buffer.buffer(); } } }); socket.handler(parser); }); server.listen(port, result -> { if (result.succeeded()) { log.info("TCP server started on port " + port); } else { log.info("Failed to start TCP server: " + result.cause()); } }); } public static void main (String[] args) { new VertxTcpServer ().doStart(8888 ); } }
修改测试 TCP client 代码如下,自己构造了一个变长、长度信息不在 Buffer 最开头(而是有一定偏移量)的消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class VertxTcpClient { public void start () { Vertx vertx = Vertx.vertx(); vertx.createNetClient().connect(8888 , "localhost" , result -> { if (result.succeeded()) { System.out.println("Connected to TCP server" ); io.vertx.core.net.NetSocket socket = result.result(); for (int i = 0 ; i < 1000 ; i++) { Buffer buffer = Buffer.buffer(); String str = "Hello, server!Hello, server!Hello, server!Hello, server!" ; buffer.appendInt(0 ); buffer.appendInt(str.getBytes().length); buffer.appendBytes(str.getBytes()); socket.write(buffer); } socket.handler(buffer -> { System.out.println("Received response from server: " + buffer.toString()); }); } else { System.err.println("Failed to connect to TCP server" ); } }); } public static void main (String[] args) { new VertxTcpClient ().start(); } }
测试结果应该也是能够正常读取到消息的,不会出现半包和粘包。
封装半包粘包处理器 我们会发现,解决半包粘包问题还是有一定的代码量的,而且由于 ServiceProxy(消费者)和请求 Handler(提供者)都需要接受 Buffer,所以都需要半包粘包问题处理。
那我们就应该要想到:需要对代码进行封装复用了。
这里我们可以使用设计模式中的 装饰者模式 ,使用 RecordParser 对原有的 Buffer 处理器的能力进行增强。
装饰者模式可以简单理解为给对象穿装备,增强对象的能力。
在 server.tcp
包下新建 TcpBufferHandlerWrapper
类,实现并增强 Handler<Buffer>
接口。
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class TcpBufferHandlerWrapper implements Handler <Buffer> { private final RecordParser recordParser; public TcpBufferHandlerWrapper (Handler<Buffer> bufferHandler) { recordParser = initRecordParser(bufferHandler); } @Override public void handle (Buffer buffer) { recordParser.handle(buffer); } private RecordParser initRecordParser (Handler<Buffer> bufferHandler) { RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH); parser.setOutput(new Handler <Buffer>() { int size = -1 ; Buffer resultBuffer = Buffer.buffer(); @Override public void handle (Buffer buffer) { if (-1 == size) { size = buffer.getInt(13 ); parser.fixedSizeMode(size); resultBuffer.appendBuffer(buffer); } else { resultBuffer.appendBuffer(buffer); bufferHandler.handle(resultBuffer); parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH); size = -1 ; resultBuffer = Buffer.buffer(); } } }); return parser; } }
其实就是把 RecordParser 的代码粘了过来,当调用处理器的 handle
方法时,改为调用 recordParser.handle
。
优化客户端调用代码 有了半包粘包处理器,我们就可以很轻松地在业务代码中运用它了。
1)修改 TCP 请求处理器。
使用 TcpBufferHandlerWrapper
来封装之前处理请求的代码,请求逻辑不用变,需要修改的部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class TcpServerHandler implements Handler <NetSocket> { @Override public void handle (NetSocket socket) { TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper (buffer -> { }); socket.handler(bufferHandlerWrapper); } }
其实就是使用一个 Wrapper 对象 包装 了之前的代码,就解决了半包粘包。
2)修改客户端处理响应的代码。
之前我们是把所有发送请求、处理响应的代码都写到了 ServiceProxy
中,使得这个类的代码 “臃肿不堪”。
我们干脆做个优化,把所有的请求响应逻辑提取出来,封装为单独的 VertxTcpClient
类,放在 server.tcp
包下。
VertxTcpClient 的完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class VertxTcpClient { public static RpcResponse doRequest (RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException { Vertx vertx = Vertx.vertx(); NetClient netClient = vertx.createNetClient(); CompletableFuture<RpcResponse> responseFuture = new CompletableFuture <>(); netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(), result -> { if (!result.succeeded()) { System.err.println("Failed to connect to TCP server" ); return ; } NetSocket socket = result.result(); ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage <>(); ProtocolMessage.Header header = new ProtocolMessage .Header(); header.setMagic(ProtocolConstant.PROTOCOL_MAGIC); header.setVersion(ProtocolConstant.PROTOCOL_VERSION); header.setSerializer((byte ) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey()); header.setType((byte ) ProtocolMessageTypeEnum.REQUEST.getKey()); header.setRequestId(IdUtil.getSnowflakeNextId()); protocolMessage.setHeader(header); protocolMessage.setBody(rpcRequest); try { Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage); socket.write(encodeBuffer); } catch (IOException e) { throw new RuntimeException ("协议消息编码错误" ); } TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper ( buffer -> { try { ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer); responseFuture.complete(rpcResponseProtocolMessage.getBody()); } catch (IOException e) { throw new RuntimeException ("协议消息解码错误" ); } } ); socket.handler(bufferHandlerWrapper); }); RpcResponse rpcResponse = responseFuture.get(); netClient.close(); return rpcResponse; } }
注意,上述代码中,也使用了 TcpBufferHandlerWrapper
对处理响应的代码进行了封装。
修改 ServiceProxy 代码,调用 VertxTcpClient,修改后的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class ServiceProxy implements InvocationHandler { @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer()); String serviceName = method.getDeclaringClass().getName(); RpcRequest rpcRequest = RpcRequest.builder() .serviceName(serviceName) .methodName(method.getName()) .parameterTypes(method.getParameterTypes()) .args(args) .build(); try { RpcConfig rpcConfig = RpcApplication.getRpcConfig(); Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry()); ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo (); serviceMetaInfo.setServiceName(serviceName); serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION); List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey()); if (CollUtil.isEmpty(serviceMetaInfoList)) { throw new RuntimeException ("暂无服务地址" ); } ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0 ); RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo); return rpcResponse.getData(); } catch (Exception e) { throw new RuntimeException ("调用失败" ); } } ... }