memcached源码学习-线程框架
看了看memcached, memcached 主要的线程框架是master-slave的主线程-工作线程模式,单进程,多线程,之间通过管道和链表通信,基本就是这样。
下面具体看下代码。
worker工作线程
memcached服务器使用libevent库进行网络事件的监听等,在main函数的开头,解析完所有的配置参数后,主线程会先创建一个struct event_base *main_base 全局结构,代表这是主线程的event结构,用来进行循环等的。接下来做了一些简单的hash等初始化,然后调用了一个比较关键的函数: thread_init:
/* initialize main thread libevent instance */ main_base = event_init(); /* initialize other stuff */ stats_init(); assoc_init(settings.hashpower_init); conn_init();//默认分配200个连接结构 slabs_init(settings.maxbytes, settings.factor, preallocate); /* * ignore SIGPIPE signals; we can use errno == EPIPE if we * need that information */ //当服务器close一个连接时,若client端接着发数据。根据TCP协议的规定,会收到一个RST响应, //client再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不要再写了。 if (sigignore(SIGPIPE) == -1) { perror("failed to ignore SIGPIPE; sigaction"); exit(EX_OSERR); } /* start up worker threads if MT mode */ thread_init(settings.num_threads, main_base);//默认4个线程,启动其libevet结构。设置监听管道,管道用来让主线程告诉自己,有新事> 件了 //上面工作线程已经准备好了,下面主线程终于可以设置自己的东西了,比如打开LISTEN socket,进行accepted监听。 //memcached的模式为:主线程负责accept,完了管道告诉工作线程去处理实际的请求。2着不同的event_base if (start_assoc_maintenance_thread() == -1) {//启动维护线程 exit(EXIT_FAILURE); }
上面thread_init 就是创建了配置的工作线程,来看看其内容。memcached线程之间对各个数据的访问都是用互斥锁,条件变量等。 然后为每个线程申请一个LIBEVENT_THREAD 结构, 来看看这个结构的内容:
typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD;
很简单,线程ID, 属于这个线程的event_base, new_conn_queue, 这些后面再介绍。上面变量中,最重要的就是notify_receive_fd 和notify_send_fd, 他们是用来进行管道通信的。一个专门读,一个专门写。然后调用setup_thread 将notify_receive_fd 加入到event里面,设置相关的回调。
void thread_init(int nthreads, struct event_base *main_base) { //初始化线程之间的锁,初始化工作线程,为每个线程分配里边event结构, 并且启动event_base_loop()循环 //工作线程监听notify_receive_fd, 可读回调为thread_libevent_process //````` //为每个线程申请一个LIBEVENT_THREAD线程结构,存放event等 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); if (! threads) { perror("Can't allocate thread descriptors"); exit(1); } // dispatcher_thread.base = main_base; dispatcher_thread.thread_id = pthread_self(); for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) {//新建一个双向管道 perror("Can't create notify pipe"); exit(1); } threads[i].notify_receive_fd = fds[0];//这是接收的,工作线程监听这个fd的可读事件即可 threads[i].notify_send_fd = fds[1];//这是发送的 //工作线程初始化,初始化里边event, 将notify_receive_fd加入event里面回调为thread_libevent_process setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) {//真正创建工作线程,执行函数为worker_libevent create_worker(worker_libevent, &threads[i]); }
上面setup_thread 用来初始化事件的。他负责将notify_receive_fd 放到线程自有的event_base, 里面。并初始化me->new_conn_queue, 前者用来通知对方,有新连接了,后者用来记录新连接的具体内容是什么。具体怎么通知后面写下。
static void setup_thread(LIBEVENT_THREAD *me) { //工作线程初始化,初始化里边event, 将notify_receive_fd加入event里面回调为thread_libevent_process me->base = event_init();//创建一个属于线程自己的event结构,这里也是一个循环哈 if (! me->base) { fprintf(stderr, "Can't allocate event base\n"); exit(1); } //下面将我自己的me->notify_receive_fd管道加入到事件循环里面,监听可读事件,回调为thread_libevent_process /* Listen for notifications from other threads */ event_set(&me->notify_event, me->notify_receive_fd, EV_READ | EV_PERSIST, thread_libevent_process, me); event_base_set(me->base, &me->notify_event);///设置关系 if (event_add(&me->notify_event, 0) == -1) {//加入 fprintf(stderr, "Can't monitor libevent notify pipe\n"); exit(1); } me->new_conn_queue = malloc(sizeof(struct conn_queue));//上面me->notify_receive_fd用来告诉对方,有新东西了,实际上东西放在new _conn_queue里面的
上面me->base = event_init() 指令为当线程分配一个属于线程自己的I/O位置了。具体里边event怎么使用旧不多说了。
注意后面的event_set 函数的参数,设置可读写的执行函数为thread_libevent_process,这样等有事件的时候,线程独立的event_base就会触发返回,比如epoll_wait返回了。
thread_init 最后调用create_worker 函数,真正的创建线程,设置线程函数等。这个时候由于event_base已经设置好了各项参数,将事件加入epoll等事件监听函数,所以可以真正创建线程了,这个由create_worker 完成,其实就是个普通的pthread_create 创建线程。
master线程
master线程也就是主线程, 在创建好工作线程后,会调用server_sockets等函数,打开监听句柄等操作。 调用关系为: main-> server_sockets->server_socket; 后者负责创建一TCP连接,进行监听事件。 这里注意memcache支持TCP 和UDP ,这里只讲TCP的情况;
static int server_socket(const char *interface, int port, enum network_transport transport, FILE *portnumber_file) { //创建socket,然后根据其是TCP还是UDP,设置相关的结构,到目前为止就可以监听了</pre> //····· if (IS_UDP(transport)) {//如果是UDP协议,那么直接将这个监听端口分破给num_threads_per_udp个线程,让他们一起处理 //因为是数据报文,接收的时候是一个报文一个报文接收,所以随便某个线程接收了都行,不需要accept。 } else {//tcp模式 //将这个TCP连接放到main_base事件循环中,设置为EV_PERSIST模式,回调为:event_handler if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } listen_conn_add->next = listen_conn; listen_conn = listen_conn_add;//挂载到链表头部 }
conn_new函数负责初始化一个连接的conn *c; 结构,这里memcache对于连接的查找方法为直接用fd大小去索引数组,这样速度快很多倍,而且这个数目不会太大的。conn_new里面大部分是数据字段初始化操作。主要就是讲参数里面的fd句柄加入到主线程的 event_base中:
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base) { //thread_libevent_process等调用这里,设置一个连接的客户端结构,然后将其加入到event里面,执行函数统一为event_handler c = conns[sfd];//直接用fd进行数组索引 //加入到事件循环里面。,不管是LISTEN SOCK,还是客户端的SOCK,都是这个执行函数。只有工作线程的管道不是这个 //工作线程的管道执行函数是thread_libevent_process,也就是本函数的调用者之一 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);//用参数,填充&c->event event_base_set(base, &c->event);//ev->ev_base = base, 就让event记住了所属的base c->ev_flags = event_flags;//其实 c->event->ev_events也记录了这个事件集合的。 if (event_add(&c->event, 0) == -1) {//加到epoll里面去 perror("event_add"); return NULL; } }
这样,一个监听SOCKET就建立起来了,并且设置了可读事件的毁掉函数为event_handler, 这个函数及其重要,是主线程的线程循环。
主线程回到main函数后,就进入了事件循环了:
/* Drop privileges no longer needed */ drop_privileges(); //进入事件循环,监听sock的回调为 event_handler,实际上等于drive_machine, 工作线程有自己的event_base_loop,各自循环自己的,由一> 个管道沟通。 //主线程accept一个连接后,调用dispatch_conn_new分配给一个工作线程,轮训 /* enter the event loop */ if (event_base_loop(main_base, 0) != 0) { retval = EXIT_FAILURE; }
此时这个事件循环里面只有监听SOCK的事件的,于是如果有新连接到来的话,event_handler 就会被调用,实际上基本等于drive_machine 函数被调用。
主线程事件循环
drive_machine 函数相当于一个自动机,由连接的不同状态进行循环处理,直到完毕。
其大体结构为:
static void drive_machine(conn *c) { //···· while (!stop) { switch(c->state) {//根据这个SOCK的状态进行处理, case conn_listening://对于监听SOCK, 在server_socket里面设置为conn_listening状态了的 case conn_waiting: case conn_read: case conn_write: } //````
这里只关注conn_listening 新连接的事件,也就是如果这个连接的状态是conn_listening,那么说明它是LISTEN socket了。这个是在conn_new的参数里面指定的。 来看看董自动机的新连接处理方法:
while (!stop) { switch(c->state) {//根据这个SOCK的状态进行处理, case conn_listening://对于监听SOCK, 在server_socket里面设置为conn_listening状态了的 addrlen = sizeof(addr); #ifdef HAVE_ACCEPT4 if (use_accept4) { sfd = accept4(c->sfd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK);//快速设置SOCK_NONBLOCK状态 } else { sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); } #else sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen); #endif if (settings.maxconns_fast && //····· } else {//OK,将这个连接分配给线程去处理吧 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, tcp_transport); } stop = true; break;
可以看出上面的新连接也还算挺简单的,就accept一个新连接,然后调用dispatch_conn_new 函数,进行分发,分发给其他地方的线程去处理真正的请求,这里只是简单的accept了连接。调用dispatch_conn_new 的时候,设置连接状态为
啊conn_new_cmd, 也就是等待命令状态.
主线程接收这个连接后,怎么分配给其他工作线程处理呢?是通过dispatch_conn_new 函数完成的。
方法为轮训选一个线程,然后将当前连接的状态, 缓存区等放到一个CQ_ITEM 结构里面,然后将其加入到线程的thread->new_conn_queue 链表里面的,注意cq_push 函数是得加锁的。 然后调用write(thread->notify_send_fd, buf, 1) 简单的将一个"c" 字符写入到该线程的管道中,这样该线程一定会被换起来,然后就能检测到这个新连接,然后就可以为其提供服务了。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport) { //分配给不同的线程去处理,通过管道 CQ_ITEM *item = cqi_new(); char buf[1]; if (item == NULL) { close(sfd); /* given that malloc failed this may also fail, but let's try */ fprintf(stderr, "Failed to allocate memory for connection object\n"); return ; } int tid = (last_thread + 1) % settings.num_threads;//轮训分配 LIBEVENT_THREAD *thread = threads + tid;//找到那个线程,然后将这个fd加入到这个线程的连接队列里面 last_thread = tid; item->sfd = sfd;//填充结构 item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; cq_push(thread->new_conn_queue, item);//加锁,放到队列尾部 MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id); buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) {//写一个字节,告诉他有新东西了 perror("Writing to thread notify pipe"); } }
稍微回顾一下:
- memcached的线程框架为:单进程,多线程,master-worker,二者用管道通信,链表传递数据。
- 使用libevent 开发框架, master-worker分别拥有不同的event_base,因此他们能各自进行事件监听, 相对还是挺清晰的;
- master负责建立客户端连接,worker负责跟实际客户端进行数据交互,处理请求。
所以这里master只是一个分发器而已,具体请求由工作线程去处理,解析等。这部分后续记录吧。
近期评论