P
Parahat Melayev
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.
But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?
thanks,
parahat
------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/
int listenfd;
typedef struct {
pthread_t tid;
int client_count;
int clients[MAX_CLIENT_PER_THREAD];
} Thread;
pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;
Thread threads[MAX_THREAD];
void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
void *thread_init_func(void *arg)
{
int tid = (int) arg;
int readsocks;
int i;
char buffer[MAX_CLIENT_BUFFER];
char c;
int n;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset((int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));
memset((char *) &buffer, 0, sizeof(buffer));
while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1); /* <-- it works ??? :-| */
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients != 0)
{
n = recv(threads[tid].clients, buffer, MAX_CLIENT_BUFFER, 0);
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset((char *) &buffer, 0, strlen(buffer));
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients);
#endif
}
}
else {
#ifdef DEBUG
printf("%d bytes received from %d - %s\n", n,
threads[tid].clients, buffer);
#endif
send(threads[tid].clients, buffer, strlen(buffer), 0);
memset((char *) &buffer, 0, strlen(buffer));
}
}
}
}
}
int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads.client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}
int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;
signal(SIGPIPE, SIG_IGN);
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}
listen(listenfd, 1024);
/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads.tid, NULL, &thread_init_func, (void *)
i);
threads.client_count = 0;
}
for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);
pthread_mutex_lock(&new_connection_mutex);
choosen = choose_thread();
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients == 0)
{
#ifdef DEBUG
printf("before threads clifd\n");
#endif
threads[choosen].clients = clifd;
#ifdef DEBUG
printf("after threads clifd\n");
#endif
threads[choosen].client_count++;
break;
}
}
#ifdef DEBUG
printf("choosen: %d\n", choosen);
for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads.client_count);
}
#endif
pthread_mutex_unlock(&new_connection_mutex);
}
if(errno)
{
printf("errno: %d", errno);
}
return 0;
}
===========================================================
--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define MAX_CLIENT 1000
/*#define DEBUG*/
int PORT = 3355;
char *IP = "192.168.222.22";
enum {
U_INT_SIZE = sizeof(unsigned int)
};
int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;
void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
void cli_con()
{
struct sockaddr_in srv;
struct hostent *h_name;
int clifd;
int n;
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &srv.sin_addr);
if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("ERROR clifd\n");
exit(1);
}
if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
printf("ERROR connect\n");
exit(1);
}
nonblock(clifd);
for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
{
if(list[n] == 0) {
#ifdef DEBUG
printf("connected %d\n", clifd);
#endif
list[n] = clifd;
clifd = -1;
}
}
if(clifd != -1) {
printf("list FULL");
}
}
void set_list() {
int n;
FD_ZERO(&fdset);
for(n = 0; n < MAX_CLIENT; n++)
{
if(list[n] != 0) {
FD_SET(list[n], &fdset);
if(list[n] > highfd)
highfd = list[n];
}
}
}
void recv_data(int num)
{
int n;
char buffer[1024];
n = recv(list[num], buffer, 1023, 0);
if(n > 0)
{
#ifdef DEBUG
printf("%d bytes from %d\n", n, list[num]);
printf("%s", buffer);
#endif
}
else if(n < 0)
{
if(errno != EAGAIN)
printf("client %d disconnected\n", list[num]);
}
else if(n == 0)
{
printf("client %d disconnected\n", list[num]);
}
}
void send_data(int num)
{
int n;
char buffer[1024];
sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
{
#ifdef DEBUG
printf("%d bytes sent from %d\n", n, list[num]);
#endif
}
}
void scan_clients()
{
#ifdef DEBUG
printf("scan_clients\n");
#endif
int num;
for(num = 0; num < MAX_CLIENT; num++) {
if(FD_ISSET(list[num], &fdset))
recv_data(num);
}
for(num = 0; num < MAX_CLIENT; num++)
send_data(num);
}
int main()
{
int readsocks;
int i=0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10;
while(i < MAX_CLIENT)
{
cli_con();
i++;
}
i=0;
while(i < MAX_CLIENT)
{
printf("list[%d] = %d\n",i,list);
i++;
}
i = 0;
while(1)
{
set_list();
readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
if(readsocks < 0) {
perror("select\n");
exit(1);
}
/*printf("else scan_clients\n");*/
scan_clients();
#ifdef DEBUG
sleep(4);
#endif
}
return 0;
}
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.
But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?
thanks,
parahat
------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/
int listenfd;
typedef struct {
pthread_t tid;
int client_count;
int clients[MAX_CLIENT_PER_THREAD];
} Thread;
pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;
Thread threads[MAX_THREAD];
void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
void *thread_init_func(void *arg)
{
int tid = (int) arg;
int readsocks;
int i;
char buffer[MAX_CLIENT_BUFFER];
char c;
int n;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset((int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));
memset((char *) &buffer, 0, sizeof(buffer));
while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1); /* <-- it works ??? :-| */
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients != 0)
{
n = recv(threads[tid].clients, buffer, MAX_CLIENT_BUFFER, 0);
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset((char *) &buffer, 0, strlen(buffer));
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients);
#endif
}
}
else {
#ifdef DEBUG
printf("%d bytes received from %d - %s\n", n,
threads[tid].clients, buffer);
#endif
send(threads[tid].clients, buffer, strlen(buffer), 0);
memset((char *) &buffer, 0, strlen(buffer));
}
}
}
}
}
int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads.client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}
int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;
signal(SIGPIPE, SIG_IGN);
if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);
if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}
listen(listenfd, 1024);
/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads.tid, NULL, &thread_init_func, (void *)
i);
threads.client_count = 0;
}
for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);
pthread_mutex_lock(&new_connection_mutex);
choosen = choose_thread();
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients == 0)
{
#ifdef DEBUG
printf("before threads clifd\n");
#endif
threads[choosen].clients = clifd;
#ifdef DEBUG
printf("after threads clifd\n");
#endif
threads[choosen].client_count++;
break;
}
}
#ifdef DEBUG
printf("choosen: %d\n", choosen);
for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads.client_count);
}
#endif
pthread_mutex_unlock(&new_connection_mutex);
}
if(errno)
{
printf("errno: %d", errno);
}
return 0;
}
===========================================================
--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>
#define MAX_CLIENT 1000
/*#define DEBUG*/
int PORT = 3355;
char *IP = "192.168.222.22";
enum {
U_INT_SIZE = sizeof(unsigned int)
};
int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;
void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}
void cli_con()
{
struct sockaddr_in srv;
struct hostent *h_name;
int clifd;
int n;
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &srv.sin_addr);
if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("ERROR clifd\n");
exit(1);
}
if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
printf("ERROR connect\n");
exit(1);
}
nonblock(clifd);
for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
{
if(list[n] == 0) {
#ifdef DEBUG
printf("connected %d\n", clifd);
#endif
list[n] = clifd;
clifd = -1;
}
}
if(clifd != -1) {
printf("list FULL");
}
}
void set_list() {
int n;
FD_ZERO(&fdset);
for(n = 0; n < MAX_CLIENT; n++)
{
if(list[n] != 0) {
FD_SET(list[n], &fdset);
if(list[n] > highfd)
highfd = list[n];
}
}
}
void recv_data(int num)
{
int n;
char buffer[1024];
n = recv(list[num], buffer, 1023, 0);
if(n > 0)
{
#ifdef DEBUG
printf("%d bytes from %d\n", n, list[num]);
printf("%s", buffer);
#endif
}
else if(n < 0)
{
if(errno != EAGAIN)
printf("client %d disconnected\n", list[num]);
}
else if(n == 0)
{
printf("client %d disconnected\n", list[num]);
}
}
void send_data(int num)
{
int n;
char buffer[1024];
sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
{
#ifdef DEBUG
printf("%d bytes sent from %d\n", n, list[num]);
#endif
}
}
void scan_clients()
{
#ifdef DEBUG
printf("scan_clients\n");
#endif
int num;
for(num = 0; num < MAX_CLIENT; num++) {
if(FD_ISSET(list[num], &fdset))
recv_data(num);
}
for(num = 0; num < MAX_CLIENT; num++)
send_data(num);
}
int main()
{
int readsocks;
int i=0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10;
while(i < MAX_CLIENT)
{
cli_con();
i++;
}
i=0;
while(i < MAX_CLIENT)
{
printf("list[%d] = %d\n",i,list);
i++;
}
i = 0;
while(1)
{
set_list();
readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
if(readsocks < 0) {
perror("select\n");
exit(1);
}
/*printf("else scan_clients\n");*/
scan_clients();
#ifdef DEBUG
sleep(4);
#endif
}
return 0;
}