博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于zbus的MySQL透明代理(<100行)
阅读量:7041 次
发布时间:2019-06-28

本文共 5723 字,大约阅读时间需要 19 分钟。

  hot3.png

项目地址 

我们上次讲到zbus网络通讯的核心API

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。

202122_bIt8_238589.png

完成大概不到100 行的代码, CoolLet’s roll

    首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage

zbus.NETIoAdaptor提供的个性化事件如下

202243_90ij_238589.png

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据	public IoBuffer encode(Object msg) {		if (msg instanceof IoBuffer) {			IoBuffer buff = (IoBuffer) msg;			return buff;		} else {			throw new RuntimeException("Message Not Support");		}	}	// 透传不需要编解码,简单返回ByteBuffer数据	public Object decode(IoBuffer buff) {		if (buff.remaining() > 0) {			byte[] data = new byte[buff.remaining()];			buff.readBytes(data);			return IoBuffer.wrap(data);		} else {			return null;		}	}

第二步,代理服务接入:

@Override	protected void onSessionAccepted(Session sess) throws IOException {		Session target = null;		Dispatcher dispatcher = sess.getDispatcher();		try {			target = dispatcher.createClientSession(targetAddress, this);		} catch (Exception e) {			sess.asyncClose();			return;		}		sess.chain = target;		target.chain = sess;		dispatcher.registerSession(SelectionKey.OP_CONNECT, target);	}

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Sessionchain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Override	public void onSessionConnected(Session sess) throws IOException {  		Session chain = sess.chain;		if(chain == null){ 			sess.asyncClose();			return; 		}   		if(sess.isActive() && chain.isActive()){ 			sess.register(SelectionKey.OP_READ);			chain.register(SelectionKey.OP_READ);		}	}

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Override	protected void onMessage(Object msg, Session sess) throws IOException {  		Session chain = sess.chain;		if(chain == null){			sess.asyncClose(); 			return;		} 		chain.write(msg); 	}

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Override	public void onSessionToDestroy(Session sess) throws IOException {   		try {			sess.close();		} catch (IOException e) { //ignore		} 		if (sess.chain == null) return; 		try {				sess.chain.close();				sess.chain.chain = null;			sess.chain = null;		} catch (IOException e) { 		}	}

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

public static void main(String[] args) throws Exception {   		Dispatcher dispatcher = new Dispatcher(); 		IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 		final Server server = new Server(dispatcher, ioAdaptor, 3306); 		server.start();	}

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQLgl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

package org.zbus.net;import java.io.IOException;import java.nio.channels.SelectionKey;import org.zbus.net.core.Dispatcher;import org.zbus.net.core.IoAdaptor;import org.zbus.net.core.IoBuffer;import org.zbus.net.core.Session;  public class TcpProxyAdaptor extends IoAdaptor {	private String targetAddress;	public TcpProxyAdaptor(String targetAddress) {		this.targetAddress = targetAddress;	}	// 透传不需要编解码,简单返回ByteBuffer数据	public IoBuffer encode(Object msg) {		if (msg instanceof IoBuffer) {			IoBuffer buff = (IoBuffer) msg;			return buff;		} else {			throw new RuntimeException("Message Not Support");		}	}	// 透传不需要编解码,简单返回ByteBuffer数据	public Object decode(IoBuffer buff) {		if (buff.remaining() > 0) {			byte[] data = new byte[buff.remaining()];			buff.readBytes(data);			return IoBuffer.wrap(data);		} else {			return null;		}	}	@Override	protected void onSessionAccepted(Session sess) throws IOException {		Session target = null;		Dispatcher dispatcher = sess.getDispatcher();		try {			target = dispatcher.createClientSession(targetAddress, this);		} catch (Exception e) {			sess.asyncClose();			return;		}		sess.chain = target;		target.chain = sess;		dispatcher.registerSession(SelectionKey.OP_CONNECT, target);	}		@Override	public void onSessionConnected(Session sess) throws IOException {  		Session chain = sess.chain;		if(chain == null){ 			sess.asyncClose();			return; 		}   		if(sess.isActive() && chain.isActive()){ 			sess.register(SelectionKey.OP_READ);			chain.register(SelectionKey.OP_READ);		}	}	@Override	protected void onMessage(Object msg, Session sess) throws IOException {  		Session chain = sess.chain;		if(chain == null){			sess.asyncClose(); 			return;		} 		chain.write(msg); 	}		@Override	public void onSessionToDestroy(Session sess) throws IOException {   		try {			sess.close();		} catch (IOException e) { //ignore		} 		if (sess.chain == null) return; 		try {				sess.chain.close();				sess.chain.chain = null;			sess.chain = null;		} catch (IOException e) { 		}	}		@SuppressWarnings("resource")	public static void main(String[] args) throws Exception {   		Dispatcher dispatcher = new Dispatcher(); 		IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); 		final Server server = new Server(dispatcher, ioAdaptor, 3306);		server.setServerName("TcpProxyServer");		server.start();	}}

转载于:https://my.oschina.net/sbz/blog/512501

你可能感兴趣的文章
选择生成日报表,月报表,年报表
查看>>
使用位操作
查看>>
Babelfish(二分)
查看>>
JS 中如何判断 undefined 和 null
查看>>
ftk学习记录(一个进度条文章)
查看>>
log4j直接输出日志到flume
查看>>
非正确使用浮点数据由项目产生BUG讨论的问题
查看>>
PHP5中的stdClass
查看>>
IntelliJ IDEA Community Edition 14.1.4下使用 Apache-Subversion搭建代码管理环境
查看>>
四种可变交流swap方法
查看>>
Lucene中的 Query对象
查看>>
二分基础
查看>>
物流英语
查看>>
[iOS]iOS8可用的识别用户方式(idfa、UUID、idfv)
查看>>
hdu1507--二分图最大匹配
查看>>
【数据结构与算法】二叉树深度遍历(递归)
查看>>
iOS开发--基于AFNetWorking3.0的图片缓存分析
查看>>
使用jqMobi开发app基础:弹出内容的设计
查看>>
3.Java集合总结系列:Set接口及其实现
查看>>
ExtJs之Element.select函数
查看>>