首页 > Nginx > Nginx memcached 模块原理分析

Nginx memcached 模块原理分析

2013年5月23日 发表评论 阅读评论 8143次阅读    

之前介绍过Nginx的FCGI模块,upstream模块的解析,今晚扫了一下mecached模块的代码,下面记录一下。

总的来说,mecached 模块是继proxy模块外的一个很简单的content模块了,非常简单,就500多行代码,也没有处理一次发送多个KEY给mecached的情况,也没有像FCGI那样做buffering处理,这样mecached返回一点点数据,nginx就发送一点数据给客户端,比较简单。

看过之前的文章后,下面就不啰嗦upstream的那坨东西了,直接上主题代码,假定我们知道upstream提供的机制。

0、ngx_http_memcached_pass配置解析:

这里看一个典型的配置指令:memcached_pass address;

nginx解析到这条指令后会调用ngx_http_memcached_pass,该函数将后面的address里面的端口,url,unix域等解析后,当做一个独立的upstream加入到当前的cmcf->upstreams数组里面。并用 mlcf->upstream.upstream 记着这个upstream。

然后clcf->handler = ngx_http_memcached_handler;注册了内容处理模块的回调,当然这个是通过find_location_config更新配置时设置到content_handler上面去的,也就是ngx_http_update_location_config函数最后几行的工作。我们来看代码:

//解析到syntax:	memcached_pass address;指令的 时候被调用,注册ngx_http_memcached_handler为内容处理模块
//并ngx_http_memcached_key的下标设置到mlcf->index上,这是i缓存的主键在变量表中的下标。
static char * ngx_http_memcached_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_memcached_loc_conf_t *mlcf = conf;

    ngx_str_t                 *value;
    ngx_url_t                  u;
    ngx_http_core_loc_conf_t  *clcf;

    if (mlcf->upstream.upstream) {
        return "is duplicate";
    }

    value = cf->args->elts;
    ngx_memzero(&u, sizeof(ngx_url_t));
    u.url = value[1];
    u.no_resolve = 1;

	//如果u代表的server已经存在,则返回句柄,否则在umcf->upstreams里面新加一个,设置初始化。
	//把这条指令当做一个新的upstream存在,放入cmcf->upstreams数组里面,记录有多少upstreams
    mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
    if (mlcf->upstream.upstream == NULL) {
        return NGX_CONF_ERROR;
    }
    clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);//设置到核心模块的loc_conf里面

	//老办法,设置这个到handler,这样在find_config_phrase上
	//设置句柄,会在ngx_http_update_location_config里面设置为content_handle的,从而在content phase中被调用
    clcf->handler = ngx_http_memcached_handler;

	//如果配置的locaton 后面最后一个字符为/路径结束符,则需要自动重定向。
    if (clcf->name.data[clcf->name.len - 1] == '/') {//指令的最后一个字符是/,那就需要重定向。
        clcf->auto_redirect = 1;
    }
	//定义了一个固定的名字,代表缓存的key: memcached_key。记住其在&cmcf->variables中的下标,这样下次可以获取key ,发送get memcached请求。
    mlcf->index = ngx_http_get_variable_index(cf, &ngx_http_memcached_key);
    if (mlcf->index == NGX_ERROR) {
        return NGX_CONF_ERROR;
    }

    return NGX_CONF_OK;
}

1、ngx_http_memcached_handler内容处理模块

ngx_http_memcached_handler函数作为内容处理模块,他启动了后端这个模块的所有动作,函数首先调用ngx_http_discard_request_body丢掉客户端发送的BODY数据,也就是不支持BODY,不过感觉我想做一个这样的类似模块:支持把表单数据等解析,方便的当做KEY进行处理。那样就很好玩了。

然后就是如果客户端没有发送content_type,那么跟据URI的后缀文件名,以及ngx_http_core_default_types查找一下设置一个。

然后就是调用ngx_http_upstream_create分配一个upstram结构,后面就是初始化create_request等函数了,老规矩,在FCGI模块也见过。

然后就是u->input_filter = ngx_http_memcached_filter;设置一个input_filter,用来在接收到数据后,调用其解析mecached协议的格式,解析出HTML数据来,放到out_bufs链表后面供发送。

最后调用ngx_http_upstream_init开始进行upstream的处理。看代码:

static ngx_int_t
ngx_http_memcached_handler(ngx_http_request_t *r)
{
    ngx_int_t                       rc;
    ngx_http_upstream_t            *u;
    ngx_http_memcached_ctx_t       *ctx;
    ngx_http_memcached_loc_conf_t  *mlcf;

    if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_HEAD))) {//memcached只能处理get/head请求。
        return NGX_HTTP_NOT_ALLOWED;
    }
	//由于这是GET和简单HEAD请求,body没用,就丢了。
	//删除客户端连接读事件,如果可以,读取客户端BODY,然后丢掉。如果读完整个BODY了,lingering_close=0.
    rc = ngx_http_discard_request_body(r);
    if (rc != NGX_OK) {
        return rc;
    }
	//自动根据后缀名,如果ngx_http_core_default_types初始化了后缀,
    if ( ngx_http_set_content_type(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }
//用来申请upstream大结构体,设置到r->upstream = u;
    if (ngx_http_upstream_create(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u = r->upstream;

    ngx_str_set(&u->schema, "memcached://");
    u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module;

    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);

    u->conf = &mlcf->upstream;//这是upstream的配置数据。
//设置各个回调函数。
    u->create_request = ngx_http_memcached_create_request;
    u->reinit_request = ngx_http_memcached_reinit_request;
    u->process_header = ngx_http_memcached_process_header;
    u->abort_request = ngx_http_memcached_abort_request;
    u->finalize_request = ngx_http_memcached_finalize_request;

    ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t));
    if (ctx == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    ctx->rest = NGX_HTTP_MEMCACHED_END;
    ctx->request = r;

    ngx_http_set_ctx(r, ctx, ngx_http_memcached_module);

//设置用来读取memcache的数据的回调函数。
    u->input_filter_init = ngx_http_memcached_filter_init;
    u->input_filter = ngx_http_memcached_filter;
    u->input_filter_ctx = ctx;

    r->main->count++;

//下面跟ngx_http_fastcgi_handler不一样,不需要调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
//因为上面已经ngx_http_discard_request_body了,设置了ngx_http_discarded_request_body_handler为读数据回调了,该回调直接丢弃BODY。
//所以,我们现在可以直接进入init阶段了。
    ngx_http_upstream_init(r);

    return NGX_DONE;
}

2、ngx_http_memcached_create_request创建mecached请求

ngx_http_memcached_create_request函数设置在 u->create_request上面,ngx_http_upstream_init_request的时候会调用这里,调用完成后,会connect后端mecached的。然后就是发送数据。这里就是创建mecached的请求数据的,跟
ngx_http_memcached_filter正好相反。不过nginx只支持单个键值一次获取,不能支持一次获取多个,这个如果有需求其实可以考虑优化。这个不给力。函数很简单,拼接一个命令 get <key>*\r\n 。就行了。至于这个key怎么来,参考set $memcached_key $host$uri;指令的原理,在之前的文章中介绍过nginx脚本解析:Nginx 脚本引擎解析源码注释 . 具体看代码注释:

//命令 get *\r\n 。
//nginx只支持单个键值一次获取,不能支持一次获取多个,这个如果有需求其实可以考虑优化。
static ngx_int_t ngx_http_memcached_create_request(ngx_http_request_t *r)
{//根据缓存的key,组成memcache的get 行,设置到r->upstream->request_bufs链表上面去,就一个块。然后没事了。
//上层是ngx_http_upstream_init_request调用这里,调用完成后,会connect后端mecached的。然后就是发送数据。
    size_t                          len;
    uintptr_t                       escape;
    ngx_buf_t                      *b;
    ngx_chain_t                    *cl;
    ngx_http_memcached_ctx_t       *ctx;
    ngx_http_variable_value_t      *vv;
    ngx_http_memcached_loc_conf_t  *mlcf;

    mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);
//根据代表缓存的key: memcached_key在&cmcf->variables中的下标。从而得到主键的值。
//这个key 从哪里来呢,这里: set $memcached_key $host$uri;
    vv = ngx_http_get_indexed_variable(r, mlcf->index);
    if (vv == NULL || vv->not_found || vv->len == 0) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "the \"$memcached_key\" variable is not set");
        return NGX_ERROR;
    }
//第一个参数为NULL,就不会拷贝,只是返回能不能需要转义,为啥是2倍呢,因为返回的是需要转义的字符数目。转义翻倍
    escape = 2 * ngx_escape_uri(NULL, vv->data, vv->len, NGX_ESCAPE_MEMCACHED);
    len = sizeof("get ") - 1 + vv->len + escape + sizeof(CRLF) - 1;

    b = ngx_create_temp_buf(r->pool, len);
    if (b == NULL) {
        return NGX_ERROR;
    }
	//申请一个链接节点,用来存储这个简单的get指令
    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }
    cl->buf = b;//指向这坨buffer
    cl->next = NULL;

    r->upstream->request_bufs = cl;//代表要发送给后端的数据链表结构,发送的时候会发request_bufs这里的内容
    *b->last++ = 'g'; *b->last++ = 'e'; *b->last++ = 't'; *b->last++ = ' ';

    ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);
    ctx->key.data = b->last;

    if (escape == 0) {//如果不需要做转义,就直接拷贝。
        b->last = ngx_copy(b->last, vv->data, vv->len);

    } else {//否则在"get "后面追加转义后的key.这样就组成了: get mykey
        b->last = (u_char *) ngx_escape_uri(b->last, vv->data, vv->len, NGX_ESCAPE_MEMCACHED);
    }
	//得到key 的长度,设置一下。
    ctx->key.len = b->last - ctx->key.data;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http memcached request: \"%V\"", &ctx->key);
    *b->last++ = CR; *b->last++ = LF;//\r\n
    return NGX_OK;//返回,上层会connect mecached的。
}

3、ngx_http_memcached_process_header mecached返回头部行解析

ngx_http_memcached_process_header函数负责解析nginx返回的这一行:

VALUE <key> <flags> <bytes>\r\n

mecached的指令是这样的:

取回命令

一行取回命令如下:
get <key>*\r\n
- <key>* 表示一个或多个键值,由空格隔开的字串
这行命令以后,客户端的等待0个或多个项目,每项都会收到一行文本,然后跟着数据区块。所有项目传送完毕后,服务器发送以下字串:
"END\r\n"
来指示回应完毕。
服务器用以下形式发送每项内容:
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
- <key> 是所发送的键名
- <flags> 是存储命令所设置的记号
- <bytes> 是随后数据块的长度,*不包括* 它的界定符“\r\n”
- <data block> 是发送的数据

如果在取回请求中发送了一些键名,而服务器没有送回项目列表,这意味着服务器没这些键名(可能因为它们从未被存储,
或者为给其他内容腾出空间而被删除,或者到期,或者被已客户端删除)。

这个函数的调用时机是: ngx_http_upstream_process_header函数调用ngx_unix_recv读取mecached的数据,然后会调用u->process_header,也就是这个函数,来解析mecached的格式的返回数据。函数处理了mecached返回的第一行: VALUE <key> <flags> <bytes>\r\n,设置content_length_n,status。不过HTML还没有解析或者读取。

static ngx_int_t ngx_http_memcached_process_header(ngx_http_request_t *r)
{//这个函数的调用时机是: ngx_http_upstream_process_header函数调用ngx_unix_recv读取mecached的数据,
//然后会调用u->process_header,也就是这个函数,来解析mecached的格式的返回数据。
//函数处理了mecached返回的第一行: VALUE   \r\n,设置content_length_n,status。不过HTML还没有解析或者读取。
    u_char                    *p, *len;
    ngx_str_t                  line;
    ngx_http_upstream_t       *u;
    ngx_http_memcached_ctx_t  *ctx;

    u = r->upstream;
	//下面是不是有点偷懒,如果没有得到LF换行,那么每次都会循环这个buffer,多不好呀。
    for (p = u->buffer.pos; p < u->buffer.last; p++) {
        if (*p == LF) {//碰到了\n
            goto found;
        }
    }
    return NGX_AGAIN;
found:
//得到
    *p = '\0';//从这个回车截断。
    line.len = p - u->buffer.pos - 1;
    line.data = u->buffer.pos;
    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "memcached: \"%V\"", &line);

    p = u->buffer.pos;
    ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);
    if (ngx_strncmp(p, "VALUE ", sizeof("VALUE ") - 1) == 0) {
        p += sizeof("VALUE ") - 1;
        if (ngx_strncmp(p, ctx->key.data, ctx->key.len) != 0) {//key 跟我这个请求发送的不一样,悲剧了。
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid key in response \"%V\" for key \"%V\"", &line, &ctx->key);
            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }
		//返回行是这样的: VALUE   \r\n
        p += ctx->key.len;//跳过key.去看flag
        if (*p++ != ' ') {
            goto no_valid;
        }
        /* skip flags */
        while (*p) {//nginx不处理flags。没用
            if (*p++ == ' ') {
                goto length;
            }
        }
        goto no_valid;
    length:
        len = p;
        while (*p && *p++ != CR) { /* void */ }//一直扫描到\r的地方。如果mecache不发\r,就傻眼了,因为这里没有处理是不是有 \r
        //从n到p的地方就是后面的结果长度了:
        r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
        if (r->headers_out.content_length_n == -1) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid length in response \"%V\" for key \"%V\"", &line, &ctx->key);
            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        u->headers_in.status_n = 200;
        u->state->status = 200;
        u->buffer.pos = p + 1;//从这后面的就是buf啦。不过buf还没有读取到。这个buf就是读取的后端mecached的数据。
        return NGX_OK;
    }
    if (ngx_strcmp(p, "END\x0d") == 0) {//结束了,那说明这一行没东西,因为nginx一次只发送一块数据。所以这里很简单。
    //这里虽然是404但是,其实我们可以做的有意思的,比如针对404做redirect到实际的后端机器。这样就是缓存失效,自动回源了。
        ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, "key: \"%V\" was not found by memcached", &ctx->key);
        u->headers_in.status_n = 404;
        u->state->status = 404;
        return NGX_OK;
    }
no_valid:
    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid response: \"%V\"", &line);
    return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}

4、ngx_http_memcached_filter mecached返回BODY部分数据解析

参照上面的取回指令的格式,这个函数用来接收mecache的body数据,conf->upstream.buffering = 0,所以memcached模块不支持buffering. 这个函数的调用时机: ngx_http_upstream_process_non_buffered_upstream等调用ngx_unix_recv接收到upstream返回的数据后 就调用这里进行协议转换,不过目前转换不多。 注意这个函数调用的时候,u->buffer->last和pos并没有更新的,也就是什么呢,刚刚读取的bytes个字节的数据,位于u->buffer->last之后。pos目前不准。

static ngx_int_t
ngx_http_memcached_filter(void *data, ssize_t bytes)
{//这个函数用来接收mecache的body数据,conf->upstream.buffering = 0,所以memcached模块不支持buffering.
//这个函数的调用时机: ngx_http_upstream_process_non_buffered_upstream等调用ngx_unix_recv接收到upstream返回的数据后
//就调用这里进行协议转换,不过目前转换不多。
//注意这个函数调用的时候,u->buffer->last和pos并没有更新的,
//也就是什么呢,刚刚读取的bytes个字节的数据,位于u->buffer->last之后。pos目前不准。

    ngx_http_memcached_ctx_t  *ctx = data;

    u_char               *last;
    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;
    b = &u->buffer;//得到接收的数据。

    if (u->length == ctx->rest) {//rest初始化为NGX_HTTP_MEMCACHED_END。rest表示还需要读取多少"END\r\n"类型的数据。
        if (ngx_strncmp(b->last, ngx_http_memcached_end + NGX_HTTP_MEMCACHED_END - ctx->rest, bytes) != 0) {
            ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0, "memcached sent invalid trailer");
            u->length = 0;
            ctx->rest = 0;
            return NGX_OK;
        }
		//搞到末尾了。所以下面其实应该不用减了,没了的。
        u->length -= bytes;
        ctx->rest -= bytes;
        return NGX_OK;
    }

    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;//找到要输出去的数据链表的最后部分。
    }
//申请一个链接节点。
    cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }
    cl->buf->flush = 1;
    cl->buf->memory = 1;
    *ll = cl;//这个新的节点的数据挂载到out_bufs的最后面。
    last = b->last;//得到这块buf之前的尾部,可能有残留数据。
    cl->buf->pos = last;//等于尾部,因为这个之前的b->last是上一块数据的值。
    b->last += bytes;//调整尾部。
    cl->buf->last = b->last;//初始化要发送给客户端的这块的尾部,可能需要调,比如满了数据了。
    cl->buf->tag = u->output.tag;

    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0, "memcached filter bytes:%z size:%z length:%z rest:%z", bytes, b->last - b->pos, u->length, ctx->rest);
    if (bytes <= (ssize_t) (u->length - NGX_HTTP_MEMCACHED_END)) {//读取的字节数还不是数据区的末尾。将这块数据纳入链表末尾,然后减少还剩下的数据量变量。
        u->length -= bytes;
        return NGX_OK;
    }
//否则,头部都在这里了。
    last += u->length - NGX_HTTP_MEMCACHED_END;//直接移动到后面去。
    if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
        ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                      "memcached sent invalid trailer");
    }

    ctx->rest -= b->last - last;//后面还有这么多的"END\r\n"数据,所以直接rest记录还需要读取多少这样的无用数据。
    b->last = last;//标记这块buf的结尾。
    cl->buf->last = last;//标记这块buf的结尾。
    u->length = ctx->rest;//后面只需要处理尾部的结尾了。
    return NGX_OK;
}

介绍的差不多了,mecached模块很简单:
  1. 没有合并请求查询么cached,一对一的,简单协议;
  2. 也没有处理buffering优化;
  3. 也不支持对表单数据,body数据用来做key。
Share
分类: Nginx 标签: , ,
  1. 几页星辰
    2022年3月26日16:25 | #1

    您好

    在nginx 请求中,可以发个http upstream 发给第三方服务器中,然后得到结果之后,nginx 请求再继续往下走吗?
    目前模仿memcache

  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。