Reactor

  • Muduo库是基于Reactor模式实现的TCP网络编程库。其中的重要组件是由Event(事件)、Reactor(反应堆)、Demultiplex(事件分发器)和Evanthandler(事件处理器)四部分构成的,其相互关系如下图所示:

    image-20240725100624027

    • 首先,将事件Event注册到反应堆Reactor上,即将应用程序感兴趣的事件注册到反应堆上,请求反应堆帮助监听,若事件发生,反应堆调用应用程序预制的回调(handler),一个event对应一个handler;
    • 反应堆Reactor相当于是一个事件以及事件处理的集合,通过相应的方法在事件分发器Demultiplex里做一个相应的调整(add/mod/del event),然后启动事件循环(epoll_wait),服务器处于阻塞状态等待新用户的连接,或者已连接用户的读写事件;
    • 如果epoll_wait监听到有新事件产生,分发器返回事件给反应堆,反应堆调用相应的事件处理器eventhandler;
    • eventhandler中读取用户的请求,解码,处理,打包,发送。
  • 在muduo库的Reactor模型上:

    • Poller和EPollPoller就是Demultiplex
    • Channel封装了Event,里面存在fd,events,revents,以及相应的回调函数callbacks,其中有两种channel,acceptorChannel以及connectionChannel分别对应listenfd以及connfd
    • EventLoop就是Reactor
  • 这也就看出muduo库的三个核心组件是:Channel类、Poller/EpollPoller类以及EventLoop类这三个组件之间的关系如下图所示:

    image-20240725101825823

muduo库三大核心组件之Channel类

  • Channel类封装了一个 fd 、fd感兴趣事件events、该fd实际发生的事件revents。同时Channel类还提供了设置该fd的感兴趣事件,以及相应的回调函数。

channel头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#pragma once

#include "Socket.h"
#include <memory>
#include <functional>

/*
理清楚 EventLoop Channel,Poller之间的关系 他们在Reactor上面对应的Demultiplex
Channel 理解为通道,封装了sockfd和其感兴趣的event,如EPOLLIN EPOLLOUT事件
还绑定了poller返回的具体事件
*/
class EventLoop;
class TimeStamp;

// 一个监听端口和一个epoll即为一个channel
class Channel{
public:
using EventCallback = std::function<void()>;
using ReadEventCallback = std::function<void(TimeStamp)>;

Channel(int fd, EventLoop* event_loop);
Channel(const Channel&) = delete;
Channel& operator=(const Channel&) = delete;
~Channel() = default;
//返回fd_
int fd() const;

// 采用边缘触发
void usetET();

// 设置inepoll成员为true
void setinepoll();

// 设置revents成员的值为参数ev
void setrevents(uint32_t ev);

// 返回 eventsl成员
uint32_t events() const;

// 返回 inepoll成员
bool inepoll() const;

// 返回 revents成员
uint32_t revents() const;

// 处理epoll_wait返回的事件
void handleevent(TimeStamp receiveTime) const;

// 防止当channel被手动remove掉,channel还在执行回调操作时失效
void tie(const std::shared_ptr<void>& obj);

// 设置fd相应的状态 update()相当于调用epoll_ctl
void enableReading() { events_ |= kReadEvent; update();} //相当于把读事件给events相应的位置位了
void disableReading() { events_ &= ~kReadEvent; update();}
void enableWriting() { events_ |= kWriteEvent; update();}
void disableWriting() { events_ &= ~kWriteEvent; update();}
void disableAll() { events_ = kNoneEvent; update();}

// 返回fd当前的事件状态
bool isNoneEvent() const { return events_ == kNoneEvent;}
bool isReadEvent() const { return events_ & kReadEvent;}
bool isWriteEvent() const { return events_ & kWriteEvent;}

// one loop per thread
EventLoop* onwerLoop() {return event_loop_;}

void remove();

// 设置写事件回调函数
void setWriteCallBack(EventCallback wcb);

// 设置读事件回调函数
void setReadCallBack(ReadEventCallback rcb);

// 设置连接断开回调函数
void setCloseCallBack(EventCallback rcb);

// 设置连接错误回调函数
void setErrorCallBack(EventCallback rcb);

private:

// 表示没有感兴趣的事件
static const int kNoneEvent;
// 表示感兴趣的是读事件
static const int kReadEvent;
// 表示感兴趣的是写事件
static const int kWriteEvent;

void update();

void handleEventWithGuard(TimeStamp receiveTime) const;

// channel拥有的fd,Channel和fd是一一对应的关系
int fd_;

// channel对应的红黑树,channel与EpollLoop是多对一的关系,一个Channel只对应一个EpollLoop
// 一个EpollLoop可以对应多个Channel
EventLoop* event_loop_;

// Channel是已经添加到对应的epoll树中,false为未添加,true表示已经添加
// 如果已经添加用EPOLL_CTL_MOD 否则用 EPOLL_CTL_ADD
bool inepoll_;

// 当客户端正常断开TCP连接,IO事件会触发Channel中的设置的CloseCallback回调
// 但是用户代码在onClose()中有可能析构Channel对象,导致回调执行到一半的时候,其所属的Channel对象本身被销毁了
// 为了解决这个问题 考虑延长生命周期,怎么延长?
// 如果直接在另一个类声明一个强引用,但是这会出现循环引用问题
// 所以想着使用弱引用,那弱引用该如何延长生命周期呢?
// 可以在调用函数之前 将它提升为强引用赋值给一个强引用 从而增加引用计数
// 在调用完某函数之前都不会引用计数变为0,执行完后出作用域,引用计数-1

std::weak_ptr<void> tie_; // 一方面这个若引用可以做到避免循环引用的现象,另一方面可以增加引用计数

// 表示当前的 Channel对象 是否和一个生命周期受控的对象(如 `TcpServer` 或 `EventLoop`)关联。
// 如果 `tied_` 为 `true`,则表示需要检查关联对象的生命周期。
bool tied_;


// fd需要监听的事件,listenfd和clientfd需要监听EPOLLIN,
// clientfd还可能监听EPOLLOUT事件
uint32_t events_;
// fd_中已发生的事件
uint32_t revents_;


// 因为channel可以获得fd最终发生的具体事件revent,所以他负责回调
// 读事件回调函数
ReadEventCallback readcallback_;
// 连接错误回调函数
EventCallback errorcallback_;
// 连接断开回调函数
EventCallback closecallback_;
// 写事件回调函数
EventCallback writecallback_;

};

channel实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include <iostream>
#include <TcpServer.h>

#include "EPollPoller.h"
#include "Channel.h"
#include "InetAddress.h"
#include "logger.h"
#include "TimeStamp.h"

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;


Channel::Channel(int fd, EventLoop* event_loop)
: fd_(fd)
, event_loop_(event_loop)
, inepoll_(false)
, tied_(false)
, events_(0)
, revents_(0)
{

}

int Channel::fd() const
{
return fd_;
}

void Channel::usetET()
{
events_ = events_|EPOLLET;
}

void Channel::setinepoll()
{
inepoll_ = true;
}

void Channel::setrevents(uint32_t ev)
{
revents_ = ev;
}

uint32_t Channel::events() const
{
return events_;
}

bool Channel::inepoll() const
{
return inepoll_;
}

uint32_t Channel::revents() const
{
return revents_;
}

void Channel::setReadCallBack(ReadEventCallback rcb){
readcallback_ = std::move(rcb);
}

void Channel::setCloseCallBack(EventCallback rcb) {
closecallback_ = std::move(rcb);
}

void Channel::setErrorCallBack(EventCallback rcb) {
errorcallback_ = std::move(rcb);
}

void Channel::setWriteCallBack(EventCallback wcb) {
writecallback_ = std::move(wcb);
}

void Channel::handleevent(TimeStamp receiveTime) const {
if(tied_) {
std::shared_ptr<void> guard = tie_.lock();
if(guard) {
handleEventWithGuard(receiveTime);
}
}else {
handleEventWithGuard(receiveTime);
}
}

void Channel::handleEventWithGuard(TimeStamp receiveTime) const{
LOG_INFO("channel handleEvent revents:%d\n",revents_);

// EPOLLRDHUP 表示对方已经关闭
if((revents_ & EPOLLRDHUP) && !(revents_ & EPOLLIN)){
if (closecallback_) {
closecallback_();
}
return;
}
// EPOLLPRI 表示外带数据
if(revents_ & (EPOLLIN | EPOLLPRI) ) {
if (readcallback_) {
readcallback_(receiveTime);
} else {
LOG_ERROR("Read callback not set for fd= %d\n", fd_);
}
}
else if(revents_ & EPOLLOUT)
{
if(writecallback_)
{
writecallback_();
}
}
else
{
if(errorcallback_) errorcallback_();
}
}

void Channel::update() {

//通过channel所属的EventLoop,把当前的channel删除掉
event_loop_->updatechannel(this);
}

void Channel::remove() {
//通过channel所属的EventLoop,调用Poller相应的方法,移除fd的events事件
event_loop_->removechannel(this);
}

void Channel::tie(const std::shared_ptr<void> &obj) {
tie_ = obj;
tied_ = true;
}


重要设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

// 当客户端正常断开TCP连接,IO事件会触发Channel中的设置的CloseCallback回调
// 但是用户代码在onClose()中有可能析构Channel对象,导致回调执行到一半的时候,其所属的Channel对象本身被销毁了
// 为了解决这个问题 考虑延长生命周期,怎么延长?
// 如果直接在另一个类声明一个强引用,但是这会出现循环引用问题
// 所以想着使用弱引用,那弱引用该如何延长生命周期呢?
// 可以在调用函数之前 将它提升为强引用赋值给一个强引用 从而增加引用计数
// 在调用完某函数之前都不会引用计数变为0,执行完后出作用域,引用计数-1

std::weak_ptr<void> tie_; // 一方面这个若引用可以做到避免循环引用的现象,另一方面可以增加引用计数

// 表示当前的 Channel对象 是否和一个生命周期受控的对象(如 `TcpServer` 或 `EventLoop`)关联。
// 如果 `tied_` 为 `true`,则表示需要检查关联对象的生命周期。
bool tied_;

void Channel::tie(const std::shared_ptr<void> &obj) {
tie_ = obj;
tied_ = true;
}

void Channel::handleEvent(TimeStamp receiveTime)
{

if(tied_)
{
std::shared_ptr<void> guard;
guard = tie_.lock(); //提升
if(guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}


void Channel::handleEventWithGuard(TimeStamp receiveTime)
{
LOG_INFO("channel handleEvent revents:%d\n",revents_);

// 连接断开,并且fd上没有可读数据(默认水平触发)
if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
if(closeCallback_)
{
closeCallback_();
}
}

if(revents_ & EPOLLERR)
{
if (errorCallback_)
{
errorCallback_();
}
}

if(revents_ & (EPOLLIN | EPOLLPRI))
{
if(readCallback_)
{
readCallback_(receiveTime);
}
}

if(revents_ & EPOLLOUT)
{
if(writeCallback_)
{
writeCallback_();
}
}

}

  • tie这个成员变量的设计:
    1. 当客户端正常断开TCP连接,IO事件会触发Channel中的设置的CloseCallback回调,但是用户代码在onClose()中有可能析构Channel对象,导致回调执行到一半的时候,其所属的Channel对象本身被销毁了
    2. 为了解决这个问题 考虑延长生命周期,怎么延长?
      1. 如果直接在另一个类声明一个强引用,但是这会出现循环引用问题
      2. 所以想着使用弱引用,那弱引用该如何延长生命周期呢?
      3. 可以在调用函数之前 将它提升为强引用赋值给一个强引用 从而增加引用计数
      4. 在调用完某函数之前都不会引用计数变为0,执行完后出作用域,引用计数-1

​ 3. 我们可以看出handleEvent中tie实际上这是一个弱指针,绑定到TcpConnection的共享指针 ,如果可以原来的弱指针,变成了强指针,这时候tie()的作用 就表明了,延长了TcpConnection的生命周期,使之长过Channel::handleEvent(),保证了TcpConnection不被销毁

muduo库三大核心组件之EventLoop类

  • Poller封装了和事件监听有关的方法和成员,调用Poller派生类EpollPoller::poll方法,我们就可以获得发生事件的fd 及其 发生的事件。EventLoop是网络服务器中负责 循环 的重要模块,从而做到持续监听、持续获取监听结果、持续处理监听结果对应的事件。
  • 也就是说: EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果,Channel类将fd及其相关属性封装,并将fd及其感兴趣事件和发生的事件以及不同事件对应的回调函数封装在一起,这样在各个模块中传递更加方便。接着被EventLoop调用。
  • 可能上面我画的图不能充分表达三者在muduo库中的角色,下面借用我在地铁站里吃闸机博主的图,可能会让大家看的更加直观。

img

重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 用来存放活跃的channels
ChannelList activeChannels_;

// 指向poller 来调用他的API
std::unique_ptr<Poller> poller_;

// 标志事件循环是否在执行
std::atomic_bool looping_;

// 标志事件循环是否停止
std::atomic_bool quit_;

// poller返回发生事件的channels的时间点
TimeStamp pollReturnTime_;

// 用来记录事件循环线程的tid
const pid_t threadId_;

/** 这是用来唤醒事件循环线程(就是执行EventLoop.loop()的线程)
// 其设计思想是当另外一个线程,调用了此EventLoop并往里面加入回调函数的时候,唤醒事件循环线程
// 会有两种唤醒情况
* 1. 会唤醒被poller_->poll(kPollTimeMs,&activeChannels_);阻塞的事件循环线程
* 2. 事件循环线程正在执行回调函数,当他执行完后,再次调用poller_->poll(kPollTimeMs,&activeChannels_);
* 由于有新的事件发生了(eventfd也就是wakeupChannel_有读事件)
* 就不会被阻塞而继续执行doPendingFunctors();
**/
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;

// 存放回调函数的所有集合
std::vector<Functor> pendingFunctors_;

// 用来标志是否正在处理回调函数
std::atomic_bool pcallingPendingFunctors_;

// 用来实现共享数据的互斥访问
std::mutex mutex_;
  • poller就不用在多说什么了,通过它会返回给EventLoop发生的事件。
  • wakeupFd是非常重要的一个成员,与之对应的wakeupChannel,起到了一个唤醒loop所在的线程的作用,因为当前线程主要阻塞在poll函数上,唤醒的方法是手动激活这个wakeupChannel, 写入几个字节让Channel变为可读, 当然这个Channel也注册到Pooll中,在下面的成员函数会详细介绍它的实现。
  • threadId创建时要保存当前时间循环所在的线程,用于之后运行时判断使用EventLoop的线程是否时EventLoop所属的线程.
  • pollReturnTime保存poll返回的时间,用于计算从激活到调用回调函数的延迟
  • activeChannels就是poller返回的所有发生事件的channel列表。
  • callingPendingFunctors标识当前loop是否有需要执行的回调操作
  • pendingFunctors存储loop需要执行的所有回调操作,避免本来属于当前线程的回调函数被其他线程调用,应该把这个回调函数添加到属于它所属的线程,等待它属于的线程被唤醒后调用,满足线程安全
  • mutex互斥锁,用来保护vector容器的线程安全操作

重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public:
using Functor = std::function<void()>;
using ChannelList = std::vector<Channel*>;

EventLoop();
~EventLoop();
EventLoop(const EventLoop&) = delete;
EventLoop& operator=(const EventLoop&) = delete;

//开启事件循环
void loop();

//退出事件循环
void quit();

//返回处理的时间辍
TimeStamp pollReturnTime() const { return pollReturnTime_; }

//EventLoop的方法=> poller的方法
void updatechannel(Channel* channel);
void removechannel(Channel* channel);
bool hasChannel(Channel* channel);

// 往存放回调函数的所有集合加数据
void queueInLoop(const Functor& cb);
void runInLoop(const Functor& cb);

// 用来唤醒事件循环线程
void wakeup() const;

// 证明EventLoop创建时的线程id与当前线程id是否相等
// 相等表示EventLoop就在所创建他的loop线程里面,可以执行回调
// 不相等就需要queueInLoop,等待唤醒它自己的线程时,在执行回调
bool isInLoopThread() const{ return threadId_ == CurrentThread::tid();}
//[[nodiscard]] EPollPoller* ReturnEPollPoller() const;
private:
//唤醒用的 wake up
void handleRead() const;

//执行回调函数用
void doPendingFunctors();

其实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
//
// Created by ubuntu on 24-7-19.
//

#include "EventLoop.h"
#include "Channel.h"
#include "logger.h"
#include <sys/eventfd.h>

//防止一个线程创建多个EventLoop
//当创建了一个EventLoop对象时,*t_loopInThisThread就指向这个对象
//在一个线程里面在创建EventLoop时,指针不为空就不会创建了
//从而控制了一个线程里面只有一个EventLoop
__thread EventLoop *t_loopInThisThread = nullptr;

//定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;

//创建wakeupfd 用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
//eventfd 计数不为零表示有可读事件发生,read 之后计数会清零,write 则会递增计数器。
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_FATAL("eventfd error:%d \n",errno);
}
return evtfd;
}

EventLoop::EventLoop()
: activeChannels_(0)
, poller_(Poller::newDefaultPoller(this))
, looping_(false)
, quit_(false)
, pollReturnTime_(0)
, threadId_(CurrentThread::tid())
, wakeupFd_(createEventfd())
, wakeupChannel_(std::make_unique<Channel>(wakeupFd_, this))
{
LOG_DEBUG("EventLoop created %p in thread %d \n",this threadId_);
if(t_loopInThisThread)
{
LOG_FATAL("Another EvnetLoop %p exists in this thread %d \n",t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}

//设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallBack(std::bind(&EventLoop::handleRead, this));

//每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了
//minreactor通过给subreactor写东西,通知其苏醒
wakeupChannel_->enableReading();
}

EventLoop::~EventLoop() {
wakeupChannel_->disableAll();
wakeupChannel_->remove();
close(wakeupFd_);
t_loopInThisThread = nullptr;
}

void EventLoop::loop() {

looping_.store(true);
quit_.store(false);

LOG_INFO("EventLoop %p start looping \n",this);

while(!quit_) {
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, activeChannels_);
for(auto activeChannel: activeChannels_) {
//poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
activeChannel->handleevent(pollReturnTime_);
}

/**
* IO线程 mainloop accept fd <= channel subloop
* mainloop事先注册一个回调cb,需要subloop执行
* wakeup subloop后执行下面的方法 执行之前mainloop注册的cb回调
*
*/
//执行当前EventLoop事件循环需要处理的回调操作
doPendingFunctors();
}
LOG_INFO("EventLoop %p stop looping,\n",this);
looping_ = false;
}

//退出事件循环
//1. loop在自己的线程中调用quit
//2. 在其他线程中调用的quit(在一个subloop(woker)中,调用了mainloop(IO)的quit)
/*
mainloop

****************************** 生产者-消费者的线程安全的队列(no)

subloop1 subloop2 subloop3

*/
void EventLoop::quit() {
quit_.store(true);

// 如果是在其它线程中,调用的quit 在一个subloop(woker)中,调用了mainLoop(IO)的quit
if(!isInLoopThread())
{
wakeup();
}
}

void EventLoop::wakeup() const {
uint64_t one = 1;
ssize_t evnums = write(wakeupFd_, &one, sizeof one);
if(evnums != sizeof one) {
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n",evnums);
}
}

void EventLoop::handleRead() const {
uint64_t one{};
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8",n);
}
}

void EventLoop::removechannel(Channel* channel) {
poller_->removeChannel(channel);
}

void EventLoop::updatechannel(Channel* channel) {
poller_->updateChannel(channel);
}

bool EventLoop::hasChannel(Channel* channel) {
return poller_->hasChannel(channel);
}

void EventLoop::doPendingFunctors() {
// 使用一个局部的vector和pendingFunctors_的交换,有两种好处
// 1. 可以缩小pendingFunctors_的容量, 因为如果只是resize
// 它只会重新设定大小(size)而不会重新设定容量
// 2. 最重要的原因:可以最大的减小占用互斥锁的时间,使得其只在swap加锁
// 在执行回调函数的时候不加锁,可能还能预防在执行回调函数的时候获取锁而死锁
// 也能在确保执行回调函数的时候,能够往pendingFunctors_里加数据
std::vector<Functor> functors;
pcallingPendingFunctors_.store(true);
{
std::lock_guard<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}

// 此处可以优化,将处理回调函数单独交给一个逻辑系统(线程池)来处理
// 实现解耦
for(const Functor& func: functors) {
func();
}

pcallingPendingFunctors_.store(false);
}

// 用来与上处设计比较
/*
void EventLoop::doPendingFunctors() {

pcallingPendingFunctors_.store(true);
{
std::lock_guard<std::mutex> lock(mutex_);

// 此处可以优化,将处理回调函数单独交给一个逻辑系统(线程池)来处理
// 实现解耦
for(const Functor& func: functors) {
func();
}
}

pcallingPendingFunctors_.store(false);
}
*/

void EventLoop::runInLoop(const Functor &cb) {
//如果调用的线程是事件循环线程直接调用回调函数
if(isInLoopThread()) {
cb();
}
else {
queueInLoop(cb);
}
}

void EventLoop::queueInLoop(const Functor &cb) {
{
std::lock_guard<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}

// 当出现以下两种情况的时候进行唤醒
/** 这是用来唤醒事件循环线程(就是执行EventLoop.loop()的线程)
// 其设计思想是当另外一个线程,调用了此EventLoop并往里面加入回调函数的时候,唤醒事件循环线程
// 会有两种唤醒情况
* 1. 会唤醒被poller_->poll(kPollTimeMs,&activeChannels_);阻塞的事件循环线程
* 2. 事件循环线程正在执行回调函数,当他执行完后,再次调用poller_->poll(kPollTimeMs,&activeChannels_);
* 由于有新的事件发生了(eventfd也就是wakeupChannel_有读事件)
* 就不会被阻塞而继续执行doPendingFunctors();
**/
if(!isInLoopThread() || pcallingPendingFunctors_.load()) {
wakeup();
}
}

其中的巧妙设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void EventLoop::doPendingFunctors() {
// 使用一个局部的vector和pendingFunctors_的交换,有两种好处
// 1. 可以缩小pendingFunctors_的容量, 因为如果只是resize
// 它只会重新设定大小(size)而不会重新设定容量
// 2. 最重要的原因:可以最大的减小占用互斥锁的时间,使得其只在swap加锁
// 在执行回调函数的时候不加锁,可能还能预防在执行回调函数的时候获取锁而死锁
// 也能在确保执行回调函数的时候,能够往pendingFunctors_里加数据
std::vector<Functor> functors;
pcallingPendingFunctors_.store(true);
{
std::lock_guard<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}

// 此处可以优化,将处理回调函数单独交给一个逻辑系统(线程池)来处理
// 实现解耦
for(const Functor& func: functors) {
func();
}

pcallingPendingFunctors_.store(false);
}


// 这个与上面比较
void EventLoop::doPendingFunctors() {

pcallingPendingFunctors_.store(true);
{
std::lock_guard<std::mutex> lock(mutex_);

// 此处可以优化,将处理回调函数单独交给一个逻辑系统(线程池)来处理
// 实现解耦
for(const Functor& func: functors) {
func();
}
}

pcallingPendingFunctors_.store(false);
}
  • 第一个比较巧妙的思想就是,使用一个局部的vectorpendingFunctors_的交换:
    • 可以缩小pendingFunctors_的容量, 因为如果只是resize, 它只会重新设定大小(size)而不会重新设定容量
    • 最重要的原因:可以最大的减小占用互斥锁的时间,使得其只在swap加锁, 在执行回调函数的时候不加锁,可能还能预防在执行回调函数的时候获取锁而死锁,也能在确保执行回调函数的时候,能够往pendingFunctors_里加数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void EventLoop::wakeup() const {
uint64_t one = 1;
ssize_t evnums = write(wakeupFd_, &one, sizeof one);
if(evnums != sizeof one) {
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n",evnums);
}
}

void EventLoop::handleRead() const {
uint64_t one{};
ssize_t n = read(wakeupFd_, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %d bytes instead of 8",n);
}
}

void EventLoop::runInLoop(const Functor &cb) {
//如果调用的线程是事件循环线程直接调用回调函数
if(isInLoopThread()) {
cb();
}
else {
queueInLoop(cb);
}
}

void EventLoop::queueInLoop(const Functor &cb) {
{
std::lock_guard<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}

// 当出现以下两种情况的时候进行唤醒
/** 这是用来唤醒事件循环线程(就是执行EventLoop.loop()的线程)
// 其设计思想是当另外一个线程,调用了此EventLoop并往里面加入回调函数的时候,唤醒事件循环线程
// 会有两种唤醒情况
* 1. 会唤醒被poller_->poll(kPollTimeMs,&activeChannels_);阻塞的事件循环线程
* 2. 事件循环线程正在执行回调函数,当他执行完后,再次调用poller_->poll(kPollTimeMs,&activeChannels_);
* 由于有新的事件发生了(eventfd也就是wakeupChannel_有读事件)
* 就不会被阻塞而继续执行doPendingFunctors();
**/
if(!isInLoopThread() || pcallingPendingFunctors_.load()) {
wakeup();
}
}
  • 第二个比较巧妙的设计就是,使用wakeupFd_,这是最巧妙的
    • 传统的进程/线程间唤醒办法是用pipe或者socketpair,IO线程始终监视管道上的可读事件,在需要唤醒的时候,其他线程向管道中写一个字节,这样IO线程就从IO multiplexing阻塞调用中返回。pipe和socketpair都需要一对文件描述符,且pipe只能单向通信,socketpair可以双向通信。一方面它比 pipe 少用一个 fd,节省了资源;另一方面,wakeupFd的缓冲区管理也简单得多,全部buffer只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。muduo库也没有采用生产者消费者的模型,采用了wakeupFd这种巧妙的思想,在今后的学习中,我们也可以进一步的使用它。
    • 什么是eventfd,他是用来记录有多少的事件可读,其有两个函数read 清空计数,write类加技术(具体请参考:https://blog.csdn.net/EDDYCJY/article/details/118980819)
    • 他可以用来跨线程通知
    • 会有两种唤醒情况
      1. 会唤醒被poller_->poll(kPollTimeMs,&activeChannels_);阻塞的事件循环线程
      1.  事件循环线程正在执行回调函数,当他执行完后,再次调用'poller_->poll(kPollTimeMs,&activeChannels_);'由于有新的事件发生了(eventfd也就是wakeupChannel_有读事件),就不会被阻塞而继续执行doPendingFunctors();
      

muduo库三大核心组件之 Poller/EpollPoller类

image-20240725101825823

Poller负责监听文件描述符事件是否触发以及返回发生事件的文件描述符以及具体事件。在 muduo 中,使用抽象基类 Poller ,并由EpollPoller和PollPoller派生基类中继承实现 epoll 和 poll

重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//poller.h
protected:
//map的key表示 sockfd value表示所属的channel通道类型
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap channels_;
private:
// 定义Poller所属的事件循环EventLoop
EventLoop* owernLoop_;


// EpollPoller

private:
using EventList = std::vector<epoll_event>;

//epoll_event初始的长度
static constexpr int kInitEventListSize = 100;


//epoll 的句柄
int epollfd_;

// 存放epoll返回的事件的容器
EventList events_;
  • 详情见注释

重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152

#include "Poller.h"

#include <Channel.h>

Poller::Poller(EventLoop* loop):owernLoop_(loop) {

}

bool Poller::hasChannel(Channel* channel) {
auto it = channels_.find(channel->fd());
return it != channels_.end() && it->second == channel;
}




//============== EpollPoller==============

#include <iostream>
#include <memory>
#include <unistd.h>
#include <unordered_map>

#include "EPollPoller.h"
#include "logger.h"
#include "TimeStamp.h"
#include "Channel.h"

EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop)
, epollfd_(epoll_create1(EPOLL_CLOEXEC))
, events_(kInitEventListSize)
{
if (epollfd_ == -1) {
LOG_FATAL("epoll_create error:%d \n", errno);
}
LOG_INFO("epoll_create() successed");
}

EPollPoller::~EPollPoller()
{
close(epollfd_);
}

//通过epoll_wait将发生事件的channel通过activeChannels告知给EventLoop
TimeStamp EPollPoller::poll(int timeoutMs, ChannelList &activeChannels) {

LOG_INFO("func=%s => fd total count:%lu \n",__FUNCTION__, channels_.size());
//events_是vector类型,
//events_.begin()返回首元素的地址,
//*events_.begin()为首元素的值
//&*events_.begin()存放首元素的地址
//这就得到了vector底层首元素的起始地址
int EventNums = epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
//记录最开始poll里面的错误值
int saveErrno = errno;

// 获取发生事件的时间
TimeStamp now(TimeStamp::now());

if(EventNums > 0) {
LOG_INFO("%d events happended!\n", EventNums);
fillActiveChannels(EventNums, activeChannels);
// 为了防止频繁开辟空间,选择提前开辟
if(EventNums == events_.size())
{
events_.resize(events_.size() * 2);
//说明当前发生的事件可能多于vector能存放的 ,需要扩容,等待下一轮处理
}
}
else if(EventNums == 0) {
LOG_DEBUG("%s timeout! \n",__FUNCTION__);
}
else {
if(saveErrno != EINTR) //不是外部中断引起的
{
errno = saveErrno;
LOG_ERROR("EPollPoller::poll() errno!");
}
}

return now;
}

void EPollPoller::fillActiveChannels(int numEvents, ChannelList &activeChannels) const {
for(int i = 0; i < numEvents; i ++) {
Channel *channel = static_cast<Channel*>(events_[i].data.ptr);
// 设置channel的返回事件
channel->setrevents(events_[i].events);
// EventLoop就拿到了他的poller给他返回的所有发生事件的channel列表了
activeChannels.push_back(channel);
}
}

//channel update remove => EventLoop updateChannel removeChannel =>Poller updateChannel removeChannel
/**
* EventLoop => poller.poll
* ChannelList Poller
* ChannelMap <fd,channel*> epollfd
*/

void EPollPoller::updateChannel(Channel* channel) {
LOG_INFO("func=%s => fd=%d events=%d\n",__FUNCTION__, channel->fd(),channel->events());
// false 表示不再epoll的红黑树中
if(!channel->inepoll()) {
update(EPOLL_CTL_ADD, channel);
channels_[channel->fd()] = channel;
}else {
if(channel->isNoneEvent()) {
update(EPOLL_CTL_DEL, channel);
channels_.erase(channel->fd());
}else {
update(EPOLL_CTL_MOD,channel);
}
}
}


void EPollPoller::removeChannel(Channel* channel) {
if(!channel->inepoll()) {
LOG_ERROR("channel not in epoll");
}else {
channels_.erase(channel->fd());
LOG_INFO("func=%s => fd=%d \n",__FUNCTION__, channel->fd());
update(EPOLL_CTL_DEL, channel);
}
}

// operation分别是epoll_(ADD/MOD/DEL)
void EPollPoller::update(int operation, Channel* channel) const {

epoll_event event{0};
event.data.ptr = channel;
// 把感兴趣的事件加入到events中
event.events = channel->events();
event.data.fd = channel->fd();

if(epoll_ctl(epollfd_, operation, channel->fd(), &event) < 0) {
if(operation == EPOLL_CTL_DEL)
{
LOG_ERROR("epoll_ctl del error:%d\n",errno);
}
else
{
LOG_FATAL("epoll_ctl add/mod error:%d\n",errno);
}
}
}



muduo库Acceptor类

Acceptor重要成员变量

1
2
3
4
5
6
Socket acceptSocket_;
Channel acceptChannel_;
std::shared_ptr<EventLoop> event_loop_;
NewConnectionCallBack connectionCallBack_;
bool listening_;
int idlefd_;
  • eventloop__监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,其实这个EventLoop就是main EventLoop。

  • acceptSocket 服务器监听套接字的文件描述符

  • acceptChannel把acceptSocket及其感兴趣事件和事件对应的处理函数进行封装。

  • newConnetionCallback这个是最重要的一个成员了,它的类型是using NewConnectionCallback = std::function;,在TcpServer构造函数中通过acceptor->setNewConnetionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::1,std::placeholders::_2));将 TcpServer::newConnection函数注册给了这个成员变量。这个 TcpServer::newConnection函数的功能是通过轮询EventLoop *ioLoop = threadPool->getNextLoop();选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。_

  • listenning是一个标志位

  • idlefd_是用来防止底层一直通知有事件以及为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作

    Acceptor重要成员函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    //
    // Created by ubuntu on 24-7-19.
    //

    #include "Acceptor.h"

    #include <cassert>
    #include <fcntl.h>

    #include "Connection.h"
    #include "logger.h"
    #include <iostream>

    #include <utility>
    #include <bits/fcntl-linux.h>


    static int createNonblocking()
    {
    int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
    if(sockfd < 0)
    {
    LOG_FATAL("%s:%s:%d listen socket create err:%d \n", __FILE__,__FUNCTION__,__LINE__,errno);
    }
    }

    Acceptor::Acceptor(std::shared_ptr<EventLoop> event_loop, const InetAddress& address, bool resuseport)
    : acceptSocket_(createNonblocking())
    , acceptChannel_(acceptSocket_.fd(),event_loop)
    , event_loop_(event_loop)
    , listening_(false)
    , idlefd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {
    assert(idlefd_ >= 0);
    acceptSocket_.SetReuseaddr(true);
    acceptSocket_.SetReuseport(resuseport);
    acceptSocket_.bind(address);
    setNewConnectionCallBack(std::bind(&Acceptor::handleread, this));
    }

    Acceptor::~Acceptor() {
    acceptChannel_.disableAll();
    acceptChannel_.remove();
    close(idlefd_);
    }

    void Acceptor::listen() {
    acceptChannel_.enableReading();
    listening_ = true;
    acceptSocket_.listen(2);
    }

    void Acceptor::handleread() {
    InetAddress clientAddress{};
    int connfd = acceptSocket_.accept(clientAddress);
    if(connfd >= 0) {
    if(connectionCallBack_) {
    connectionCallBack_(connfd, clientAddress);
    }else {
    close(connfd);
    }
    }
    else {
    LOG_ERROR("in Acceptor::handleRead");
    // 由于采用了LT,为了防止一直通知,使用idlefd_来接受读事件
    //`idleFd_`的设计是为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作。
    //通过预先打开一个文件描述符(`/dev/null`),在文件描述符耗尽时释放它来接受新连接,
    //然后再重新打开`/dev/null`,这种机制可以有效避免程序因无法分配文件描述符而崩溃。
    if (errno == EMFILE)
    {
    ::close(idlefd_);
    idlefd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
    ::close(idlefd_);
    idlefd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
    }
    }



  • 其中值得一提的是handleread的实现:

    这里主要是建立连接,通过调用Socket的accept函数,底层调用系统的accept函数,返回一个已连接的socket描述字,这样连接就建立了。同时内部还调用了成员变量newConnectionCallback保存的函数,当mainLoop监听到acceptChannel上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法,内部调用newConnetionCallback_,也就是TcpServer设置的一个回调函数setNewConnetionCallback,绑定了TcpServer::newConnection,通过 轮询算法,选择一个subloop,分发当前的新客户端的Channel,并且绑定了一些回调。

比较好的设计

  • idlefd:
  • 这个思想也是很巧妙的,在调用accept的过程中,如果已用文件描述符过多,accept会返回-1,构造函数中注册的idleFd就派上用场了。当前文件描述符过多,无法接收新的连接。但是由于我们采用LT模式,如果无法接收,可读事件会一直触发。那么在这个地方的处理机制就是,关掉之前创建的空的idleFd,然后去accept,这样进行了连接,让这个事件不会一直触发,然后再关掉该文件描述符,重新将它设置为空文件描述符。这样就优雅的解决 EMFIFE 问题,也就是为了优雅地处理文件描述符耗尽的情况,确保系统在高负载下仍能稳定工作。

muduo库Buffer类

(本文参考链接:https://blog.csdn.net/m0_73537205/article/details/138665904)

为什么需要buffer类?

  • 首先是为什么要有 Buffer?为什么不能直接接收和发送呢,如果直接使用 read()、write() 系统调用进行操作,就可以直接将应用层数据发送出去或者将内核数据读取到应用层,加上 Buffer 以后,相当于在应用层数据和内核缓冲区之间又多了一层,会不会造成负面影响呢?[2] 中明确,应用层的缓冲区是必要的,因为非阻塞 IO 的核心就是避免阻塞以在 read() 和 write() 为代表的系统调用上。
  • 对于发送来说,假设应用程序需要发送 40KB 数据,但是操作系统的 TCP 发送缓冲区只有 25KB 剩余空间,如果等待内核缓冲区可用,就会阻塞当前线程,因为不知道对方什么时候收到并读取数据。因此网络库应该把这 15KB 数据先缓存起来,等 fd 变得可写的时候立刻发送数据,这样操作才不会造成阻塞。需要注意,如果应用程序随后又要发送 50KB 数据,而此时发送缓冲区中尚有未发送的数据,那么网络库应该将这 50KB 数据追加到发送缓冲区的末尾,而不能立刻尝试 write(),因为这样有可能打乱数据的顺序。对于接收来说,假设一次读到的数据不够一个完整的数据包,那么这些已经读到的数据应该先暂存在某个地方,等剩余的数据收到之后再一并处理。所以说,发送缓冲区和接收缓冲区的存在都是必要的。

Buffer设计思想

  • muduo的Buffer的定义如下,其内部是 一个 std::vector,且还存在两个sizet类型的readerIndex,writerIndex_标识来表示读写的位置。结构图如下:

img

  • readIndex、writeIndex把整个vector内容分为3块:prependable、readable、writable,各块大小关系:

    • prependable = readIndex
    • readable = writeIndex - readIndex
    • writable = buffer.size() - writeIndex
  • Buffer类是可以动态扩容的,在下面的成员函数中,会详细介绍。

Buffer重要成员变量

1
2
3
4
5
6
7
8
9
10

//缓冲区头部
static constexpr size_t kCheapPrepend = 8;

//缓冲区读写初始大小
static constexpr size_t kInitialSize = 1024;

std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
  • 其中readerIndex_指向的可读地区的起始地址
  • writerIndec_指向的是可写位置的起始地址

Buffer重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 返回可读的缓冲区大小
[[nodiscard]] size_t readAbleBytes() const { return writerIndex_ - readerIndex_;}

// 返回可写的缓冲区大小
[[nodiscard]] size_t writeAbleBytes() const { return buffer_.size() - writerIndex_;}

// 返回缓冲区头部大小
[[nodiscard]] size_t prependAbleBytes() const { return readerIndex_;}

// 返回缓冲区中可读数据的起始地址
[[nodiscard]] const char* peek() const{ return begin() + readerIndex_;}

// 返回缓冲区可以写的起始地址
[[nodiscard]] char* beginWrite() { return begin() + writerIndex_; }

[[nodiscard]] const char* beginWrite() const { return begin() + writerIndex_; }

// 返回数据的首地址
[[nodiscard]] const char* begin() const {
//it.operator*() 首元素 it.operator*().operator&() 首元素的地址
return &*buffer_.begin();
}

[[nodiscard]] char* begin() {
//it.operator*() 首元素 it.operator*().operator&() 首元素的地址
return &*buffer_.begin();
}
  • 这几个代码功能如注释所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

// 把[data ,data+len]内存上的数据,添加到writeable缓冲区当中
void append(const char* data, size_t len);

// 确定是否可以写入
void ensureWriterableBytes(size_t len);

// 扩容操作
void makespace(size_t len);

//===================================以下为对应实现============================================

// 考虑两种扩容情况
// 1. 预留空间 加上 可写空间不足的的话 就直接扩容
// 2. 否则 就不直接扩容,而是重新分配下 内部结构
// 这样可以避免内存空间的浪费
void Buffer::makespace(size_t len) {
if(prependAbleBytes() + writeAbleBytes() < len + kCheapPrepend) {
buffer_.resize(writerIndex_ + len);
}else {
size_t readable = readAbleBytes();
std::copy(begin() + readerIndex_, begin() + writerIndex_, begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
}
}

void Buffer::ensureWriterableBytes(size_t len) {
// 不够写就扩容
if(writeAbleBytes() < len) {
makespace(len);
}
}

void Buffer::append(const char *data, size_t len) {
ensureWriterableBytes(len);
std::copy(data, data + len, beginWrite());
writerIndex_ += len;
}
  • 比较巧妙的思想是:在进行扩容的时候考虑两种扩容情况

    1. 预留空间 加上 可写空间(prependableBytes() + writerableBytes())不足的的话 就直接扩容
    2. 否则 就不直接扩容,而是重新分配下 内部结构

    这样可以避免内存空间的浪费(因为更新可读与可写地区的时候是通过移动两个Index实现的,可能readerIndex会一直往右移,导致空间预留空间越来越大,如果直接扩容会导致内存空间的浪费

img

在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    // 读len长的数据,并进行移动位置
void retrieve(size_t len); //len表示已经读了的

// 如果都读完了
void retrieveAll();

// 把onMessage函数上报的Buffer数据,转成string类型的数据返回
std::string retrieveAllAsString();

//从起始位置读len长
std::string retrieveAsString(size_t len);


//===================================以下为对应实现============================================

void Buffer::retrieve(size_t len) {
if(len < readAbleBytes())
{
//已经读的小于可读的,只读了一部分len
//还剩readerIndex_ += len 到 writerIndex_
readerIndex_ += len;
}
else //len == readableBytes()
{
retrieveAll();
}
}

void Buffer::retrieveAll() {
readerIndex_ = writerIndex_ = kCheapPrepend;
}

std::string Buffer::retrieveAllAsString() {
return retrieveAsString(readAbleBytes());//应用可读取数据的长度
}

std::string Buffer::retrieveAsString(size_t len) {
// 从起始位置读len长
std::string result{peek(), len};

// 上面一句吧缓冲区可读的数据读出来,这里对缓冲区进行复位操作
retrieve(len);

return result;
}
  • 这部分函数是对读数据操作的函数,其功能见注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 //从fd上读取数据
ssize_t readFd(int fd, int* saveErrno);

//通过fd发送数据
ssize_t writeFd(int fd, int* saveErrno);

// ===========================其对应实现===================================

ssize_t Buffer::readFd(int fd, int *saveErrno) {
// 栈上的临时空间,分配64K
char extrabuf[65536] = { 0 };
struct iovec vec[2];
// buffer底层缓冲区剩余可以写的空间
const size_t writable = writeAbleBytes();
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;

vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

// writable < sizeof extrabuf就选2块,否则一块就够用
const int iovcnt = (writable < sizeof extrabuf)? 2 : 1;

const ssize_t n = readv(fd, vec, iovcnt);

if(n < 0)
{
*saveErrno = errno;
}
else if(n <= writable) //buffer可写的缓冲区已经够存储读取出来的数据
{
writerIndex_ += n;
}
else //extrabufl里面也写入了数据
{
writerIndex_ = buffer_.size();

//writerIndex_ 开始写n-writable的数据
append(extrabuf,n-writable);
}
return n;
}

ssize_t Buffer::writeFd(int fd, int *saveErrno) {
const ssize_t n = write(fd, peek(), readAbleBytes());
if(n < 0)
{
*saveErrno = errno;
}
return n;
}
  • 读数据的设计思想:
    • 我们在读数据的时候,不知道数据的最终大小是多少,所以采用了如下的方法:
      1. 首先会在栈区开一个64k的空间,利用栈的好处是可以自动的释放,并计算出目前剩余可写的空间大小;
      2. 利用结构体 iovec 指定了两块缓冲区,一块是目前剩余的可写的Buffer,一个是临时的缓冲区,指定了起始位置以及缓冲区的大小;
      3. const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; 如果writable < sizeof extrabuf就选2块内存readv,否则一块就够用;
      4. 读数据const ssize_t n = ::readv(fd, vec, iovcnt);
      5. 若读取的数据超过现有内部buffer的writable空间大小时, 启用备用的extrabuf 64KB空间, 并将这些数据添加到内部buffer的末尾。

iovec结构体定义

1
2
3
4
5
6
7
#include <sys/uio.h>

struct iovec {
ptr_t iov_base; /* Starting address */
size_t iov_len; /* Length in bytes */
};

  • struct iovec定义了一个向量元素。通常,这个结构用作一个多元素的数组。对于每一个传输的元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是readv所接收的数据或是writev将要发送的数据。成员iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度。
  • 其中readv函数可以一次性向多个缓冲区读数据,而不用挨个调用,减少系统调用次数,writev同理

muduo库TcpConnection类

设计思想以及一些感悟

  • 在学习此网络库之前我并不知道到底什么时候该用智能指针,什么时候不该用,在一些情况下使用智能指针会带来额外的性能开销,所以不能无脑梭哈,所以不知道该如何权衡使用它们,但是学习之后我得到了一些启发

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /******** Callbacks.h  ********/
    using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
    /******** TcpServer.cc ********/
    void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
    {
    代码省略
    TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;
    代码省略
    ioLoop->runInLoop(bind(&TcpConnection::connectEstablished, conn));
    }

  • 在TcpServer::newConnection()函数中,当接受了一个新用户连接,就要把这个Tcp连接封装成一个TcpConnection对象,也就是上面代码中的new TcpConnection(…)。然后用一个共享型智能指针来管理这个对象。所以为什么这里要把TcpConnection用智能指针来管理?

  • 这里使用智能指针管理TcpConnetion的最重要原因在于防止指针悬空,而指针悬空可能会来自以下这三个方面:

    • 考虑一种情况,如果在通信的时候,用户手贱将TcpConnection给删除了,删除了之后,程序内部还要好几处地方都在使用TcpConnection对象。结果这个对象的内存突然消失了,服务器访问非法内存崩溃。我们不能防止用户做这些事情,所以编程设计不可以依赖用户行为,一定要尽可能地封死用户的误操作。所以这里用了共享智能指针。

    • 但是单单只使用一个shared_ptr还不足以解决上述问题,还需要配合weak_ptr来实现生命周期的延长,在muduo中就有这样的设计,这个设计非常的好,值得学习,以下为具体内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

class TcpConnection : noncopyable,public std::enable_shared_from_this<TcpConnection>
{
public:

TcpConnection(EventLoop *loop,
const std::string &name,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);

/***** TcpConnection.cpp *****/
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件
//新连接建立,执行回调
connectionCallback_(shared_from_this());
}


- Tcpconnection设计的时候继承了enable_shared_from_this,这使得能通过调用shared_from_this,获得一个共享指针 - 假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA。 - 接着这个shared_ptr就作为channel_的Channel::tie()函数的函数参数。 - 再根据上述Channel专栏中讲的[重要设计](#重要设计),可知使用智能指针的好处
  • 考虑第二种情况,TcpConnection对象的多线程安全问题:

    • 假如服务器要关闭了这个时候MainEventLoop线程中的TcpServer::~TcpServer()函数开始把所有TcpConnection对象都删掉。那么其他线程还在使用这个TcpConnection对象,如果你把它的内存空间都释放了,其他线程访问了非法内存,会直接崩溃。
    • 你可能会觉得,反正我都要把服务器给关了,崩就崩了吧。这种想法是错的!因为可能在你关闭服务器的时候,其他线程正在处理TcpConnection的发送消息任务,这个时候你应该等它发完才释放TcpConnection对象的内存才对!
    • 使用智能指针就很好的解决了这个问题,即使在一个线程中删除一个TcpConnection,也只是删除了引用,而只要引用技术不为0就不会删除掉

TcpConnection重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 一个subloop
EventLoop *loop_;

// Connection的名字
const std::string name_;

//Connection的状态
std::atomic_int state_;

// 是否正在读
bool reading_;

// Connection对应的socket
std::unique_ptr<Socket> socket_;

// Connection对应的channel
std::unique_ptr<Channel> channel_;

// 本地的地址
InetAddress localAddr_;

// 对端的地址
InetAddress peerAddr_;

// 以下三个回调都是用户设置给TcpServer=>(传给)TcpConnection=>Channel=>Poller=>notify channel调用回调
ConnectionCallback connectionCallback_; //有新连接时的回调
MessageCallback messageCallback_; //有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; //消息发送完成以后的回调
CloseCallback closeCallback_; // 关闭回调
HighWaterMarkCallback highWaterMarkCallback_; // 高水位回调

// 水位的阀值
size_t highWaterMark_;

// 接受数据的缓冲区
Buffer inputBuffer_;

// 发送数据的缓冲区
Buffer outputBuffer_;
  • 具体含义见注释
  • 其中两个输入输出缓冲区,在输出缓冲区是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如达到了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer中的数据拷贝到Tcp发送缓冲区中。

TcpConnection重要成员函数

  • 首先来看下构造函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38

    static EventLoop *CheckLoopNotNull(EventLoop* loop)
    {
    if(loop == nullptr)
    {
    LOG_FATAL("%s:%s:%d TcpConnection loop is null! \n",__FILE__,__FUNCTION__,__LINE__);
    }
    return loop;
    }

    TcpConnection::TcpConnection(EventLoop *loop,
    const std::string &nameArg,
    int sockfd,
    const InetAddress &localAddr,
    const InetAddress &peerAddr)
    : loop_(CheckLoopNotNull(loop))
    , name_(nameArg)
    , state_(kConnecting)
    , reading_(true)
    , socket_(std::make_unique<Socket>(sockfd))
    , channel_(std::make_unique<Channel>(sockfd, loop))
    , localAddr_(localAddr)
    , peerAddr_(peerAddr)
    , highWaterMark_(64*1024*1024) //64M
    {
    //下面给channel设置相应的回调函数
    //poller给channel通知感兴趣的事件发生了
    //channel会回调相应的操作函数
    //将TcpConnection自己的成员函数注册给当前accept返回的connfd对应的Channel对象上
    channel_->setReadCallBack(std::bind(&TcpConnection::handleRead,this,std::placeholders::_1));
    channel_->setWriteCallBack(std::bind(&TcpConnection::handleWrite,this));
    channel_->setCloseCallBack(std::bind(&TcpConnection::handleClose,this));
    channel_->setErrorCallBack(std::bind(&TcpConnection::handleError,this));

    LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n",name_.c_str(),sockfd);
    //开启Tcp/Ip层的心跳包检测
    socket_->SetKeepalive(true);
    }
    • 其功能是创建一个TcpConnection,初始化一些变量,并且设置一些回调函数以及开启心跳包检测
  • 接着我们看看发送数据相关的函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    / 发送数据
    void TcpConnection::send(const std::string &buf) //直接引用buffer
    {
    // 先检查是否处于连接状态
    if(state_ == kConnected)
    {
    // 判断处理的线程是否是事件循环线程
    if(loop_->isInLoopThread())
    {
    //string.c_str是Borland封装的String类中的一个函数,它返回当前字符串的首字符地址。
    sendInLoop(buf.c_str(),buf.size());
    }
    else
    {
    loop_->runInLoop(std::bind(&TcpConnection::sendInLoop
    , this
    , buf.c_str()
    , buf.size()
    ));
    }
    }

    }

    /**
    * 发送数据,应用写得快,内核发送数据慢,
    * 需要把待发送的数据写入缓冲区
    * 且设置了水位回调,防止发送太快
    */
    void TcpConnection::sendInLoop(const void* data, size_t len)
    {
    ssize_t nwrote = 0; // 已经写了多少数据
    size_t remaining = len; //未发送的数据
    bool faultError = false; //记录是否产生错误

    //之前调用过connection的shutdown 不能在发送了
    if(state_ == kDisconnected)
    {
    LOG_ERROR("disconnected,give up writing!");
    return ;
    }

    //channel 第一次开始写数据,且缓冲区没有待发送数据
    if(!channel_->isWriteEvent() && outputBuffer_.readAbleBytes() == 0)
    {
    nwrote = ::write(channel_->fd(),data,len);
    if(nwrote >= 0)
    {
    // 更新剩余数据
    remaining = len - nwrote;
    if(remaining == 0 && writeCompleteCallback_)
    {
    //一次性数据全部发送完成,就不要再给channel设置epollout事件了
    loop_->queueInLoop(
    std::bind(writeCompleteCallback_,shared_from_this()));
    }
    }
    else
    {
    nwrote = 0;
    if(errno != EWOULDBLOCK) //用于非阻塞模式,不需要重新读或者写
    {
    LOG_ERROR("TcpConnection::sendInLoop");
    if(errno == EPIPE || errno == ECONNRESET) //SIGPIPE RESET
    {
    faultError = true;
    }
    }
    }
    }

    // 说明当前这一次write ,并没有把数据全发送出去,剩余的数据
    // 需要保存到缓冲区当中,给channel注册epollout事件
    // poller发现tcp发送缓冲区有空间,会通知相应的socket-channel
    // 调用相应的writeCallback()回调方法
    // 也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去
    if(!faultError && remaining > 0)
    {
    //目前发送缓冲区剩余的待发送数据的长度
    size_t oldlen = outputBuffer_.readAbleBytes();

    if(oldlen + remaining >= highWaterMark_
    && oldlen < highWaterMark_
    && highWaterMark_)
    {
    loop_->queueInLoop(
    std::bind(highWaterMarkCallback_,shared_from_this(),oldlen + remaining)
    );
    }

    // 把未发送的数据加入到发送缓冲区
    outputBuffer_.append((char*)data + nwrote,remaining);

    if(!channel_->isWriteEvent())
    {
    channel_->enableWriting(); //注册channel写事件,否则poller不会向channel通知epollout
    }
    }
    }
    • 为了秉持一个线程占用一个eventloop所以我们要确保执行发送任务的时候是发生在事件循环线程中 所以要进行一次判断,如果不是得投放到任务容器里等待被唤醒
    • 发送数据的时候,会有两种情况,一种是第一次发送直接全部发送完了,这时候将它放入事件循环的处理函数容器,并唤醒对应线程处理,第二种情况是第一次没发完,这时就需要将未发完的数据存入outputBuffer里,存入的时候需要进行比较oldlen + remaining >= highWaterMark && oldlen < highWaterMark&& highWaterMark_,如果满足就进行高水位回调(这有助于防止发送方发送过多数据,导致接收方无法及时处理。)
    • 剩余的数据保存到缓冲区当中,要给给channel注册epollout事件(切记,一定要注册channel的写事件,否则poller不会向channel通知epollout),这样poller发现tcp发送缓冲区有空间,会通知相应的socket-channel调用相应的writeCallback()回调方法,也就是调用TcpConnection::handleWrite,把发送缓冲区中数据全部发送出去。
  • 一系列的回调函数处理

    • handleread
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    void TcpConnection::handleRead(TimeStamp receiveTime)
    {
    int savedErrno = 0;
    ssize_t n = inputBuffer_.readFd(channel_->fd(),&savedErrno);
    if(n > 0)
    {
    // 已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage
    // shared_from_this()获取了当前TcpConnection对象的智能指针
    messageCallback_(shared_from_this(),&inputBuffer_,receiveTime);
    }
    else if(n==0) //客户端断开
    {
    handleClose();
    }
    else
    {
    errno = savedErrno;
    LOG_ERROR("TcpConnection::hanleRead");
    handleError();
    }

    }

    void TcpConnection::handleWrite()
    {
    if(channel_->isWriteEvent())
    {
    int savedErrno = 0;
    ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
    if(n > 0)
    {
    outputBuffer_.retrieve(n); //处理了n个
    if(outputBuffer_.readAbleBytes() == 0) //发送完成
    {
    channel_->disableWriting(); //不可写了
    if(writeCompleteCallback_)
    {
    //唤醒loop对应的thread线程,执行回调
    loop_->queueInLoop(
    std::bind(writeCompleteCallback_,shared_from_this())
    );
    }
    if(state_ == kDisconnecting)
    {
    shutdownInLoop();// 在当前loop中删除TcpConnection
    }
    }
    }
    else
    {
    LOG_ERROR("TcpConnection::handleWrite");
    }
    }
    else
    {
    LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
    }
    }
      • 关于readFd在Buffer类中我们已经剖析过了Buffer类,接着已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessageshared_from_this()获取了当前TcpConnection对象的智能指针.
  • handlewirte

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    void TcpConnection::handleWrite()
    {
    if(channel_->isWriteEvent())
    {
    int savedErrno = 0;
    ssize_t n = outputBuffer_.writeFd(channel_->fd(),&savedErrno);
    if(n > 0)
    {
    outputBuffer_.retrieve(n); //处理了n个
    if(outputBuffer_.readAbleBytes() == 0) //发送完成
    {
    channel_->disableWriting(); //不可写了
    if(writeCompleteCallback_)
    {
    //唤醒loop对应的thread线程,执行回调
    loop_->queueInLoop(
    std::bind(writeCompleteCallback_,shared_from_this())
    );
    }
    if(state_ == kDisconnecting)
    {
    shutdownInLoop();// 在当前loop中删除TcpConnection
    }
    }
    }
    else
    {
    LOG_ERROR("TcpConnection::handleWrite");
    }
    }
    else
    {
    LOG_ERROR("TcpConnection fd=%d is down, no more writing \n",channel_->fd());
    }
    }
  • 如果可写,通过fd发送数据,直到发送完成

  • 设置不可写,如果writeCompleteCallback_,唤醒loop对应的thread线程,执行回调
  • 当前TCP正在断开连接,调用shutdownInLoop,在当前loop中删除TcpConnection
  • handleclose

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    //Poller => Channel::closeCallback => TcpConnection::handlerClose
    void TcpConnection::handleClose()
    {
    LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n",channel_->fd(),(int)state_);
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr connPtr(shared_from_this());
    connectionCallback_(connPtr); //执行连接关闭的回调
    closeCallback_(connPtr); //关闭连接的回调 TcpServer => TcpServer::removeConnection
    }
  • 处理逻辑就是将这个TcpConnection对象中的channel从事件监听器中移除。然后调用connectionCallback和closeCallback保存的回调函数。closeCallback在TcpServer::newConnection()为新连接新建TcpConnection时,已设为TcpServer::removeConnection(),而removeConnection()最终会调用TcpConnection::connectDestroyed()来销毁连接资源。

  • handleerror

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    void TcpConnection::handleError()
    {
    int optval;
    socklen_t optlen = sizeof optval;
    int err = 0;
    if(::getsockopt(channel_->fd(),SOL_SOCKET,SO_ERROR,&optval,&optlen) < 0)
    {
    err = errno;
    }
    else
    {
    err = optval;
    }
    LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n",name_.c_str(),err);

    }
  • 关闭写端函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//关闭连接
void TcpConnection::shutdown()
{
if(state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop,this)
);
}
}

void TcpConnection::shutdownInLoop()
{
if(!channel_->isWriteEvent()) //说明当前outputBuffer中的数据已经全部发送完成
{
socket_->shutDownWrite(); // 关闭写端

}
}
  • 注意: 为什么是关闭了写端呢?在TcpConnection::shutdownInLoop()中,我们会发现它调用了Socket的shutdowmWrite,这里并没有使用close,陈硕大佬原话是这样的:Muduo TcpConnection 没有提供 close,而只提供 shutdown ,这么做是为了收发数据的完整性。因为TCP 是一个全双工协议,同一个文件描述符既可读又可写, shutdownWrite() 关闭了“写”方向的连接,保留了“读”方向,这称为 TCP half-close。如果直接 close(socket_fd),那么 socket_fd 就不能读或写了。用 shutdown 而不用 close 的效果是,如果对方已经发送了数据,这些数据还“在路上”,那么 muduo 不会漏收这些数据。换句话说,muduo 在 TCP 这一层面解决了“当你打算关闭网络连接的时候,如何得知对方有没有发了一些数据而你还没有收到?”这一问题。当然,这个问题也可以在上面的协议层解决,双方商量好不再互发数据,就可以直接断开连接。等于说 muduo 把“主动关闭连接”这件事情分成两步来做,如果要主动关闭连接,它会先关本地“写”端,等对方关闭之后,再关本地“读”端。
  • 另外如果当前outputbuffer里面还有数据尚未发出的话,Muduo也不会立刻调用shutwownWrite,而是等到数据发送完毕再shutdown,可以避免对方漏收数据。
  • 关闭连接事件很重要,涉及到TcpConnection和Channel的生命周期以及是否能合理销毁,用了智能指针来管理和控制生命周期。下面我们就来分析一下断开流程中TcpConnection的引用计数问题

    1. 首先连接到来创建TcpConnection,并存入容器。引用计数+1 总数:1
    2. 客户端断开连接,在Channel的handleEvent函数中会将Channel中的TcpConnection弱指针提升,引用计数+1 总数:2
    3. 触发HandleRead ,可读字节0,进而触发HandleClose,HandleClose函数中栈上的TcpConnectionPtr guardThis会继续将引用计数+1 总数:3
    4. 触发HandleClose的回调函数 在TcpServer::removeConnection结束后(回归主线程队列),释放HandleClose的栈指针,以及Channel里提升的指针引用计数-2 总数:1
    5. 主线程执行回调removeConnectionInLoop,在函数内部将tcpconnection从TcpServer中保存连接容器中erase掉。但在removeConnectionInLoop结用conn为参数构造了bind。引用计数不变 总数:1
    6. 回归次线程处理connectDestroyed事件,结束完释放参数传递的最后一个shard_ptr,释放TcpConnection。引用计数-1 总数:0
  • 连接建立与删除函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//建立连接
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); //向poller注册channel的epollin事件

//新连接建立 执行回调
connectionCallback_(shared_from_this());
}

//销毁连接
void TcpConnection::connectDestroyed()
{
if(state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll(); //把channel所有感兴趣的事件,从poller中del掉
connectionCallback_(shared_from_this());
}
channel_->remove();//把channel从poller中删除掉
}

muduo库TcpServer类

  • TcpServer与用户打交道,用户可以传入读回调函数,写完成回调等函数给TcpConnection
  • TcpServer 发送逻辑给Acceptor或者TcpConnection,进行相应处理

TcpServer重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//baseloop_用户定义的loop
EventLoop *loop_;

// 本地ip端口
const std::string ipPort_;

// 名称
std::string name_;

// 用来连接新到的客户端
std::unique_ptr<Acceptor> acceptor_;

// 指向一个线程池
std::shared_ptr<EventLoopThreadPool> threadPool_;

// 有新连接时的回调
ConnectionCallback connectionCallback_;

// 有读写消息时的回调
MessageCallback messageCallback_;

//消息发送完成以后的回调
WriteCompleteCallback writeCompleteCallback_;

//LOOP线程初始化的回调 std::function类型 调用者,调用回调函数
ThreadInitCallback threadInitCallback_;

//防止一个TcpServer对象被start多次
std::atomic_int started_;

int nextConnId_;

// 用来存储Connection的map
ConnectionMap connMap_;
  • 详解见注释

TcpServer重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "TcpServer.h"

#include <strings.h>
#include <TcpConnection.h>

#include "logger.h"

static EventLoop* CheckLoopNotNull(EventLoop* loop) {
if(loop == nullptr) {
LOG_FATAL("%s:%s:%d mainloop is null! \n",__FILE__,__FUNCTION__,__LINE__);
}
return loop;
}

TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(std::make_unique<Acceptor>(loop, listenAddr, option == kReusePort))
, threadPool_(std::make_shared<EventLoopThreadPool>(loop, nameArg))
, messageCallback_()
, writeCompleteCallback_()
, started_(0)
, nextConnId_(0)
, connMap_()
{
acceptor_->setNewConnectionCallBack(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}

// 遍历connMap_, 挨个删除map中的内容,并将TcpConnection::connectDestroyed投递到conn的loop中
TcpServer::~TcpServer() {
for(auto& item: connMap_) {
//这个局部的shared_ptr智能指针对象,出右括号
//可以自动释放new出来的TcpConnetion对象资源
TcpConnectionPtr conn(item.second);
item.second.reset();
//销毁连接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed,conn)
);
}
}

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) {
// 1.先用轮询算法选出一个subloop
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf,"-%s#%d",ipPort_.c_str(),nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(),connName.c_str(),peerAddr.toIpPort().c_str());

//2.通过socket获取其绑定的本机的ip地址和端口信息
sockaddr_in local;
::bzero(&local,sizeof local);
socklen_t addrlen = sizeof local;
if(::getsockname(sockfd,(sockaddr*) &local,&addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}

InetAddress localAddr(local);

//3.根据连接成功的sockfd,创建 TcpConnection连接对象conn
TcpConnectionPtr conn = std::make_shared<TcpConnection>(
ioLoop, connName, sockfd, localAddr, peerAddr);

//4.下面的回调都是用户设置给TcpServer=>TcpConnection=>Channel=>Poller=>notify channel调用回调
connMap_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);

//5.设置了如何关闭连接的回调 conn->shutdown
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection,this,std::placeholders::_1)
);

//6.直接调用TcpConnection::connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished,conn));
}

// 启动线程池,并把Acceptor监听加到事件循环中
void TcpServer::start() {
//防止一个TcpServer对象被start多次
if(started_++ == 0) {
threadPool_->start(threadInitCallback_);
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}

// 设置threadPool_的线程数量
void TcpServer::setThreadNums(const int threadNums) {
threadPool_->setThreadNums(threadNums);
}

// 将删除操作投递到事件循环中
void TcpServer::removeConnection(const TcpConnectionPtr &conn) {
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

// 先将connMap里的删掉,在调用TcpConnection的删除函数
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn) {
LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection \n",
name_.c_str(),conn->name().c_str());

connMap_.erase(conn->name()); //从map表中删除
EventLoop *ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed,conn)
);
}

  • 具体功能见注释

muduo库Thread类

Thread重要成员变量

1
2
3
4
5
6
7
8
9
10
11
std::string name_;//线程名字
bool started_;//线程是否启动
bool joined_;//是否被join回收
pid_t tid_;//是否被join回收
std::shared_ptr<std::thread> thread_; //用智能指针管理

// 线程要执行的任务
ThreadFunc func_;

// 记录线程开辟了多少用于编号
static std::atomic_int numCreated_;
  • 具体见注释

Thread重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
std::atomic_int Thread::numCreated_(0); //静态成员变量 要在类外单独进行初始化

Thread::Thread(ThreadFunc func, const std::string &name)
: name_(name)
, started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func))
{
setDefaultName();
}

Thread::~Thread() {
//线程已经运行起来了,并且没有joined_
if (started_ && !joined_)
{
//thread类提供的分离线程的方法
thread_->detach();
}
}

void Thread::setDefaultName() {
int num = ++numCreated_;
if(name_.empty())
{
char buf[32] = {0};
snprintf(buf,sizeof buf,"Thread%d",num);
name_=buf;
}
}

void Thread::start() {
started_ = true;
sem_t sem;
// 初始化一个信号量
sem_init(&sem, 0, 0);
thread_ = std::make_shared<std::thread>([this, &sem]() {
++numCreated_;
// 得到当前线程的tid
tid_ = CurrentThread::tid();

// 信号量 + 1,说明tid_已经有了
sem_post(&sem);

// 执行线程任务
func_();
});

// 等待返回线程id
sem_wait(&sem);
}

void Thread::join() {
joined_ = true;
thread_->join();
}
  • 值得注意的是线程并不是在构造函数中创建的,并且必须得创建线程后得到id才能继续运行(因为其他逻辑实现的one loop per thread必须要得到tid)
  • 补充一下sem_t变量类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    sem_init()// sem为指向信号量结构的一个指针;pshared不为0时此信号量在进程间共享,否则只能为当前进程的所有线程共享;value给出了信号量的初始值。
    extern int sem_init __P ((sem_t *__sem, int __pshared, unsigned int __value)); 

    sem_wait( sem_t *sem ) //用来阻塞当前线程直到信号量sem的值大于0,解除阻塞后将sem的值减一,表明公共资源经使用后减少。函数sem_trywait ( sem_t *sem )是函数sem_wait()的非阻塞版本,它直接将信号量sem的值减一。

    sem_post( sem_t *sem )//用来增加信号量的值。当有线程阻塞在这个信号量上时,调用这个函数会使其中的一个线程不在阻塞,选择机制同样是由线程的调度策略决定的。
    //sem_post和sem_wait函数配合使用来达到线程同步

    sem_destroy(sem_t *sem)//用来释放信号量sem。

muduo库EventLoopThread类

EventLoopThread重要成员变量

1
2
3
4
5
6
7
EventLoop *loop_;  //线程内部的eventloop*
bool exiting_; //线程是否退出
Thread thread_; //线程
std::mutex mutex_; //互斥锁
std::condition_variable cond_; //条件变量
ThreadInitCallBack callback_; //线程初始化回调函数

  • 具体见注释

EventLoopThread重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include "EventLoopThread.h"

EventLoopThread::EventLoopThread(const ThreadInitCallBack &cb, const std::string &name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this))
, mutex_()
, cond_()
, callback_(cb)
{

}

EventLoopThread::~EventLoopThread() {
exiting_ = true;
if(loop_ != nullptr) {
// 退出循环
loop_->quit();
thread_.join();
}
}

EventLoop *EventLoopThread::startLoop() {
// 创建线程, 执行任务
thread_.start();
/**
* 会调用Thread::start(),然后执行func_(); func_(std::move(func));
* 而func就是&EventLoopThread::threadFunc,this 传入的,所以会启动一个新线程
*/
EventLoop* loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock,[this]() {
return loop_ != nullptr;
});
loop = loop_;
}
return loop;
}

void EventLoopThread::threadFunc() {

EventLoop loop;

if(callback_) {
callback_(&loop);
}

{
std::lock_guard<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}

loop.loop();
std::lock_guard<std::mutex> lock(mutex_);
loop_=nullptr;
}



  • 启动LOOP的过程

    1. 首先,在startLoop中调用了Thread::start(),而这里的thread的线程函数是threadFunc,在上一篇剖析Thread类时,我们在Thread::start(),看到了一个线程函数func(),所以func就是构造函数中&EventLoopThread::threadFunc,this 传入的,所以这里会创建线程调用threadFunc()函数,并且主线程阻塞等待EventLoop对象的创建
    2. 此时有两个线程在运行 一个是调用EventLoopThread::startLoop()的线程,一个是执行EventLoopThread::threadFunc()的线程 IO线程
    3. threadFunc是在单独的新线程里面运行的,创建一个独立的Eventloop,和上面的线程是一一对应的 one loop per thread
    4. 将IO线程定义好的loop传入回调
    5. 创建好loop了唤醒主线程,并把loop给主线程,主线程返回IO线程创建的EventLoop对象
    6. 注意开始执行 loop.loop();,也就是EventLoop loop => Poller.poll,开始一个循环,知道循环结束eventloop析构,把loop设为空。
  • 总体来说,EventLoopThread提供了对应eventloop和thread的封装,意为I/O线程类,EventLoopThread可以创建一个IO线程,通过startLoop返回一个IO线程的loop,threadFunc中开启loop循环。

muduo库EventLoopThreadPool类

EventLoopThreadPool重要成员变量

1
2
3
4
5
6
7
8
EventLoop *baseLoop_;  //主线程loop
std::string name_;
bool started_; //标记当前状态 即IO线程是否开始运行
int numThreads_; //线程池中线程的数量
int next_; //负载均衡用
std::vector<std::unique_ptr<EventLoopThread>> threads_;//创建事件的线程
std::vector<EventLoop*> loops_; //事件线程里面EventLoop的指针,每个EventLoopThread线程对应的EventLoop保存在loops_中

  • 具体含义见注释

EventLoopThreadPool重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
explicit EventLoopThreadPool(EventLoop* baseloop, const std::string& nameArg = std::string{});
~EventLoopThreadPool();

// 设置线程池子的线程数量
void setThreadNums(const signed int threadNums) { threadNums_ = threadNums;}

// 开启事件循环线程
void start(const ThreadInitCallBack &cb = ThreadInitCallBack());

//若是多线程,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* getNextLoop();

// 得到所有的EventLoop
std::vector<EventLoop*> getAllLoops();

bool started() const{ return started_;}

const std::string name() const { return name_;}


//===================================对应实现=============================================
#include "EventLoopThreadPool.h"

EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseloop, const std::string &nameArg)
: baseLoop_(baseloop)
, started_(false)
, name_(nameArg)
, threadNums_(std::thread::hardware_concurrency())
, next_(0)
{

}

EventLoopThreadPool::~EventLoopThreadPool() {
// 因为子线程的的loop 是栈中分配的所以不需要手动删除
}

EventLoop *EventLoopThreadPool::getNextLoop() {
EventLoop* loop = baseLoop_;
// 通过轮询,获取下一个处理事件的loop
if(!loops_.empty()) {
loop = loops_[next_];
++next_;
if(next_ >= loops_.size()) next_ = 0;
}
return loop;
}

// 空的时候是单线程
std::vector<EventLoop *> EventLoopThreadPool::getAllLoops() {
if(loops_.empty()) {
return std::vector<EventLoop*>{baseLoop_};
}else {
return loops_;
}
}

void EventLoopThreadPool::start(const ThreadInitCallBack &cb) {
started_ = true;
for(auto i = 0; i < threadNums_; i ++) {
char buf[name_.size() + 32];
snprintf(buf,sizeof buf,"%s%d",name_.c_str(),i);
auto t = std::make_unique<EventLoopThread>(cb, buf);
// 启动循环并创建线程
loops_.emplace_back(t->startLoop());
// 放入threads
threads_.emplace_back(std::move(t));
}

//如果整个循环就只有一个线程,就让这个主线程来执行回调函数
if(threadNums_ == 0 && cb) {
cb(baseLoop_);
}
}


  • 启动线程池,实际上创建numThreads个线程,并让每个eventloopthread调用startLoop()
    1. 设置当前状态为true,根据需要的线程数numThreads,创建线程
    2. _在for循环中,先创建一个EventLoopThread对象,构造线程池内线程集合
    3. 调用EventLoopThread::startLoop(),创建线程,绑定一个新的EventLoop,并返回loop地址,放入loops中,loops是一个std::vector类型
    4. 把每个EventLoopThread线程对应的EventLoop保存在loops中。
    5. 如果没有其他线程,只有主线程的话,直接调用callback
  • muduo是支持单线程和多线程的

    1
    2
    3
    4
    5
    EventLoop* loop = baseLoop_;, 

    if(threadNums_ == 0 && cb) {
    cb(baseLoop_);
    }

    这两块就实现了单线程功能

EchoServer之建立

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <stdio.h>
#include <unistd.h>
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <cstring>
#include <sys/types.h>
#include <memory>
#include "include/TimeStamp.h"
#include "include/InetAddress.h"
#include "include/TcpConnection.h"
#include "include/EventLoop.h"
#include "include/TcpServer.h"
#include "include/logger.h"

class EchoServer {
public:
EchoServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg)
: loop_(loop)
, tcpServer_(loop, listenAddr, nameArg)
{
Start();
//注册回调函数
tcpServer_.setConnectionCallback(
std::bind(&EchoServer::onConnection,this,std::placeholders::_1)
);

tcpServer_.setMessageCallback(
std::bind(&EchoServer::onMessage,this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);

//设置合适的loop线程数量 loopThread
tcpServer_.setThreadNums(std::thread::hardware_concurrency());
}

void Start() {
tcpServer_.start();
}
private:

//可读写事件回调
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
TimeStamp time)
{
std::string msg = buf->retrieveAllAsString();
LOG_INFO("%s recv %d bytes at ", conn->name(), msg.size(),time.to_string());
if (msg == "exit\n")
{
conn->send("bye\n");
conn->shutdown();
}
if (msg == "quit\n")
{
loop_->quit();
}
conn->send(msg);
}

//连接建立或者断开的回调
void onConnection(const TcpConnectionPtr &conn)
{
if(conn->connected())
{
LOG_INFO("Connection UP : %s",conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s",conn->peerAddress().toIpPort().c_str());
}
}

EventLoop* loop_;
TcpServer tcpServer_;
};

int main(int argc, char* argv[]){
EventLoop loop;
InetAddress addr(8000);
//Acceptor non-blocking listenfd create bind
EchoServer server(&loop,addr,"EchoServer-01");
//listen loopthread listenfd => acceptChannel => mainLoop => subloop
server.Start();
loop.loop(); //启动mainloop的底层pooler
return 0;
}

具体执行逻辑可以看代码或者访问https://blog.csdn.net/T_Solotov/article/details/124044175这篇大佬的文章

muduo库中的定时器类

Timer类

Timer重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 定时到期需要执行的任务
TimerCallback timerCaLLBack_;

// 何时到期
TimeStamp expiration_;

// 距离下次启动的时间间隔,如果不是重复定时器,其值为0
double interval_;

// 标志是否是重复定时器
bool repeat_;

// 定时器的唯一ID
std::int64_t sequence_;

// 定时器开辟的数量
static std::atomic_int64_t numCreated_;
  • 具体逻辑见注释

Timer重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
 using TimerCallback = std::function<void()>;
Timer(TimerCallback timercallback, TimeStamp when, double interval);
~Timer() = default;

TimeStamp expiration() const { return expiration_; }

bool repeat() const { return repeat_; }

int64_t sequence() const { return sequence_; }

void run() const{
timerCaLLBack_();
}

void restart(TimeStamp now);

static int64_t numCreated() { return numCreated_; }



// ================================其实现================================


#include "Timer.h"
#include "TimeStamp.h"

std::atomic_int64_t Timer::numCreated_ = 0;

Timer::Timer(TimerCallback timercallback, TimeStamp when, double interval)
: timerCaLLBack_(timercallback)
, expiration_(when)
, interval_(interval)
, repeat_(interval > 0.0)
, sequence_(++ numCreated_)
{

}

void Timer::restart(TimeStamp now) {
if (repeat_)
{
expiration_ = TimeStamp::addTime(now, interval_);
}
else
{
expiration_ = TimeStamp::invaild();
}
}

  • restart函数判断是否重复定时器构造到期时间,如果是重复就设置到期时间为设定的时间,否则为0

Timer设计思想

  • 首先需要到期时需要执行的任务,然后需要存储到期时间以及是否是重复定时器,以及定时器的唯一id

TimerId类

TimerId重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#pragma once
#include <cstdint>


class Timer;

class TimerId
{
public:
TimerId()
: timer_(NULL),
sequence_(0)
{
}

TimerId(Timer* timer, int64_t seq)
: timer_(timer),
sequence_(seq)
{
}

TimerId(const TimerId& timerid) = default;

TimerId& operator=(const TimerId& timerid) = default;

friend class TimerQueue;

private:
// 指向一个定时器
Timer* timer_;

// 该定时器对应编号
int64_t sequence_;
};

TimerQueue类

TimerQueue重要成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
EventLoop* loop_;

// timerfd的描述符
int timerfd_;

// 与timer描述符相对应的channel
Channel timerfdChannel_;

// 定时器任务列表,以触发时间排序,小的在前
TimerList timers_;

// 待处理的定时器列表
ActiverTimerSet activeTimers_;

// 标记是否正在执行要过期的定时器
std::atomic_bool callingExpiredTimers_;

// 暂存正要过期的定时器列表
ActiverTimerSet cancleTimers_;
  • 具体见注释

TimerQueue重要成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
public:
using TimerCallback = std::function<void()>;
explicit TimerQueue(EventLoop* loop);
~TimerQueue();

// 增加定时器任务
TimerId addTimer(TimerCallback cb,
TimeStamp when,
double interval);
// 取消定时器任务
void cancel(TimerId timerId);
private:
using ActiverTimer = std::pair<Timer *, int64_t>;
using ActiverTimerSet = std::set<ActiverTimer>;
using Entry = std::pair<TimeStamp, Timer*>;
using TimerList = std::set<Entry>;

// 取消定时器任务
void cancelInLoop(TimerId timerid);

// 处理readfd的读事件,得到过期的定时器,并且执行对应任务,并设置下一个要发生的定时器
void handleRead();

// 把过期的定时器从timers_和activeTimer_中删掉 并返回所有过期的定时器集合
std::vector<Entry> getExpired(TimeStamp now);

// 加入到容器中,加入时候要注意是否更新了最早触发任务
void addTimerInLoop(Timer* timer);

// 如果有重复的定时器就重新设置,否则就直接删除,获取过期时间最早的定时器的时间,并将它设置到timerfd中
void reset(const std::vector<Entry>& expired, TimeStamp now);

// 插入到ActiveTimeSet和timers,同时注意是否会影响一个要发生的定时器时间
bool insert(Timer* timer);



// =====================================其实现===============================

#include "TimerQueue.h"
#include "logger.h"
#include <sys/timerfd.h>

timespec howMuchTimeFromNow(TimeStamp when) {
int64_t microseconds = when.microSecondsSinceEpoch()
- TimeStamp::now().microSecondsSinceEpoch();
if (microseconds < 100)
{
microseconds = 100;
}
struct timespec ts;
ts.tv_sec = static_cast<time_t>(
microseconds / TimeStamp::kMicroSecondsPerSecond);
ts.tv_nsec = static_cast<long>(
(microseconds % TimeStamp::kMicroSecondsPerSecond) * 1000);
return ts;
}

int createTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if(timerfd < 0) {
LOG_ERROR("timerfd_create failed!");
}
return timerfd;
}

void resetTimerfd(int timerfd, TimeStamp expiration) {
itimerspec newValue{};
itimerspec oldValue{};
// 定时的间隔时间
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = timerfd_settime(timerfd, 0, &newValue, &oldValue);
if(ret < 0) {
LOG_ERROR("timerfd_settime failed");
}
}

void readTimerfd(int timerfd, TimeStamp now) {
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_INFO("TimerQueue::handleRead() %lu at %s", howmany, now.to_string().c_str());
if (n != sizeof howmany)
{
LOG_ERROR("TimerQueue::handleRead() reads %ld bytes instead of 8", n);
}
}

TimerQueue::TimerQueue(EventLoop *loop)
: loop_(loop)
, timerfd_(createTimerfd())
, timerfdChannel_(timerfd_, loop)
, timers_()
, activeTimers_()
, callingExpiredTimers_(false)
, cancleTimers_()
{
timerfdChannel_.setReadCallBack(std::bind(&TimerQueue::handleRead, this));

// 设置可读 并且让loop监听读事件
timerfdChannel_.enableReading();
}

TimerQueue::~TimerQueue() {
timerfdChannel_.disableAll();
timerfdChannel_.remove();
close(timerfd_);
for(auto& t: timers_) {
delete t.second;
}
}

TimerId TimerQueue::addTimer(TimerCallback cb, TimeStamp when, double interval) {
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(std::bind(std::bind(&TimerQueue::addTimerInLoop, this, timer)));
return TimerId(timer, timer->sequence());
}

void TimerQueue::addTimerInLoop(Timer *timer) {
bool earliestChanged = insert(timer);
if(earliestChanged) {
resetTimerfd(timerfd_, timer->expiration());
}
}

bool TimerQueue::insert(Timer *timer) {
bool earliestChanged = false;
TimeStamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
if(it == timers_.end() || when < it->first) {
earliestChanged = true;
}

{
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
}

{
std::pair<ActiverTimerSet::iterator, bool> result
= activeTimers_.insert(ActiverTimer(timer, timer->sequence()));
}

return earliestChanged;

}

void TimerQueue::cancel(TimerId timerId) {
loop_->runInLoop(
std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::cancelInLoop(TimerId timerid) {
ActiverTimer atimer(timerid.timer_, timerid.sequence_);
ActiverTimerSet::iterator it = activeTimers_.find(atimer);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
(void)n;
delete it->first; // FIXME: no delete please
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
cancleTimers_.insert(atimer);
}
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(TimeStamp now) {
std::vector<TimerQueue::Entry> expired;
// 这是一个哨兵
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// 找到第一个大于或等于的定时器
auto end = timers_.lower_bound(sentry);
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);

for (const Entry& it : expired) {
ActiverTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
}
return std::move(expired);
}

void TimerQueue::handleRead() {
TimeStamp now(TimeStamp::now());

readTimerfd(timerfd_, now);

std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancleTimers_.clear();
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

reset(expired, now);
}

void TimerQueue::reset(const std::vector<Entry> &expired, TimeStamp now) {
TimeStamp nextExpire;

for (const Entry& it : expired) {
ActiverTimer timer(it.second, it.second->sequence());
if (it.second->repeat()
&& cancleTimers_.find(timer) == cancleTimers_.end())
{
it.second->restart(now);
insert(it.second);
}
else
{
delete it.second;
}
}

if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}



取消定时器的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void TimerQueue::cancel(TimerId timerId) {
loop_->runInLoop(
std::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::cancelInLoop(TimerId timerid) {
ActiverTimer atimer(timerid.timer_, timerid.sequence_);
ActiverTimerSet::iterator it = activeTimers_.find(atimer);
if (it != activeTimers_.end())
{
size_t n = timers_.erase(Entry(it->first->expiration(), it->first));
(void)n;
delete it->first; // FIXME: no delete please
activeTimers_.erase(it);
}
else if (callingExpiredTimers_)
{
cancleTimers_.insert(atimer);
}
}
  • 该功能是提供给用户手动取消不想要的定时器任务,其实现逻辑是根据用户传入的TimerId将其从activeTimer(活跃定时器中删除)以及timers(存储定时器的列表)中删除,如果它并没有被触发就直接删除,否则就判断是否正在处理别的定时器任务,是的话现暂存在cancleTimers取消定时器列表中(因为这样可以防止别的定时器会调用这个定时器而导致未定义行为),cancleTimers这个列表的东西会在准备开始处理下次的一组过期任务的时候被删除

加入定时器的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void readTimerfd(int timerfd, TimeStamp now) {
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_INFO("TimerQueue::handleRead() %lu at %s", howmany, now.to_string().c_str());
if (n != sizeof howmany)
{
LOG_ERROR("TimerQueue::handleRead() reads %ld bytes instead of 8", n);
}
}

TimerId TimerQueue::addTimer(TimerCallback cb, TimeStamp when, double interval) {
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(std::bind(std::bind(&TimerQueue::addTimerInLoop, this, timer)));
return TimerId(timer, timer->sequence());
}

void TimerQueue::addTimerInLoop(Timer *timer) {
bool earliestChanged = insert(timer);
if(earliestChanged) {
resetTimerfd(timerfd_, timer->expiration());
}
}

// 插入到ActiveTimeSet和timers,同时注意是否会影响一个要发生的定时器时间
bool TimerQueue::insert(Timer *timer) {
bool earliestChanged = false;
TimeStamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
if(it == timers_.end() || when < it->first) {
earliestChanged = true;
}

{
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
}

{
std::pair<ActiverTimerSet::iterator, bool> result
= activeTimers_.insert(ActiverTimer(timer, timer->sequence()));
}

return earliestChanged;

}
  • 加入新的定时器的时候,需要判断新加入的定时器过期时间会不会是最早的(这是通过set红黑数实现的),也就是下一次会先执行,如果是的话 要更新下一次过期时间给timerfd(调用resettimerfd),然后不管是不是都要加入到对应的存储容器中

处理定时器过期逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
void TimerQueue::handleRead() {
TimeStamp now(TimeStamp::now());

readTimerfd(timerfd_, now);

std::vector<Entry> expired = getExpired(now);

callingExpiredTimers_ = true;
cancleTimers_.clear();
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;

reset(expired, now);
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(TimeStamp now) {
std::vector<TimerQueue::Entry> expired;
// 这是一个哨兵
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
// 找到第一个大于或等于的定时器
auto end = timers_.lower_bound(sentry);
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);

for (const Entry& it : expired) {
ActiverTimer timer(it.second, it.second->sequence());
size_t n = activeTimers_.erase(timer);
}
return std::move(expired);
}

void TimerQueue::reset(const std::vector<Entry> &expired, TimeStamp now) {
TimeStamp nextExpire;

for (const Entry& it : expired) {
ActiverTimer timer(it.second, it.second->sequence());
if (it.second->repeat()
&& cancleTimers_.find(timer) == cancleTimers_.end())
{
it.second->restart(now);
insert(it.second);
}
else
{
delete it.second;
}
}

if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}

if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}

void resetTimerfd(int timerfd, TimeStamp expiration) {
itimerspec newValue{};
itimerspec oldValue{};
// 定时的间隔时间
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = timerfd_settime(timerfd, 0, &newValue, &oldValue);
if(ret < 0) {
LOG_ERROR("timerfd_settime failed");
}
}

void readTimerfd(int timerfd, TimeStamp now) {
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_INFO("TimerQueue::handleRead() %lu at %s", howmany, now.to_string().c_str());
if (n != sizeof howmany)
{
LOG_ERROR("TimerQueue::handleRead() reads %ld bytes instead of 8", n);
}
}
  • 由于使用了timerfd,所以定时器过期后会有read事件,接收到后会调用读处理函数,读处理函数中会先读取Timerfd,然后获取过期定时器列表(通过一个哨兵,使用二分找到lower_bound找到第一个大于或等于的位置,执行对应删除),获取后执行对应任务,最后重新设置下次过期时间以达到循环目的
  • 重新设置的时候需要判断过期任务列表中是否有重复定时器,有的话重新设定,并且更新下一次过期时间给timerfd

Timerfd的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int createTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if(timerfd < 0) {
LOG_ERROR("timerfd_create failed!");
}
return timerfd;
}

void resetTimerfd(int timerfd, TimeStamp expiration) {
itimerspec newValue{};
itimerspec oldValue{};
// 定时的间隔时间
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = timerfd_settime(timerfd, 0, &newValue, &oldValue);
if(ret < 0) {
LOG_ERROR("timerfd_settime failed");
}
}

void readTimerfd(int timerfd, TimeStamp now) {
uint64_t howmany;
ssize_t n = ::read(timerfd, &howmany, sizeof howmany);
LOG_INFO("TimerQueue::handleRead() %lu at %s", howmany, now.to_string().c_str());
if (n != sizeof howmany)
{
LOG_ERROR("TimerQueue::handleRead() reads %ld bytes instead of 8", n);
}
}
  • timerfd 这个名字拆开来看,就是 timer fd,所谓定时器 fd 类型,那么它的可读可写事件一定是跟时间有关系。timerfd 被 new 出来之后 ( timerfd_create ),可以设置超时时间( timerfd_setting ),超时之后,该句柄可读,读出来的是超时的次数

  • 他有三个用法

    1
    2
    3
    4
    5
    6
    // 创建一个 timerfd 句柄
    int timerfd_create(int clockid, int flags);
    // 启动或关闭 timerfd 对应的定时器,会返回上次设定时间给传入的参数
    int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
    // 获取指定 timerfd 距离下一次超时还剩的时间
    int timerfd_gettime(int fd, struct itimerspec *curr_value);
  • 两个需要用得到结构体

    • timespec
    1
    2
    3
    4
    5
    // timespec 结构体用于表示时间,其定义如下:
    struct timespec {
    time_t tv_sec; /* 秒 */
    long tv_nsec; /* 纳秒 */
    };
    • itimespec
    1
    2
    3
    4
    5
    // itimerspec 结构体用于定义定时器的设置值和剩余时间,
    struct itimerspec {
    struct timespec it_interval; /* 重复周期 */
    struct timespec it_value; /* 首次触发时间 */
    };
    • 用法

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      #include <stdio.h>
      #include <time.h>
      #include <signal.h>
      #include <stdlib.h>
      #include <string.h>

      timer_t timerid;

      void create_timer(void) {
      struct itimerspec new_value;
      struct itimerspec curr_value;

      if (timer_create(CLOCK_REALTIME, NULL, &timerid) == -1) {
      perror("timer_create");
      exit(1);
      }

      memset(&new_value, 0, sizeof(new_value));
      new_value.it_value.tv_sec = 5;
      new_value.it_interval.tv_sec = 2;
      if (timer_settime(timerid, 0, &new_value, NULL) == -1) {
      perror("timer_settime");
      exit(1);
      }
      }

      void signal_handler(int sig, siginfo_t *si, void *uc) {
      static int count = 1;
      struct itimerspec curr_value;

      printf("Signal handler called - count: %d\n", count);

      if (timer_gettime(timerid, &curr_value) == -1) {
      perror("timer_gettime");
      exit(1);
      }

      count++;
      }

      int main() {
      struct sigaction sa;

      memset(&sa, 0, sizeof(sa));
      sa.sa_flags = SA_SIGINFO;
      sa.sa_sigaction = signal_handler;
      if (sigaction(SIGRTMIN, &sa, NULL) == -1) {
      perror("sigaction");
      exit(1);
      }

      create_timer();

      while(1) {
      pause();
      }

      return 0;
      }