通用的 socket 服务器处理端,也就是进程所在的主线程每次接收一个连接后,
处理完毕后,在接收一个连接,周而复始,完成和客户端的通讯过程。
缺点是显而意见的,特别是处理客户端的长连接的时候,服务器和客户端的通讯一直没
有结束,不能再处理已经在排队的连接请求。一般会导致客户端长时间的等待。这就是
单进程的服务器通讯模式。
对应与单线程模型,我们发现一个连接(accept)效率太低,
我们要是建立多个Accept 去处理连接,这样效率就高了;同时对
接入的连接创建多个线程去处理,这样就实现了多接入,多处理。
这个非常简单,关键在于每个时刻只能有一个连接(accept)被接入。
很显然,多线程同步中的互斥体正是这种用途。对比单线程模型,
我们在侦听后,建立了多个互斥的接收(accept)去处理客户端发来的连接(connect),
形成了多接收的线程池(thread pool),有效的增加了处理连接的对
象,并发的处理了客户连接。在处理新连接的过程中,完成了和整个
客户端的通讯过程。
对于以上的模型,我们在 linux 使用 c/c++ 下实现,建立了实现模型.
Log::getLog() 是自写的日志类。
//****************************************************************/
// 文件名 : pthreadaccept.h
// 作者 : hzh
// 版本号 : 1.1
// 生成日期 : 2005.09.26
// 描述 : 实现了多线程接收客户端请求,start 函数开始接收
// getsocket 函数得到新接收到未处理的的文件描述符,
// 可实现多线程接收客户端连接请求
// 备注 : 可参考 unix 网络编程(第三版)
/****************************************************************/
#if !defined(_PTHREADACCEPT_H__)
#define _PTHREADACCEPT_H__
#include <pthread.h>
#include <sys/types.h>
#include <queue>
typedef struct sockets_t
{
//存储未处理的 sockets
std::queue<unsigned int> socketqueqe;
//操作 socket 互斥体
pthread_mutex_t mutexsocket;
};
typedef struct threads_t
{
//accept 互斥体
pthread_mutex_t mutexaccept;
//服务器 socket
unsigned int servsock;
std::vector<pthread_t> tid;
std::map<pthread_t,unsigned long> count;
};
//线程处理函数
void * thread_accept(void * index);
//开始 accept
bool Start(int port,int listennum = 24,
int threads = 6,
int delaysecond = 5);
//创建线程
void thread_make(int index,int delaysecond = 5);
//得到待处理的 socket
int GetSocket();
//处理为后台进程
InitProcess();
//线程延时,毫秒级
void pthread_delay(int millisecond)
#endif
//****************************************************************/
// 文件名 : pthreadaccept.cpp
// 作者 : hzh
// 版本号 : 1.0
// 生成日期 : 2005.09.26
// 描述 : 实现了功能函数
/*****************************************************************/
#include "pthreadaccept.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include "../util/log.h"
//当前容许的最大的连接线程数
threads_t m_threads;
//当前未处理的 socket 数目
sockets_t m_sockets;
//***************************************************************//
// 函数名称 : Start
// 描述 : 绑定端口,建立 listen,启动线程开始 accept
// 访问 : public
// 参数 :
// 1. : int port
// : 侦听端口
// 2. : int listennum
// : 侦听队列数目
// 3. : int threads
// : accept 线程数
// 4. : int delaysecond
// : socket 接受超时
// 返回值 : true 是成功,false 是失败
// 作者 : hzh
// 日期 : 2005.09.26
// 说明 :
// 修改 :
//***************************************************************//
bool Start(int port,int listennum,int threads,int delaysecond)
{
//服务器 socket
m_threads.servsock = 0;
pthread_mutex_init(&m_threads.mutexaccept,0);
pthread_mutex_init(&m_sockets.mutexsocket,0);
if(port < 1024)
{
Log::getLog().error("socket port less than 1024,fail");
return false;
}
m_threads.servsock = socket(AF_INET,SOCK_STREAM,0);
if(m_threads.servsock < 0)
{
Log::getLog().error("socket port less than 1024,fail");
return false;
}
//容许端口释放后重复绑定
const int on = 1;
setsockopt(m_threads.servsock,SOL_SOCKET,
SO_REUSEADDR,&on,sizeof(on));
struct sockaddr_in servaddr;
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if(bind(m_threads.servsock,(const struct sockaddr*)&servaddr,sizeof(servaddr)) != 0)
{
close(m_threads.servsock);
Log::getLog().error("bind socket port fail");
return false;
}
if(listen(m_threads.servsock,listennum) < 0)
{
close(m_threads.servsock);
Log::getLog().error("listen fail");
return false;
}
//创建 accept 线程
for (int i = 0; i < threads; i++)
{
m_threads.tid.push_back((pthread_t)0);
thread_make(i,delaysecond);
}
return true;
}
//***************************************************************//
// 函数名称 : thread_make
// 描述 : 构建线程
// 访问 : public
// 参数 :
// 1. : int index
// : 线程索引
// 2. : int delaysecond
// : socket 接受超时
// 返回值 : NONE
// 作者 : hzh
// 日期 : 2005.09.26
// 说明 :
// 修改 :
//***************************************************************//
void thread_make(int index,int delaysecond)
{
if(0 != pthread_create(&m_threads.tid[index],NULL,
thread_accept,(void * )&delaysecond))
{
Log::getLog().error("create accept thread fail");
}
else
{
m_threads.count[m_threads.tid[index]] = 0;
//m_threads.count.push_back(make_pair(m_threads.tid[index],0));
}
return;
}
//***************************************************************//
// 函数名称 : Start
// 描述 : 绑定端口,建立 listen,启动线程开始 accept
// 访问 : public
// 参数 :
// 返回值 : true 是成功,false 是失败
// 作者 : hzh
// 日期 : 2005.09.26
// 说明 :
// 修改 :
//***************************************************************//
void * thread_accept(void * index)
{
int second = *((int *)index);
for(;;)
{
try
{
pthread_mutex_lock(&m_threads.mutexaccept);
int connfd = accept(m_threads.servsock,NULL,0);
if(errno == EINTR)
{
pthread_mutex_unlock(&m_threads.mutexaccept);
continue;
}
if(connfd == -1)
{
pthread_mutex_unlock(&m_threads.mutexaccept);
continue;
}
//设定接收和发送超时(单位: 秒)
struct timeval tv;
tv.tv_sec = second;
tv.tv_usec = 0;
setsockopt(connfd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv));
setsockopt(connfd,SOL_SOCKET,SO_SNDTIMEO,&tv,sizeof(tv));
//记录该线程接收了多少连接,最多2 << 30
const unsiged long threadconnfd = 1073741824;
if(m_threads.count[pthread_self()] > threadconnfd)
{
m_threads.count[pthread_self()] = 0;
}
m_threads.count[pthread_self()]++;
pthread_mutex_unlock(&m_threads.mutexaccept);
//加入待处理文件描述符列表
int status = pthread_mutex_lock(&m_sockets.mutexsocket);
if(status == 0)
{
m_sockets.socketqueqe.push(connfd);
pthread_mutex_unlock(&m_sockets.mutexsocket);
}
}
catch(...)
{
//
}
}
return NULL;
}
//***************************************************************//
// 函数名称 : GetSocket
// 描述 : 得到新的未处理的 socket
// 访问 : public
// 参数 :
// 返回值 : 返回新的未处理的文件描述符
// 作者 : hzh
// 日期 : 2005.09.26
// 说明 :
// 修改 :
//***************************************************************//
int GetSocket()
{
//加入待处理文件描述符列表
int connfd = -1;
int status = pthread_mutex_lock(&m_sockets.mutexsocket);
if(status == 0)
{
if(m_sockets.socketqueqe.size() > 0)
{
connfd = m_sockets.socketqueqe.front();
m_sockets.socketqueqe.pop();
}
pthread_mutex_unlock(&m_sockets.mutexsocket);
}
return connfd;
}
//线程延时,毫秒级
void pthread_delay(int millisecond)
{
pthread_mutex_t mutex;
pthread_cond_t cond;
struct timeval tv;
gettimeofday(&tv, 0);
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
ts.tv_sec += (millisecond / 1000);
ts.tv_nsec += (millisecond % 1000) * 1000000;
pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0);
pthread_mutex_lock(&mutex);
pthread_cond_timedwait(&cond, &mutex, &ts);
pthread_mutex_unlock(&mutex);
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
}
InitProcess()
{
signal(SIGINT,SIG_IGN);
signal(SIGTERM,SIG_IGN);
signal(SIGPIPE,SIG_IGN);
pid_t sid;
pid_t pid = fork();
if (pid == 0)
{
//printf("children process work\n");
}
else if(pid < 0)
{
exit(1);
}
else if (pid > 0)
{
exit(0);
}
if((sid = setsid()) < 0)
{
exit(1);
}
for(int i = 0; i < getdtablesize();i++)
{
close(i);
}
if(chdir("/") < 0)
{
exit(1);
}
umask(022);
}
int main(int argc, char* argv[])
{
InitProcess();
if(!Start(5000,10,6,5))
{
return -1;
}
for(;;)
{
pause();
}
}