multi-threaded server, pthreads & sleep

P

Parahat Melayev

I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?

thanks,
parahat


------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
pthread_t tid;
int client_count;
int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}

void *thread_init_func(void *arg)
{
int tid = (int) arg;

int readsocks;
int i;
char buffer[MAX_CLIENT_BUFFER];
char c;
int n;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset((int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));
memset((char *) &buffer, 0, sizeof(buffer));
while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1); /* <-- it works ??? :-| */

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients != 0)
{
n = recv(threads[tid].clients, buffer, MAX_CLIENT_BUFFER, 0);
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset((char *) &buffer, 0, strlen(buffer));
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
threads[tid].clients = 0;
threads[tid].client_count--;
memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients);
#endif
}
}
else {
#ifdef DEBUG
printf("%d bytes received from %d - %s\n", n,
threads[tid].clients, buffer);
#endif

send(threads[tid].clients, buffer, strlen(buffer), 0);
memset((char *) &buffer, 0, strlen(buffer));
}
}
}
}
}

int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads.client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}

int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;

signal(SIGPIPE, SIG_IGN);

if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);

if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}

listen(listenfd, 1024);


/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads.tid, NULL, &thread_init_func, (void *)
i);
threads.client_count = 0;
}


for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);

pthread_mutex_lock(&new_connection_mutex);

choosen = choose_thread();

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients == 0)
{
#ifdef DEBUG
printf("before threads clifd\n");
#endif
threads[choosen].clients = clifd;
#ifdef DEBUG
printf("after threads clifd\n");
#endif
threads[choosen].client_count++;
break;
}
}

#ifdef DEBUG
printf("choosen: %d\n", choosen);

for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads.client_count);
}
#endif

pthread_mutex_unlock(&new_connection_mutex);
}

if(errno)
{
printf("errno: %d", errno);
}

return 0;
}


===========================================================

--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>


#define MAX_CLIENT 1000
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}


void cli_con()
{
struct sockaddr_in srv;
struct hostent *h_name;
int clifd;
int n;

bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_port = htons(PORT);
inet_pton(AF_INET, IP, &srv.sin_addr);

if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("ERROR clifd\n");
exit(1);
}

if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
printf("ERROR connect\n");
exit(1);
}

nonblock(clifd);
for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
{
if(list[n] == 0) {
#ifdef DEBUG
printf("connected %d\n", clifd);
#endif
list[n] = clifd;
clifd = -1;
}
}

if(clifd != -1) {
printf("list FULL");
}
}

void set_list() {
int n;
FD_ZERO(&fdset);

for(n = 0; n < MAX_CLIENT; n++)
{
if(list[n] != 0) {
FD_SET(list[n], &fdset);
if(list[n] > highfd)
highfd = list[n];
}
}
}

void recv_data(int num)
{
int n;
char buffer[1024];
n = recv(list[num], buffer, 1023, 0);
if(n > 0)
{
#ifdef DEBUG
printf("%d bytes from %d\n", n, list[num]);
printf("%s", buffer);
#endif
}
else if(n < 0)
{
if(errno != EAGAIN)
printf("client %d disconnected\n", list[num]);
}
else if(n == 0)
{
printf("client %d disconnected\n", list[num]);
}
}


void send_data(int num)
{
int n;
char buffer[1024];
sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
{
#ifdef DEBUG
printf("%d bytes sent from %d\n", n, list[num]);
#endif
}
}

void scan_clients()
{
#ifdef DEBUG
printf("scan_clients\n");
#endif
int num;

for(num = 0; num < MAX_CLIENT; num++) {
if(FD_ISSET(list[num], &fdset))
recv_data(num);
}

for(num = 0; num < MAX_CLIENT; num++)
send_data(num);
}

int main()
{
int readsocks;
int i=0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10;

while(i < MAX_CLIENT)
{
cli_con();
i++;
}
i=0;
while(i < MAX_CLIENT)
{
printf("list[%d] = %d\n",i,list);
i++;
}
i = 0;
while(1)
{
set_list();
readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
if(readsocks < 0) {
perror("select\n");
exit(1);
}

/*printf("else scan_clients\n");*/
scan_clients();
#ifdef DEBUG
sleep(4);
#endif
}
return 0;
}
 
P

Pascal Bourguignon

I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?


I don't know, but instead of:
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients != 0)
{
n = recv(threads[tid].clients, buffer, MAX_CLIENT_BUFFER, 0);


I'd use select(2) or poll(2). That would probably avoid the problem
you're hiding with the sleep call.
 
P

Parahat Melayev

Pascal Bourguignon said:
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

But here I have a problem. I find a solution but it is not how it must
be... i think. When threads working without sleep(1) I can't get
response from server but when I put sleep(1) in thread function as you
will see in code, everything works fine and server can make echo
nearly for 40000 clients between 1 or 2 seconds. What am I doing wrong
here?


I don't know, but instead of:
for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[tid].clients != 0)
{
n = recv(threads[tid].clients, buffer, MAX_CLIENT_BUFFER, 0);


I'd use select(2) or poll(2). That would probably avoid the problem
you're hiding with the sleep call.



yes I programmed using select firstly, actually. but it is not doing
what it is supposed to do, when I connect 1000 client at once.

here is the code.

------ selectserver.c -----------------------
$ cc selectserver.c -o selectserver -lpthread
---------------------------------------------

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 100
#define MAX_THREAD 100
#define PORT 3355
/*#define DEBUG*/
#define HIGHFD MAX_CLIENT_PER_THREAD * MAX_THREAD
int listenfd;
int which_thread = 0;


/*
* FD_ZERO()
*
*
*
*/

typedef struct {
pthread_t tid;
int tnumber;
long client_count;
int clients[MAX_CLIENT_PER_THREAD];
fd_set clientset;
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
int opts;
opts = fcntl(sockfd, F_GETFL);
if(opts < 0)
{
perror("fcntl(F_GETFL)\n");
exit(1);
}
opts = (opts | O_NONBLOCK);
if(fcntl(sockfd, F_SETFL, opts) < 0)
{
perror("fcntl(F_SETFL)\n");
exit(1);
}
}

void *thread_init_func(void *arg)
{
int tid = (int) arg;
int readsocks;
int i;
char buffer[1024];
int n;
fd_set readset;
int clifd;

struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1;
#ifdef DEBUG
printf("thread %d created\n", tid);
printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
memset( (int *) &threads[tid].clients, 0,
sizeof(threads[tid].clients));

while(1)
{
#ifdef DEBUG
printf("thread %d running, client count: %d\n", tid,
threads[tid].client_count);
sleep(3);
#endif
sleep(1);
readset = threads[tid].clientset;
#ifdef DEBUG
printf("before select\n");
#endif
readsocks = select(HIGHFD + 1, &readset, NULL, NULL, &tv);
#ifdef DEBUG
printf("after select\n");
#endif


for(i = 0; i < threads[tid].client_count; i++)
{
if( threads[tid].clients == 0)
continue;
if(FD_ISSET(threads[tid].clients, &readset))
{
n = Readline(threads[tid].clients, buffer, sizeof(buffer));
if(n == 0)
{
#ifdef DEBUG
printf("client %d closed connection 0\n",
threads[tid].clients);
#endif
FD_CLR(threads[tid].clients, &threads[tid].clientset);
threads[tid].clients = 0;
threads[tid].client_count--;
}
else if(n < 0)
{
if(errno == EAGAIN)
{
#ifdef DEBUG
printf("errno: EAGAIN\n");
#endif
}
else {
#ifdef DEBUG
printf("errno: %d\n", errno);
#endif
FD_CLR(threads[tid].clients, &threads[tid].clientset);
threads[tid].clients = 0;
threads[tid].client_count--;
#ifdef DEBUG
printf("client %d closed connection -1\n",
threads[tid].clients);
#endif
}
}
else {
#ifdef DEBUG
printf("\n %d bytes received from %d - %s\n", n,
threads[tid].clients, buffer);
#endif
send(threads[tid].clients, buffer, strlen(buffer), 0);
}

}
}
}
}

int choose_thread()
{
int i=MAX_THREAD-1;
int min = 0;
while(i > -1)
{
if(threads.client_count < threads[i-1].client_count)
{
min = i;
break;
}
i--;
}
return min;
}

int main()
{
char c;
struct sockaddr_in srv, cli;
int clifd;
int tid;
int i;
int choosen;

if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("sockfd\n");
exit(1);
}
bzero(&srv, sizeof(srv));
srv.sin_family = AF_INET;
srv.sin_addr.s_addr = INADDR_ANY;
srv.sin_port = htons(PORT);

if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
{
perror("bind\n");
exit(1);
}

listen(listenfd, 1024);


/* create threads */
for(i = 0; i < MAX_THREAD; i++)
{
pthread_create(&threads.tid, NULL, &thread_init_func, (void *)
i);
threads.client_count = 0;
}


for( ; ; )
{
clifd = accept(listenfd, NULL, NULL);
nonblock(clifd);

pthread_mutex_lock(&new_connection_mutex);

choosen = choose_thread();;

for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
{
if(threads[choosen].clients == 0)
{
threads[choosen].clients = clifd;
threads[choosen].client_count++;
FD_SET(threads[choosen].clients, &threads[choosen].clientset);
break;
}
}

#ifdef DEBUG
printf("choosen: %d\n", choosen);

for(i = 0; i < MAX_THREAD; i++)
{
printf("threads[%d].client_count:%d\n", i,
threads.client_count);
}
#endif

pthread_mutex_unlock(&new_connection_mutex);
}

return 0;
}
 
D

David Schwartz

yes I programmed using select firstly, actually. but it is not doing
what it is supposed to do, when I connect 1000 client at once.

You're going to have to get more sophisticated if you want to handle
more than a few hundred concurrent connections. You'll at minimum need to
use 'poll' instead of 'select'. You may need to raise resource limits also.
It's hard to know exactly what your issue is because "it is not doing what
it is supposed to do" is a very vague description of the problem.

DS
 
J

Joona I Palaste

Parahat Melayev <[email protected]> scribbled the following
I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle requests
of multiple clients.

Your code depends inherently on system-specific extensions and is thus
completely off-topic on comp.lang.c. Please stop crossposting to
comp.lang.c, thanks.
 
C

CBFalconer

Parahat said:
.... snip ...

here is the code.

------ selectserver.c -----------------------
$ cc selectserver.c -o selectserver -lpthread
---------------------------------------------

#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

Of these, only stdio.h, string.h, signal.h and errno.h are
legitimate standard C headers, at least in this newsgroup. So why
are you cluttering c.l.c up with nearly 400 lines of non-standard
and off topic code? Find a newsgroup that deals with your system.
F'ups set.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,055
Latest member
SlimSparkKetoACVReview

Latest Threads

Top