2.1.1 网络IO与IO多路复用

2.1.1.1 思考以下操作,与网络IO有什么关系呢?

  1. 在生活中,我们使用微信,发送文字、视频、语音
  2. 刷抖音时,打开一个视频,视频资源怎么到达我们的 APP 的?
  3. GitHub/GitLab,git clone 为什么代码能到达本地?
  4. 扫描共享单车二维码,能够打开车锁
  5. 通过 APP 操纵空调
  6. 王者荣耀释放技能造成了伤害

💡 核心: Server ↔ Client

以上流程中都有着 Server ↔ Client 之间的交互,体现了网络 IO 在日常生活中数据传输和交互中的重要作用。

2.1.1.2 客户端和服务端进行通信

2.1.1.2.1 简易的客户端

1
nc 192.168.133.128 3263

2.1.1.2.2 仅链接一次的简易服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in servaddr;

servaddr.sin_family = AF_INET; // 指定IP地址版本为IPV4
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定本地0.0.0.0
servaddr.sin_port = htons(3264); // 0~1023系统默认,需要给一个大于1024的地址,小于65535

if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
std::cout << "bind failed" << std::endl;
return -1;
}

listen(sockfd, 10);
printf("listen finished\n");

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr *)&servaddr, &len); // 会阻塞,直至第一次来连接

while (1) {
printf("accept finished\n");
char buffer[1024] = {0};
int count = recv(clientfd, buffer, 1024, 0); // 会阻塞,直至收到消息

if (count == 0) {
std::cout << "client closed!" << std::endl;
close(clientfd);
return 0;
}

printf("RECV:%s\n", buffer);
}

printf("exit\n");
return 0;
}

关键知识点:

  • 端口被绑定以后,不可再次绑定
  • 执行 listen 以后,可以看到 IO 的状态
  • 进入 listen 可以被连接,并且会产生新的连接状态 LISTEN,并且客户端可以发送数据
  • fd 是 IO,TCP 链接是 accept 建立连接

fd(文件描述符)和 TCP(传输控制协议)

  • 在 C 或 C++ 中,当使用 socket() 创建网络套接字时,返回的就是一个文件描述符(即 int 类型),它可以被用在 read()write()recv()send() 等系统调用中与网络通信
  • 文件描述符是一个操作系统的抽象,是一个非负整数,用于引用已打开的文件。可以是常规文件、目录、设备文件,也可以是网络套接字,用于管理打开的文件或套接字
  • 而 TCP 是一个协议,定义了如何在网络上传输数据

2.1.1.2.3 sockfd

sockfd 起始值为 3,0、1、2 是系统默认已经封装好的 fd,可以在 /dev/fd 下看到:

  • sockfd 0: stdin
  • sockfd 1: stdout
  • sockfd 2: stderr

sockfd 系统是逐渐递增的,若前面有完全断开的 clientfd,则会从最开始的 clientfd 接着向后创建。

类似于:3 4 5 3(close) 6 7 3(TIME_WAIT后再次可用)

系统中的 fd 限制(Linux 一切皆文件)

在 close 以后,一个 fd 要等一段时间才能再度使用。在 close 以后,需要等待 IO 回收的时间(TIME_WAIT),一般是 60 秒,可以通过以下方式修改:

1
2
3
4
5
6
# 查看当前设置
sysctl net.ipv4.tcp_fin_timeout

# 修改 tcp_fin_timeout
# tcp_fin_timeout 参数控制 TIME_WAIT 状态的持续时间(以秒为单位)
sudo sysctl -w net.ipv4.tcp_fin_timeout=30

这会将 TIME_WAIT 的持续时间设置为 30 秒。

2.1.1.2.4 一请求一线程的处理方案

如何做到一个服务器处理同时多个客户端的链接呢?我们可以考虑在循环内为每个请求创建一个线程,对之进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#define THREAD_CREATE_FUNC 0

#if THREAD_CREATE_FUNC
void *client_thread(void *clientfd) {
int client_fd = *(int *)clientfd;
while (1) {
printf("accept finished\n");
char buffer[1024] = {0};
int count = recv(client_fd, buffer, 1024, 0);

if (count == 0) {
close(clientfd);
std::cout << "client closed!" << std::endl;
return 0;
}

printf("RECV:%s\n", buffer);
}
}
#else
void client_thread(int clientfd) {
while (1) {
printf("accept finished\n");
char buffer[1024] = {0};
int count = recv(clientfd, buffer, 1024, 0);

if (count == 0) {
close(clientfd);
std::cout << "client closed!" << std::endl;
return;
}

printf("RECV:%s\n", buffer);
}
}
#endif

int main() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建一个socket
struct sockaddr_in servaddr;

servaddr.sin_family = AF_INET; // 指定IP地址版本为IPV4
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定本地0.0.0.0
servaddr.sin_port = htons(3264); // 0~1023系统默认,需要给一个大于1024的地址,小于65535

if (-1 == bind(sockfd, (struct sockaddr *)&servaddr, sizeof(struct sockaddr))) {
std::cout << "bind failed" << std::endl;
return -1;
}

listen(sockfd, 10);
printf("listen finished\n");

// 创建线程方法1
#if THREAD_CREATE_FUNC
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

while (1) {
int clientfd = accept(sockfd, (struct sockaddr *)&clientaddr, &len);

if (clientfd >= 0) {
pthread_t thread_id;
pthread_create(&thread_id, NULL, client_thread, (void *)&clientfd);
}
}
#else
// 创建线程方法2
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);

while (1) {
int clientfd = accept(sockfd, (struct sockaddr *)&clientaddr, &len);

if (clientfd >= 0) {
std::thread t1(client_thread, clientfd);
t1.detach();
}
}
#endif

printf("exit\n");
return 0;
}

一请求一线程的处理方案分析

优点:

  1. 代码逻辑简单

存在的问题:

  1. 资源消耗高

    • 线程开销: 每个线程都需要一定的系统资源,包括线程栈、线程控制块等。创建大量线程会消耗更多的内存和 CPU 资源,可能导致系统性能下降
    • 上下文切换: 线程过多会导致频繁的上下文切换,这会消耗 CPU 时间,影响应用程序的整体性能
  2. 可扩展性差

    • 当并发连接数目增加时,系统可能会达到最大线程数的限制,无法再创建新线程。这种情况下,新的请求可能被拒绝,降低了系统的可用性
    • 在高并发环境下,创建和管理大量线程的复杂性增加,会增加开发和维护成本
  3. 负载均衡问题

    • 在某些情况下,同一时间处理的请求可能会导致某些线程长时间处于活动状态,而其他线程却处于空闲状态,从而造成资源的不均匀分配
  4. 设计复杂性

    • 需要处理多线程带来的同步和共享数据的问题,这会增加代码的复杂性和潜在的错误风险
  5. 延迟

    • 对于短暂的小请求,线程的创建和销毁可能比请求本身的处理时间还要长,因此会引入不必要的延迟

⚠️ 总结: 一请求一线程是非常不利于大并发的,只能做到并发量 1000 左右(而且会很卡),这就引入了 IO 多路复用。

2.1.1.2.5 IO多路复用

一请求一线程的处理方案在高并发、大量请求的情况下,会消耗大量的资源,也会有负载均衡的问题和重复多次创建线程的过程,这很浪费 CPU 时间。

网络 IO 处理有两类:

  1. accept -> listenfd
  2. recv/send -> clientfd

① select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fd_set rfds, rset, wset, errset;  // fd的集合  
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds); // 设置
int max_fd = sockfd; // 用于对fd_set做遍历的最大个数

while (1) {
rset = rfds;
// 需要传入 是否可读 / 是否可写?
int nready = select(max_fd + 1, &rset, &wset, &errset, NULL);
// 返回值为就绪的总数量
// 该函数有五个参数:
// 1. 最大的fd
// 2. 可读集合
// 3. 可写集合(一般为NULL)
// 4. 错误集合(一般为NULL)
// 5. 超时时间(一般为NULL)

// 例如select返回 rset 俩个 wset 3个,nready = 5
// 每次调用都会将fd的集合拷贝到内存中去,循环判断IO有没有就绪,判断三个集合

char buffer[1024] = {0};

if (FD_ISSET(sockfd, &rset)) { // sockfd有没有被设置?
sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr *)&clientaddr, &len);

std::cout << "clientfd: " << clientfd
<< " ip:" << inet_ntoa(clientaddr.sin_addr)
<< " port:" << ntohs(clientaddr.sin_port) << std::endl;

FD_SET(clientfd, &rfds);
max_fd = clientfd > max_fd ? clientfd : max_fd; // 可能会回收所以取他们的最大值
}

// recv
for (int i = sockfd + 1; i <= max_fd; i++) {
if (FD_ISSET(i, &rset)) {
int ret = recv(i, buffer, 1024, 0);

if (ret == 0) {
printf("client disconnect:%d \n", i);
close(i); // close后应该要在fd_set中清空的
FD_CLR(i, &rset);
continue;
}

std::cout << string(buffer) << std::endl;
}
}
}

fd_set 是什么?

  • 是一个比特位集合,中间默认定义为 1024 位

Select 的特点:

  1. 每次调用需要把所有 fd_set 集合从用户空间 copy 到内核空间,如果 fd 的量很大的话,拷贝的消耗会很大
  2. max_fd,遍历每个 fd,如果 fd 的量很大的话,遍历的消耗会很大
1
for (i = 0; i < max_fd; i++);  // 类似于这样

💡 总结: 实现了 IO 多路复用,但缺点也很明显——参数太多!

② poll

在实现了 IO 多路复用后,还简化了 select 的流程。

在 poll 里面用到的结构体:

1
2
3
4
5
6
/* Data structure describing a polling request. */
struct pollfd {
int fd; // 对应的fd值
short int events; // 关心的events
short int revents; // 实际上发生了可读的
};

简单的 poll 服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// poll:
struct pollfd fds[1024] = {0}; // 创建poll_set
fds[sockfd].fd = sockfd;
fds[sockfd].events = POLLIN;

int max_fd = sockfd; // 也可以用1024,但底层每次就会循环1024个了

while (1) {
int nready = poll(fds, max_fd + 1, -1); // -1是一直阻塞,将fds拷贝到内核,内核通过循环判断poll是否已经就绪

if (fds[sockfd].revents & POLLIN) { // 只要有一位为1:POLLIN:位 0000 0001
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
int clientfd = accept(sockfd, (struct sockaddr *)&clientaddr, &len);

std::cout << "clientfd: " << clientfd
<< " ip:" << inet_ntoa(clientaddr.sin_addr)
<< " port:" << ntohs(clientaddr.sin_port) << std::endl;

max_fd = max_fd > clientfd ? max_fd : clientfd;
fds[clientfd].fd = clientfd;
fds[clientfd].events = POLLIN;
}

for (int i = 0; i < max_fd + 1; i++) {
if (fds[i].revents & POLLIN) {
// RECV:
char buffer[1024] = {0};
int count = recv(i, buffer, 1024, 0);

if (count == 0) {
printf("client disconnect:%d\n", i);
fds[i].fd = -1;
fds[i].events = 0;
close(i);
}

printf("%s\n", buffer);
}
}
}

poll 有没有独特的使用场景?

  1. 简单性: poll 的接口相对简单,适合用于小范围的文件描述符监视。对于不需要处理大量并发连接的简单应用,poll 更容易理解和实现
  2. 非特定操作系统支持: poll 是 POSIX 标准的一部分,因此在许多 UNIX 和类 UNIX 系统上都能使用,具有更好的跨平台兼容性
  3. 适用于文件描述符数量较少的场景: 当需要监视的文件描述符数量相对较少时(比如少于 100 个),poll 的性能可以与 epoll 相当,且实现起来更为简单
  4. 动态添加和移除观察文件描述符: 尽管 epoll 允许更高效地处理大量文件描述符的动态添加和删除,但在某些情况下,比如需要频繁变化的文件描述符集,使用 poll 的灵活性可能更好
  5. 支持从未就绪状态开始: poll 可以在任何时候返回未准备好的文件描述符,epoll 在某些情况下必须提前注册

③ epoll

epoll 的重要程度:在 Linux 2.4 以前,没有 Linux 做服务器的,也没有云主机。当时的 server 都是用 Windows/Unix。而现在云主机基本都用的 Linux 系统。核心原因是因为 Linux 在 2.6 时引入了 epoll,使得 server 端做到能将 IO 的数量做到更多。但 select 和 poll 不行,因为他们会将对应的 set 拷贝进去,并必须逐个遍历,极度地消耗内存和性能。

接口:

1
2
3
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event * _Nullable event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

结构体:

1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data {
void *ptr;
int fd; // 对应的fd,貌似只用到了这个
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */ // 用于规定模式,EPOLLIN?
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;

代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// int epoll_create(int size);
// size:最开始是一次性最多就绪的数量,但后来从数组改成了列表,size没有了作用。只要不等于0,就效果相同,目的是兼容过去
int epfd = epoll_create(1);

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockfd;

sockaddr client_addr;
socklen_t len = sizeof(client_addr);

epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);

while (1) {
struct epoll_event events[1024] = {0};
int nready = epoll_wait(epfd, events, 1024, -1);

for (int i = 0; i < nready; i++) {
int confd = events[i].data.fd;

if (confd == sockfd) { // 有新的链接来了
int clientfd = accept(sockfd, (sockaddr*)&client_addr, &len);
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);

printf("accept finished:%d\n", clientfd);
}
else if (events[i].events & EPOLLIN) { // 有新的消息来了
char buffer[1024] = {0};
int count = recv(confd, buffer, 1024, 0);

if (count == 0) {
printf("client disconnect!\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, NULL);
close(confd);
continue;
}

printf("clientfd: %d msg=%s", confd, buffer);
send(confd, buffer, count, 0);
printf("send\n");
}
}
}

epoll 为什么没有遍历?

  1. 事件驱动模型:

    • epoll 使用事件通知机制,只返回那些状态发生变化的文件描述符。这样,程序只需处理实际有事件发生的描述符,而不是遍历所有可能的描述符
  2. 高效的内核实现:

    • epoll 在内核中维护一个红黑树来跟踪注册的文件描述符,以及一个双向链表来管理就绪的文件描述符。这种数据结构使得插入、删除和查找操作都非常高效,不需要遍历所有描述符
  3. 减少系统调用开销:

    • 通过 epoll_wait,用户空间程序只需一次系统调用就能获取所有就绪的文件描述符,避免了像 selectpoll 那样需要多次调用来检查每个描述符的状态

相比较于 select 而言,epoll 的大并发优势在哪里?

  • 在 100w 并发时,100w IO 是慢慢积累起来的,而不是一次性全部拷贝进去。有 IO 事件只需要处理就绪事件
  • 而 select 是每次都将全部的 IO 都拷贝进去
  • 微信:100w 同时在线,并不代表是 100w 人同时发消息
  • 就绪是我们需要处理的事件,而不是有多少人在线就得一直处理多少事件,而是谁发消息就处理谁的事件