菜鸟笔记
提升您的技术认知

系统间通信1:阻塞与非阻塞式通信a-ag真人游戏

从这篇博文开始,我们将进入一个新文章系列。这个文章系列专门整理总结了目前系统间通信的主要原理、手段和实现。我们将讲解典型的信息格式讲解传统的rmi调用并延伸出来重点讲解rpc调用和使用案例;最后我们还会讲到soa架构的实现,包括esb实现和服务注册/治理的实现,同样包括原理、实现和使用案例。
系统间通信是架构师需要掌握的又一个关键技术领域,如果说理解和掌握负载均衡层技术需要您有一定的linux系统知识和操作系统知识的话,那么理解和掌握系统间通信层技术,需要您有一定的编程经验(最好是java编程经验,因为我们会主要以java技术作为实例演示)。

首先我们来看一个显示场景:在现实生活中有两个人技术人员a和b,在进行一问一答形式的交流。如下图所示:

我们来看这幅图的中的几个要点:

  • 他们两都使用中文进行交流。如果他们一人使用的是南斯拉夫语另一人使用的是索马里语,并且相互都不能理解对方的语系,很显然a所要表达的内容b是无法理解的。
  • 他们的声音是在空气中进行传播的。空气除了支撑他们的呼吸外,还支撑了他们声音的传播。如果没有空气他们是无法知道对方用中文说了什么。
  • 他们的交流方式是协调一致的,即a问完一个问题后,等待b进行回答。收到b的回答后,a才能问下一个问题。
  • 由于都是人类,所以他们处理信息的方式也是一样的:用嘴说话,用耳朵听话,用大脑处理形成结果。
  • 目前这个交流场景下,只有a和b两个人。但是随时有可能增加n个人进来。第n个人可能不是采用中文进行交流。

很明显通过中文的交谈,两个人相互明白了对方的意图。为了保证信息传递的高效性,我们一定会将信息做成某种参与者都理解的格式。例如:中文有其特定的语法结构,例如主谓宾,定状补。

在计算机领域为了保证信息能够被处理,信息也会被做成特定的格式,而且要确保目标能够明白这种格式。常用的信息格式包括:

2.1 xml

可扩展标记语言,这个语言由w3c(万维网联盟)进行发布和维护。xml语言应用之广泛,扩展之丰富。适合做网络通信的信息描述格式(一般是“应用层”协议了)。例如google 定义的xmpp通信协议就是使用xml进行描述的;不过xml的更广泛使用场景是对系统环境进行描述(因为它会造成较多的不必要的内容传输),例如服务器的配置描述、spring的配置描述、maven仓库描述等等。

2.2 json

json(javascript object notation) 是一种轻量级的数据交换格式。它和xml的设计思路是一致的:和语言无关(流行的语言都支持json格式描述:go、python、c、c 、c#、java、erlang、javascript等等);但是和xml不同,json的设计目标就是为了进行通信。要描述同样的数据,json格式的容量会更小。

2.3 protocol buffer

protocol buffer(以下简称pb)是google 的一种数据交换的格式,它独立于语言,独立于平台。google 提供了三种语言的实现:java、c 和 python,每一种实现都包含了相应语言的编译器以及库文件。

2.4 tlv

三元组编码,t(标记/类型域)l(长度/大小域)v(值/内容域),通常这种信息格式用于金融、军事领域。它通过字节的位运算来进行信息的序列化/反序列化(据说微信的信息格式也采用的是tlv,但实际情况我不清楚):

这里有一篇介绍tlv的文章:《通信协议之序列化tlv》,tlv格式所携带的内容是最有效的,它就连json中用于分割层次的“{}”符号都没有。

2.5 自定义

当然,如果您的两个内部系统已经约定好了一种信息格式,您当然可以使用自己定制的格式进行描述。您可以使用c 描述一个结构体,然后序列化/反序列它,或者使用一个纯文本,以“|”号分割这些字符串,然后序列化/反序列它。

在这个系列的博文中,我们不会把信息格式作为一个重点,但是会花一些篇幅去比较各种信息格式在网络上传输的速度、性能,并为大家介绍几种典型的信息格式选型场景。

如文中第一张图描述的场景,有一个我们看不到但是却很重要的元素:空气。声音在空气中完成传播,真空无法传播声音。同样信息是在网络中完成传播的,没有网络就没法传播信息。网络协议就是计算机领域的“空气”,下图中我们以osi模型作为参考:

  • 物理层:物理层就是我们的网络设备层,例如我们的网卡、交换机等设备,在他们之间我们一般传递的是电信号或者光信号。
  • 数据链路层:数据链路又分为物理链路和逻辑链路。物理链路负责组合一组电信号,称之为“帧”;逻辑链路层通过一些规则和协议保证帧传输的正确性,并且可以使来自于多个源/目标 的帧在同一个物理链路上进行传输,实现“链路复用”。
  • 网络层:网络层使用最广泛的协议是ip协议(又分为ipv4协议和ipv6协议),ipx协议。这些协议解决的是源和目标的定位问题,以及从源如何到达目标的问题。
  • 传输层:tcp、udp是传输层最常使用的协议,传输层的最重要工作就是携带内容信息了,并且通过他们的协议规范提供某种通信机制。举例来说,tcp协议中的通信机制是:首先进行三次通信握手,然后再进行正式数据的传送,并且通过校验机制保证每个数据报文的正确性,如果数据报文错误了,则重新发送。
  • 应用层:http协议、ftp协议、telnet协议这些都是应用层协议。应用层协议是最灵活的协议,甚至可以由程序员自行定义应用层协议。下图我们表示了http协议的工作方式:

    在这个系列的博文中,我们不会把网络协议作为一个重点。这是因为网络网络协议的知识是一个相对独立的的知识领域,十几篇文章都不一定讲得清楚。如果您对网络协议有兴趣,这里推荐两本书:《tcp/ip详解.卷1-协议》和《tcp/ip详解.卷2-实现》。

在文章最前面我们看到其中一个人规定了一种沟通方式:“你必须把我说的话听完,然后给我反馈后。我才会问第二个问题”。这种沟通方式虽然沟通效率不高,但是很有效:一个问题一个问题的处理。

但是如果参与沟通的人处理信息的能力比较强,那么他们还可以采用另一种沟通方式:“我给我提的问题编了一个号,在问完第x个问题后,我不会等待你返回,就会问第x 1个问题,同样你在听完我第x个问题后,一边处理我的问题,一边听我第x 1个问题。”

实际上以上两种现实中的沟通方式,在计算机领域是可以找到对应的通信方式的,这就是我们这个系列的博文会着重讲的bio(阻塞模式)通信和nio(非阻塞模式)。

4.1 bio通信模式

以前大多数网络通信方式都是阻塞模式的,即:

客户端向服务器端发出请求后,客户端会一直等待(不会再做其他事情),直到服务器端返回结果或者网络出现问题。

服务器端同样的,当在处理某个客户端a发来的请求时,另一个客户端b发来的请求会等待,直到服务器端的这个处理线程完成上一个处理。

如下图所示:

传统的bio通信方式存在几个问题:

同一时间,服务器只能接受来自于客户端a的请求信息;虽然客户端a和客户端b的请求是同时进行的,但客户端b发送的请求信息只能等到服务器接受完a的请求数据后,才能被接受。

由于服务器一次只能处理一个客户端请求,当处理完成并返回后(或者异常时),才能进行第二次请求的处理。很显然,这样的处理方式在高并发的情况下,是不能采用的。

上面说的情况是服务器只有一个线程的情况,那么读者会直接提出我们可以使用多线程技术来解决这个问题:

当服务器收到客户端x的请求后,(读取到所有请求数据后)将这个请求送入一个独立线程进行处理,然后主线程继续接受客户端y的请求。

客户端一侧,也可以使用一个子线程和服务器端进行通信。这样客户端主线程的其他工作就不受影响了,当服务器端有响应信息的时候再由这个子线程通过 监听模式/观察模式(等其他设计模式)通知主线程。

如下图所示:

但是使用线程来解决这个问题实际上是有局限性的:

  • 虽然在服务器端,请求的处理交给了一个独立线程进行,但是操作系统通知accept()的方式还是单个的。也就是,实际上是服务器接收到数据报文后的“业务处理过程”可以多线程,但是数据报文的接受还是需要一个一个的来(下文的示例代码和debug过程我们可以明确看到这一点)
  • 在linux系统中,可以创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令查看可以创建的最大线程数。当然这个值是可以更改的,但是线程越多,cpu切换所需的时间也就越长,用来处理真正业务的需求也就越少。
  • 创建一个线程是有较大的资源消耗的。jvm创建一个线程的时候,即使这个线程不做任何的工作,jvm都会分配一个堆栈空间。这个空间的大小默认为128k,您可以通过-xss参数进行调整。
  • 当然您还可以使用threadpoolexecutor线程池来缓解线程的创建问题,但是又会造成blockingqueue积压任务的持续增加,同样消耗了大量资源。另外,如果您的应用程序大量使用长连接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。

那么,如果你真想单纯使用线程解决阻塞的问题,那么您自己都可以算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。

4.2 bio通信方式深入分析

在这个系列的博文中,通信方式/框架将作为一个重点进行讲解。包括nio的原理,并通过讲解netty的使用、java原生nio框架的使用,去熟悉这些核心原理。

实际上从上文中我们可以看出,bio的问题关键不在于是否使用了多线程(包括线程池)处理这次请求,而在于accept()、read()的操作点都是被阻塞。要测试这个问题,也很简单。我们模拟了20个客户端(用20根线程模拟),利用java的同步计数器countdownlatch,保证这20个客户都初始化完成后然后同时向服务器发送请求,然后我们来观察一下server这边接受信息的情况。

4.2.1 模拟20个客户端并发请求,服务器端使用单线程:

  • 客户端代码(socketclientdaemon)
package testbsocket;
import java.util.concurrent.countdownlatch;
public class socketclientdaemon {
  
    public static void main(string[] args) throws exception {
  
        integer clientnumber = 20;
        countdownlatch countdownlatch = new countdownlatch(clientnumber);
        //分别开始启动这20个客户端
        for(int index = 0 ; index < clientnumber ; index   , countdownlatch.countdown()) {
  
            socketclientrequestthread client = new socketclientrequestthread(countdownlatch, index);
            new thread(client).start();
        }
        //这个wait不涉及到具体的实验逻辑,只是为了保证守护线程在启动所有线程后,进入等待状态
        synchronized (socketclientdaemon.class) {
  
            socketclientdaemon.class.wait();
        }
    }
}
  • 客户端代码(socketclientrequestthread模拟请求)
package testbsocket;
import java.io.ioexception;
import java.io.inputstream;
import java.io.outputstream;
import java.net.socket;
import java.util.concurrent.countdownlatch;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.apache.log4j.basicconfigurator;
/**
 * 一个socketclientrequestthread线程模拟一个客户端请求。
 * @author yinwenjie
 */
public class socketclientrequestthread implements runnable {
  
    static {
  
        basicconfigurator.configure();
    }
    /**
     * 日志
     */
    private static final log logger = logfactory.getlog(socketclientrequestthread.class);
    private countdownlatch countdownlatch;
    /**
     * 这个线层的编号
     * @param countdownlatch
     */
    private integer clientindex;
    /**
     * countdownlatch是java提供的同步计数器。
     * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性
     * @param countdownlatch
     */
    public socketclientrequestthread(countdownlatch countdownlatch , integer clientindex) {
  
        this.countdownlatch = countdownlatch;
        this.clientindex = clientindex;
    }
    @override
    public void run() {
  
        socket socket = null;
        outputstream clientrequest = null;
        inputstream clientresponse = null;
        try {
  
            socket = new socket("localhost",83);
            clientrequest = socket.getoutputstream();
            clientresponse = socket.getinputstream();
            //等待,直到socketclientdaemon完成所有线程的启动,然后所有线程一起发送请求
            this.countdownlatch.await();
            //发送请求信息
            clientrequest.write(("这是第"   this.clientindex   " 个客户端的请求。").getbytes());
            clientrequest.flush();
            //在这里等待,直到服务器返回信息
            socketclientrequestthread.logger.info("第"   this.clientindex   "个客户端的请求发送完成,等待服务器返回信息");
            int maxlen = 1024;
            byte[] contextbytes = new byte[maxlen];
            int reallen;
            string message = "";
            //程序执行到这里,会一直等待服务器返回信息(注意,前提是in和out都不能close,如果close了就收不到服务器的反馈了)
            while((reallen = clientresponse.read(contextbytes, 0, maxlen)) != -1) {
  
                message  = new string(contextbytes , 0 , reallen);
            }
            socketclientrequestthread.logger.info("接收到来自服务器的信息:"   message);
        } catch (exception e) {
  
            socketclientrequestthread.logger.error(e.getmessage(), e);
        } finally {
  
            try {
  
                if(clientrequest != null) {
  
                    clientrequest.close();
                }
                if(clientresponse != null) {
  
                    clientresponse.close();
                }
            } catch (ioexception e) {
  
                socketclientrequestthread.logger.error(e.getmessage(), e);
            }
        }
    }
}
  • 服务器端(socketserver1)单个线程
package testbsocket;
import java.io.inputstream;
import java.io.outputstream;
import java.net.serversocket;
import java.net.socket;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.apache.log4j.basicconfigurator;
public class socketserver1 {
  
    static {
  
        basicconfigurator.configure();
    }
    /**
     * 日志
     */
    private static final log logger = logfactory.getlog(socketserver1.class);
    public static void main(string[] args) throws exception{
  
        serversocket serversocket = new serversocket(83);
        try {
  
            while(true) {
  
                socket socket = serversocket.accept();
                //下面我们收取信息
                inputstream in = socket.getinputstream();
                outputstream out = socket.getoutputstream();
                integer sourceport = socket.getport();
                int maxlen = 2048;
                byte[] contextbytes = new byte[maxlen];
                //这里也会被阻塞,直到有数据准备好
                int reallen = in.read(contextbytes, 0, maxlen);
                //读取信息
                string message = new string(contextbytes , 0 , reallen);
                //下面打印信息
                socketserver1.logger.info("服务器收到来自于端口:"   sourceport   "的信息:"   message);
                //下面开始发送信息
                out.write("回发响应信息!".getbytes());
                //关闭
                out.close();
                in.close();
                socket.close();
            }
        } catch(exception e) {
  
            socketserver1.logger.error(e.getmessage(), e);
        } finally {
  
            if(serversocket != null) {
  
                serversocket.close();
            }
        }
    }
}

4.2.2 使用多线程来优化服务器端的处理过程

客户端代码和上文一样,最主要是更改服务器端的代码:

package testbsocket;
import java.io.ioexception;
import java.io.inputstream;
import java.io.outputstream;
import java.net.serversocket;
import java.net.socket;
import org.apache.commons.logging.log;
import org.apache.commons.logging.logfactory;
import org.apache.log4j.basicconfigurator;
public class socketserver2 {
  
    static {
  
        basicconfigurator.configure();
    }
    private static final log logger = logfactory.getlog(socketserver2.class);
    public static void main(string[] args) throws exception{
  
        serversocket serversocket = new serversocket(83);
        try {
  
            while(true) {
  
                socket socket = serversocket.accept();
                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
                socketserverthread socketserverthread = new socketserverthread(socket);
                new thread(socketserverthread).start();
            }
        } catch(exception e) {
  
            socketserver2.logger.error(e.getmessage(), e);
        } finally {
  
            if(serversocket != null) {
  
                serversocket.close();
            }
        }
    }
}
/**
 * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
 * 但还是改变不了socket被一个一个的做accept()的情况。
 * @author yinwenjie
 */
class socketserverthread implements runnable {
  
    /**
     * 日志
     */
    private static final log logger = logfactory.getlog(socketserverthread.class);
    private socket socket;
    public socketserverthread (socket socket) {
  
        this.socket = socket;
    }
    @override
    public void run() {
  
        inputstream in = null;
        outputstream out = null;
        try {
  
            //下面我们收取信息
            in = socket.getinputstream();
            out = socket.getoutputstream();
            integer sourceport = socket.getport();
            int maxlen = 1024;
            byte[] contextbytes = new byte[maxlen];
            //使用线程,同样无法解决read方法的阻塞问题,
            //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
            int reallen = in.read(contextbytes, 0, maxlen);
            //读取信息
            string message = new string(contextbytes , 0 , reallen);
            //下面打印信息
            socketserverthread.logger.info("服务器收到来自于端口:"   sourceport   "的信息:"   message);
            //下面开始发送信息
            out.write("回发响应信息!".getbytes());
        } catch(exception e) {
  
            socketserverthread.logger.error(e.getmessage(), e);
        } finally {
  
            //试图关闭
            try {
  
                if(in != null) {
  
                    in.close();
                }
                if(out != null) {
  
                    out.close();
                }
                if(this.socket != null) {
  
                    this.socket.close();
                }
            } catch (ioexception e) {
  
                socketserverthread.logger.error(e.getmessage(), e);
            }
        }
    }
}

4.2.3 服务器端的执行效果

我相信服务器使用单线程的效果就不用看了,我们主要看一看服务器使用多线程处理时的情况:

4.2.4

那么重点的问题并不是“是否使用了多线程”,而是为什么accept()、read()方法会被阻塞。即:异步io模式 就是为了解决这样的并发性存在的。但是为了说清楚异步io模式,在介绍io模式的时候,我们就要首先了解清楚,什么是 阻塞式同步、非阻塞式同步、多路复用同步模式。
api文档中对于 serversocket.accept() 方法的使用描述:

listens for a connection to be made to this socket and accepts it. the method blocks until a connection is made.

那么我们首先来看看为什么serversocket.accept()会被阻塞。这里涉及到阻塞式同步io的工作原理:

  • 服务器线程发起一个accept动作,询问操作系统 是否有新的socket套接字信息从端口x发送过来。
  • 注意,是询问操作系统。也就是说socket套接字的io模式支持是基于操作系统的,那么自然同步io/异步io的支持就是需要操作系统级别的了。如下图:
  • 如果操作系统没有发现有套接字从指定的端口x来,那么操作系统就会等待。这样serversocket.accept()方法就会一直等待。这就是为什么accept()方法为什么会阻塞:它内部的实现是使用的操作系统级别的同步io。

阻塞io 和 非阻塞io 这两个概念是程序级别的。主要描述的是程序请求操作系统io操作后,如果io资源没有准备好,那么程序该如何处理的问题:前者等待;后者继续执行(并且使用线程一直轮询,直到有io资源准备好了)

同步io 和 非同步io,这两个概念是操作系统级别的。主要描述的是操作系统在收到程序请求io操作后,如果io资源没有准备好,该如何相应程序的问题:前者不响应,直到io资源准备好以后;后者返回一个标记(好让程序和自己知道以后的数据往哪里通知),当io资源准备好以后,再用事件机制返回给程序。

网站地图