首页 > Memcached > memcached 简单get操作处理代码学习

memcached 简单get操作处理代码学习

2014年10月8日 发表评论 阅读评论 12450次阅读    

好久没写博客了,这里简单记录一下memcached的get操作处理过程,之前写了些memcached的主体流程/线程框架。get命令相对来说是最简单的了, 主要分为这几步: 读取命令; 解析命令;查找key, 拼接返回数据格式;发送结果给客户端;

一、读取命令

memcached的所有事件回调都是drive_machine, 该函数就是一个大switch-case, 对客户端状态c->state的分发, 当客户端连接上来后,其状态就设置为了conn_read ,也就是读取数据状态,因此就会进入如下case:

static void drive_machine(conn *c) {
        case conn_read:
            //异步读取数据,返回是否出错
            res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);

            switch (res) {
            case READ_NO_DATA_RECEIVED:
                conn_set_state(c, conn_waiting);//
                break;
            case READ_DATA_RECEIVED:
                conn_set_state(c, conn_parse_cmd);//下一步要去解析这条命令了
                break;
            case READ_ERROR:
                conn_set_state(c, conn_closing);
                break;
            case READ_MEMORY_ERROR: /* Failed to allocate more memory */
                /* State already set by try_read_network */
                break;
            }
            //注意这里没有设置stop=true, 所以如果数据还没有读完,还会循环一次到上面的conn_waiting
            break;
}

这里指讨论TCP协议,根据try_read_network的返回结果不同,可能有以下几种状态:

  1. READ_NO_DATA_RECEIVED 没收到数据,没有读完一条指令,就设置为conn_waiting,接下来还是会马上进入conn_read继续读取;
  2. READ_DATA_RECEIVED 成功读取了一些数据, 下一步进入conn_parse_cmd去尝试解析指令,注意此时不一定保证有一条完整命令;
  3. READ_ERROR 出错,关闭连接,进入conn_closing ;

下面看看try_read_network函数,标准的异步数据读取方式,读取的数据放在c->rbuf上面。期间可能会有自动扩容等操作。代码比较简单:

static enum try_read_result try_read_network(conn *c) {
    //老办法, 异步读取数据,直到读满或者没东西了
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;
    assert(c != NULL);

    if (c->rcurr != c->rbuf) {//为了处理方便,将后面的未处理的数据拷贝到前面
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

    while (1) {
        if (c->rbytes >= c->rsize) {//没空间了,realloc一个
            if (num_allocs == 4) {
                return gotdata;
            }
            ++num_allocs;//每次翻倍接收缓冲区
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
            if (!new_rbuf) {
                STATS_LOCK();
                stats.malloc_fails++;
                STATS_UNLOCK();
                if (settings.verbose > 0) {
                    fprintf(stderr, "Couldn't realloc input buffer\n");
                }
                c->rbytes = 0; /* ignore what we read */
                out_of_memory(c, "SERVER_ERROR out of memory reading request");//发送错误码,这样会设置连接状态为: conn_write, 写完后,设置为下面的状态
                c->write_and_go = conn_closing;//关闭连接,在上层的时候下一次while会干掉的
                return READ_MEMORY_ERROR;
            }
            c->rcurr = c->rbuf = new_rbuf;//rcurr因为上面的保证,总是指向最开头
            c->rsize *= 2;
        }

        int avail = c->rsize - c->rbytes;//剩余的空间
        res = read(c->sfd, c->rbuf + c->rbytes, avail);
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);//靠,大量的锁啊
            c->thread->stats.bytes_read += res;
            pthread_mutex_unlock(&c->thread->stats.mutex);
            gotdata = READ_DATA_RECEIVED;
            c->rbytes += res;
            if (res == avail) {//满了,while一下去realloc
                continue;
            } else {//没给我那么多,应该是没有了
                break;
            }
        }
        if (res == 0) {//返回0为连接断开了
            return READ_ERROR;
        }
        if (res == -1) {//出错了,可能是没东西了
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }
            return READ_ERROR;
        }
    }
    return gotdata;
}

二、解析指令

c->state ==conn_parse_cmd 就进入了指令解析截断。在读取数据后,会进入这个阶段。 如果城管解析了指令,则处理之,否则会将连接继续设置为conn_waiting状态,从而继续读取数据。

memcached的协议非常简单,这里抛去二进制协议不说,文本协议就是回车换行分开的。函数是try_read_command,如果解析到了一条指令后,就会调用process_command去进行处理。这块代码不啰嗦了,比较简单。

三、处理指令

解析到指令后,就需要进行指令的分发处理。memcached采用的是一个大if-else来进行指令分发的,具体在process_command里面。对于get指令,其实际上分发到的函数为process_get_command(),来看看其代码。

static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
    char *key;
    size_t nkey;
    int i = 0;
    item *it;
    token_t *key_token = &tokens[KEY_TOKEN];
    char *suffix;
    assert(c != NULL);

    do {
        while(key_token->length != 0) {

            key = key_token->value;
            nkey = key_token->length;

            if(nkey > KEY_MAX_LENGTH) {//key最长250个字符
                out_string(c, "CLIENT_ERROR bad command line format");
                while (i-- > 0) {
                    item_remove(*(c->ilist + i));
                }
                return;
            }

            it = item_get(key, nkey);//获取值,里面自动增加引用计数了的。refcount_incr(&it->refcount);
            if (settings.detail_enabled) {
                stats_prefix_record_get(key, nkey, NULL != it);
            }
            if (it) {
                if (i >= c->isize) {
                    item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
                    if (new_list) {
                        c->isize *= 2;
                        c->ilist = new_list;
                    } else {
                        STATS_LOCK();
                        stats.malloc_fails++;
                        STATS_UNLOCK();
                        item_remove(it);
                        break;
                    }
                }

上面得到一个key后,调用item_get(key, nkey); 去获取需要的缓存数据,如果命中了,那么检查一下c->ilist数组是否足够,不够就realloc。item_get函数就查找哈希表等操作,重要的还有它会调用assoc_find来查找key是否命中缓存,然后会对这个缓存项增加引用计数,以备上层能够服用这个key的value所占用的内存。支持需要在处理完指令后,减少引用计数。

拿到缓存项后,下面就是拼接客户端需要的返回数据:

                  /* 拼接如下结构给客户端
                   VALUE aaaa 0 5
                   12345
                   END
                   */
                  if (add_iov(c, "VALUE ", 6) != 0 || //"VALUE "
                      add_iov(c, ITEM_key(it), it->nkey) != 0 || //"aaaa"
                      add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0) // " 0 5\r\n12345\r\n"
                      {//第一行: VALUE aaaa 0 5
                          item_remove(it);
                          break;
                      }

          /* item_get() has incremented it->refcount for us */
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
                c->thread->stats.get_cmds++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                item_update(it);//更新访问时间等
                *(c->ilist + i) = it;//it的数据被指向了,不要轻易删除, 在上面已经增加了引用计数丶,因此将其指针挂载到ilist。发送完数据后就减少引用
                i++;//item_get 自动增加引用计数了的

add_iov函数的功能在上一篇文章里面(memcached里面一段神奇,危险,暂且无bug的code: add_iov)已经介绍过,这里不多说了。值得注意的是,add_iov会将返回给客户端的数据放在c->msglist数组中,一块一块的采用struct iovec的分块写方式,并且真正的数据部分还是指向原来的数据位置,没用进行数据的拷贝工作。这样节省了数据的拷贝工作,增加了其性能。只是在发送完数据后,需要降低其引用技术。
拼接好后,需要记录当前的msglist里面,涉及到的key链表,以备在发送完数据后,知道要降低哪些key的引用技术。

拼接完成后,设置状态:        conn_set_state(c, conn_mwrite);//数据已经准备好了,该写数据给对方了,后面进入写数据过程。 这样就进入了发送数据的状态。

四、发送结果给客户端

前面可知,拼接返回数据后,将连接状态设置为conn_mwrite,这样就在drive_machine里面的分发器里面进入到case conn_mwrite。


        case conn_mwrite://写数据给客户端。一次写多份
          if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
            if (settings.verbose > 0)
              fprintf(stderr, "Failed to build UDP headers\n");
            conn_set_state(c, conn_closing);
            break;
          }
            switch (transmit(c)) {///发送数据
            case TRANSMIT_COMPLETE:
                if (c->state == conn_mwrite) {//高级送方式
                    conn_release_items(c);//将一个连接所涉及到的item给解除引用,因为数据已经发送完成了
                    /* XXX:  I don't know why this wasn't the general case */
                    if(c->protocol == binary_prot) {
                        conn_set_state(c, c->write_and_go);
                    } else {
                        conn_set_state(c, conn_new_cmd);//继续读取数据
                    }
                } else if (c->state == conn_write) {
                    if (c->write_and_free) {
                        free(c->write_and_free);
                        c->write_and_free = 0;
                    }
                    conn_set_state(c, c->write_and_go);//设置为写数据后要设置的那个状态,这个一般在设置IFA送数据的时候会设置write_and_go
                } else {
                    if (settings.verbose > 0)
                        fprintf(stderr, "Unexpected state %d\n", c->state);
                    conn_set_state(c, conn_closing);
                }
                break;

            case TRANSMIT_INCOMPLETE:
            case TRANSMIT_HARD_ERROR:
                break;                   /* Continue in state machine. */

            case TRANSMIT_SOFT_ERROR:
                stop = true;
                break;
            }
            break;

transmit()函数负责数据发送,根据其返回结果不同,做响应的处理,如果返回TRANSMIT_COMPLETE,代表发送完成了。那么久根据当前的c->state做处理,如果是刚才的get 指令,那么c->state == conn_mwrite, 于是需要调用conn_release_items函数去减少所有这次涉及到的item的引用计数。
transmit函数本身就是对&c->msglist[c->msgcurr];数组的iov 结构进行发送,不够就先返回,否则继续。具体不多赘述了。

至此get指令就处理完毕了。

后续记录一下memcache的过期机制,以及其内存管理方式等;

Share
分类: Memcached 标签:

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。