muduo网络库学习
Reactor
Muduo库是基于Reactor模式实现的TCP网络编程库。其中的重要组件是由Event(事件)、Reactor(反应堆)、Demultiplex(事件分发器)和Evanthandler(事件处理器)四部分构成的,其相互关系如下图所示:
- 首先,将事件Event注册到反应堆Reactor上,即将应用程序感兴趣的事件注册到反应堆上,请求反应堆帮助监听,若事件发生,反应堆调用应用程序预制的回调(handler),一个event对应一个handler;
- 反应堆Reactor相当于是一个事件以及事件处理的集合,通过相应的方法在事件分发器Demultiplex里做一个相应的调整(add/mod/del event),然后启动事件循环(epoll_wait),服务器处于阻塞状态等待新用户的连接,或者已连接用户的读写事件;
- 如果epoll_wait监听到有新事件产生,分发器返回事件给反应堆,反应堆调用相应的事件处理器eventhandler;
- eventhandler中读取用户的请求,解码,处理,打包,发送。
在muduo库的Reactor模型上:
- Poller和EPollPoller就是Demultiplex
- Channel封装了Event,里面存在fd,events,revents,以及相应的回调函数callbacks,其中有两种channel,acceptorChannel以及connectionChannel分别对应listenfd以及connfd
- EventLoop就是Reactor
这也就看出muduo库的三个核心组件是:Channel类、Poller/EpollPoller类以及EventLoop类这三个组件之间的关系如下图所示:
muduo库三大核心组件之Channel类
- Channel类封装了一个 fd 、fd感兴趣事件events、该fd实际发生的事件revents。同时Channel类还提供了设置该fd的感兴趣事件,以及相应的回调函数。
channel头文件
1 |
|
channel实现
1 |
|
重要设计
1 |
|
- tie这个成员变量的设计:
- 当客户端正常断开TCP连接,IO事件会触发Channel中的设置的CloseCallback回调,但是用户代码在onClose()中有可能析构Channel对象,导致回调执行到一半的时候,其所属的Channel对象本身被销毁了
- 为了解决这个问题 考虑延长生命周期,怎么延长?
- 如果直接在另一个类声明一个强引用,但是这会出现循环引用问题
- 所以想着使用弱引用,那弱引用该如何延长生命周期呢?
- 可以在调用函数之前 将它提升为强引用赋值给一个强引用 从而增加引用计数
- 在调用完某函数之前都不会引用计数变为0,执行完后出作用域,引用计数-1
3. 我们可以看出handleEvent中tie实际上这是一个弱指针,绑定到TcpConnection的共享指针 ,如果可以原来的弱指针,变成了强指针,这时候tie()的作用 就表明了,延长了TcpConnection的生命周期,使之长过Channel::handleEvent(),保证了TcpConnection不被销毁
muduo库三大核心组件之EventLoop类
- Poller封装了和事件监听有关的方法和成员,调用Poller派生类EpollPoller::poll方法,我们就可以获得发生事件的fd 及其 发生的事件。EventLoop是网络服务器中负责 循环 的重要模块,从而做到持续监听、持续获取监听结果、持续处理监听结果对应的事件。
- 也就是说: EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果,Channel类将fd及其相关属性封装,并将fd及其感兴趣事件和发生的事件以及不同事件对应的回调函数封装在一起,这样在各个模块中传递更加方便。接着被EventLoop调用。
- 可能上面我画的图不能充分表达三者在muduo库中的角色,下面借用我在地铁站里吃闸机博主的图,可能会让大家看的更加直观。
重要成员变量
1 | // 用来存放活跃的channels |
- poller就不用在多说什么了,通过它会返回给EventLoop发生的事件。
- wakeupFd是非常重要的一个成员,与之对应的wakeupChannel,起到了一个唤醒loop所在的线程的作用,因为当前线程主要阻塞在poll函数上,唤醒的方法是手动激活这个wakeupChannel, 写入几个字节让Channel变为可读, 当然这个Channel也注册到Pooll中,在下面的成员函数会详细介绍它的实现。
- threadId创建时要保存当前时间循环所在的线程,用于之后运行时判断使用EventLoop的线程是否时EventLoop所属的线程.
- pollReturnTime保存poll返回的时间,用于计算从激活到调用回调函数的延迟
- activeChannels就是poller返回的所有发生事件的channel列表。
- callingPendingFunctors标识当前loop是否有需要执行的回调操作
- pendingFunctors存储loop需要执行的所有回调操作,避免本来属于当前线程的回调函数被其他线程调用,应该把这个回调函数添加到属于它所属的线程,等待它属于的线程被唤醒后调用,满足线程安全
- mutex互斥锁,用来保护vector容器的线程安全操作
重要成员函数
1 | public: |
其实现
1 | // |
其中的巧妙设计
1 | void EventLoop::doPendingFunctors() { |
- 第一个比较巧妙的思想就是,使用一个局部的
vector
和pendingFunctors_
的交换:- 可以缩小pendingFunctors_的容量, 因为如果只是resize, 它只会重新设定大小(size)而不会重新设定容量
- 最重要的原因:可以最大的减小占用互斥锁的时间,使得其只在swap加锁, 在执行回调函数的时候不加锁,可能还能预防在执行回调函数的时候获取锁而死锁,也能在确保执行回调函数的时候,能够往pendingFunctors_里加数据
1 | void EventLoop::wakeup() const { |
- 第二个比较巧妙的设计就是,使用
wakeupFd_
,这是最巧妙的- 传统的进程/线程间唤醒办法是用pipe或者socketpair,IO线程始终监视管道上的可读事件,在需要唤醒的时候,其他线程向管道中写一个字节,这样IO线程就从IO multiplexing阻塞调用中返回。pipe和socketpair都需要一对文件描述符,且pipe只能单向通信,socketpair可以双向通信。一方面它比 pipe 少用一个 fd,节省了资源;另一方面,wakeupFd的缓冲区管理也简单得多,全部buffer只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。muduo库也没有采用生产者消费者的模型,采用了wakeupFd这种巧妙的思想,在今后的学习中,我们也可以进一步的使用它。
- 什么是eventfd,他是用来记录有多少的事件可读,其有两个函数read 清空计数,write类加技术(具体请参考:https://blog.csdn.net/EDDYCJY/article/details/118980819)
- 他可以用来跨线程通知
- 会有两种唤醒情况
1. 会唤醒被poller_->poll(kPollTimeMs,&activeChannels_);阻塞的事件循环线程 1. 事件循环线程正在执行回调函数,当他执行完后,再次调用'poller_->poll(kPollTimeMs,&activeChannels_);'由于有新的事件发生了(eventfd也就是wakeupChannel_有读事件),就不会被阻塞而继续执行doPendingFunctors();
muduo库三大核心组件之 Poller/EpollPoller类
Poller负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件。在 muduo 中,使用抽象基类 Poller ,并由EpollPoller和PollPoller派生基类中继承实现 epoll 和 poll
重要成员变量
1 | //poller.h |
- 详情见注释
重要成员函数
1 |
|
muduo库Acceptor类
Acceptor重要成员变量
1 | Socket acceptSocket_; |
eventloop__监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。
acceptSocket 服务器监听套接字的文件描述符
acceptChannel把acceptSocket及其感兴趣事件和事件对应的处理函数进行封装。
newConnetionCallback这个是最重要的一个成员了,它的类型是using NewConnectionCallback = std::function
;,在TcpServer构造函数中通过acceptor ->setNewConnetionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::1,std::placeholders::_2));将 TcpServer::newConnection函数注册给了这个成员变量。这个 TcpServer::newConnection函数的功能是通过轮询EventLoop *ioLoop = threadPool->getNextLoop();选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。_listenning是一个标志位
idlefd_是用来防止底层一直通知有事件以及为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作
Acceptor重要成员函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79//
// Created by ubuntu on 24-7-19.
//
static int createNonblocking()
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if(sockfd < 0)
{
LOG_FATAL("%s:%s:%d listen socket create err:%d \n", __FILE__,__FUNCTION__,__LINE__,errno);
}
}
Acceptor::Acceptor(std::shared_ptr<EventLoop> event_loop, const InetAddress& address, bool resuseport)
: acceptSocket_(createNonblocking())
, acceptChannel_(acceptSocket_.fd(),event_loop)
, event_loop_(event_loop)
, listening_(false)
, idlefd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {
assert(idlefd_ >= 0);
acceptSocket_.SetReuseaddr(true);
acceptSocket_.SetReuseport(resuseport);
acceptSocket_.bind(address);
setNewConnectionCallBack(std::bind(&Acceptor::handleread, this));
}
Acceptor::~Acceptor() {
acceptChannel_.disableAll();
acceptChannel_.remove();
close(idlefd_);
}
void Acceptor::listen() {
acceptChannel_.enableReading();
listening_ = true;
acceptSocket_.listen(2);
}
void Acceptor::handleread() {
InetAddress clientAddress{};
int connfd = acceptSocket_.accept(clientAddress);
if(connfd >= 0) {
if(connectionCallBack_) {
connectionCallBack_(connfd, clientAddress);
}else {
close(connfd);
}
}
else {
LOG_ERROR("in Acceptor::handleRead");
// 由于采用了LT,为了防止一直通知,使用idlefd_来接受读事件
//`idleFd_`的设计是为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作。
//通过预先打开一个文件描述符(`/dev/null`),在文件描述符耗尽时释放它来接受新连接,
//然后再重新打开`/dev/null`,这种机制可以有效避免程序因无法分配文件描述符而崩溃。
if (errno == EMFILE)
{
::close(idlefd_);
idlefd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idlefd_);
idlefd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}其中值得一提的是handleread的实现:
这里主要是建立连接,通过调用Socket的accept函数,底层调用系统的accept函数,返回一个已连接的socket描述字,这样连接就建立了。同时内部还调用了成员变量newConnectionCallback保存的函数,当mainLoop监听到acceptChannel上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法,内部调用newConnetionCallback_,也就是TcpServer设置的一个回调函数setNewConnetionCallback,绑定了TcpServer::newConnection,通过 轮询算法,选择一个subloop,分发当前的新客户端的Channel,并且绑定了一些回调。
比较好的设计
- idlefd:
- 这个思想也是很巧妙的,在调用accept的过程中,如果已用文件描述符过多,accept会返回-1,构造函数中注册的idleFd就派上用场了。当前文件描述符过多,无法接收新的连接。但是由于我们采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空的idleFd,然后去accept,这样进行了连接,让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。这样就优雅的解决 EMFIFE 问题,也就是为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作。
muduo库Buffer类
(本文参考链接:https://blog.csdn.net/m0_73537205/article/details/138665904)
为什么需要buffer类?
- 首先是为什么要有
Buffer
?为什么不能直接接收和发送呢,如果直接使用 read()、write() 系统调用进行操作,就可以直接将应用层数据发送出去或者将内核数据读取到应用层,加上Buffer
以后,相当于在应用层数据和内核缓冲区之间又多了一层,会不会造成负面影响呢?[2] 中明确,应用层的缓冲区是必要的,因为非阻塞 IO 的核心就是避免阻塞以在 read() 和 write() 为代表的系统调用上。 - 对于发送来说,假设应用程序需要发送 40KB 数据,但是操作系统的 TCP 发送缓冲区只有 25KB 剩余空间,如果等待内核缓冲区可用,就会阻塞当前线程,因为不知道对方什么时候收到并读取数据。因此网络库应该把这 15KB 数据先缓存起来,等 fd 变得可写的时候立刻发送数据,这样操作才不会造成阻塞。需要注意,如果应用程序随后又要发送 50KB 数据,而此时发送缓冲区中尚有未发送的数据,那么网络库应该将这 50KB 数据追加到发送缓冲区的末尾,而不能立刻尝试 write(),因为这样有可能打乱数据的顺序。对于接收来说,假设一次读到的数据不够一个完整的数据包,那么这些已经读到的数据应该先暂存在某个地方,等剩余的数据收到之后再一并处理。所以说,发送缓冲区和接收缓冲区的存在都是必要的。
Buffer设计思想
- muduo的Buffer的定义如下,其内部是 一个 std::vector,且还存在两个sizet类型的readerIndex,writerIndex_标识来表示读写的位置。结构图如下:
readIndex、writeIndex把整个vector内容分为3块:prependable、readable、writable,各块大小关系:
- prependable = readIndex
- readable = writeIndex - readIndex
- writable = buffer.size() - writeIndex
Buffer类是可以动态扩容的,在下面的成员函数中,会详细介绍。
Buffer重要成员变量
1 |
|
- 其中readerIndex_指向的可读地区的起始地址
- writerIndec_指向的是可写位置的起始地址
Buffer重要成员函数
1 | // 返回可读的缓冲区大小 |
- 这几个代码功能如注释所示
1 |
|
比较巧妙的思想是:在进行扩容的时候考虑两种扩容情况
- 预留空间 加上 可写空间(prependableBytes() + writerableBytes())不足的的话 就直接扩容
- 否则 就不直接扩容,而是重新分配下 内部结构
这样可以避免内存空间的浪费(因为更新可读与可写地区的时候是通过移动两个Index实现的,可能readerIndex会一直往右移,导致空间预留空间越来越大,如果直接扩容会导致内存空间的浪费)
1 | // 读len长的数据,并进行移动位置 |
- 这部分函数是对读数据操作的函数,其功能见注释
1 | //从fd上读取数据 |
- 读数据的设计思想:
- 我们在读数据的时候,不知道数据的最终大小是多少,所以采用了如下的方法:
- 首先会在栈区开一个64k的空间,利用栈的好处是可以自动的释放,并计算出目前剩余可写的空间大小;
- 利用结构体 iovec 指定了两块缓冲区,一块是目前剩余的可写的Buffer,一个是临时的缓冲区,指定了起始位置以及缓冲区的大小;
- const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; 如果writable < sizeof extrabuf就选2块内存readv,否则一块就够用;
- 读数据const ssize_t n = ::readv(fd, vec, iovcnt);
- 若读取的数据超过现有内部buffer的writable空间大小时, 启用备用的extrabuf 64KB空间, 并将这些数据添加到内部buffer的末尾。
- 我们在读数据的时候,不知道数据的最终大小是多少,所以采用了如下的方法:
iovec结构体定义
1 |
|
- struct iovec定义了一个向量元素。通常,这个结构用作一个多元素的数组。对于每一个传输的元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是readv所接收的数据或是writev将要发送的数据。成员iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度。
- 其中readv函数可以一次性向多个缓冲区读数据,而不用挨个调用,减少系统调用次数,writev同理
muduo库TcpConnection类
设计思想以及一些感悟
在学习此网络库之前我并不知道到底什么时候该用智能指针,什么时候不该用,在一些情况下使用智能指针会带来额外的性能开销,所以不能无脑梭哈,所以不知道该如何权衡使用它们,但是学习之后我得到了一些启发
1
2
3
4
5
6
7
8
9
10
11
12/******** Callbacks.h ********/
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
/******** TcpServer.cc ********/
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
代码省略
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
connections_[connName] = conn;
代码省略
ioLoop->runInLoop(bind(&TcpConnection::connectEstablished, conn));
}在TcpServer::newConnection()函数中,当接受了一个新用户连接,就要把这个Tcp连接封装成一个TcpConnection对象,也就是上面代码中的new TcpConnection(…)。然后用一个共享型智能指针来管理这个对象。所以为什么这里要把TcpConnection用智能指针来管理?
这里使用智能指针管理TcpConnetion的最重要原因在于防止指针悬空,而指针悬空可能会来自以下这三个方面:
考虑一种情况,如果在通信的时候,用户手贱将TcpConnection给删除了,删除了之后,程序内部还要好几处地方都在使用TcpConnection对象。结果这个对象的内存突然消失了,服务器访问非法内存崩溃。我们不能防止用户做这些事情,所以编程设计不可以依赖用户行为,一定要尽可能地封死用户的误操作。所以这里用了共享智能指针。
但是单单只使用一个shared_ptr还不足以解决上述问题,还需要配合weak_ptr来实现生命周期的延长,在muduo中就有这样的设计,这个设计非常的好,值得学习,以下为具体内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TcpConnection : noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(EventLoop *loop,
const std::string &name,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);
/***** TcpConnection.cpp *****/
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}
- Tcpconnection设计的时候继承了enable_shared_from_this,这使得能通过调用shared_from_this,获得一个共享指针
- 假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA。
- 接着这个shared_ptr就作为channel_的Channel::tie()函数的函数参数。
- 再根据上述Channel专栏中讲的[重要设计](#重要设计),可知使用智能指针的好处
考虑第二种情况,TcpConnection对象的多线程安全问题:
- 假如服务器要关闭了这个时候MainEventLoop线程中的TcpServer::~TcpServer()函数开始把所有TcpConnection对象都删掉。那么其他线程还在使用这个TcpConnection对象,如果你把它的内存空间都释放了,其他线程访问了非法内存,会直接崩溃。
- 你可能会觉得,反正我都要把服务器给关了,崩就崩了吧。这种想法是错的!因为可能在你关闭服务器的时候,其他线程正在处理TcpConnection的发送消息任务,这个时候你应该等它发完才释放TcpConnection对象的内存才对!
- 使用智能指针就很好的解决了这个问题,即使在一个线程中删除一个TcpConnection,也只是删除了引用,而只要引用技术不为0就不会删除掉
TcpConnection重要成员变量
1 | // 一个subloop |
- 具体含义见注释
- 其中两个输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer中的数据拷贝到Tcp发送缓冲区中。
TcpConnection重要成员函数
首先来看下构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static EventLoop *CheckLoopNotNull(EventLoop* loop)
{
if(loop == nullptr)
{
LOG_FATAL("%s:%s:%d TcpConnection loop is null! \n",__FILE__,__FUNCTION__,__LINE__);
}
return loop;
}
TcpConnection::TcpConnection(EventLoop *loop,
const std::string &nameArg,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr)
: loop_(CheckLoopNotNull(loop))
, name_(nameArg)
, state_(kConnecting)
, reading_(true)
, socket_(std::make_unique<Socket>(sockfd))
, channel_(std::make_unique<Channel>(sockfd, loop))
, localAddr_(localAddr)
, peerAddr_(peerAddr)
, highWaterMark_(64*1024*1024) //64M
{
//下面给channel设置相应的回调函数
//poller给channel通知感兴趣的事件发生了
//channel会回调相应的操作函数
//将TcpConnection自己的成员函数注册给当前accept返回的connfd对应的Channel对象上
channel_->setReadCallBack(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));
channel_->setWriteCallBack(std::bind(&TcpConnection::handleWrite,this));
channel_->setCloseCallBack(std::bind(&TcpConnection::handleClose,this));
channel_->setErrorCallBack(std::bind(&TcpConnection::handleError,this));
LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
//开启Tcp/Ip层的心跳包检测
socket_->SetKeepalive(true);
}- 其功能是创建一个TcpConnection,初始化一些变量,并且设置一些回调函数以及开启心跳包检测
接着我们看看发送数据相关的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99/ 发送数据
void TcpConnection::send(const std::string &buf) //直接引用buffer
{
// 先检查是否处于连接状态
if(state_ == kConnected)
{
// 判断处理的线程是否是事件循环线程
if(loop_->isInLoopThread())
{
//string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。
sendInLoop(buf.c_str(),buf.size());
}
else
{
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop
, this
, buf.c_str()
, buf.size()
));
}
}
}
/**
* 发送数据,应用写得快,内核发送数据慢,
* 需要把待发送的数据写入缓冲区
* 且设置了水位回调,防止发送太快
*/
void TcpConnection::sendInLoop(const void* data, size_t len)
{
ssize_t nwrote = 0; // 已经写了多少数据
size_t remaining = len; //未发送的数据
bool faultError = false; //记录是否产生错误
//之前调用过connection的shutdown 不能在发送了
if(state_ == kDisconnected)
{
LOG_ERROR("disconnected,give up writing!");
return ;
}
//channel 第一次开始写数据,且缓冲区没有待发送数据
if(!channel_->isWriteEvent() && outputBuffer_.readAbleBytes() == 0)
{
nwrote = ::write(channel_->fd(),data,len);
if(nwrote >= 0)
{
// 更新剩余数据
remaining = len - nwrote;
if(remaining == 0 && writeCompleteCallback_)
{
//一次性数据全部发送完成,就不要再给channel设置epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this()));
}
}
else
{
nwrote = 0;
if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写
{
LOG_ERROR("TcpConnection::sendInLoop");
if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET
{
faultError = true;
}
}
}
}
// 说明当前这一次write ,并没有把数据全发送出去,剩余的数据
// 需要保存到缓冲区当中,给channel注册epollout事件
// poller发现tcp发送缓冲区有空间,会通知相应的socket-channel
// 调用相应的writeCallback()回调方法
// 也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去
if(!faultError && remaining > 0)
{
//目前发送缓冲区剩余的待发送数据的长度
size_t oldlen = outputBuffer_.readAbleBytes();
if(oldlen + remaining >= highWaterMark_
&& oldlen < highWaterMark_
&& highWaterMark_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining)
);
}
// 把未发送的数据加入到发送缓冲区
outputBuffer_.append((char*)data + nwrote,remaining);
if(!channel_->isWriteEvent())
{
channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout
}
}
}- 为了秉持一个线程占用一个eventloop所以我们要确保执行发送任务的时候是发生在事件循环线程中 所以要进行一次判断,如果不是得投放到任务容器里等待被唤醒
- 发送数据的时候,会有两种情况,一种是第一次发送直接全部发送完了,这时候将它放入事件循环的处理函数容器,并唤醒对应线程处理,第二种情况是第一次没发完,这时就需要将未发完的数据存入outputBuffer里,存入的时候需要进行比较oldlen + remaining >= highWaterMark && oldlen < highWaterMark&& highWaterMark_,如果满足就进行高水位回调(这有助于防止发送方发送过多数据,导致接收方无法及时处理。)
- 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。
一系列的回调函数处理
- handleread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58void TcpConnection::handleRead(TimeStamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
if(n > 0)
{
// 已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage
// shared_from_this()获取了当前TcpConnection对象的智能指针
messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
}
else if(n==0) //客户端断开
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::hanleRead");
handleError();
}
}
void TcpConnection::handleWrite()
{
if(channel_->isWriteEvent())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n > 0)
{
outputBuffer_.retrieve(n); //处理了n个
if(outputBuffer_.readAbleBytes() == 0) //发送完成
{
channel_->disableWriting(); //不可写了
if(writeCompleteCallback_)
{
//唤醒loop对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
if(state_ == kDisconnecting)
{
shutdownInLoop();// 在当前loop中删除TcpConnection
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
}
}- 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作
onMessage
,shared_from_this()
获取了当前TcpConnection对象的智能指针.
- 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作
handlewirte
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35void TcpConnection::handleWrite()
{
if(channel_->isWriteEvent())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
if(n > 0)
{
outputBuffer_.retrieve(n); //处理了n个
if(outputBuffer_.readAbleBytes() == 0) //发送完成
{
channel_->disableWriting(); //不可写了
if(writeCompleteCallback_)
{
//唤醒loop对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_,shared_from_this())
);
}
if(state_ == kDisconnecting)
{
shutdownInLoop();// 在当前loop中删除TcpConnection
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
}
}如果可写,通过fd发送数据,直到发送完成
- 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
- 当前TCP正在断开连接,调用
shutdownInLoop
,在当前loop中删除TcpConnection
handleclose
1
2
3
4
5
6
7
8
9
10
11//Poller => Channel::closeCallback => TcpConnection::handlerClose
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); //执行连接关闭的回调
closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
}处理逻辑就是将这个TcpConnection对象中的channel从事件监听器中移除。然后调用connectionCallback和closeCallback保存的回调函数。closeCallback在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。
handleerror
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);
}
- 关闭写端函数
1 | //关闭连接 |
- 注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
- 另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。
关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中TcpConnection的引用计数问题:
- 首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
- 客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
- 触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
- 触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
- 主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结用conn为参数构造了bind。引用计数不变 总数:1
- 回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0
连接建立与删除函数
1 | //建立连接 |
muduo库TcpServer类
- TcpServer与用户打交道,用户可以传入读回调函数,写完成回调等函数给TcpConnection
- TcpServer 发送逻辑给Acceptor或者TcpConnection,进行相应处理
TcpServer重要成员变量
1 | //baseloop_用户定义的loop |
- 详解见注释
TcpServer重要成员函数
1 |
|
- 具体功能见注释
muduo库Thread类
Thread重要成员变量
1 | std::string name_;//线程名字 |
- 具体见注释
Thread重要成员函数
1 | std::atomic_int Thread::numCreated_(0); //静态成员变量 要在类外单独进行初始化 |
- 值得注意的是线程并不是在构造函数中创建的,并且必须得创建线程后得到id才能继续运行(因为其他逻辑实现的one loop per thread必须要得到tid)
补充一下sem_t变量类型
1
2
3
4
5
6
7
8
9sem_init()// sem为指向信号量结构的一个指针;pshared不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享;value给出了信号量的初始值。
extern int sem_init __P ((sem_t *__sem, int __pshared, unsigned int __value));
sem_wait( sem_t *sem ) //用来阻塞当前线程直到信号量sem的值大于0,解除阻塞后将sem的值减一,表明公共资源经使用后减少。函数sem_trywait ( sem_t *sem )是函数sem_wait()的非阻塞版本,它直接将信号量sem的值减一。
sem_post( sem_t *sem )//用来增加信号量的值。当有线程阻塞在这个信号量上时,调用这个函数会使其中的一个线程不在阻塞,选择机制同样是由线程的调度策略决定的。
//sem_post和sem_wait函数配合使用来达到线程同步
sem_destroy(sem_t *sem)//用来释放信号量sem。
muduo库EventLoopThread类
EventLoopThread重要成员变量
1 | EventLoop *loop_; //线程内部的eventloop* |
- 具体见注释
EventLoopThread重要成员函数
1 |
|
启动LOOP的过程
- 首先,在startLoop中调用了Thread::start(),而这里的thread的线程函数是threadFunc,在上一篇剖析Thread类时,我们在Thread::start(),看到了一个线程函数func(),所以func就是构造函数中&EventLoopThread::threadFunc,this 传入的,所以这里会创建线程调用threadFunc()函数,并且主线程阻塞等待EventLoop对象的创建
- 此时有两个线程在运行 一个是调用EventLoopThread::startLoop()的线程,一个是执行EventLoopThread::threadFunc()的线程 IO线程
- threadFunc是在单独的新线程里面运行的,创建一个独立的Eventloop,和上面的线程是一一对应的 one loop per thread
- 将IO线程定义好的loop传入回调
- 创建好loop了唤醒主线程,并把loop给主线程,主线程返回IO线程创建的EventLoop对象
- 注意开始执行 loop.loop();,也就是EventLoop loop => Poller.poll,开始一个循环,知道循环结束eventloop析构,把loop设为空。
总体来说,EventLoopThread提供了对应eventloop和thread的封装,意为I/O线程类,EventLoopThread可以创建一个IO线程,通过startLoop返回一个IO线程的loop,threadFunc中开启loop循环。
muduo库EventLoopThreadPool类
EventLoopThreadPool重要成员变量
1 | EventLoop *baseLoop_; //主线程loop |
- 具体含义见注释
EventLoopThreadPool重要成员函数
1 | explicit EventLoopThreadPool(EventLoop* baseloop, const std::string& nameArg = std::string{}); |
- 启动线程池,实际上创建numThreads个线程,并让每个eventloopthread调用startLoop()
- 设置当前状态为true,根据需要的线程数numThreads,创建线程
- _在for循环中,先创建一个EventLoopThread对象,构造线程池内线程集合
- 调用EventLoopThread::startLoop(),创建线程,绑定一个新的EventLoop,并返回loop地址,放入loops中,loops是一个std::vector
类型 - 把每个EventLoopThread线程对应的EventLoop保存在loops中。
- 如果没有其他线程,只有主线程的话,直接调用callback
muduo是支持单线程和多线程的
1
2
3
4
5EventLoop* loop = baseLoop_;,
if(threadNums_ == 0 && cb) {
cb(baseLoop_);
}这两块就实现了单线程功能
EchoServer之建立
1 |
|
具体执行逻辑可以看代码或者访问https://blog.csdn.net/T_Solotov/article/details/124044175这篇大佬的文章
muduo库中的定时器类
Timer类
Timer重要成员变量
1 | // 定时到期需要执行的任务 |
- 具体逻辑见注释
Timer重要成员函数
1 | using TimerCallback = std::function<void()>; |
- restart函数判断是否重复定时器构造到期时间,如果是重复就设置到期时间为设定的时间,否则为0
Timer设计思想
- 首先需要到期时需要执行的任务,然后需要存储到期时间以及是否是重复定时器,以及定时器的唯一id
TimerId类
TimerId重要成员变量
1 |
|
TimerQueue类
TimerQueue重要成员变量
1 | EventLoop* loop_; |
- 具体见注释
TimerQueue重要成员函数
1 | public: |
取消定时器的逻辑
1 | void TimerQueue::cancel(TimerId timerId) { |
- 该功能是提供给用户手动取消不想要的定时器任务,其实现逻辑是根据用户传入的TimerId将其从activeTimer(活跃定时器中删除)以及timers(存储定时器的列表)中删除,如果它并没有被触发就直接删除,否则就判断是否正在处理别的定时器任务,是的话现暂存在cancleTimers取消定时器列表中(因为这样可以防止别的定时器会调用这个定时器而导致未定义行为),cancleTimers这个列表的东西会在准备开始处理下次的一组过期任务的时候被删除
加入定时器的逻辑
1 | void readTimerfd(int timerfd, TimeStamp now) { |
- 加入新的定时器的时候,需要判断新加入的定时器过期时间会不会是最早的(这是通过set红黑数实现的),也就是下一次会先执行,如果是的话 要更新下一次过期时间给timerfd(调用resettimerfd),然后不管是不是都要加入到对应的存储容器中
处理定时器过期逻辑
1 | void TimerQueue::handleRead() { |
- 由于使用了timerfd,所以定时器过期后会有read事件,接收到后会调用读处理函数,读处理函数中会先读取Timerfd,然后获取过期定时器列表(通过一个哨兵,使用二分找到lower_bound找到第一个大于或等于的位置,执行对应删除),获取后执行对应任务,最后重新设置下次过期时间以达到循环目的
- 重新设置的时候需要判断过期任务列表中是否有重复定时器,有的话重新设定,并且更新下一次过期时间给timerfd
Timerfd的使用
1 | int createTimerfd() { |
timerfd 这个名字拆开来看,就是 timer fd,所谓定时器 fd 类型,那么它的可读可写事件一定是跟时间有关系。timerfd 被 new 出来之后 (
timerfd_create
),可以设置超时时间(timerfd_setting
),超时之后,该句柄可读,读出来的是超时的次数。他有三个用法
1
2
3
4
5
6// 创建一个 timerfd 句柄
int timerfd_create(int clockid, int flags);
// 启动或关闭 timerfd 对应的定时器,会返回上次设定时间给传入的参数
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
// 获取指定 timerfd 距离下一次超时还剩的时间
int timerfd_gettime(int fd, struct itimerspec *curr_value);两个需要用得到结构体
- timespec
1
2
3
4
5// timespec 结构体用于表示时间,其定义如下:
struct timespec {
time_t tv_sec; /* 秒 */
long tv_nsec; /* 纳秒 */
};- itimespec
1
2
3
4
5// itimerspec 结构体用于定义定时器的设置值和剩余时间,
struct itimerspec {
struct timespec it_interval; /* 重复周期 */
struct timespec it_value; /* 首次触发时间 */
};用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
timer_t timerid;
void create_timer(void) {
struct itimerspec new_value;
struct itimerspec curr_value;
if (timer_create(CLOCK_REALTIME, NULL, &timerid) == -1) {
perror("timer_create");
exit(1);
}
memset(&new_value, 0, sizeof(new_value));
new_value.it_value.tv_sec = 5;
new_value.it_interval.tv_sec = 2;
if (timer_settime(timerid, 0, &new_value, NULL) == -1) {
perror("timer_settime");
exit(1);
}
}
void signal_handler(int sig, siginfo_t *si, void *uc) {
static int count = 1;
struct itimerspec curr_value;
printf("Signal handler called - count: %d\n", count);
if (timer_gettime(timerid, &curr_value) == -1) {
perror("timer_gettime");
exit(1);
}
count++;
}
int main() {
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_flags = SA_SIGINFO;
sa.sa_sigaction = signal_handler;
if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
perror("sigaction");
exit(1);
}
create_timer();
while(1) {
pause();
}
return 0;
}