首页 > Nginx > Nginx upstream原理分析【2】-带buffering读取upstream数据

Nginx upstream原理分析【2】-带buffering读取upstream数据

2013年5月19日 发表评论 阅读评论 11973次阅读    

上篇文章Nginx upstream原理分析【1】-无缓冲模式发送数据 讲到了nginx upstream在不带buffer的情况下,是如何接收upstream数据然后把它们发送给客户端的。这里讲一下带缓存的情况下nginx是怎么巧妙的完成这一任务的。

回到ngx_http_upstream_send_response函数,这个函数发送请求数据给客户端。里面会分别发送header,body,这个在上篇文章已经介绍过了,对于FCGI这种带buffering的数据处理,这里nginx是用所谓的event_pipe方式处理的,pipe为管子的意思,顾名思义就是读写事件的管道。

其所有的处理都集中在ngx_event_pipe_s这个结构体上,结构体包含指向upstream连接,指向客户端的连接的指针,以及尽心各个缓冲区管理的指针,还有input_filter、output_filter等函数回调,以及一堆标志位。这里先看一下这个结构的内容:

struct ngx_event_pipe_s {//nginx处理buffering机制的结构
ngx_connection_t  *upstream;//表示nginx和client,以及和后端的两条连接
ngx_connection_t  *downstream;//这个表示客户端的连接

ngx_chain_t       *free_raw_bufs;//保存了从upstream读取的数据(没有经过任何处理的),以及缓存的buf.
ngx_chain_t       *in;//每次读取数据后,调用input_filter对协议格式进行解析,解析完后的数据部分放到in里面形成一个链表。
/*关于p->in和shadow,多说一下,in指向一堆chain链表,每个链表指向一块实实在在的fcgi DATA数据,多个这样的html代码块共享一块大的裸FCGI数据块;
属于某个大的裸FCGI数据块的最后一个数据节点的last_shadow成员为1,表示我是这个大FCGI数据块的最后一个,并且我的shadow指针指向这个裸FCGI数据块的buf指针
	释放这些大数据块的时候,可以参考ngx_event_pipe_drain_chains进行释放。
*/
ngx_chain_t      **last_in;//上面的in结构的最后一个节点的next指针的地址,p->last_in = &cl->next;,这样就可以将新分析到的FCGI数据链接到后面了。
					//out 指放入文件中的buffer,其总是在in所指向的buf链表的前面,发送前先发送这个
ngx_chain_t       *out;//buf到tempfile的数据会放到out里面。在ngx_event_pipe_write_chain_to_temp_file函数里面设置的。
ngx_chain_t      **last_out;

ngx_chain_t       *free;//这里就是那些空闲的内存节点,从busy移动过来的。注意是节点,不是buf
ngx_chain_t       *busy;//代表经过了output_filter过的,从out移动过来的缓存,其里面可能有已经发送完成了,因为ngx_http_write_filter会更新这写buf的

/*
* the input filter i.e. that moves HTTP/1.1 chunks
* from the raw bufs to an incoming chain
*///FCGI为ngx_http_fastcgi_input_filter,其他为ngx_event_pipe_copy_input_filter 。用来解析特定格式数据
ngx_event_pipe_input_filter_pt    input_filter;//这个用来解析对应协议的数据。比如解析FCGI协议的数据。
void                             *input_ctx;

ngx_event_pipe_output_filter_pt   output_filter;//ngx_http_output_filter输出filter
void                             *output_ctx;

unsigned           read:1;//标记是否读了数据。
unsigned           cacheable:1;
unsigned           single_buf:1;//如果使用了NGX_USE_AIO_EVENT异步IO标志,则设置为1
unsigned           free_bufs:1;
unsigned           upstream_done:1;
unsigned           upstream_error:1;
unsigned           upstream_eof:1;
unsigned           upstream_blocked:1;//ngx_event_pipe用来标记是否读取了upstream的数据来决定是不是要write
unsigned           downstream_done:1;
unsigned           downstream_error:1;
unsigned           cyclic_temp_file:1;

ngx_int_t          allocated;//表示已经分配了的bufs的个数,每次会++
ngx_bufs_t         bufs;//fastcgi_buffers等指令设置的nginx用来缓存body的内存块数目以及大小。ngx_conf_set_bufs_slot函数会解析这样的配置。
//对应xxx_buffers,也就是读取后端的数据时的bufer大小以及个数
ngx_buf_tag_t      tag;

ssize_t            busy_size;//fastcgi_busy_buffers_size 指令或者其他upstream设置的大小,作用为最大的busy状态的内存总容量。
//文档介绍 : mits the total size of buffers that can be busy sending a response to the client while the response is not yet fully read.

off_t              read_length;//从upstream读取的数据长度

off_t              max_temp_file_size;
ssize_t            temp_file_write_size;

ngx_msec_t         read_timeout;
ngx_msec_t         send_timeout;
ssize_t            send_lowat;

ngx_pool_t        *pool;
ngx_log_t         *log;

ngx_chain_t       *preread_bufs;//指读取upstream的时候多读的,或者说预读的body部分数据。p->preread_bufs->buf = &u->buffer;
size_t             preread_size;
ngx_buf_t         *buf_to_file;

ngx_temp_file_t   *temp_file;

/* STUB */ int     num;
};

回到ngx_http_upstream_send_response,如果是buffering,就会进入后面的处理过程,函数竖线准备一个ngx_event_pipe_t结构的数据,这个结构可以通过upstream的u->pipe进行索引找到。首先设置p->output_filter输出过滤函数为ngx_http_output_filter,用来进行输出过滤并发送数据。然后设置busy_buffers_size,这是配置的值。将p->upstream 设置为跟后端PHP的脸颊 u->peer.connection;p->downstream设置为跟客户端的连接;
之后便是拷贝了u->buffer到p->preread_bufs,这个u->buf是读取上游返回的数据的缓冲区,也就是proxy,FCGI返回的数据。这里面有http头部,也可能有body部分。preread_bufs字段上面的ngx_event_pipe_t里面介绍过了,是之前读取header的时候多读取的数据,这里需要用来解析;
然后便是设置upstream的读回调函数read_event_handler为read_event_handler,跟客户端的连接的写回调函数write_event_handler为ngx_http_upstream_process_downstream(下一张介绍),这2个回调函数一个处理跟upstream的读取数据,一个处理跟客户端连接的发送数据,这是重点。设置完后就调用ngx_http_upstream_process_upstream尝试读取upstream的数据。ngx_http_upstream_send_response后面一部分看下面的代码:

//下面进入event_pipe过程,pipe==水泵,beng···
    p = u->pipe;
    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;//设置filter,可以看到就是http的输出filter
    p->output_ctx = r;
    p->tag = u->output.tag;
    p->bufs = u->conf->bufs;//设置bufs,它就是upstream中设置的bufs.u == &flcf->upstream;
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;//赋值跟后端upstream的连接。
    p->downstream = c;//赋值跟客户端的连接。
    p->pool = r->pool;
    p->log = c->log;

    p->cacheable = u->cacheable || u->store;
    p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
    if (p->temp_file == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
    p->temp_file->file.fd = NGX_INVALID_FILE;
    p->temp_file->file.log = c->log;
    p->temp_file->path = u->conf->temp_path;
    p->temp_file->pool = r->pool;
    if (p->cacheable) {
        p->temp_file->persistent = 1;
    } else {
        p->temp_file->log_level = NGX_LOG_WARN;
        p->temp_file->warn = "an upstream response is buffered to a temporary file";
    }
    p->max_temp_file_size = u->conf->max_temp_file_size;
    p->temp_file_write_size = u->conf->temp_file_write_size;

	//下面申请一个缓冲链接节点,来存储刚才我们再读取fcgi的包,为了得到HTTP headers的时候不小心多读取到的数据。
	//其实只要FCGI发给后端的包中,有一个包的前半部分是header,后一部分是body,就会有预读数据。
    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;
    p->preread_size = u->buffer.last - u->buffer.pos;
    //······
    p->read_timeout = u->conf->read_timeout;
    p->send_timeout = clcf->send_timeout;
    p->send_lowat = clcf->send_lowat;
	//下面的u->read***是这样被调用的: c->read->handler = ngx_http_upstream_handler;设置这个连接上的读写句柄是upstream_handler
	//u->read_event_handler = XXXX;//upstream自己记着当前自己有事件来的时候应该怎么读,读什么。
    u->read_event_handler = ngx_http_upstream_process_upstream;//设置读事件结构,是用来处理超时,关闭连接等用的。
    //下面的r->write***是这样被调用的:c->write->handler = ngx_http_request_handler;r->write_event_handler = XXX;//
    r->write_event_handler = ngx_http_upstream_process_downstream;//设置可写事件结构。这样就可以给客户端发送数据了。
    //发动一下数据读取吧。以后有数据可读的时候也会调用这里的。
    ngx_http_upstream_process_upstream(r, u);
}

上面的send_response只是设置了一下event_pipe的相关结构,回调等,然后调用ngx_http_upstream_process_upstream来读取后端PHP的数据,我们来看ngx_http_upstream_process_upstream函数,这个函数如上面所说,是挂在在upstream的连接上的读事件,有可读数据的时候也会调用这个函数触发对upstream的数据读取。函数首先老规矩判断一下超时,然后调用了ngx_event_pipe()函数进入事件处理的主函数,然后调用ngx_http_upstream_process_request函数处理一下缓存等东西,看看是否需要把数据写到磁盘上,这个比较简单。稍微看一下代码:

static void
ngx_http_upstream_process_upstream(ngx_http_request_t *r, ngx_http_upstream_t *u)
{//这是在有buffering的情况下使用的函数。
//ngx_http_upstream_send_response调用这里发动一下数据读取。以后有数据可读的时候也会调用这里的。设置到了u->read_event_handler了。
    ngx_connection_t  *c;
    c = u->peer.connection;
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream process upstream");
    c->log->action = "reading upstream";
    if (c->read->timedout) {//如果超时了
        u->pipe->upstream_error = 1;
        ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");
    } else {
    //请求没有超时,那么对后端,处理一下读事件。ngx_event_pipe开始处理
        if (ngx_event_pipe(u->pipe, 0) == NGX_ABORT) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    }
	//处理了一下是否需要吧数据写到磁盘上。
    ngx_http_upstream_process_request(r);
}

下面进入ngx_event_pipe,这是处理的中心地带,函数有2个参数,第一个位ngx_event_pipe_t结构的指针,也就是u->pipe。第二个参数代表是否需要立即往客户度写数据,或者说进行写数据处理,如果设置为0,则会先读取数据,读到了再写,否则会先读取数据。
ngx_event_pipe函数主体就是一个循环,循环读取upstream的数据,然后发送到客户端去。分别调用ngx_event_pipe_read_upstream和ngx_event_pipe_write_to_downstream,从函数名就可以看出来,ngx_event_pipe函数代码如下,比较简单:

ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{//在有buffering的时候,使用event_pipe进行数据的转发,调用ngx_event_pipe_write_to*函数读取数据,或者发送数据给客户端。
//ngx_event_pipe将upstream响应发送回客户端。do_write代表是否要往客户端发送,写数据。
//如果设置了,那么会先发给客户端,再读upstream数据,当然,如果读取了数据,也会调用这里的。
    u_int         flags;
    ngx_int_t     rc;
    ngx_event_t  *rev, *wev;

//这个for循环是不断的用ngx_event_pipe_read_upstream读取客户端数据,然后调用ngx_event_pipe_write_to_downstream
    for ( ;; ) {
        if (do_write) {
            p->log->action = "sending to client";
            rc = ngx_event_pipe_write_to_downstream(p);
            if (rc == NGX_ABORT) {
                return NGX_ABORT;
            }
            if (rc == NGX_BUSY) {
                return NGX_OK;
            }
        }
        p->read = 0;
        p->upstream_blocked = 0;
        p->log->action = "reading upstream";
		//从upstream读取数据到chain的链表里面,然后整块整块的调用input_filter进行协议的解析,并将HTTP结果存放在p->in,p->last_in的链表里面。
        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
            return NGX_ABORT;
        }
		//upstream_blocked是在ngx_event_pipe_read_upstream里面设置的变量,代表是否有数据已经从upstream读取了。
        if (!p->read && !p->upstream_blocked) {
            break;
        }
        do_write = 1;//还要写。因为我这次读到了一些数据
    }
//下面是处理是否需要设置定时器,或者删除读写事件的epoll。
}

上面都是铺垫,介绍读写事件是怎么到达处理点ngx_event_pipe_read_upstream的,对于读取,则是这样到达的:

0.ngx_single_process_cycle循环调用ngx_process_events_and_timers,后者调用ngx_epoll_process_events处理读写事件;
1.c->read->handler = ngx_http_upstream_handler();SOCK连接最基础的读写回调handler,
2.u->read_event_handler = ngx_http_upstream_process_upstream();
3.ngx_event_pipe();
4.ngx_event_pipe_read_upstream() 进入主要读取处理函数。

ngx_event_pipe_read_upstream函数完成下面几个功能:

0.从preread_bufs,free_raw_bufs或者ngx_create_temp_buf寻找一块空闲的或部分空闲的内存;
1.调用p->upstream->recv_chain==ngx_readv_chain,用writev的方式读取FCGI的数据,填充chain。
2.对于整块buf都满了的chain节点调用input_filter(ngx_http_fastcgi_input_filter)进行upstream协议解析,比如FCGI协议,解析后的结果放入p->in里面;
3.对于没有填充满的buffer节点,放入free_raw_bufs以待下次进入时从后面进行追加。
4.当然了,如果对端发送完数据FIN了,那就直接调用input_filter处理free_raw_bufs这块数据

函数主题是一个大循环,循环进行数据读取,解析,保存。我们一点点来看其代码。循环首先第一部分就是读取数据,第二部分解析数据;

读取数据可能有2种情况,如果之前读取header的时候多读取的preread_bufs不为空,如果这里面有数据的话,那就先处理这些数据;否则需要寻找一块空闲的内存进行读取。第一种情况比较简单:

/*
1.从preread_bufs,free_raw_bufs或者ngx_create_temp_buf寻找一块空闲的或部分空闲的内存;
2.调用p->upstream->recv_chain==ngx_readv_chain,用writev的方式读取FCGI的数据,填充chain。
3.对于整块buf都满了的chain节点调用input_filter(ngx_http_fastcgi_input_filter)进行upstream协议解析,比如FCGI协议,解析后的结果放入p->in里面;
4.对于没有填充满的buffer节点,放入free_raw_bufs以待下次进入时从后面进行追加。
5.当然了,如果对端发送完数据FIN了,那就直接调用input_filter处理free_raw_bufs这块数据。
*/
static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
{//ngx_event_pipe调用这里读取后端的数据。
    ssize_t       n, size;
    ngx_int_t     rc;
    ngx_buf_t    *b;
    ngx_chain_t  *chain, *cl, *ln;

    if (p->upstream_eof || p->upstream_error || p->upstream_done) {
        return NGX_OK;
    }
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe read upstream: %d", p->upstream->read->ready);

    for ( ;; ) {
        if (p->upstream_eof || p->upstream_error || p->upstream_done) {
            break;//状态判断。
        }
		//如果没有预读数据,并且跟upstream的连接还没有read,那就可以退出了,因为没数据可读。
        if (p->preread_bufs == NULL && !p->upstream->read->ready) {
            break;
        }
		//下面这个大的if-else就干一件事情: 寻找一块空闲的内存缓冲区,用来待会存放读取进来的upstream的数据。
		//如果preread_bufs不为空,就先用之,否则看看free_raw_bufs有没有,或者申请一块
        if (p->preread_bufs) {//如果预读数据有的话,比如第一次进来,连接尚未可读,但是之前读到了一部分body。那就先处理完这个body再进行读取。
            /* use the pre-read bufs if they exist */
            chain = p->preread_bufs;//那就将这个块的数据链接起来,待会用来存放读入的数据。并清空preread_bufs,
            p->preread_bufs = NULL;
            n = p->preread_size;
            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,  "pipe preread: %z", n);
            if (n) {
                p->read = 1;//读了数据。
            }
        } else {//否则,preread_bufs为空,没有了。

第二种情况就是预读数据指针为空,也就是没有预读数据了,那么我们需要找到一块内存进行读取,函数寻找的顺序为:
0.free_raw_bufs空闲内存链表是否为空;
1.然后是如果没有超过fastcgi_buffers等指令的限制,那么申请一块内存吧。因为现在没有空闲内存了;
2.否则看看如果不能cache,那么久尝试发送一些数据到客户端,用来得到空闲的内存;
3.如果可以cache,那么就将r->in的头部的数据写入到文件,然后用r->out记着写入文件的头部的数据;
4.如果还没有,那没办法了;

上面这块寻找空闲内存buf的代码比较简单,限于篇幅这里就不列了。找到空闲内存后,就调用p->upstream->recv_chain()去读取数据,这个句柄的函数为ngx_readv_chain,也就是用readv的方式进行:

//到这里,肯定是找到空闲的buf了,chain指向之了。先睡觉,电脑没电了。
//ngx_readv_chain .调用readv不断的读取连接的数据。放入chain的链表里面
//这里的chain是不是只有一块? 其next成员为空呢,不一定,如果free_raw_bufs不为空,
//上面的获取空闲buf只要没有使用AIO的话,就可能有多个buffer链表的。
            n = p->upstream->recv_chain(p->upstream, chain);

            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "pipe recv chain: %z", n);
            if (p->free_raw_bufs) {//free_raw_bufs不为空,那就将chain指向的这块放到free_raw_bufs头部。
                chain->next = p->free_raw_bufs;
            }
            p->free_raw_bufs = chain;//放入头部
            if (n == NGX_ERROR) {
                p->upstream_error = 1;
                return NGX_ERROR;
            }
            if (n == NGX_AGAIN) {
                if (p->single_buf) {
                    ngx_event_pipe_remove_shadow_links(chain->buf);
                }
                break;
            }
            p->read = 1;
            if (n == 0) {
                p->upstream_eof = 1;//没有读到数据,肯定upstream发送了FIN包,那就读取完成了。
                break;
            }
        }//从上面for循环刚开始的if (p->preread_bufs) {到这里,都在寻找一个空闲的缓冲区,然后读取数据填充chain。够长的。

经过上了面的过程后,下面肯定已经拿到了数据,或者没有数据了。如果有数据的话,我们知道,这个数据肯定是FCGI格式的数据,肯定不能直接发给客户端,需要进行FCGI协议处理,则就是下面的代码的工作了。首先cl = chain;用cl指向刚刚读取的或预读的数据,n代表数据的大小,于是循环的将cl指向的链表里面的数据(总大小为n)进行协议解析,我们知道,这个协议解析会调用FCGI协议或者其他协议的回调的,那就是:input_filter,对于FCGI是ngx_http_fastcgi_input_filter,其他可能为ngx_event_pipe_copy_input_filter。FCGI这个回调是在ngx_http_fastcgi_handler函数里面初始化时设置的。

ngx_http_fastcgi_input_filter比较复杂,他跟shadow内存相关,我们待会介绍。当FCGI解析出HTML格式的数据后,会把它们存放在ngx_event_pipe_s:in成员里面形成链表。另外如果有刷入到磁盘里面的数据,那他们是存放在out成员里面的,这个参考ngx_event_pipe_s的介绍。

另外注意一下,在readv读取到数据后,进行input_filter协议解析处理时,只会简析chain链表中填满的buf,对于没有填满的buf,nginx会考虑先放到free_raw_bufs头部,用来在下一次进行处理,下一次free_raw_bufs不为空,然后在ngx_readv_chain读取数据时,他会把数据放在free_raw_bufs最后一个字节开始的地方,拼起来,这样应该是为了性能。当然,如过PHP发送完数据后,发送FIN关闭连接,那最后一部分内存怎么办呢,这个是ngx_event_pipe_read_upstream函数最后退出之前进行判断的,他会看看是否还有残留的没有填满buf的free_raw_bufs,如果有,那么就将他们发送出去。

看看协议解析的代码:

//读取了数据,下面要进行FCGI协议解析,保存了。
        p->read_length += n;
        cl = chain;//chain已经是链表的头部了,等于free_raw_bufs所以下面可以置空先。
        p->free_raw_bufs = NULL;

        while (cl && n > 0) {//如果还有链表数据并且长度不为0,也就是这次的还没有处理完。那如果之前保留有一部分数据呢?
        //不会的,如果之前预读了数据,那么上面的大if语句else里面进不去,就是此时的n肯定等于preread_bufs的长度preread_size。
        //如果之前没有预读数据,但free_raw_bufs不为空,那也没关系,free_raw_bufs里面的数据肯定已经在下面几行处理过了。

		//下面的函数将c->buf中用shadow指针连接起来的链表中所有节点的recycled,temporary,shadow成员置空。
            ngx_event_pipe_remove_shadow_links(cl->buf);

            size = cl->buf->end - cl->buf->last;
            if (n >= size) {
                cl->buf->last = cl->buf->end;//把这坨全部用了,readv填充了数据。
                /* STUB */ cl->buf->num = p->num++;//第几块
				//FCGI为ngx_http_fastcgi_input_filter,其他为ngx_event_pipe_copy_input_filter 。用来解析特定格式数据
                if (p->input_filter(p, cl->buf) == NGX_ERROR) {//整块buffer的调用协议解析句柄
                //这里面,如果cl->buf这块数据解析出来了DATA数据,那么cl->buf->shadow成员指向一个链表,
                //通过shadow成员链接起来的链表,每个成员就是零散的fcgi data数据部分。
                    return NGX_ABORT;
                }
                n -= size;
                ln = cl;
                cl = cl->next;//继续处理下一块,并释放这个节点。
                ngx_free_chain(p->pool, ln);

            } else {//如果这个节点的空闲内存数目大于剩下要处理的,就将剩下的存放在这里。
                cl->buf->last += n;//啥意思,不用调用input_filter了吗,不是。是这样的,如果剩下的这块数据还不够塞满当前这个cl的缓存大小,
                n = 0;//那就先存起来,怎么存呢: 别释放cl了,只是移动其大小,然后n=0使循环退出。然后在下面几行的if (cl) {里面可以检测到这种情况
 //于是在下面的if里面会将这个ln处的数据放入free_raw_bufs的头部。不过这里会有多个连接吗? 可能有的。
            }
        }

        if (cl) {
	//将上面没有填满一块内存块的数据链接放到free_raw_bufs的前面。注意上面修改了cl->buf->last,后续的读入数据不会覆盖这些数据的。看ngx_readv_chain
            for (ln = cl; ln->next; ln = ln->next) { /* void */ }
            ln->next = p->free_raw_bufs;//这个不是NULL吗,上面初始化的,不对,因为input_filter可能会将那些没用data部分的fcgi数据包块放入free_raw_bufs直接进行复用。
            p->free_raw_bufs = cl;//这样在下一次循环的时候,也就是上面,会使用free_raw_bufs的。
            //并且,如果循环结束了,会在下面再处理一下这个尾部没有填满整个块的数据。
        }
    }//for循环结束。

如果upstream数据发送完毕了,那么upstream_eof会被设置为1,在函数最后会进行扫尾工作,把半满的free_raw_bufs数据进行解析。这里我们可以看到buffering的含义就在这里:nginx会尽量读取upstream的数据,直到填满一块buffer,由fastcgi_buffers等参数决定的大小,才会发送给客户端。千万别误解为读取完所有的数据才发送,而是读取了一块buffer。

    if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {//没办法了,都快到头了,或者出现错误了,所以处理一下这块不完整的buffer
        /* STUB */ p->free_raw_bufs->buf->num = p->num++;
		//如果数据读取完毕了,或者后端出现问题了,并且,free_raw_bufs不为空,后面还有一部分数据,
		//当然只可能有一块。那就调用input_filter处理它。FCGI为ngx_http_fastcgi_input_filter 在ngx_http_fastcgi_handler里面设置的

		//这里考虑一种情况: 这是最后一块数据了,没满,里面没有data数据,所以ngx_http_fastcgi_input_filter会调用ngx_event_pipe_add_free_buf函数,
		//将这块内存放入free_raw_bufs的前面,可是君不知,这最后一块不存在数据部分的内存正好等于free_raw_bufs,因为free_raw_bufs还没来得及改变。
		//所以,就把自己给替换掉了。这种情况会发生吗?
        if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
            return NGX_ABORT;
        }
        p->free_raw_bufs = p->free_raw_bufs->next;
        if (p->free_bufs && p->buf_to_file == NULL) {
            for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                if (cl->buf->shadow == NULL) 
			//这个shadow成员指向由我这块buf产生的小FCGI数据块buf的指针列表。如果为NULL,就说明这块buf没有data,可以释放了。
                    ngx_pfree(p->pool, cl->buf->start);
                }
            }
        }
    }
    if (p->cacheable && p->in) {
        if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
            return NGX_ABORT;
        }
    }
    return NGX_OK;
}//ngx_event_pipe_read_upstream函数结束

经过上面的数据读取,解析,基本算是完成了upstream的数据处理,处理的结果放在p->in,p->out指针链表上面,这些链表指向HTML数据的buffer,后面就只需要往客户端发送了。


上面有个关键的函数:input_filter回调,对于FCGI是ngx_http_fastcgi_input_filter,这个函数负责进行FCGI协议解析,将解析后的数据挂载到:p->in链表上。我们来看看这个函数。

这个函数利用ngx_http_fastcgi_process_record解析FCGI的头部格式,返回解析后的状态,如果解析到了ngx_http_fastcgi_st_data的话,不断的接收数据,这里面的数据就是HTTP格式的数据了。这里比较简单,下面大概看一下代码:

static ngx_int_t
ngx_http_fastcgi_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
{//这个函数在ngx_http_fastcgi_handler里面设置为p->input_filter,在FCGI给nginx发送数据的时候调用,解析FCGI的数据。
//ngx_event_pipe_read_upstream调用这里,来把已经读取的数据进行FCGI协议解析。
//这个函数处理一块FCGI数据buf,外层会循环调用的。
    u_char                  *m, *msg;
    ngx_int_t                rc;
    ngx_buf_t               *b, **prev;
    ngx_chain_t             *cl;
    ngx_http_request_t      *r;
    ngx_http_fastcgi_ctx_t  *f;

    if (buf->pos == buf->last) {
        return NGX_OK;
    }
    r = p->input_ctx;
	//得到这个请求的协议上下文,比如我们这个包是第一个预读的包,那么现在的pos肯定不为0,而是位于中间部分。
    f = ngx_http_get_module_ctx(r, ngx_http_fastcgi_module);

    b = NULL;
    prev = &buf->shadow;//当前这个buf
    f->pos = buf->pos;
    f->last = buf->last;

    for ( ;; ) {
		//小于ngx_http_fastcgi_st_data状态的比较好处理,读,解析吧。后面就只有data,padding 2个状态了。
        if (f->state < ngx_http_fastcgi_st_data) {//还不是在处理数据的过程中。前面还有协议头部
        //下面简单处理一下FCGI的头部,将信息赋值到f的type,length,padding成员上。
            rc = ngx_http_fastcgi_process_record(r, f);
            if (rc == NGX_AGAIN) {
                break;//没数据了,等待读取
            }
            if (rc == NGX_ERROR) {
                return NGX_ERROR;
            }
            if (f->type == NGX_HTTP_FASTCGI_STDOUT && f->length == 0) {//如果协议头表示是标准输出,并且长度为0,那就是说明没有内容
                f->state = ngx_http_fastcgi_st_version;//又从下一个包头开始,也就是版本号。
                p->upstream_done = 1;
                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0, "http fastcgi closed stdout");
                continue;
            }

            if (f->type == NGX_HTTP_FASTCGI_END_REQUEST) {//FCGI发送了关闭连接的请求。
                f->state = ngx_http_fastcgi_st_version;
                p->upstream_done = 1;
                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, p->log, 0, "http fastcgi sent end request");
                break;
            }
        }

        if (f->state == ngx_http_fastcgi_st_padding) {//下面是读取padding的阶段,
            if (f->pos + f->padding < f->last) {//而正好当前缓冲区后面有足够的padding长度,那就直接用它,然后标记到下一个状态,继续处理吧
                f->state = ngx_http_fastcgi_st_version;
                f->pos += f->padding;
                continue;
            }
            if (f->pos + f->padding == f->last) {//刚好结束,那就退出循环,完成一块数据的解析。
                f->state = ngx_http_fastcgi_st_version;
                break;
            }
            f->padding -= f->last - f->pos;
            break;
        }
//到这里,就只有读取数据部分了。
        /* f->state == ngx_http_fastcgi_st_data */
        if (f->type == NGX_HTTP_FASTCGI_STDERR) {//这是标准错误输出,nginx会怎么处理呢,打印一条日志就行了。
            if (f->length) {//代表数据长度
                if (f->pos == f->last) {//后面没东西了,还需要下次再读取一点数据才能继续了
                    break;
                }
                msg = f->pos;
                if (f->pos + f->length <= f->last) {//错误信息已经全部读取到了,
                    f->pos += f->length;
                    f->length = 0;
                    f->state = ngx_http_fastcgi_st_padding;//下一步去处理padding
                } else {
                    f->length -= f->last - f->pos;
                    f->pos = f->last;
                }
                for (m = f->pos - 1; msg < m; m--) {//从错误信息的后面往前面扫,直到找到一个部位\r,\n . 空格 的字符为止,也就是过滤后面的这些字符吧。
                    if (*m != LF && *m != CR && *m != '.' && *m != ' ') {
                        break;
                    }
                }//就用来打印个日志。没其他的。
                ngx_log_error(NGX_LOG_ERR, p->log, 0, "FastCGI sent in stderr: \"%*s\"", m + 1 - msg, msg);
                if (f->pos == f->last) {
                    break;
                }
            } else {
                f->state = ngx_http_fastcgi_st_version;
            }
            continue;
        }

经过上面的处理,只有STDOUT标准输出的ngx_http_fastcgi_st_data状态数据才会运行后面的代码,后面的代码会将这块数据部分放入到p->in链表的最后位置,也就是p->last_in指向的位置。对于数据的处理,有如下几种情况:

0. 如果f->pos + f->length < f->last,当前这块内存足够容纳这个FCGI包,也就是length+padding都位于这一块FCGI buf,那就将数据部分放入p->in;

1.如果f->pos + f->length == f->last,那就说明正好读取的FCGI数据buf刚好容纳,也就是数据部分正好位于末尾,也可以正好组成一块buf。剩下的看看是否需要处理padding了;

2.否则,说明这个数据data部分跨越了这个FCGI buf和下一个,那没办法了,从这里切断,这前面的部分作为一块ngx_buf_t链接到p->in后面;下次循环处理的时候会将剩下的部分挂在后面的。

这个函数最容易被忽略的地方是shadow。nginx 中的shadow的意思是什么呢,我们知道,ngx_http_fastcgi_input_filter中第二个参数ngx_buf_t *buf是指向刚刚用readv从upstream的连接中读取的裸数据,我们称之为FCGI裸数据,这块络数据可能包含多个FCGI包,也就是可能含有多个FCGI的data部分,这里的data就是HTML格式的数据。正常情况下我们会怎么做呢?简单的话,我们会这样做:

简单做法:从FCGI buf里面解析到了data后,根据beg-end将data数据拷贝出来,新建一块内存用来保存HTML格式的数据,放入p->in,后续用来发送。

但是一向以性能著称的nginx为了节省这次拷贝,不惜引入shadow的概念来避免拷贝,那怎么办呢?答案是:共享,或者说影子;p->in里面只是保存了buf的描述信息,具体内容还是根FCGI buf共享。这样引入的复杂性不少,而且不太直观。不知道我理解对了没有,错了麻烦告诉我一下;下面看一下函数后一部分的代码:

		//到这里就是标准的输出啦,也就是网页内容。
        /* f->type == NGX_HTTP_FASTCGI_STDOUT */
        if (f->pos == f->last) {
            break;//正好没有数据,返回
        }
        if (p->free) {//从free空闲ngx_buf_t结构中取一个
            b = p->free->buf;
            p->free = p->free->next;
        } else {
            b = ngx_alloc_buf(p->pool);
            if (b == NULL) {
                return NGX_ERROR;
            }
        }
		//用这个新的缓存描述结构,指向buf这块内存里面的标准输出数据部分,注意这里并没有拷贝数据,而是用b指向了f->pos也就是buf的某个数据地方。
        ngx_memzero(b, sizeof(ngx_buf_t));
        b->pos = f->pos;//从pos到end
        b->start = buf->start;//b 跟buf共享一块客户端发送过来的数据。这就是shadow的地方, 类似影子?
        b->end = buf->end;
        b->tag = p->tag;
        b->temporary = 1;
        b->recycled = 1;
	//在函数开始处,prev = &buf->shadow;下面就用buf->shadow指向了这块新分配的b描述结构,其实数据是分开的,只是2个描述结构指向同一个buffer
        *prev = b;//实际上,这里第一次是将&buf->shadow指向b,没什么用,因为没人指向&buf->shadow自己。而对于所有的shadow,我们可以通过p->in组成链表的。不断追加在后面
        prev = &b->shadow;//这里用最开始的buf,也就是客户端接收到数据的那块数据buf的shadow成员,形成一个链表,里面每个元素都是FCGI的一个包的data部分数据。
//下面将当前分析得到的FCGI数据data部分放入p->in的链表里面。
        cl = ngx_alloc_chain_link(p->pool);
        if (cl == NULL) {
            return NGX_ERROR;
        }
        cl->buf = b;
        cl->next = NULL;
        if (p->in) {
            *p->last_in = cl;
        } else {
            p->in = cl;
        }
        p->last_in = &cl->next;//记住最后一块

		//同样,拷贝一下数据块序号。不过这里注意,buf可能包含好几个FCGI协议数据块,
		//那就可能存在多个in里面的b->num等于一个相同的buf->num.不要认为是一一映射。
        /* STUB */ b->num = buf->num;
        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d %p", b->num, b->pos);
        if (f->pos + f->length < f->last) {//如果数据足够长,那修改一下f->pos,和f->state从而进入下一个数据包的处理。数据已经放入了p->in了的。
            if (f->padding) {
                f->state = ngx_http_fastcgi_st_padding;
            } else {
                f->state = ngx_http_fastcgi_st_version;
            }
            f->pos += f->length;
            b->last = f->pos;
            continue;//接收这块数据,继续下一块
        }
        if (f->pos + f->length == f->last) {//正好等于。下面可能需要读取padding,否则进入下一个数据包处理。
            if (f->padding) {
                f->state = ngx_http_fastcgi_st_padding;
            } else {
                f->state = ngx_http_fastcgi_st_version;
            }
            b->last = f->last;
            break;
        }
		//到这里,表示当前读取到的数据还少了,不够一个完整包的,那就用完这一点,然后返回,
		//等待下次event_pipe的时候再次read_upstream来读取一些数据再处理了。
        f->length -= f->last - f->pos;
        b->last = f->last;
        break;
    }//for循环结束。这个循环结束的条件为当前的buf数据已经全部处理完毕。

    if (b) {//刚才已经解析到了数据部分。
        b->shadow = buf;//将最后一块数据的shadow指向这块用来存放读入的裸FCGI数据块。干嘛用的呢,只是指向一下吗?
        //不是,这里的shadow成员正好用来存储: 我这个b所在的大buf 的指针,这样通过不断遍历p->in我是可以找出这些小data部分的b所在的大数据块的。
        //具体看ngx_event_pipe_drain_chains里面.
        b->last_shadow = 1;//标记这是最后一块。
        ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf %p %z", b->pos, b->last - b->pos);
        return NGX_OK;
    }
    /* there is no data record in the buf, add it to free chain */
    if (ngx_event_pipe_add_free_buf(p, buf) != NGX_OK) {//将buf挂入free_raw_bufs头部或者第二个位置,如果第一个位置有数据的话。
        return NGX_ERROR;
    }
    return NGX_OK;
}

在这个ngx_http_fastcgi_input_filter函数中我们能看出,函数开始用prev = &buf->shadow;也就是prev开始的时候指向这块FCGI格式的buf的shadow成员,然后在下面开始的部分,用*prev = b;b在这里是刚刚申请的ngx_buf_t结构,也就是,整块FCGI格式buf的shadow指向了其第一个数据部分,我们称为数据节点,然后prev = &b->shadow;改变prev指向为这个数据节点的shadow的地址,这样下次循环时,这个数据节点的shadow成员就指向第二个数据节点的起始位置。

那结束呢?看函数最后倒数第二个b,如果刚才已经找到数据节点了,b当然肯定是最后一个了,那就b->shadow = buf;还节的吗,buf就是这块大的FCGI格式的buf,也就是让最后一个数据节点的shadow指向所属的大裸FCGI格式buf,然后用b->last_shadow = 1;来标注:我这个节点的shadow不是指向下一个节点,而是指向我所属的大FCGI格式buf的地址。有点难解,下面用Graphviz画个图就清楚了,麻烦点击新窗口查看,图比较大不好显示,这里是下图Graphviz的代码ngx_event_pipe_t

ngx_event_pipe_t

图中注意别混淆了,带有“共享数据”的边实际上不是指向FCGI数据块buf的data部分,而是跟这个data部分指向同一个地方,也就是真实的BUF,但这不影响我们的理解。

上图中属于同一块裸FCGI 数据中的不同FCGI包,如不同FCGI_STDOUT的data节点通过shadow指针组成一个链表,最后一个节点比较特殊:通过last_shadow=1来标志,其shadow指针是指向所属的这块大FCIG裸buffer的。

在回收内存到free_raw_bufs的时候,需要注意比如如果回收,我们不能对途中的第1,2,3个buf节点回收,因为其实际上没有指向真实的buf,而是指向了裸FCGI数据的buf的data的地方,也就是共享了。如果要释放,必须找到last_shadow=1的节点,然后取得其shadow指针,其就指向了一块buf。


到这里读取FCGI数据的ngx_event_pipe_read_upstream终于介绍完了。

下面还有往客户端发送数据的ngx_event_pipe_write_to_downstream。篇幅太长,移步这里:Nginx upstream原理分析【3】-带buffering给客户端返回数据

Share
分类: Nginx 标签: , ,
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.

注意: 评论者允许使用'@user空格'的方式将自己的评论通知另外评论者。例如, ABC是本文的评论者之一,则使用'@ABC '(不包括单引号)将会自动将您的评论发送给ABC。使用'@all ',将会将评论发送给之前所有其它评论者。请务必注意user必须和评论者名相匹配(大小写一致)。