Nginx upstream原理分析【1】给后端FastCGI发送数据
上篇文章“Nginx upstream原理分析【1】给后端FastCGI发送数据”讲到了给PHP等FCGI程序发送数据了,也就是ngx_http_upstream_send_request函数,下面接着写。
ngx_http_upstream_send_request调用输出的过滤器,发送数据到后端,前面已经介绍过,ngx_http_proxy_create_request函数会将客户端发送过来的HEADER,以及body部分的数据组成一块块的FCGI协议的buffer,放到u->request_bufs成员上面,因此在发送数据的时候,就需要吧这块数据发送给后端的PHP或者其他模块。send_request函数完成的任务有如下几个:
- 连接状态诊断,调用ngx_http_upstream_test_connect();
- 调用ngx_output_chain函数将需要发送的数据发送出去(不一定真的发送出了,可能留在缓冲链表里面);
- 设置定时器send_timeout,ngx_tcp_push标志位等;
- 如果连接可读,则调用ngx_http_upstream_process_header()尝试读取FCGI的返回数据。
下面分2部分介绍这个函数:
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u) {//调用输出的过滤器,发送数据到后端 ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection;//拿到这个peer的连接 ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request"); //测试一个连接状态,如果连接损坏,则重试 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream"; //下面开始过滤模块的过程。对请求的FCGI数据进行过滤,里面会调用ngx_chain_writer,将数据用writev发送出去 //ngx_http_proxy_create_request将客户端发送的数据拷贝到这里,如果是从读写事件回调进入的,则这里的request_sent应该为1, //表示数据已经拷贝到输出链了。这份数据是在ngx_http_upstream_init_request里面调用处理模块比如FCGI的create_request处理的,解析为FCGI的结构数据。 rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs); u->request_sent = 1;//标志位数据已经发送完毕,指的是放入输出列表里面,不一定发送出去了。
上面这部分很简单,主要就是调用了ngx_output_chain,将数据发送出去,这部分比较复杂,我们留在后面介绍,下面继续看看ngx_http_upstream_send_request的后半部分,其后半部分设置了定时器,然后是tcp_nopush标志,如果需要的话。然后如果跟upstream的连接可读的话就调用ngx_http_upstream_process_header处理一下可读事件。
if (c->write->timer_set) {//已经不需要写数据了。 ngx_del_timer(c->write); } if (rc == NGX_AGAIN) {//数据还没有发送完毕,待会还需要发送。 ngx_add_timer(c->write, u->conf->send_timeout); if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {//注册一下读写事件。 ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK */ if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) {//设置PUSH标志位,尽快发送数据。 ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } ngx_add_timer(c->read, u->conf->read_timeout);//这回数据已经发送了,可以准备接收了,设置接收超时定时器。 #if 1 if (c->read->ready) {//如果读已经ready了,那么,你懂的,去读个头啊 /* post aio operation */ /* TODO comment * although we can post aio operation just in the end * of ngx_http_upstream_connect() CHECK IT !!! * it's better to do here because we postpone header buffer allocation */ ngx_http_upstream_process_header(r, u);///处理上游发送的响应头。 return; } #endif u->write_event_handler = ngx_http_upstream_dummy_handler;//不用写了,只需要读 if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //下面,由于这个句柄还没数据可以读取,我们可以先回去干其他事情了,因为在ngx_http_upstream_connect里面已经设置了读数据的回调函数了的。 }
1、读取FCGI头部数据ngx_http_upstream_process_header:
读取FCGI头部数据,或者proxy头部数据。ngx_http_upstream_send_request发送完数据后,会调用这里,或者有可写事件的时候会调用这里。ngx_http_upstream_connect函数连接fastcgi后,会设置这个回调函数为fcgi连接的可读事件回调。
函数首先进行一下超时判断,然后就诊断upstream的连接状态,如果失败了就重试upstream的其他server。然后如果u->buffer为空,就初始化它,申请一块内存并初始化u->headers_in.headers数组,以备待会读取header。这个u->buffer是读取上游返回的数据的缓冲区,也就是proxy,FCGI返回的数据。这里面有http头部,也可能有body部分。其body部分会跟event_pipe_t的preread_bufs结构对应起来。就是预读的buf,其实是i不小心读到的。
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u) {//读取FCGI头部数据,或者proxy头部数据。ngx_http_upstream_send_request发送完数据后, //会调用这里,或者有可写事件的时候会调用这里。 //ngx_http_upstream_connect函数连接fastcgi后,会设置这个回调函数为fcgi连接的可读事件回调。 ssize_t n; ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream process header"); c->log->action = "reading response header from upstream"; if (c->read->timedout) {//读超时了,轮询下一个。错误信息应该已经打印了 ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); return; } //我已发送请求,但连接出问题了 if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } if (u->buffer.start == NULL) {//分配一块缓存,用来存放接受回来的数据。 u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size); if (u->buffer.start == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; u->buffer.end = u->buffer.start + u->conf->buffer_size; u->buffer.temporary = 1; u->buffer.tag = u->output.tag; //初始化headers_in存放头部信息,后端FCGI,proxy解析后的HTTP头部将放入这里 if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) != NGX_OK){ ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } }
接下来是一个主要的循环,其循环读取FCGI返回的数据,然后调用u->process_header进行解析,如果读取完毕了所有的HTTP HEADERS则退出循环,如果暂时没有可读数据,则先返回。
循环退出后,如果已经读取完了所有的header部分,那就调用ngx_http_upstream_process_headers函数,拷贝一下头部数据,之后如果没有子请求,那就调用ngx_http_upstream_send_response去发送数据给客户端,并且考虑读取BODY数据。
ngx_http_upstream_process_header函数在之前的一篇文章:“Nginx upstream原理分析【1】-无缓冲模式发送数据”中有介绍,这里就不多说了。
2、发送数据给FCGI:
上面我讲到,ngx_http_upstream_send_request发送数据是通过ngx_output_chain进行的,这里我们详细看一下,其调用的形式是这样的:
ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
request_sent为0表示还没有调用这个函数发送过数据,否则表示调用过了。如果之前没有调用过,那么第二个参数就是u->request_bufs,也就是ngx_http_proxy_create_request组合成的FCGI数据,代表这次调用ngx_output_chain需要把u->request_bufs的数据发送出去。
ngx_output_chain函数开头判断一下ctx->in 和ctx->busy 是不是空,如果是,则表示之前没有什么数据要发送的,所以这次我们可以直接调用ctx->output_filter()函数进行发送,这个函数在ngx_http_upstream_init_request初始化的时候设置的输出数据函数,为ngx_chain_writer。
ngx_int_t ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in) {//ctx为&u->output, in为u->request_bufs //这里nginx filter的主要逻辑都在这个函数里面,将in参数链表的缓冲块拷贝到 //ctx->in,然后将ctx->in的数据拷贝到out,然后调用output_filter发送出去。 off_t bsize; ngx_int_t rc, last; ngx_chain_t *cl, *out, **last_out; if (ctx->in == NULL && ctx->busy == NULL) { //看下面的注释,in是待发送的数据,busy是已经调用ngx_chain_writer但还没有发送完毕。 /* the short path for the case when the ctx->in and ctx->busy chains * are empty, the incoming chain is empty too or has the single buf that does not require the copy */ if (in == NULL) {//如果要发送的数据为空,也就是啥也不用发送。那就直接调用output_filter的了。 //调用的是ngx_chain_writer,在ngx_http_upstream_init_request初始化的时候设置的输出数据。 return ctx->output_filter(ctx->filter_ctx, in); } if (in->next == NULL //如果输出缓冲不为空,但是只有一块数据,那也可以直接发送不用拷贝了 #if (NGX_SENDFILE_LIMIT) && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT) #endif && ngx_output_chain_as_is(ctx, in->buf))//这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝 { return ctx->output_filter(ctx->filter_ctx, in); } }
下面看一下ngx_chain_writer函数,其首先将第二个参数代表的要发送的缓冲区链表整理,计算器总大小,并且将其buf结构拷贝挂载到data参数,比如ctx->filter_ctx上面。这个指针在初始化的时候设置为:u->output.filter_ctx = &u->writer;,也就是upstream的write成员上面。
ngx_int_t ngx_chain_writer(void *data, ngx_chain_t *in) {//ngx_output_chain调用这里,将数据发送出去。数据已经拷贝到in参数里面了。嗲用方式;(ctx->filter_ctx, out);,out为要发送的buf链表头部。 ngx_chain_writer_ctx_t *ctx = data;//ctx永远指向链表头部。其next指针指向下一个节点。但是last指针却指向 off_t size; ngx_chain_t *cl; ngx_connection_t *c; c = ctx->connection; /*下面的循环,将in里面的每一个链接节点,添加到ctx->filter_ctx所指的链表中。并记录这些in的链表的大小。*/ for (size = 0; in; in = in->next) {//遍历整个输入缓冲链表。将输入缓冲挂接到ctx里面。 if (ngx_buf_size(in->buf) == 0 && !ngx_buf_special(in->buf)) { ngx_debug_point(); } size += ngx_buf_size(in->buf);//计算这个节点的数据大小。进行累加。这个大小目前其实没什么用,只是为了判断是否还有数据没有发送完毕。 ngx_log_debug2(NGX_LOG_DEBUG_CORE, c->log, 0, "chain writer buf fl:%d s:%uO", in->buf->flush, ngx_buf_size(in->buf)); cl = ngx_alloc_chain_link(ctx->pool);//分配一个链接节点,待会挂到ctx里面 if (cl == NULL) { return NGX_ERROR; } cl->buf = in->buf;//指向输入的缓冲 cl->next = NULL; *ctx->last = cl;//挂到ctx的last处,也就是挂载到链表的最后面。 //初始化的时候是u->writer.last = &u->writer.out;,也就是初始时指向自己的头部地址。 ctx->last = &cl->next;//向后移动last指针,指向新的最后一个节点的next变量地址。 }
然后遍历ctx->out,也就是&u->writer.out链表上面的数据,计算其总大小。
for (cl = ctx->out; cl; cl = cl->next) {//遍历刚刚准备的链表,并统计其大小,这是啥意思?ctx->out为链表头,所以这里遍历的是所有的。 if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) { ngx_debug_point(); } size += ngx_buf_size(cl->buf); } if (size == 0 && !c->buffered) {//啥数据都么有,不用发了都 return NGX_OK; }
然后就是调用c->send_chain(),也就是ngx_writev_chain函数,用writev将ctx->out的数据全部发送出去。如果没法送完,则返回没发送完毕的部分。记录到ctx->out里面,其实就是u->writer上面。
//调用writev将ctx->out的数据全部发送出去。如果没法送完,则返回没发送完毕的部分。记录到out里面 //在ngx_event_connect_peer连接上游服务器的时候设置的发送链接函数ngx_send_chain=ngx_writev_chain。 ctx->out = c->send_chain(c, ctx->out, ctx->limit); ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0, "chain writer out: %p", ctx->out); if (ctx->out == NGX_CHAIN_ERROR) { return NGX_ERROR; } if (ctx->out == NULL) {//如果没有输出的,则怎么办呢 ctx->last = &ctx->out;//标记结尾。没东西了。 if (!c->buffered) { return NGX_OK; } } return NGX_AGAIN; }
下面继续分析ngx_chain_writer函数。这个函数首先将in参数链表的数据放到u->writer.out的最后,然后尝试用ngx_writev_chain将数据用writev的方式发送出去,没有发送完毕的部分挂入u->writer.out上面,以备后续进来时进行发送。
这个函数首先根据参数Limit指定的大小,以及in参数代表的要发送的内存,不断的遍历,将尽可能多的数据用struct iovec指向,以备待会调用writev函数。
ngx_chain_t * ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) {//调用writev一次发送多个缓冲区,如果没有发送完毕,则返回剩下的链接结构头部。 //ngx_chain_writer调用这里,调用方式为 ctx->out = c->send_chain(c, ctx->out, ctx->limit); //第二个参数为要发送的数据 u_char *prev; ssize_t n, size, sent; off_t send, prev_send; ngx_uint_t eintr, complete; ngx_err_t err; ngx_array_t vec; ngx_chain_t *cl; ngx_event_t *wev; struct iovec *iov, iovs[NGX_IOVS]; wev = c->write;//拿到这个连接的写事件结构 if (!wev->ready) {//连接还没准备好,返回当前的节点。 return in; } send = 0; complete = 0; vec.elts = iovs;//数组 vec.size = sizeof(struct iovec); vec.nalloc = NGX_IOVS;//申请了这么多。 vec.pool = c->pool; for ( ;; ) { prev = NULL; iov = NULL; eintr = 0; prev_send = send;//之前已经发送了这么多 vec.nelts = 0; /* create the iovec and coalesce the neighbouring bufs */ //循环发送数据,一次一块IOV_MAX数目的缓冲区。 for (cl = in; cl && vec.nelts < IOV_MAX && send < limit; cl = cl->next) { if (ngx_buf_special(cl->buf)) { continue; } #if 1 if (!ngx_buf_in_memory(cl->buf)) { ngx_debug_point(); } #endif size = cl->buf->last - cl->buf->pos;//计算这个节点的大小 if (send + size > limit) {//超过最大发送大小。截断,这次只发送这么多 size = (ssize_t) (limit - send); } if (prev == cl->buf->pos) {//如果还是等于刚才的位置,那就复用 iov->iov_len += size; } else {//否则要新增一个节点。返回之 iov = ngx_array_push(&vec); if (iov == NULL) { return NGX_CHAIN_ERROR; } iov->iov_base = (void *) cl->buf->pos;//从这里开始 iov->iov_len = size;//有这么多我要发送 } prev = cl->buf->pos + size;//记录刚才发到了这个位置,为指针哈。 send += size;//增加已经记录的数据长度。 }
经过上面的步骤,vec变量已经设置好了,其数组每个元素都指向参数in,也就是ctx->filter_ctx,也就是u->writer.out上面的数据。然后就是简单的调用writev函数发送。
n = writev(c->fd, vec.elts, vec.nelts);//调用writev发送这些数据,返回发送的数据大小 if (n == -1) { err = ngx_errno; switch (err) { case NGX_EAGAIN: break; case NGX_EINTR: eintr = 1; break; default: wev->error = 1; (void) ngx_connection_error(c, err, "writev() failed"); return NGX_CHAIN_ERROR; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, err, "writev() not ready"); } sent = n > 0 ? n : 0;//记录发送的数据大小。 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "writev: %z", sent); if (send - prev_send == sent) {//啥意思?之前没有发送任何数据吗 complete = 1; } c->sent += sent;//递增统计数据,这个链接上发送的数据大小
ngx_writev_chain剩下的部分就是要跟进刚才调用writev函数发送时返回的发送出去的字节数,找到现在发送到了哪一块数据的什么位置。
for (cl = in; cl; cl = cl->next) { //又遍历一次这个链接,为了找到那块只成功发送了一部分数据的内存块,从它继续开始发送。 if (ngx_buf_special(cl->buf)) { continue; } if (sent == 0) { break; } size = cl->buf->last - cl->buf->pos; if (sent >= size) { sent -= size;//标记后面还有多少数据是我发送过的 cl->buf->pos = cl->buf->last;//清空这段内存。继续找下一个 continue; } cl->buf->pos += sent;//这块内存没有完全发送完毕,悲剧,下回得从这里开始。 break; } if (eintr) { continue; } if (!complete) { wev->ready = 0; return cl; } if (send >= limit || cl == NULL) { return cl; } in = cl;//继续刚才没有发送完毕的内存。继续发送 } }
由此可知,其实调用了ngx_chain_writer,数据不一定都调用了writev发送出去,可能由于连接比较忙,不可写,数据都放在u->writer.out链表上面了。
现在回到之前讲的ngx_output_chain函数,如果in参数有数据的话,首先需要吧这部分数据拷贝一下到ctx->in上面,业绩上&u->output->in上面。
/* add the incoming buf to the chain ctx->in */ if (in) {//拷贝一份数据到ctx->in里面,需要老老实实的进行数据拷贝了。将in参数里面的数据拷贝到ctx->in里面。换了个in if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) { return NGX_ERROR; } }
拷贝完后,要发送的数据就放在ctx->in上面了,剩下的就需要准备发送了。后面就是一个大的for循环,其不断循环发送数据。
首先是一个循环,不断遍历刚刚准备的ctx->in链表,找出要发送的链表,放到ctx->out上面去。
//到现在了,in参数的缓冲链表已经放在了ctx->in里面了。下面准备发送吧。 out = NULL; last_out = &out; last = NGX_NONE; for ( ;; ) { #if (NGX_HAVE_FILE_AIO) if (ctx->aio) { return NGX_AGAIN; } #endif while (ctx->in) {//遍历所有待发送的数据。将他们一个个拷贝到out指向的链表中,为什么要拷贝呢,不知道 /* cycle while there are the ctx->in bufs * and there are the free output bufs to copy in */ bsize = ngx_buf_size(ctx->in->buf); //这块内存大小为0,可能有问题。 if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) { ngx_log_error(NGX_LOG_ALERT, ctx->pool->log, 0, "zero size buf in output t:%d r:%d f:%d %p %p-%p %p %O-%O", ctx->in->buf->temporary, ctx->in->buf->recycled, ctx->in->buf->in_file, ctx->in->buf->start, ctx->in->buf->pos, ctx->in->buf->last, ctx->in->buf->file, ctx->in->buf->file_pos, ctx->in->buf->file_last); ngx_debug_point(); ctx->in = ctx->in->next; continue; } //这块数据不能拷贝的话,就只能改变一下指向进行共享了,不能拷贝实际数据了。 if (ngx_output_chain_as_is(ctx, ctx->in->buf)) { /* move the chain link to the output chain */ cl = ctx->in;//buf不需要拷贝,改变一下指向就行了。 ctx->in = cl->next; *last_out = cl;//初始化时,last_out = &out;这里将cl这个头部的节点用out指向。out指向这种处理过的节点头部。 last_out = &cl->next;//last_out指向下一个节点。下面继续。 cl->next = NULL;//截断这个几点与后面还未处理的节点,然后继续。下次循环会继续往后添加的。 continue; } //后面的是需要实际拷贝内存的。 if (ctx->buf == NULL) { rc = ngx_output_chain_align_file_buf(ctx, bsize); if (rc == NGX_ERROR) { return NGX_ERROR; } if (rc != NGX_OK) { if (ctx->free) { /* get the free buf */ cl = ctx->free; ctx->buf = cl->buf; ctx->free = cl->next; ngx_free_chain(ctx->pool, cl); } else if (out || ctx->allocated == ctx->bufs.num) { break; } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) { return NGX_ERROR; } } } rc = ngx_output_chain_copy_buf(ctx);//将ctx->in->buf的缓冲拷贝到ctx->buf上面去。会创建一个新的节点。 if (rc == NGX_ERROR) { return rc; } if (rc == NGX_AGAIN) { if (out) { break; } return rc; } /* delete the completed buf from the ctx->in chain */ if (ngx_buf_size(ctx->in->buf) == 0) { ctx->in = ctx->in->next;//这个节点大小为0,移动到下一个节点。 } cl = ngx_alloc_chain_link(ctx->pool); if (cl == NULL) { return NGX_ERROR; } cl->buf = ctx->buf; cl->next = NULL; *last_out = cl;//将这个节点拷贝到last_out所指向的节点,也就是out指针指向的头部的链表后面。 last_out = &cl->next; ctx->buf = NULL; }//while (ctx->in) 结束,遍历ctx->in完毕,也就是处理完了所有待发送节点,将他们
剩下的部分就是把刚刚准备的ctx->out,也就是u->writer.out 调用ctx->output_filter发送出去。然后再调用ngx_chain_update_chains把链表更新一下,这个update函数完成2个功能:
- 1. 将out_bufs的缓冲区放入busy_bufs链表的尾部,注意顺序;
- 2.如果busy_bufs里面的数据没有了,发送完毕了,那就将这块buffer缓冲区移动到free_bufs链表里面。
//out为刚刚处理,拷贝过的缓冲区链表头部,下面进行发送了吧应该。调用的是ngx_chain_writer last = ctx->output_filter(ctx->filter_ctx, out); if (last == NGX_ERROR || last == NGX_DONE) { return last; } ngx_chain_update_chains(&ctx->free, &ctx->busy, &out, ctx->tag); last_out = &out;// }//for ( ;; ) 这是个死循环,也就是不断的处理out所指向的数据。 }
从上面可知,如果我们这次没有发送完所有的数据,那剩下的会存储在u->writer.out链表里面的,当upstream的连接可读的时候,就好调用ngx_http_upstream_send_request_handler函数,这个是在ngx_http_upstream_connect连接完fcgi、upstream之后设置的写事件回调。这个回调判断了一下超时,然后就是调用ngx_http_upstream_send_request去发送数据到后端。
这种第二次调用进入ngx_http_upstream_send_request函数的情况时,u->request_sent = 1;因此,其调用的ngx_output_chain第二个参数就是NULL,也就是没有新的数据要发送,你看看有没有旧的没法完的,就发送出去。
到这里nginx如何把数据发送给FCGI讲的完了。
近期评论