memcached 简单get操作处理代码学习
好久没写博客了,这里简单记录一下memcached的get操作处理过程,之前写了些memcached的主体流程/线程框架。get命令相对来说是最简单的了, 主要分为这几步: 读取命令; 解析命令;查找key, 拼接返回数据格式;发送结果给客户端;
一、读取命令
memcached的所有事件回调都是drive_machine, 该函数就是一个大switch-case, 对客户端状态c->state的分发, 当客户端连接上来后,其状态就设置为了conn_read ,也就是读取数据状态,因此就会进入如下case:
static void drive_machine(conn *c) { case conn_read: //异步读取数据,返回是否出错 res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); switch (res) { case READ_NO_DATA_RECEIVED: conn_set_state(c, conn_waiting);// break; case READ_DATA_RECEIVED: conn_set_state(c, conn_parse_cmd);//下一步要去解析这条命令了 break; case READ_ERROR: conn_set_state(c, conn_closing); break; case READ_MEMORY_ERROR: /* Failed to allocate more memory */ /* State already set by try_read_network */ break; } //注意这里没有设置stop=true, 所以如果数据还没有读完,还会循环一次到上面的conn_waiting break; }
这里指讨论TCP协议,根据try_read_network的返回结果不同,可能有以下几种状态:
- READ_NO_DATA_RECEIVED 没收到数据,没有读完一条指令,就设置为conn_waiting,接下来还是会马上进入conn_read继续读取;
- READ_DATA_RECEIVED 成功读取了一些数据, 下一步进入conn_parse_cmd去尝试解析指令,注意此时不一定保证有一条完整命令;
- READ_ERROR 出错,关闭连接,进入conn_closing ;
下面看看try_read_network函数,标准的异步数据读取方式,读取的数据放在c->rbuf上面。期间可能会有自动扩容等操作。代码比较简单:
static enum try_read_result try_read_network(conn *c) { //老办法, 异步读取数据,直到读满或者没东西了 enum try_read_result gotdata = READ_NO_DATA_RECEIVED; int res; int num_allocs = 0; assert(c != NULL); if (c->rcurr != c->rbuf) {//为了处理方便,将后面的未处理的数据拷贝到前面 if (c->rbytes != 0) /* otherwise there's nothing to copy */ memmove(c->rbuf, c->rcurr, c->rbytes); c->rcurr = c->rbuf; } while (1) { if (c->rbytes >= c->rsize) {//没空间了,realloc一个 if (num_allocs == 4) { return gotdata; } ++num_allocs;//每次翻倍接收缓冲区 char *new_rbuf = realloc(c->rbuf, c->rsize * 2); if (!new_rbuf) { STATS_LOCK(); stats.malloc_fails++; STATS_UNLOCK(); if (settings.verbose > 0) { fprintf(stderr, "Couldn't realloc input buffer\n"); } c->rbytes = 0; /* ignore what we read */ out_of_memory(c, "SERVER_ERROR out of memory reading request");//发送错误码,这样会设置连接状态为: conn_write, 写完后,设置为下面的状态 c->write_and_go = conn_closing;//关闭连接,在上层的时候下一次while会干掉的 return READ_MEMORY_ERROR; } c->rcurr = c->rbuf = new_rbuf;//rcurr因为上面的保证,总是指向最开头 c->rsize *= 2; } int avail = c->rsize - c->rbytes;//剩余的空间 res = read(c->sfd, c->rbuf + c->rbytes, avail); if (res > 0) { pthread_mutex_lock(&c->thread->stats.mutex);//靠,大量的锁啊 c->thread->stats.bytes_read += res; pthread_mutex_unlock(&c->thread->stats.mutex); gotdata = READ_DATA_RECEIVED; c->rbytes += res; if (res == avail) {//满了,while一下去realloc continue; } else {//没给我那么多,应该是没有了 break; } } if (res == 0) {//返回0为连接断开了 return READ_ERROR; } if (res == -1) {//出错了,可能是没东西了 if (errno == EAGAIN || errno == EWOULDBLOCK) { break; } return READ_ERROR; } } return gotdata; }
二、解析指令
c->state ==conn_parse_cmd 就进入了指令解析截断。在读取数据后,会进入这个阶段。 如果城管解析了指令,则处理之,否则会将连接继续设置为conn_waiting状态,从而继续读取数据。
memcached的协议非常简单,这里抛去二进制协议不说,文本协议就是回车换行分开的。函数是try_read_command,如果解析到了一条指令后,就会调用process_command去进行处理。这块代码不啰嗦了,比较简单。
三、处理指令
解析到指令后,就需要进行指令的分发处理。memcached采用的是一个大if-else来进行指令分发的,具体在process_command里面。对于get指令,其实际上分发到的函数为process_get_command(),来看看其代码。
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) { char *key; size_t nkey; int i = 0; item *it; token_t *key_token = &tokens[KEY_TOKEN]; char *suffix; assert(c != NULL); do { while(key_token->length != 0) { key = key_token->value; nkey = key_token->length; if(nkey > KEY_MAX_LENGTH) {//key最长250个字符 out_string(c, "CLIENT_ERROR bad command line format"); while (i-- > 0) { item_remove(*(c->ilist + i)); } return; } it = item_get(key, nkey);//获取值,里面自动增加引用计数了的。refcount_incr(&it->refcount); if (settings.detail_enabled) { stats_prefix_record_get(key, nkey, NULL != it); } if (it) { if (i >= c->isize) { item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2); if (new_list) { c->isize *= 2; c->ilist = new_list; } else { STATS_LOCK(); stats.malloc_fails++; STATS_UNLOCK(); item_remove(it); break; } }
上面得到一个key后,调用item_get(key, nkey); 去获取需要的缓存数据,如果命中了,那么检查一下c->ilist数组是否足够,不够就realloc。item_get函数就查找哈希表等操作,重要的还有它会调用assoc_find来查找key是否命中缓存,然后会对这个缓存项增加引用计数,以备上层能够服用这个key的value所占用的内存。支持需要在处理完指令后,减少引用计数。
拿到缓存项后,下面就是拼接客户端需要的返回数据:
/* 拼接如下结构给客户端 VALUE aaaa 0 5 12345 END */ if (add_iov(c, "VALUE ", 6) != 0 || //"VALUE " add_iov(c, ITEM_key(it), it->nkey) != 0 || //"aaaa" add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) // " 0 5\r\n12345\r\n" {//第一行: VALUE aaaa 0 5 item_remove(it); break; } /* item_get() has incremented it->refcount for us */ pthread_mutex_lock(&c->thread->stats.mutex); c->thread->stats.slab_stats[it->slabs_clsid].get_hits++; c->thread->stats.get_cmds++; pthread_mutex_unlock(&c->thread->stats.mutex); item_update(it);//更新访问时间等 *(c->ilist + i) = it;//it的数据被指向了,不要轻易删除, 在上面已经增加了引用计数丶,因此将其指针挂载到ilist。发送完数据后就减少引用 i++;//item_get 自动增加引用计数了的
add_iov函数的功能在上一篇文章里面(memcached里面一段神奇,危险,暂且无bug的code: add_iov)已经介绍过,这里不多说了。值得注意的是,add_iov会将返回给客户端的数据放在c->msglist数组中,一块一块的采用struct iovec的分块写方式,并且真正的数据部分还是指向原来的数据位置,没用进行数据的拷贝工作。这样节省了数据的拷贝工作,增加了其性能。只是在发送完数据后,需要降低其引用技术。
拼接好后,需要记录当前的msglist里面,涉及到的key链表,以备在发送完数据后,知道要降低哪些key的引用技术。
拼接完成后,设置状态: conn_set_state(c, conn_mwrite);//数据已经准备好了,该写数据给对方了,后面进入写数据过程。 这样就进入了发送数据的状态。
四、发送结果给客户端
前面可知,拼接返回数据后,将连接状态设置为conn_mwrite,这样就在drive_machine里面的分发器里面进入到case conn_mwrite。
case conn_mwrite://写数据给客户端。一次写多份 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) { if (settings.verbose > 0) fprintf(stderr, "Failed to build UDP headers\n"); conn_set_state(c, conn_closing); break; } switch (transmit(c)) {///发送数据 case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) {//高级送方式 conn_release_items(c);//将一个连接所涉及到的item给解除引用,因为数据已经发送完成了 /* XXX: I don't know why this wasn't the general case */ if(c->protocol == binary_prot) { conn_set_state(c, c->write_and_go); } else { conn_set_state(c, conn_new_cmd);//继续读取数据 } } else if (c->state == conn_write) { if (c->write_and_free) { free(c->write_and_free); c->write_and_free = 0; } conn_set_state(c, c->write_and_go);//设置为写数据后要设置的那个状态,这个一般在设置IFA送数据的时候会设置write_and_go } else { if (settings.verbose > 0) fprintf(stderr, "Unexpected state %d\n", c->state); conn_set_state(c, conn_closing); } break; case TRANSMIT_INCOMPLETE: case TRANSMIT_HARD_ERROR: break; /* Continue in state machine. */ case TRANSMIT_SOFT_ERROR: stop = true; break; } break;
transmit()函数负责数据发送,根据其返回结果不同,做响应的处理,如果返回TRANSMIT_COMPLETE,代表发送完成了。那么久根据当前的c->state做处理,如果是刚才的get 指令,那么c->state == conn_mwrite, 于是需要调用conn_release_items函数去减少所有这次涉及到的item的引用计数。
transmit函数本身就是对&c->msglist[c->msgcurr];数组的iov 结构进行发送,不够就先返回,否则继续。具体不多赘述了。
至此get指令就处理完毕了。
后续记录一下memcache的过期机制,以及其内存管理方式等;
近期评论