memcached 简单get操作处理代码学习
好久没写博客了,这里简单记录一下memcached的get操作处理过程,之前写了些memcached的主体流程/线程框架。get命令相对来说是最简单的了, 主要分为这几步: 读取命令; 解析命令;查找key, 拼接返回数据格式;发送结果给客户端;
一、读取命令
memcached的所有事件回调都是drive_machine, 该函数就是一个大switch-case, 对客户端状态c->state的分发, 当客户端连接上来后,其状态就设置为了conn_read ,也就是读取数据状态,因此就会进入如下case:
static void drive_machine(conn *c) {
case conn_read:
//异步读取数据,返回是否出错
res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);//
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);//下一步要去解析这条命令了
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
//注意这里没有设置stop=true, 所以如果数据还没有读完,还会循环一次到上面的conn_waiting
break;
}
这里指讨论TCP协议,根据try_read_network的返回结果不同,可能有以下几种状态:
- READ_NO_DATA_RECEIVED 没收到数据,没有读完一条指令,就设置为conn_waiting,接下来还是会马上进入conn_read继续读取;
- READ_DATA_RECEIVED 成功读取了一些数据, 下一步进入conn_parse_cmd去尝试解析指令,注意此时不一定保证有一条完整命令;
- READ_ERROR 出错,关闭连接,进入conn_closing ;
下面看看try_read_network函数,标准的异步数据读取方式,读取的数据放在c->rbuf上面。期间可能会有自动扩容等操作。代码比较简单:
static enum try_read_result try_read_network(conn *c) {
//老办法, 异步读取数据,直到读满或者没东西了
enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
int res;
int num_allocs = 0;
assert(c != NULL);
if (c->rcurr != c->rbuf) {//为了处理方便,将后面的未处理的数据拷贝到前面
if (c->rbytes != 0) /* otherwise there's nothing to copy */
memmove(c->rbuf, c->rcurr, c->rbytes);
c->rcurr = c->rbuf;
}
while (1) {
if (c->rbytes >= c->rsize) {//没空间了,realloc一个
if (num_allocs == 4) {
return gotdata;
}
++num_allocs;//每次翻倍接收缓冲区
char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
if (!new_rbuf) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
if (settings.verbose > 0) {
fprintf(stderr, "Couldn't realloc input buffer\n");
}
c->rbytes = 0; /* ignore what we read */
out_of_memory(c, "SERVER_ERROR out of memory reading request");//发送错误码,这样会设置连接状态为: conn_write, 写完后,设置为下面的状态
c->write_and_go = conn_closing;//关闭连接,在上层的时候下一次while会干掉的
return READ_MEMORY_ERROR;
}
c->rcurr = c->rbuf = new_rbuf;//rcurr因为上面的保证,总是指向最开头
c->rsize *= 2;
}
int avail = c->rsize - c->rbytes;//剩余的空间
res = read(c->sfd, c->rbuf + c->rbytes, avail);
if (res > 0) {
pthread_mutex_lock(&c->thread->stats.mutex);//靠,大量的锁啊
c->thread->stats.bytes_read += res;
pthread_mutex_unlock(&c->thread->stats.mutex);
gotdata = READ_DATA_RECEIVED;
c->rbytes += res;
if (res == avail) {//满了,while一下去realloc
continue;
} else {//没给我那么多,应该是没有了
break;
}
}
if (res == 0) {//返回0为连接断开了
return READ_ERROR;
}
if (res == -1) {//出错了,可能是没东西了
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break;
}
return READ_ERROR;
}
}
return gotdata;
}
二、解析指令
c->state ==conn_parse_cmd 就进入了指令解析截断。在读取数据后,会进入这个阶段。 如果城管解析了指令,则处理之,否则会将连接继续设置为conn_waiting状态,从而继续读取数据。
memcached的协议非常简单,这里抛去二进制协议不说,文本协议就是回车换行分开的。函数是try_read_command,如果解析到了一条指令后,就会调用process_command去进行处理。这块代码不啰嗦了,比较简单。
三、处理指令
解析到指令后,就需要进行指令的分发处理。memcached采用的是一个大if-else来进行指令分发的,具体在process_command里面。对于get指令,其实际上分发到的函数为process_get_command(),来看看其代码。
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
char *key;
size_t nkey;
int i = 0;
item *it;
token_t *key_token = &tokens[KEY_TOKEN];
char *suffix;
assert(c != NULL);
do {
while(key_token->length != 0) {
key = key_token->value;
nkey = key_token->length;
if(nkey > KEY_MAX_LENGTH) {//key最长250个字符
out_string(c, "CLIENT_ERROR bad command line format");
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}
it = item_get(key, nkey);//获取值,里面自动增加引用计数了的。refcount_incr(&it->refcount);
if (settings.detail_enabled) {
stats_prefix_record_get(key, nkey, NULL != it);
}
if (it) {
if (i >= c->isize) {
item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
if (new_list) {
c->isize *= 2;
c->ilist = new_list;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
item_remove(it);
break;
}
}
上面得到一个key后,调用item_get(key, nkey); 去获取需要的缓存数据,如果命中了,那么检查一下c->ilist数组是否足够,不够就realloc。item_get函数就查找哈希表等操作,重要的还有它会调用assoc_find来查找key是否命中缓存,然后会对这个缓存项增加引用计数,以备上层能够服用这个key的value所占用的内存。支持需要在处理完指令后,减少引用计数。
拿到缓存项后,下面就是拼接客户端需要的返回数据:
/* 拼接如下结构给客户端
VALUE aaaa 0 5
12345
END
*/
if (add_iov(c, "VALUE ", 6) != 0 || //"VALUE "
add_iov(c, ITEM_key(it), it->nkey) != 0 || //"aaaa"
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) // " 0 5\r\n12345\r\n"
{//第一行: VALUE aaaa 0 5
item_remove(it);
break;
}
/* item_get() has incremented it->refcount for us */
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
c->thread->stats.get_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_update(it);//更新访问时间等
*(c->ilist + i) = it;//it的数据被指向了,不要轻易删除, 在上面已经增加了引用计数丶,因此将其指针挂载到ilist。发送完数据后就减少引用
i++;//item_get 自动增加引用计数了的
add_iov函数的功能在上一篇文章里面(memcached里面一段神奇,危险,暂且无bug的code: add_iov)已经介绍过,这里不多说了。值得注意的是,add_iov会将返回给客户端的数据放在c->msglist数组中,一块一块的采用struct iovec的分块写方式,并且真正的数据部分还是指向原来的数据位置,没用进行数据的拷贝工作。这样节省了数据的拷贝工作,增加了其性能。只是在发送完数据后,需要降低其引用技术。
拼接好后,需要记录当前的msglist里面,涉及到的key链表,以备在发送完数据后,知道要降低哪些key的引用技术。
拼接完成后,设置状态: conn_set_state(c, conn_mwrite);//数据已经准备好了,该写数据给对方了,后面进入写数据过程。 这样就进入了发送数据的状态。
四、发送结果给客户端
前面可知,拼接返回数据后,将连接状态设置为conn_mwrite,这样就在drive_machine里面的分发器里面进入到case conn_mwrite。
case conn_mwrite://写数据给客户端。一次写多份
if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
if (settings.verbose > 0)
fprintf(stderr, "Failed to build UDP headers\n");
conn_set_state(c, conn_closing);
break;
}
switch (transmit(c)) {///发送数据
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {//高级送方式
conn_release_items(c);//将一个连接所涉及到的item给解除引用,因为数据已经发送完成了
/* XXX: I don't know why this wasn't the general case */
if(c->protocol == binary_prot) {
conn_set_state(c, c->write_and_go);
} else {
conn_set_state(c, conn_new_cmd);//继续读取数据
}
} else if (c->state == conn_write) {
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
conn_set_state(c, c->write_and_go);//设置为写数据后要设置的那个状态,这个一般在设置IFA送数据的时候会设置write_and_go
} else {
if (settings.verbose > 0)
fprintf(stderr, "Unexpected state %d\n", c->state);
conn_set_state(c, conn_closing);
}
break;
case TRANSMIT_INCOMPLETE:
case TRANSMIT_HARD_ERROR:
break; /* Continue in state machine. */
case TRANSMIT_SOFT_ERROR:
stop = true;
break;
}
break;
transmit()函数负责数据发送,根据其返回结果不同,做响应的处理,如果返回TRANSMIT_COMPLETE,代表发送完成了。那么久根据当前的c->state做处理,如果是刚才的get 指令,那么c->state == conn_mwrite, 于是需要调用conn_release_items函数去减少所有这次涉及到的item的引用计数。
transmit函数本身就是对&c->msglist[c->msgcurr];数组的iov 结构进行发送,不够就先返回,否则继续。具体不多赘述了。
至此get指令就处理完毕了。
后续记录一下memcache的过期机制,以及其内存管理方式等;

近期评论