从零到负一

Socket编程基础5 - epoll实战

2020/05/15

这篇文章主要用于复习epoll()的使用,通过一个简单的聊天室例子来学习、总结epoll()。在前面两章,我们学习了多路复用的概念以及函数API的使用。这一章,我们将通过实例来进一步搞懂epoll()的使用以及可能遇到的坑。

程序的基本结构

这个例子采用简单的C/S模型,支持一个服务器,多个客户端,单进程、多路复用。服务器端有1个listen的socket,每连接一个客户端生成一个客户端socket;在客户端,有1个socket、1个stdin和1个stdout。客户端发信息给服务器后,服务器将该信息转发给所有客户端。

客户端代码分析

下面的代码在必要的地方都添加了注释,我这里只做一些简单的解释,下面是一些之前已经知道的知识点:

  1. 在这个例子中,我们使用的是epollET模式,这个模式相对于普通的LT有更高的效率;
  2. 在多路复用的设计中,一般我们都需要将socket设置为非阻塞。因为这样的话socket就不会在部分函数(accept(), connect(), read, write等)处阻塞epoll处理其他socket;
  3. 对于非阻塞的socket,在读写的时候,如果内核中的buffer为空(读)/满(写),那么将返回EAGAINEWOULDBLOCK,这样我们就知道是否读写完成(如果读取的数据需要通过多次读完/写满内核buffer,那么我们还需要添加额外功能);
  4. 对于ET模式,我们往往需要在read(), accept()等函数外添加while(1)。对于read(),如果一次操作没有读完buffer,下次epoll_wait()就不会再触发EPOLLIN事件了。因此我们需要在一次事件中读完所有buffer中的数据。对于accept()道理类似,我们需要接受某一时间段所有的socket连接请求。

除了上面这些知道的,下面也有一些坑:

  1. sockAddr.sin_port = htons(port);将host的端口地址转换成network的端口地址,一定要用htons而不能用htonl等函数;
  2. 我们需要添加EPOLLRDHUPepoll_event后,才能收到相关事件;同时,我们需要在文件最前面添加#define _GNU_SOURCE 1
  3. stdin是阻塞的,因此不能在外层添加while(1),也不能通过是否收到EAGAIN或者EWOULDBLOCK来判断读写的完成;
  4. epoll_wait()返回的事件顺序和添加的事件顺序无关,我们不能假设事件以某种顺序返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#define _GNU_SOURCE 1
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>

#define MAX_EVENT_NUM 2
#define BUF_SIZE 128

char rdBuf0[BUF_SIZE], rdBuf1[BUF_SIZE];

int setnonblocking(int fd) {
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

int main(int argc, char **argv)
{
int32_t status;

if (argc <= 1) {
printf("Please enter 2 arguments...\n");
return argc;
}

// configure socket
const char *ipAddr = "127.0.0.1";
uint32_t port = atoi(argv[1]);
struct sockaddr_in sockAddr; // netinet/in.h
bzero(&sockAddr, sizeof(sockAddr)); // string.h
sockAddr.sin_family = AF_INET;
sockAddr.sin_port = htons(port);
inet_pton(AF_INET, ipAddr, &sockAddr.sin_addr); // arpa/inet.h
int32_t sockFd = socket(AF_INET, SOCK_STREAM, 0);
if (sockFd < 0) {
printf("sockFd < 0...\n");
return sockFd;
}
status = connect(sockFd, (sockaddr *)&sockAddr, sizeof(sockAddr));
if (status < 0) {
printf("connect error...\n");
return status;
}
setnonblocking(sockFd);

// configure epoll
struct epoll_event inEvents, outEvents, rdyEvents[2];
int32_t efFd = epoll_create(1);
inEvents.data.fd = STDIN_FILENO;
// To receive EPOLLRDHUP event, we need to add it to epoll_event.
inEvents.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET;
outEvents.data.fd = sockFd;
outEvents.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET;
epoll_ctl(efFd, EPOLL_CTL_ADD, STDIN_FILENO, &inEvents);
epoll_ctl(efFd, EPOLL_CTL_ADD, sockFd, &outEvents);

while (1) {
int32_t rdyFdCnt = epoll_wait(efFd, rdyEvents, MAX_EVENT_NUM, -1);
int32_t stdinClosed = 0;
if (rdyFdCnt < 0) {
printf("epoll_wait fail...\n");
return rdyFdCnt;
}

for (int i = 0; i < rdyFdCnt; i++) {
// case 1: error
if (rdyEvents[i].events & EPOLLERR) {
printf("epoll_wait error in fds = %d...\n", rdyEvents[i].data.fd);
return EPOLLERR;
// case 2: server is closed
} else if (rdyEvents[i].events & EPOLLRDHUP) {
close(rdyEvents[i].data.fd);
return 0;
// case 3: receive data from stdin or server
} else if (rdyEvents[i].events & EPOLLIN) {
// data from STDIN_FILENO
if (rdyEvents[i].data.fd == STDIN_FILENO) {
// When we read from stdin, there is no need to add while(1) loop(the data from
// stdin is limited). Because stdin is not non-blocking, even the buffer is empty
// it will not return EAGAIN or EWOULDBLOCK error code.
int rdCnt = read(STDIN_FILENO, rdBuf0, BUF_SIZE);
// read error
if (rdCnt < 0) {
printf("read from stdin error...\n");
// This flag shows stdin is closed.
stdinClosed = 1;
// We only close the write side of socket, so the socket can still
// read data. When the socket receives 0(server close the socket),
// the communication is done and we close the socket.
shutdown(sockFd, SHUT_WR);
// epoll_wait will not be triggered when read data from stdin.
epoll_ctl(efFd, EPOLL_CTL_DEL, STDIN_FILENO, &inEvents);
break;
// read EOF
} else if (rdCnt == 0) {
// same as error case
stdinClosed = 1;
shutdown(sockFd, SHUT_WR);
epoll_ctl(efFd, EPOLL_CTL_DEL, STDIN_FILENO, &inEvents);
break;
// read data
} else {
// To make things simple, I directly write data to sockFd.
write(sockFd, rdBuf0, rdCnt);
break;
}
}

// data from socket which is connected to server
if (rdyEvents[i].data.fd == sockFd) {
// In epoll ET mode, read from socket should in while(1) loop.
while (1) {
int rdCnt = read(sockFd, rdBuf1, BUF_SIZE);
// read error
if (rdCnt < 0) {
if ((errno != EAGAIN) && (errno != EPOLLWRNORM)) {
printf("read from sockFd error...\n");
close(sockFd);
return rdCnt;
// Read buffer is empty.
} else {
break;
}
// read 0, disconnect from server
} else if (rdCnt == 0) {
close(sockFd);
epoll_ctl(efFd, EPOLL_CTL_DEL, sockFd, &outEvents);
return ((stdinClosed == 1) ? 0 : -1);
} else {
// To make things simple, I directly write data to sockFd.
write(STDOUT_FILENO, rdBuf1, rdCnt);
}
}
}
}
}
}

return 0;
}

服务器端代码分析

服务器端的代码要比客户端多一些,但其中最重要的一些知识点我已经在上一节讲了。这里讲讲上一节没有的:

  1. 对于write()操作,在本项目中,内核中的buffer是不会被写满的。因此我们不能用EAGAIN或者EWOULDBLOCK来判断是否写完,而应该通过比较要写的和已经写了的字节数来进行判断;

  2. users[i]connectedUser[i]的基本操作。

    图1. users[i]和connectedUser[i]的基本操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#define _GNU_SOURCE 1
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>

#define MAX_USER_NUM 10
#define MAX_FDS_NUM 1024
#define MAX_BUF_SIZE 512
#define MAX_EVENT_NUM 10

typedef struct _userInf {
struct sockaddr_in addr;
char wrBuf[MAX_BUF_SIZE];
char rdBuf[MAX_BUF_SIZE];
} userData;

int setnonblocking(int fd) {
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}

int main(int argc, char **argv)
{
int32_t status;

if (argc <= 1) {
printf("Please enter 2 arguments...\n");
return argc;
}

// configure listen socket
const char *ipAddr = "127.0.0.1";
uint32_t port = atoi(argv[1]);
struct sockaddr_in listenSockAddr; // netinet/in.h
bzero(&listenSockAddr, sizeof(listenSockAddr)); // string.h
listenSockAddr.sin_family = AF_INET;
listenSockAddr.sin_port = htons(port);
inet_pton(AF_INET, ipAddr, &listenSockAddr.sin_addr); // arpa/inet.h
int32_t listenFd = socket(AF_INET, SOCK_STREAM, 0);
if (listenFd < 0) {
printf("listenFd = %d...\n", listenFd);
return listenFd;
}
status = bind(listenFd, (sockaddr *)&listenSockAddr, sizeof(listenSockAddr));
if (status < 0) {
printf("bind status = %d...\n", status);
return status;
}
status = listen(listenFd, 20);
if (status < 0) {
printf("listen status = %d...\n", status);
return status;
}
setnonblocking(listenFd);

// configure epoll
struct epoll_event eplistenEvents, recEvents[MAX_EVENT_NUM];
int32_t epFd = epoll_create(1);
eplistenEvents.data.fd = listenFd;
eplistenEvents.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET;
status = epoll_ctl(epFd, EPOLL_CTL_ADD, listenFd, &eplistenEvents);
if (status < 0) {
printf("epoll_ctl status = %d...\n", status);
return status;
}

userData *users = new userData[MAX_FDS_NUM];
users[listenFd].addr = listenSockAddr;
int connectedUser[MAX_USER_NUM];
int connectedUserNum = 0;
for (int i = 0; i < MAX_USER_NUM; i++) {
connectedUser[i] = -1;
}

while (1) {
int32_t rdyFdCnt = epoll_wait(epFd, recEvents, MAX_EVENT_NUM, -1);
if (rdyFdCnt < 0) {
printf("epoll_wait fail...\n");
return rdyFdCnt;
}

for (int i = 0; i < rdyFdCnt; i++) {
uint32_t events = recEvents[i].events;
// case 1: error
if (events & EPOLLERR) {
printf("error in recEvents...\n");
return EPOLLERR;
// case 2: listen socket
} else if (recEvents[i].data.fd == listenFd) {
while (1) {
struct sockaddr cliSockAddr;
socklen_t addrLen = sizeof(cliSockAddr);
bzero(&cliSockAddr, addrLen);
int cliFd = accept(listenFd, &cliSockAddr, &addrLen);
if (cliFd < 0) {
if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
printf("accept error...\n");
return cliFd;
// We set socket descriptor to non-blocking. So if there is no
// incoming connect request, EAGAIN or EWOULDBLOCK error code will return.
} else {
printf("all requests have been accepted...\n");
break;
}
}
// We need to first connects with client and then send message to it.
// Finally, we close the socket to disconnect the communication.
if (connectedUserNum == MAX_USER_NUM) {
const char *msg = "MAX_USER_NUM is reached...\n";
printf("%s", msg);
send(cliFd ,msg, strlen(msg), 0);
close(cliFd);
break;
}
// Set newly connected socket descriptor to non-blocking.
setnonblocking(cliFd);
// configure epoll events
struct epoll_event epCliEvents;
epCliEvents.data.fd = cliFd;
epCliEvents.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET;
status = epoll_ctl(epFd, EPOLL_CTL_ADD, cliFd, &epCliEvents);
if (status < 0) {
printf("epoll_ctl for new clients error...\n");
return status;
}
// update users and connectedUser
users[cliFd].addr = *(struct sockaddr_in *)&cliSockAddr;
connectedUser[connectedUserNum] = cliFd;
connectedUserNum++;
}
// case 3: client disconnects socket communication
} else if (events & EPOLLRDHUP) {
int closeFd = recEvents[i].data.fd;
printf("fd = %d left...\n", closeFd);
close(closeFd);
bzero(&users[closeFd], sizeof(userData));
connectedUser[closeFd] = connectedUser[connectedUserNum - 1];
connectedUser[connectedUserNum - 1] = -1;
connectedUserNum--;
// case 4: ready to read
} else if (events & EPOLLIN) {
int cliFd = recEvents[i].data.fd;
bzero(users[cliFd].rdBuf, MAX_BUF_SIZE);
while (1) {
ssize_t rdLen = read(cliFd, users[cliFd].rdBuf, MAX_BUF_SIZE);
if (rdLen < 0) {
if ((errno != EAGAIN) && (errno != EWOULDBLOCK)) {
printf("socket read error...\n");
close(cliFd);
// Update users and connectedUser information.
bzero(&users[cliFd], sizeof(users[0]));
for (int i = 0; i < MAX_USER_NUM; i++) {
if (connectedUser[i] == cliFd) {
connectedUser[i] = connectedUser[connectedUserNum - 1];
connectedUser[connectedUserNum - 1] = -1;
break;
}
}
connectedUserNum--;
// read buffer is empty
} else {
break;
}
} else if (rdLen > 0) {
// Send received data to all clients.
for (int i = 0; i < connectedUserNum; i++) {
struct epoll_event epCliEvents;
epCliEvents.data.fd = connectedUser[i];
// To allow epoll_wait() be triggered by write ready, we need to
// reconfigure the epoll.
epCliEvents.events = EPOLLOUT | EPOLLERR | EPOLLET;
status = epoll_ctl(epFd, EPOLL_CTL_MOD, connectedUser[i], &epCliEvents);
if (status < 0) {
printf("epoll_ctl for existing clients error...\n");
close(connectedUser[i]);
// Update users and connectedUser information.
bzero(&users[cliFd], sizeof(users[0]));
connectedUser[i] = connectedUser[connectedUserNum - 1];
connectedUser[connectedUserNum - 1] = -1;
connectedUserNum--;
// ith item has been updated, we need to check it again.
i--;
continue;
}
// Copy read data to all connected users' write buffer.
memcpy(users[connectedUser[i]].wrBuf,
users[cliFd].rdBuf,
strlen(users[cliFd].rdBuf));
}
} else {
// should not go there...
printf("read 0, disconnect from client...\n");
}
}
// case 5: ready to write
} else if (events & EPOLLOUT) {
int outputFd = recEvents[i].data.fd;
size_t wrLen = MAX_BUF_SIZE;
ssize_t sentCnt;
// For write operation, when write buffer is full, send() will return EAGAIN or
// EWOULDBLOCK error code. In this case, we will never make the write buffer
// full. So we need to add code to handle the case that all data has been written.
while (1) {
if (wrLen == 0) {
struct epoll_event epCliEvents;
epCliEvents.data.fd = outputFd;
// When write is done, we need to change EPOLLOUT to EPOLLIN for next read.
epCliEvents.events = EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLET;
int status = epoll_ctl(epFd, EPOLL_CTL_MOD, outputFd, &epCliEvents);
if (status < 0) {
printf("epoll_ctl error...\n");
return status;
}
bzero(users[outputFd].wrBuf, MAX_BUF_SIZE);
break;
}
sentCnt = send(outputFd, users[outputFd].wrBuf, wrLen, 0);
if (sentCnt < 0) {
printf("send data error...\n");
continue;
}
wrLen -= sentCnt;
}
}
}
}

return 0;
}
CATALOG
  1. 1. 程序的基本结构
  2. 2. 客户端代码分析
  3. 3. 服务器端代码分析