Nginx memcached 模块原理分析
之前介绍过Nginx的FCGI模块,upstream模块的解析,今晚扫了一下mecached模块的代码,下面记录一下。
总的来说,mecached 模块是继proxy模块外的一个很简单的content模块了,非常简单,就500多行代码,也没有处理一次发送多个KEY给mecached的情况,也没有像FCGI那样做buffering处理,这样mecached返回一点点数据,nginx就发送一点数据给客户端,比较简单。
看过之前的文章后,下面就不啰嗦upstream的那坨东西了,直接上主题代码,假定我们知道upstream提供的机制。
0、ngx_http_memcached_pass配置解析:
这里看一个典型的配置指令:memcached_pass address;
nginx解析到这条指令后会调用ngx_http_memcached_pass,该函数将后面的address里面的端口,url,unix域等解析后,当做一个独立的upstream加入到当前的cmcf->upstreams数组里面。并用 mlcf->upstream.upstream 记着这个upstream。
然后clcf->handler = ngx_http_memcached_handler;注册了内容处理模块的回调,当然这个是通过find_location_config更新配置时设置到content_handler上面去的,也就是ngx_http_update_location_config函数最后几行的工作。我们来看代码:
//解析到syntax: memcached_pass address;指令的 时候被调用,注册ngx_http_memcached_handler为内容处理模块
//并ngx_http_memcached_key的下标设置到mlcf->index上,这是i缓存的主键在变量表中的下标。
static char * ngx_http_memcached_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_http_memcached_loc_conf_t *mlcf = conf;
ngx_str_t *value;
ngx_url_t u;
ngx_http_core_loc_conf_t *clcf;
if (mlcf->upstream.upstream) {
return "is duplicate";
}
value = cf->args->elts;
ngx_memzero(&u, sizeof(ngx_url_t));
u.url = value[1];
u.no_resolve = 1;
//如果u代表的server已经存在,则返回句柄,否则在umcf->upstreams里面新加一个,设置初始化。
//把这条指令当做一个新的upstream存在,放入cmcf->upstreams数组里面,记录有多少upstreams
mlcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0);
if (mlcf->upstream.upstream == NULL) {
return NGX_CONF_ERROR;
}
clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);//设置到核心模块的loc_conf里面
//老办法,设置这个到handler,这样在find_config_phrase上
//设置句柄,会在ngx_http_update_location_config里面设置为content_handle的,从而在content phase中被调用
clcf->handler = ngx_http_memcached_handler;
//如果配置的locaton 后面最后一个字符为/路径结束符,则需要自动重定向。
if (clcf->name.data[clcf->name.len - 1] == '/') {//指令的最后一个字符是/,那就需要重定向。
clcf->auto_redirect = 1;
}
//定义了一个固定的名字,代表缓存的key: memcached_key。记住其在&cmcf->variables中的下标,这样下次可以获取key ,发送get memcached请求。
mlcf->index = ngx_http_get_variable_index(cf, &ngx_http_memcached_key);
if (mlcf->index == NGX_ERROR) {
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
1、ngx_http_memcached_handler内容处理模块:
ngx_http_memcached_handler函数作为内容处理模块,他启动了后端这个模块的所有动作,函数首先调用ngx_http_discard_request_body丢掉客户端发送的BODY数据,也就是不支持BODY,不过感觉我想做一个这样的类似模块:支持把表单数据等解析,方便的当做KEY进行处理。那样就很好玩了。
然后就是如果客户端没有发送content_type,那么跟据URI的后缀文件名,以及ngx_http_core_default_types查找一下设置一个。
然后就是调用ngx_http_upstream_create分配一个upstram结构,后面就是初始化create_request等函数了,老规矩,在FCGI模块也见过。
然后就是u->input_filter = ngx_http_memcached_filter;设置一个input_filter,用来在接收到数据后,调用其解析mecached协议的格式,解析出HTML数据来,放到out_bufs链表后面供发送。
最后调用ngx_http_upstream_init开始进行upstream的处理。看代码:
static ngx_int_t
ngx_http_memcached_handler(ngx_http_request_t *r)
{
ngx_int_t rc;
ngx_http_upstream_t *u;
ngx_http_memcached_ctx_t *ctx;
ngx_http_memcached_loc_conf_t *mlcf;
if (!(r->method & (NGX_HTTP_GET|NGX_HTTP_HEAD))) {//memcached只能处理get/head请求。
return NGX_HTTP_NOT_ALLOWED;
}
//由于这是GET和简单HEAD请求,body没用,就丢了。
//删除客户端连接读事件,如果可以,读取客户端BODY,然后丢掉。如果读完整个BODY了,lingering_close=0.
rc = ngx_http_discard_request_body(r);
if (rc != NGX_OK) {
return rc;
}
//自动根据后缀名,如果ngx_http_core_default_types初始化了后缀,
if ( ngx_http_set_content_type(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
//用来申请upstream大结构体,设置到r->upstream = u;
if (ngx_http_upstream_create(r) != NGX_OK) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
u = r->upstream;
ngx_str_set(&u->schema, "memcached://");
u->output.tag = (ngx_buf_tag_t) &ngx_http_memcached_module;
mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);
u->conf = &mlcf->upstream;//这是upstream的配置数据。
//设置各个回调函数。
u->create_request = ngx_http_memcached_create_request;
u->reinit_request = ngx_http_memcached_reinit_request;
u->process_header = ngx_http_memcached_process_header;
u->abort_request = ngx_http_memcached_abort_request;
u->finalize_request = ngx_http_memcached_finalize_request;
ctx = ngx_palloc(r->pool, sizeof(ngx_http_memcached_ctx_t));
if (ctx == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
ctx->rest = NGX_HTTP_MEMCACHED_END;
ctx->request = r;
ngx_http_set_ctx(r, ctx, ngx_http_memcached_module);
//设置用来读取memcache的数据的回调函数。
u->input_filter_init = ngx_http_memcached_filter_init;
u->input_filter = ngx_http_memcached_filter;
u->input_filter_ctx = ctx;
r->main->count++;
//下面跟ngx_http_fastcgi_handler不一样,不需要调用ngx_http_read_client_request_body(r, ngx_http_upstream_init);
//因为上面已经ngx_http_discard_request_body了,设置了ngx_http_discarded_request_body_handler为读数据回调了,该回调直接丢弃BODY。
//所以,我们现在可以直接进入init阶段了。
ngx_http_upstream_init(r);
return NGX_DONE;
}
2、ngx_http_memcached_create_request创建mecached请求:
ngx_http_memcached_create_request函数设置在 u->create_request上面,ngx_http_upstream_init_request的时候会调用这里,调用完成后,会connect后端mecached的。然后就是发送数据。这里就是创建mecached的请求数据的,跟
ngx_http_memcached_filter正好相反。不过nginx只支持单个键值一次获取,不能支持一次获取多个,这个如果有需求其实可以考虑优化。这个不给力。函数很简单,拼接一个命令 get <key>*\r\n 。就行了。至于这个key怎么来,参考set $memcached_key $host$uri;指令的原理,在之前的文章中介绍过nginx脚本解析:Nginx 脚本引擎解析源码注释 . 具体看代码注释:
//命令 get *\r\n 。
//nginx只支持单个键值一次获取,不能支持一次获取多个,这个如果有需求其实可以考虑优化。
static ngx_int_t ngx_http_memcached_create_request(ngx_http_request_t *r)
{//根据缓存的key,组成memcache的get 行,设置到r->upstream->request_bufs链表上面去,就一个块。然后没事了。
//上层是ngx_http_upstream_init_request调用这里,调用完成后,会connect后端mecached的。然后就是发送数据。
size_t len;
uintptr_t escape;
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_memcached_ctx_t *ctx;
ngx_http_variable_value_t *vv;
ngx_http_memcached_loc_conf_t *mlcf;
mlcf = ngx_http_get_module_loc_conf(r, ngx_http_memcached_module);
//根据代表缓存的key: memcached_key在&cmcf->variables中的下标。从而得到主键的值。
//这个key 从哪里来呢,这里: set $memcached_key $host$uri;
vv = ngx_http_get_indexed_variable(r, mlcf->index);
if (vv == NULL || vv->not_found || vv->len == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "the \"$memcached_key\" variable is not set");
return NGX_ERROR;
}
//第一个参数为NULL,就不会拷贝,只是返回能不能需要转义,为啥是2倍呢,因为返回的是需要转义的字符数目。转义翻倍
escape = 2 * ngx_escape_uri(NULL, vv->data, vv->len, NGX_ESCAPE_MEMCACHED);
len = sizeof("get ") - 1 + vv->len + escape + sizeof(CRLF) - 1;
b = ngx_create_temp_buf(r->pool, len);
if (b == NULL) {
return NGX_ERROR;
}
//申请一个链接节点,用来存储这个简单的get指令
cl = ngx_alloc_chain_link(r->pool);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf = b;//指向这坨buffer
cl->next = NULL;
r->upstream->request_bufs = cl;//代表要发送给后端的数据链表结构,发送的时候会发request_bufs这里的内容
*b->last++ = 'g'; *b->last++ = 'e'; *b->last++ = 't'; *b->last++ = ' ';
ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);
ctx->key.data = b->last;
if (escape == 0) {//如果不需要做转义,就直接拷贝。
b->last = ngx_copy(b->last, vv->data, vv->len);
} else {//否则在"get "后面追加转义后的key.这样就组成了: get mykey
b->last = (u_char *) ngx_escape_uri(b->last, vv->data, vv->len, NGX_ESCAPE_MEMCACHED);
}
//得到key 的长度,设置一下。
ctx->key.len = b->last - ctx->key.data;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http memcached request: \"%V\"", &ctx->key);
*b->last++ = CR; *b->last++ = LF;//\r\n
return NGX_OK;//返回,上层会connect mecached的。
}
3、ngx_http_memcached_process_header mecached返回头部行解析:
ngx_http_memcached_process_header函数负责解析nginx返回的这一行:
VALUE <key> <flags> <bytes>\r\n
mecached的指令是这样的:
取回命令
一行取回命令如下:
get <key>*\r\n
- <key>* 表示一个或多个键值,由空格隔开的字串
这行命令以后,客户端的等待0个或多个项目,每项都会收到一行文本,然后跟着数据区块。所有项目传送完毕后,服务器发送以下字串:
"END\r\n"
来指示回应完毕。
服务器用以下形式发送每项内容:
VALUE <key> <flags> <bytes>\r\n
<data block>\r\n
- <key> 是所发送的键名
- <flags> 是存储命令所设置的记号
- <bytes> 是随后数据块的长度,*不包括* 它的界定符“\r\n”
- <data block> 是发送的数据如果在取回请求中发送了一些键名,而服务器没有送回项目列表,这意味着服务器没这些键名(可能因为它们从未被存储,
或者为给其他内容腾出空间而被删除,或者到期,或者被已客户端删除)。
这个函数的调用时机是: ngx_http_upstream_process_header函数调用ngx_unix_recv读取mecached的数据,然后会调用u->process_header,也就是这个函数,来解析mecached的格式的返回数据。函数处理了mecached返回的第一行: VALUE <key> <flags> <bytes>\r\n,设置content_length_n,status。不过HTML还没有解析或者读取。
static ngx_int_t ngx_http_memcached_process_header(ngx_http_request_t *r)
{//这个函数的调用时机是: ngx_http_upstream_process_header函数调用ngx_unix_recv读取mecached的数据,
//然后会调用u->process_header,也就是这个函数,来解析mecached的格式的返回数据。
//函数处理了mecached返回的第一行: VALUE \r\n,设置content_length_n,status。不过HTML还没有解析或者读取。
u_char *p, *len;
ngx_str_t line;
ngx_http_upstream_t *u;
ngx_http_memcached_ctx_t *ctx;
u = r->upstream;
//下面是不是有点偷懒,如果没有得到LF换行,那么每次都会循环这个buffer,多不好呀。
for (p = u->buffer.pos; p < u->buffer.last; p++) {
if (*p == LF) {//碰到了\n
goto found;
}
}
return NGX_AGAIN;
found:
//得到
*p = '\0';//从这个回车截断。
line.len = p - u->buffer.pos - 1;
line.data = u->buffer.pos;
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "memcached: \"%V\"", &line);
p = u->buffer.pos;
ctx = ngx_http_get_module_ctx(r, ngx_http_memcached_module);
if (ngx_strncmp(p, "VALUE ", sizeof("VALUE ") - 1) == 0) {
p += sizeof("VALUE ") - 1;
if (ngx_strncmp(p, ctx->key.data, ctx->key.len) != 0) {//key 跟我这个请求发送的不一样,悲剧了。
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid key in response \"%V\" for key \"%V\"", &line, &ctx->key);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
//返回行是这样的: VALUE \r\n
p += ctx->key.len;//跳过key.去看flag
if (*p++ != ' ') {
goto no_valid;
}
/* skip flags */
while (*p) {//nginx不处理flags。没用
if (*p++ == ' ') {
goto length;
}
}
goto no_valid;
length:
len = p;
while (*p && *p++ != CR) { /* void */ }//一直扫描到\r的地方。如果mecache不发\r,就傻眼了,因为这里没有处理是不是有 \r
//从n到p的地方就是后面的结果长度了:
r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
if (r->headers_out.content_length_n == -1) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid length in response \"%V\" for key \"%V\"", &line, &ctx->key);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
u->headers_in.status_n = 200;
u->state->status = 200;
u->buffer.pos = p + 1;//从这后面的就是buf啦。不过buf还没有读取到。这个buf就是读取的后端mecached的数据。
return NGX_OK;
}
if (ngx_strcmp(p, "END\x0d") == 0) {//结束了,那说明这一行没东西,因为nginx一次只发送一块数据。所以这里很简单。
//这里虽然是404但是,其实我们可以做的有意思的,比如针对404做redirect到实际的后端机器。这样就是缓存失效,自动回源了。
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0, "key: \"%V\" was not found by memcached", &ctx->key);
u->headers_in.status_n = 404;
u->state->status = 404;
return NGX_OK;
}
no_valid:
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "memcached sent invalid response: \"%V\"", &line);
return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}
4、ngx_http_memcached_filter mecached返回BODY部分数据解析:
参照上面的取回指令的格式,这个函数用来接收mecache的body数据,conf->upstream.buffering = 0,所以memcached模块不支持buffering. 这个函数的调用时机: ngx_http_upstream_process_non_buffered_upstream等调用ngx_unix_recv接收到upstream返回的数据后 就调用这里进行协议转换,不过目前转换不多。 注意这个函数调用的时候,u->buffer->last和pos并没有更新的,也就是什么呢,刚刚读取的bytes个字节的数据,位于u->buffer->last之后。pos目前不准。
static ngx_int_t
ngx_http_memcached_filter(void *data, ssize_t bytes)
{//这个函数用来接收mecache的body数据,conf->upstream.buffering = 0,所以memcached模块不支持buffering.
//这个函数的调用时机: ngx_http_upstream_process_non_buffered_upstream等调用ngx_unix_recv接收到upstream返回的数据后
//就调用这里进行协议转换,不过目前转换不多。
//注意这个函数调用的时候,u->buffer->last和pos并没有更新的,
//也就是什么呢,刚刚读取的bytes个字节的数据,位于u->buffer->last之后。pos目前不准。
ngx_http_memcached_ctx_t *ctx = data;
u_char *last;
ngx_buf_t *b;
ngx_chain_t *cl, **ll;
ngx_http_upstream_t *u;
u = ctx->request->upstream;
b = &u->buffer;//得到接收的数据。
if (u->length == ctx->rest) {//rest初始化为NGX_HTTP_MEMCACHED_END。rest表示还需要读取多少"END\r\n"类型的数据。
if (ngx_strncmp(b->last, ngx_http_memcached_end + NGX_HTTP_MEMCACHED_END - ctx->rest, bytes) != 0) {
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0, "memcached sent invalid trailer");
u->length = 0;
ctx->rest = 0;
return NGX_OK;
}
//搞到末尾了。所以下面其实应该不用减了,没了的。
u->length -= bytes;
ctx->rest -= bytes;
return NGX_OK;
}
for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
ll = &cl->next;//找到要输出去的数据链表的最后部分。
}
//申请一个链接节点。
cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
if (cl == NULL) {
return NGX_ERROR;
}
cl->buf->flush = 1;
cl->buf->memory = 1;
*ll = cl;//这个新的节点的数据挂载到out_bufs的最后面。
last = b->last;//得到这块buf之前的尾部,可能有残留数据。
cl->buf->pos = last;//等于尾部,因为这个之前的b->last是上一块数据的值。
b->last += bytes;//调整尾部。
cl->buf->last = b->last;//初始化要发送给客户端的这块的尾部,可能需要调,比如满了数据了。
cl->buf->tag = u->output.tag;
ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0, "memcached filter bytes:%z size:%z length:%z rest:%z", bytes, b->last - b->pos, u->length, ctx->rest);
if (bytes <= (ssize_t) (u->length - NGX_HTTP_MEMCACHED_END)) {//读取的字节数还不是数据区的末尾。将这块数据纳入链表末尾,然后减少还剩下的数据量变量。
u->length -= bytes;
return NGX_OK;
}
//否则,头部都在这里了。
last += u->length - NGX_HTTP_MEMCACHED_END;//直接移动到后面去。
if (ngx_strncmp(last, ngx_http_memcached_end, b->last - last) != 0) {
ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
"memcached sent invalid trailer");
}
ctx->rest -= b->last - last;//后面还有这么多的"END\r\n"数据,所以直接rest记录还需要读取多少这样的无用数据。
b->last = last;//标记这块buf的结尾。
cl->buf->last = last;//标记这块buf的结尾。
u->length = ctx->rest;//后面只需要处理尾部的结尾了。
return NGX_OK;
}
介绍的差不多了,mecached模块很简单:
- 没有合并请求查询么cached,一对一的,简单协议;
- 也没有处理buffering优化;
- 也不支持对表单数据,body数据用来做key。

您好
在nginx 请求中,可以发个http upstream 发给第三方服务器中,然后得到结果之后,nginx 请求再继续往下走吗?
目前模仿memcache