IntroductionThis paper addresses the issue of enhancing server performance by leveraging the concepts of thread pools and asynchronous I/O. It was motivated by an ISV's need to port an NT-compliant-IOCP-using application to Solaris 8. The discussion is centered around one kind of server application, where a large number of clients is serviced by a significantly smaller number of server threads. It describes all the necessary APIs and presents working example code for a server (and its matching client) to illustrate how the APIs can be used. There are also pointers to other Solaris APIs that could be used to attack the problem, and a discussion of situations in which AIO might not be the best choice.
This paper assumes that the reader is already familiar with
Solaris synchronous input/output facilities (
The Usenet newsgroups comp.unix.programmer and comp.programming.threads have pointers to more information on all these topics, and are frequented by recognized experts. The Concept of Thread PoolsThe designer of a server application can choose from a variety of architectural models. Two common ones are:
The serial model does not handle multiple, simultaneous requests well. If two clients make a request simultaneously, one has to wait until the other one is serviced. Hence, this model is discarded as being good only for extremely simplistic server applications. Due to the limitations of the serial model, the concurrent model is quite popular. The advantage is that the thread that is waiting for incoming requests has very little work to do. In most cases, this thread is sleeping most of the time. Incoming client requests are handled expediently, because each request gets its own thread. Also, the concurrent model scales well on multiprocessor machines. However, it was noticed that the performance of the concurrent model was not as high as developers liked it to be. When there is a large number of clients connected to the server simultaneously, it implies that there is an equally large number of threads running on the server concurrently. Since all the threads are runnable (not suspended and waiting for something to happen), the kernel spends most of its time context switching between these threads and very little time actually performing the service. In such a situation, an elegant solution would be to have a thread pool: a collection of N threads that are used to run tasks. In this model, one of the fundamental issues is "multiplexing" a large number of clients onto a much smaller number of worker threads. Disparities as great as three orders of magnitude (e.g., 200 threads serving 100,000 clients) are not uncommon, and efficient management of the dispatch problem is essential to the application's performance. In the thread pool model, none of the threads is bound to a particular client or service. Every time a job needs to be done, it is handed off to any thread from the pool. The threads are available to run any task and they will run up to N at a time. If there are more tasks than threads, tasks wait in a queue until a thread becomes available before continuing. Why Asynchronous I/O?
When an application executes I/O system calls like In other words, the program sleeps until the I/O operation completes, and cannot perform computation during that time. For performance sensitive applications (as, in most cases, client/server applications are) it is not good that an application sleeps over I/O. Asynchronous I/O (AIO) avoids this problem. The asynchronous
The
In the context of a single file descriptor, this isn't too
exciting. Yes, it enables a program to engage in fancy
double-buffering schemes, but Solaris typically manages that sort of
thing even with synchronous
One important point to observe here is that the I/O operations
will not necessarily complete in the same order they were started;
when the program does an
Another thing to note is that it's perfectly alright for an I/O
to finish while the program is not waiting for it; all that happens
is that the program's next
In the server design presented in this paper, you will see that
the computational activities in the above diagram are spread across
several threads of execution: one thread will initiate an The Sample ApplicationThe application developed to demonstrate the use of asynchronous I/O with thread pools is a simple multi-threaded server that handles just one kind of request from its clients. The server is implemented in the boss-worker model, using a pool of worker threads. Upon initialization, the server
The poll thread performs
In this server, the problem is solved in a simple way: every
typedef struct client_data { /* server-defined structure */
aio_result_t aiores; /* AIO result structure */
... /* other server-specific data */
} client_data_t;
Since the
aio_result_t *aioptr = aiowait(...);
client_data_t *cdtptr = (client_data_t*)aioptr;
Having located the Among other things....
However, the POSIX implementations are
not suitable for this application, as they do not give good blocking
wait semantics - there is no Things to Watch Out For
AIO is not a cure-all. If the application uses a third party
library that implements asynchronous operations, this may not be the
best solution. This is because of the "anonymous" nature of
In such a case, it would be better to use some other notification technique such as signals or doors. Signals are usually to be avoided, since they are notoriously difficult to use reliably. It is very easy to lose signals, especially as the number of the signals that need to be handled increases.
Doors are an IPC (inter-process communication) mechanism used for
fast, secure control transfer between local processes and threads.
A door descriptor is used to describe a procedure and optionally some
additional state associated with the procedure (e.g. a data structure
containing information about AIO completion). For more information on
doors, refer to the
The efficiency of the AIO approach also depends on which Solaris
version is used. Older Solaris versions implement asynchronous I/O
directly in the kernel only for certain classes of I/O devices, not
including sockets. For other devices, the As of Kernel Update 4 to Solaris 8 the kernel supports asynchronous I/O for sockets without needing user-level threads. A network server application built on the thread pool model with asynchronous I/O will see substantial performance gains without code changes when running on the newer Solaris versions. Code
The code for the application discussed above can all be downloaded:
/* static char sccsid [ ] = "@(#)thread_pools.html 1.7 01/03/27 15:21:40 SCCS ID";*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/errno.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/asynch.h>
#include <sys/fcntl.h>
#include <sys/asynch.h>
#include <pthread.h>
#include "tpool.h"
#include <tnf/probe.h>
tpool_t srv_thread_pool;
int no_of_act_conn = 0;
pthread_mutex_t global_data_mutex =
PTHREAD_MUTEX_INITIALIZER;
/* Master socket */
int msock;
/* connections handled */
slavesocket_t clientreq[MAXSOCKS];
extern int errno;
void reaper(int);
int errexit(const char *format, ...);
int passiveTCP(const char *service, int qlen);
/*----------------------------------------------------
* main - Concurrent TCP server for ECHO service
*----------------------------------------------------
*/
int
main(int argc, char *argv[])
{
char *service = "echo"; /*service name or port*/
/*number*/
pthread_t *pollthreadid =
(pthread_t *) malloc(sizeof(pthread_t));
switch (argc) {
case 1:
break;
case 2:
service = argv[1];
break;
default:
errexit("usage: TCPechod [port]n");
}
/* start the server and workder threads */
server_init();
msock = passiveTCP(service, QLEN);
pthread_create(pollthreadid, NULL, aiopoll, (void *)
NULL);
while (1) {
if (no_of_act_conn < MAXSOCKS) {
clientreq[no_of_act_conn].ssock = accept(msock,
(struct sockaddr *)
&clientreq[no_of_act_conn].fsin,
&clientreq[no_of_act_conn].alen);
clientreq[no_of_act_conn].offsetp = 0;
clientreq[no_of_act_conn].aioflag = 1;
memset(clientreq[no_of_act_conn].aiobuf, 0,
BUFSIZE);
TCPechodread(&clientreq[no_of_act_conn]);
/* calling aioread from main thread */
no_of_act_conn++;
}
}
}
/*****************************
* Server Initialization *
*****************************/
void server_init() {
tpool_init(&srv_thread_pool, MAX_THREADS,
MAX_THREADS );
}
/* tpool.c --
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <string.h>
#include <pthread.h>
#include <tpool.h>
#include <sys/signal.h>
#include <poll.h>
#include <tnf/probe.h>
/* pool synchronization */
pthread_mutex_t queue_lock =
PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queue_not_empty =
PTHREAD_COND_INITIALIZER;
pthread_cond_t queue_not_full =
PTHREAD_COND_INITIALIZER;
pthread_cond_t queue_empty =
PTHREAD_COND_INITIALIZER;
pthread_mutex_t conn_pool_lock =
PTHREAD_MUTEX_INITIALIZER;
extern tpool_t srv_thread_pool;
extern slavesocket_t clientreq[MAXSOCKS];
extern int no_of_act_conn;
void *tpool_thread(void *);
void sigpipedo(int);
long long r_diff, w_diff;
char temp[BUFSIZE];
void tpool_init(tpool_t *tpoolp,
int num_worker_threads,
int max_queue_size)
{
int i, rtn;
tpool_t tpool;
TNF_PROBE_0_DEBUG(tpool_init, "tpool_probes", "");
for (i=0; i <MAXSOCKS; i++)
clientreq[i].aioflag = -1;
/* Initializing the pool and allocating the data structures */
/* allocate a pool data structure */
if ((tpool = (tpool_t )malloc(sizeof(struct tpool)))
== NULL)
perror("malloc"), exit(1);
/* initialize the fields */
if ((tpool->threads = (pthread_t *)
malloc(sizeof(pthread_t)*num_worker_threads)) == NULL)
perror("malloc"), exit(1);
tpool->cur_queue_size = 0;
tpool->queue_head = NULL;
tpool->queue_tail = NULL;
tpool->queue_closed = 0;
/* create threads */
for (i = 0; i != num_worker_threads; i++) {
if ((rtn = pthread_create( &(tpool->threads[i]),
NULL,
tpool_thread,
(void *)tpool)) != 0)
fprintf(stderr, "pthread_create %d",rtn), exit(1);
}
*tpoolp = tpool;
}
/* routine to add work items to the queue */
int tpool_add_work(
tpool_t tpool,
void (*routine)(),
slavesocket_t *workorder)
{
int rtn;
TNF_PROBE_0_DEBUG(tpool_add_work, "tpool_probes", "");
/* obtain a lock on the queue */
if( (rtn = pthread_mutex_lock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
/* allocate work structure */
workorder->routine = routine;
/* aio operations */
workorder->next = NULL;
/* adding an item to the queue */
if (tpool->cur_queue_size == 0) {
tpool->queue_tail = tpool->queue_head = workorder;
/* waking all workers */
if ((rtn = pthread_cond_broadcast(&queue_not_empty))
!= 0)
fprintf(stderr,"pthread_cond_broadcast %d",rtn),
exit(1);
} else {
tpool->queue_tail->next = workorder;
tpool->queue_tail = workorder;
}
tpool->cur_queue_size++;
/* unlock the queue */
if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d" ,rtn),
exit(1);
return 1;
}
/****************************
* Worker thread process *
****************************/
void *tpool_thread(void *arg)
{
tpool_t tpool = (tpool_t)arg;
int rtn;
slavesocket_t *my_workp;
TNF_PROBE_0_DEBUG(tpool_thread, "tpool_probes", "");
for(;;) {
/* Check queue for work */
/* lock queue */
if ((rtn = pthread_mutex_lock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
/* no work on the queue? go back to sleep */
while (tpool->cur_queue_size == 0) {
/* wait on the condition that queue_not _empty */
if ((rtn = pthread_cond_wait
(&queue_not_empty, &queue_lock)) != 0)
fprintf(stderr,"pthread_cond_wait %dn",rtn),
exit(1);
}
/* Get to work, dequeue the next item */
my_workp = tpool->queue_head;
tpool->cur_queue_size--;
if (tpool->cur_queue_size == 0)
tpool->queue_head = tpool->queue_tail = NULL;
else
tpool->queue_head = my_workp->next;
if ((rtn = pthread_mutex_unlock(&queue_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn),
exit(1);
/* Do this work item */
(*(my_workp->routine))(my_workp);
}
return(NULL);
}
/*******************************
* Polling function *
*******************************/
void *aiopoll(void *arg) {
int i;
int rtn;
int cc;
aio_result_t *res;
slavesocket_t *out;
hrtime_t temp_t;
struct timeval timeout = {0,0};
TNF_PROBE_0_DEBUG(aiopoll, "tpool_probes", "");
while (1) {
if (no_of_act_conn > 0) {
/* check for completed I/O; poll */
if ( (res = aiowait(&timeout)) == -1 ) {
continue;
}
temp_t = gethrtime();
if (res == 0) continue;
/* result of aiowait */
out = (slavesocket_t *) res;
/* if aioflag == 2, set aioflag = 1,*/
/*perform aioread */
if (out->aioflag == 2) {
out->aft_write = temp_t;
w_diff = out->aft_write - out->bef_write;
sprintf(temp, "%lld", w_diff);
strncat(out->aiobuf, "AFTER_WRITE",
strlen("AFTER_WRITE"));
strncat(out->aiobuf, temp, strlen(temp));
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
out->aioflag = 1;
memset(out->aiobuf, 0, BUFSIZE);
/* ading item to work queue; calling aioread */
tpool_add_work(srv_thread_pool,
TCPechodread, out );
if ((rtn = pthread_mutex_unlock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
continue;
}
/* if aioflag == 1, call aiowrite */
if (out->aioflag == 1) {
if (out->aiores.aio_return) {
out->aft_read = temp_t;
r_diff = out->aft_read - out->bef_read;
sprintf(temp, "%lld", r_diff);
strncat(out->aiobuf, "AFTER_READ",
strlen("AFTER_READ"));
strncat(out->aiobuf, temp, strlen(temp));
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
TNF_PROBE_0_DEBUG(in_aiopoll_calling_TCPechodwrite,
"tpool_probes", "");
/* adding item to work queue, calling aiowrite */
tpool_add_work(srv_thread_pool, TCPechodwrite, out );
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
continue;
}
}
/* if (aioflag != 1 || aioflag != 2); close connection */
printf("closing connection %dn", out->ssock);
if ((rtn = pthread_mutex_lock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
out->aioflag = -1;
aiocancel(&(out->aiores));
close(out->ssock);
if ((rtn = pthread_mutex_unlock(&conn_pool_lock))
!= 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn),
exit(1);
}
}
}
/* signal handler */
void sigpipedo (int sig) {
};
void TCPechodwrite(slavesocket_t *connreq) {
int tmpoffset = connreq->aiores.aio_return;
int rtn, rtnaio;
signal(SIGPIPE, sigpipedo);
if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
connreq->aioflag = 2;
memset(temp, 0, BUFSIZE);
connreq->bef_write = gethrtime();
if( aiowrite(connreq->ssock, connreq->aiobuf, 55, 0,
SEEK_SET,
&(connreq->aiores)) == -1) {
errexit("aiowrite errorn");
}
TNF_PROBE_0_DEBUG(after_write, "tpool_probes", "");
connreq->offsetp += tmpoffset;
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}
void TCPechodread(slavesocket_t *connreq) {
int rtn;
signal(SIGPIPE, sigpipedo);
if ((rtn = pthread_mutex_lock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_lock %d",rtn), exit(1);
connreq->aioflag = 1;
connreq->bef_read = gethrtime();
if ( aioread(connreq->ssock, connreq->aiobuf, BUFSIZE, 0,
SEEK_SET, &(connreq->aiores)) == -1) {
connreq->aioflag = -1;
}
if ((rtn = pthread_mutex_unlock(&conn_pool_lock)) != 0)
fprintf(stderr,"pthread_mutex_unlock %d",rtn), exit(1);
}
/* TCPecho.c - main, TCPecho */
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <pthread.h>
#define MAXTHREADS 10000
#define LINELEN 82
typedef struct threadstruct {
char *host;
char *service ;
int threadno;
pthread_t threadidt;
pthread_attr_t threadattr;
char buf[LINELEN+1];
int s, n;
int outchars, inchars;
int count;
double timetaken, mintime, avgtime, sum;
struct timeval pbef_time, paft_time;
} threadstruct_t;
struct timeval iter_bef_time, iter_aft_time;
double iter, time_stamp;
int t1=0, t2=0, t3=0, t4=0, t5=0, t6=0, t7=0,
t8=0, t9=0, t10=0, t11=0;
int a1 = 0, a2 = 0, a4 = 0, a3 = 0, a5= 0, a6 = 0;
extern int errno;
int TCPecho( char *host, char *service,
int noofthreads);
int errexit(const char *format, ...);
int connectTCP(const char *host,
const char *service);
threadstruct_t clients[MAXTHREADS];
void *doroutine(void *);
/*-----------------------------------------------
* main - TCP client for ECHO service
*-----------------------------------------------
*/
int
main(int argc, char *argv[])
{
char *hostt, *servicet;
int noofthreads;
switch (argc) {
case 1:
hostt = "localhost";
break;
case 4:
noofthreads = atoi(argv[3]);
case 3:
servicet = argv[2];
/* FALL THROUGH */
case 2:
hostt = argv[1];
break;
default:
fprintf(stderr,
"usage: client host port threadsn");
exit(1);
}
if (noofthreads > MAXTHREADS)
errexit("Maximum number of threads is %dn",
MAXTHREADS);
TCPecho(hostt, servicet, noofthreads);
exit(0);
}
/*------------------------------------------------------------------------
* TCPecho - send input to ECHO service on specified host and print reply
*------------------------------------------------------------------------
*/
int
TCPecho( char *host, char *service, int noofthreads)
{
int i;
pthread_t idt;
for ( i =0; i < noofthreads; i++) {
clients[i].host = host;
clients[i].service = service;
clients[i].count = 0;
clients[i].timetaken = 0;
clients[i].threadno = i + 1;
pthread_attr_init(&clients[i].threadattr);
pthread_attr_setscope(&(clients[i].threadattr),
PTHREAD_SCOPE_SYSTEM);
pthread_create(
&clients[i].threadidt, NULL, doroutine,
(void *) &clients[i]);
}
for (i=0; i < noofthreads; i++) {
pthread_join(clients[i].threadidt, NULL);
}
}
void *doroutine(void *argt) {
char *tmp1, *tmp2, *tmp3;
double r_diff;
size_t sz = 147;
threadstruct_t *arg = (threadstruct_t *) argt;
arg->s = connectTCP(arg->host, arg->service);
gettimeofday(&iter_bef_time, NULL);
while(1){
memset(arg->buf, 0, LINELEN);
strncat(arg->buf, "CLIENT", strlen("CLIENT"));
arg->outchars = strlen(arg->buf);
gettimeofday(&(arg->pbef_time), NULL);
(void) write(arg->s, arg->buf, arg->outchars);
memset(arg->buf, 0, LINELEN);
/* read it back */
arg->n = read(arg->s, &(arg->buf), 55);
if (arg->n < 0)
perror("socket read failedn"), exit(1);
gettimeofday(&(arg->paft_time), NULL);
tmp1 = &(arg->buf[16]);
tmp2 = (char *)malloc(17);
strncpy(tmp2, tmp1, strlen(tmp1));
tmp2[strlen(tmp2)-1] = '0';
r_diff = atoi(tmp2);
r_diff = r_diff / 1000000000;
/*tmp1 = &(arg->buf[37]);
tmp3 = (char *)malloc(17);
strncpy(tmp3, tmp1, 16);
tmp3[16] = '0';
ar = atof(tmp3); */
/*tmp1 = &(arg->buf[11]);
tmp4 = (char *)malloc(17);
strncpy(tmp4, tmp1, 16);
tmp4[16] = '0';
bw = atof(tmp4);
tmp1 = &(arg->buf[39]);
tmp5 = (char *)malloc(17);
strncpy(tmp5, tmp1, 16);
tmp5[16] = '0';
aw = atof(tmp5);*/
/*CLIENT_READ969994591.142932AFTER_READ969994594.181865*/
/* fputs(arg->buf, stdout); */
arg->timetaken = (arg->paft_time.tv_sec - arg->
pbef_time.tv_sec) +
(1 - arg->pbef_time.tv_usec*0.000001) +
arg->paft_time.tv_usec*0.000001;
if(arg->timetaken >=
0 && arg->timetaken <=10) t1++;
if(arg->timetaken >=
11 && arg->timetaken <=20) t2++;
if(arg->timetaken >=
21 && arg->timetaken <=30) t3++;
if(arg->timetaken >=
31 && arg->timetaken <=40) t4++;
if(arg->timetaken >= 41
&& arg->timetaken <=50) t5++;
if(arg->timetaken >=
51 && arg->timetaken <=60) t6++;
if(arg->timetaken >=
61 && arg->timetaken <=70) t7++;
if(arg->timetaken >=
71 && arg->timetaken <=80) t8++;
if(arg->timetaken >=
81 && arg->timetaken <=90) t9++;
if(arg->timetaken >=
91 && arg->timetaken <100) t10++;
if(arg->timetaken >= 100 ) t11++;
if(r_diff >= 0 && r_diff <=1) a1++;
if(r_diff >1 && r_diff <=2) a2++;
if(r_diff >2 && r_diff <=4) a3++;
if(r_diff >4 && r_diff <= 6) a4++;
if(r_diff >6 && r_diff <=10) a5++;
if(r_diff >10) a6++;
/* set the minimum time */
if (arg->count == 0) arg->mintime =
arg->timetaken;
if (arg->timetaken < arg->mintime)
arg->mintime = arg->timetaken;
arg->sum += arg->timetaken;
arg->count ++;
arg->avgtime = arg->timetaken/arg->count;
printf("%dt %dt %10.6ft %10.10ft %10.6ft %10.6fn",
arg->threadno, arg->count, arg->timetaken,
r_diff, arg->mintime,
arg->avgtime);
/*fclose(arg->fp);*/
sleep(2);
gettimeofday(&iter_aft_time, NULL);
iter = (iter_aft_time.tv_sec - iter_bef_time.tv_sec) +
(1 - iter_bef_time.tv_usec*0.000001) +
iter_aft_time.tv_usec*0.000001;
if(iter > 10) {
printf("Execution time = %10.6f", iter);
printf("Number of iterations = %dn", arg->count);
printf("Response time ranges : n");
printf("0-10 t 11-20 t 21-30 t 31-40 t 41-50
+ nn");
printf("%d t %d t %d t %d t %d t nn",
t1, t2, t3, t4, t5);
printf("51-60 t 61-70 t 71-80 t 81-90 t 91-100
+ t 100+ nn");
printf("%d t %d t %d t %d t %d t %d t nn",
t6, t7, t8, t9, t10, t11);
printf("Aioread time ranges : n");
printf("0-1 t 1-2 t 2-4 t 4-6 t 6-10 t 10+ n");
printf("%d t %d t %d t %d t %d t %d n",
a1, a2, a3, a4,a5, a6);
exit(0); }
}
}
November 2000 | ||||||||
|
| ||||||||||||