Missing something or do I just not understand epoll?
Full disclosure, I'm a student and this is an assignment. I've been working on it for over a week almost non-stop (in addition to previous time spent) and I can't figure out what I'm doing wrong. My server keeps hanging on epoll_wait after only a "few" recvs are done ("few" because I'm anticipating several GB of data and I'm getting only a few dozen MB). I don't think there's anything wrong with how my client works, because it's working just fine with my select and multi-threaded servers. Please take a quick look and let me know if there's anything that jumps out at you as being the cause of my problem.
The basic idea of the client/server is to bombard the server with connections (10k+) and transfer a given amount of data across several times. This epoll server is having trouble with 2000, when my multi-threaded server handled just shy of the 10k goal.
I am NOT asking for you to do my assignment for me (I'm nearly done) I just need help figuring out what I'm doing wrong here. Thanks in advance for any help you can offer :)
1 #include "common.h"
2 #include <sys/epoll.h>
4 uint16_t ready[MAX_CONNS];
5 uint16_t next;
6 pthread_mutex_t mutex;
8 void *worker_thread(void *param) {
9 int my_sock, pos;
10 struct conn_params *conn_ps = (struct conn_params *)param;
12 while (1) {
13 pthread_mutex_lock(&mutex);
15 while (1) {
16 if (next == MAX_CONNS) {
17 printf("ballsn");
18 next = 4;
19 }
21 if (ready[next] != 0) {
22 pos = next;
23 my_sock = ready[pos];
24 next++;
25 break;
26 }
27 }
29 pthread_mutex_unlock(&mutex);
30 /* handle recv/send */
31 if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */
32 shutdown(my_sock, SHUT_RDWR);
33 close(my_sock);
34 serv_stats.active_connections--;
35 }
36 ready[pos] = 0;
37 /* print_conn_stats(&conn_ps[my_sock]);*/
38 }
39 }
41 void *add_client_thread(void *param) {
42 struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param;
43 struct sockaddr client;
44 struct epoll_event event;
45 socklen_t client_len;
46 int new_sock, ret;
47 char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV];
49 bzero(&client, sizeof(struct sockaddr));
52 while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) {
53 set_nonblock(new_sock);
54 event.data.fd = new_sock;
55 if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) {
56 perror("epoll_ctl");
57 printf("%un", new_sock);
58 continue;
59 }
61 bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params));
62 eat->conn_ps[new_sock].sock = new_sock;
63 if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) {
64 gai_strerror(ret);
65 }
67 update_server_stats();
68 printf("added clientn");
69 }
71 if (errno != EAGAIN) {
72 perror("Couldn't accept connection");
73 }
75 pthread_exit(NULL);
76 }
78 int main(int argc, char **argv) {
79 char opt, *port = NULL;
80 struct addrinfo hints, *results, *p;
81 int listen_sock = new_tcp_sock(), nfds, i, ret;
82 int fd_epoll, next_avail = 4;
83 struct conn_params conn_ps[MAX_CONNS];
84 struct epoll_event evs[MAX_CONNS];
85 struct epoll_event event;
86 struct epoll_accept_thread eat;
87 pthread_t thread;
89 while ((opt = getopt(argc, argv, ":l:")) != -1) {
90 switch (opt) {
91 case 'l': /* port to listen on */
92 port = optarg;
93 break;
94 case '?': /* unknown option */
95 fprintf(stderr, "The option -%c is not supported.n", opt);
96 exit(1);
97 case ':': /* required arg not supplied for option */
98 fprintf(stderr, "The option -%c requires an argument.n", opt);
99 exit(1);
100 }
101 } /* command line arg processing done */
103 if (port == NULL) {
104 fprintf(stderr, "You must provide the port to listen on (-l).n");
105 exit(1);
106 }
108 signal(SIGINT, handle_interrupt);
110 bzero(&hints, sizeof(struct addrinfo));
111 hints.ai_family = AF_INET;
112 hints.ai_socktype = SOCK_STREAM;
113 hints.ai_flags = AI_PASSIVE;
115 set_nonblock(listen_sock);
116 set_reuseaddr(listen_sock);
118 if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) {
119 gai_strerror(ret);
120 exit(1);
121 }
123 for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */
124 if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) {
125 perror("Bind failed");
126 } else {
127 break;
128 }
129 }
131 if (p == NULL) { /* we were unable to connect to anything */
132 fprintf(stderr, "Unable to bind to the specified port. Exiting...n");
133 exit(1);
134 }
136 freeaddrinfo(results);
138 if (listen(listen_sock, 5) == -1) {
139 perror("Listen failed");
140 exit(1);
141 }
143 /* everything is set up. method-specific code goes below */
145 start_server_stats();
146 next = 4;
148 if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) {
149 perror("epoll_create");
150 exit(1);
151 }
154 event.data.fd = listen_sock;
155 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) {
156 perror("epoll_ctl");
157 exit(1);
158 }
160 signal(SIGPIPE, SIG_IGN);
161 bzero(ready, MAX_CONNS * sizeof(uint16_t));
162 pthread_mutex_init(&mutex, NULL);
164 for (i = 0; i < 5; i++) { /* five workers should be enough */
165 pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps);
166 }
168 while (1) {
169 if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) {
170 continue;
171 }
172 for (i = 0; i < nfds; i++) { /* loop through all FDs */
173 if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */
174 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/
175 close(evs[i].data.fd);
176 continue;
177 } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */
178 eat.listen_sock = listen_sock;
179 eat.fd_epoll = fd_epoll;
180 eat.conn_ps = conn_ps;
181 pthread_create(&thread, NULL, add_client_thread, (void *)&eat);
182 } else { /* inbound data */
183 while (ready[next_avail] != 0) {
184 next_avail++;
186 if (next_avail == MAX_CONNS) {
187 next_avail = 4;
188 }
189 }
190 ready[next_avail] = evs[i].data.fd;
191 } /* end inbound data */
192 } /* end iterating through FDs */
193 } /* end epoll_wait loop */
195 perror("epoll_wait");
197 return 0;
198 }
And here's the echo_recv function, as I assume someone's going to want to see that as well:
14 int echo_recv(struct conn_params *conn_p, int single) {
15 char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE];
16 int nread, nwrite, nsent = 0, i;
18 while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) {
19 /* create buffer of MULTIPLIER(int) times what was received */
20 for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) {
21 memcpy(buffer+(nread*i), client_buf, nread);
22 }
24 /* send the created buffer */
25 while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) {
26 nsent += nwrite;
27 }
29 conn_p->total_recvd += nread; /* update our stats for this conn */
30 conn_p->total_sent += nsent; /* update our status for this conn */
31 serv_stats.total_recvd += nread;
32 serv_stats.total_sent += nsent;
33 nsent = 0;
35 if (single) {
36 return 1;
37 }
38 }
40 if (nread == -1 && (errno & EAGAIN)) {
41 return 1;
42 }
44 if (nread == -1) {
45 perror("wtf?");
46 }
48 shutdown(conn_p->sock, SHUT_RDWR);
49 close(conn_p->sock);
51 return 0; /* recv failed */
52 }
Here are some thoughts:
is accessed. In your worker thread, you acquire a mutex to read it, however there are times when you modify this outside of the lock, additionally, you don't acquire this lock in your polling loop (main thread), you simply write to the array - this is plain wrong. pthread_join
) epoll_accept_thread
structure in this thread - and there is no locking around it. I would fix all the synchronization issues first, and that may then reveal other issues.
I wanted to post this in a comment above but it got much longer than it would allow:
Try implementing a simple epoll based server that is entirely asynchronous (steps)
This should remove any complexity you've added from having threading that could cause issues. This moves epoll back into the same sort of domain as select()
except that it is generally much faster. The whole idea of using an event library is to know when you can read/write instead of setting a socket to non-blocking, and trying to read from it/write to it.
You also never seem to check the return value from write()
which may have failed due to receiving a SIGPIPE (I know you ignored the signal, but you will still get an EAGAIN/EINTR errno).
The other thing I see is that you are doing a busy loop inside of your thread that is waiting for sockets to be ready. When you use select()
or epoll
in this case it is so that you are notified that there is something new, so you don't have to do a busy loop...
I am not exactly sure what you are attempting to accomplish, but your code is extremely inefficient.
What you could do, after implementing just a simple asynchronous example using the steps above is start up multiple worker threads that all listen (using epoll) for read
events on the listener
/ accept
socket and have each of the threads handle various connections (still using what I posted above).
上一篇: 哪种协议(FTP或HTTP)更适合下载/上传小文件或大文件?
下一篇: 缺少某些东西或者我只是不理解epoll?