利志分享
fast_forward
view_headline
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
go基础知识
go的环境搭建
go变量
go常量
go字符串
go数组和切片
go的map和range的使用
go的struct的使用
go的函数使用
go的interface的使用
go channel使用
go的routine使用
go的panic和recover使用
go实现http请求
go 复杂的http请求
go实现表单提交
go实现表单验证
go上传附件
go实现mysql连接
go实现redis操作
go对xml操作
go的json操作
go的base64使用
go实现websocket功能
go的单元测试
go的文件操作
go的web服务基础
golang url解析和包介绍使用
go的正则表达式-MatchString,FindString等的使用
golang实现从byte和文件中读取csv格式数据
go进阶
go的类型转换
go的map的多维应用
go的多维数组和slice使用
go的select使用
go的原子性atomic类库使用
go给图片添加水印
go给图片添加文字
go实现http的rpc服务
go实现tcp的rpc服务
go实现json格式的rpc服务
多个defer的执行问题
golang的队列机制实现同步主线程接受子协程的结果
go的值传递和引用传递以及引用类型的问题
go中的make和new的使用问题
golang读文件分析1
golang读文件分析2
golang实现自然周计算
golang实现读写excel
go实战
beego的安装和使用
beego聊天室的基本配置
beego聊天室的生成
Go 写一个类似 cron 的定时任务管理器
Go 调度器 M, P 和 G
AES对称加密算法如何用golang语言实现?
非对称加密的RSA算法如何通过golang来实现?
golang实现http2.0服务端,客户端完整案例
go实战总结
go的日期操作类使用-日常使用类库no.1
go的字符串的连接讲解-日常实战总结no.1
golang实现队列服务-日常实战总结no.2
深入理解golang的channel的使用-日常实战总结no.3
go的sync.pool在实际应用中的讲解和性能分析比较-日常实战总结no.4
go语言中一个典型的引用类型的数据使用案例的注意点-日常实战总结no.5
go的sync包的使用详解1-日常实战总结6
go的sync包的使用详解2-日常实战总结7
深度学习go判断各个类型相等-日常实战总结8
go的排序类使用讲解-日常实战总结9
go的context使用讲解
golang 网络爬虫框架gocolly
golang实现桶排序
golang处理gb2312转utf-8编码的问题
golang实现单链的添加,删除以及翻转
golang的一个err不判断引起的血案(json.Marshal的error到底要不要判断?)
如何控制golang协程的并发数量问题-panic: too many concurrent operations on a single file or socket (max 1048575)
你所要知道的redis客户端返回值知识点
golang实现连续的时间,比如连续的天,月,年等。
go深入
由引用类型引发的概念的深入理解
sync.WaitGroup深入源码理解
golang如何创建动态的struct类型以及如何转换成slice类型
深入理解go的管道数据读写
关于go的只读管道只写管道以及单向管道的理解
深入理解go的slice深入,slice扩容机制
深入理解go的函数参数传递
golang实现动态调用不同struct中不同的方法
如何配置sqlx.DB的SetMaxOpenConns SetMaxIdleConns 和 SetConnMaxLifetime来保证更好的性能
深入理解go的select原理
深入理解golang的GPM模型
精通golang的项目管理go modules
深入理解golang的GC回收机制
超级肝文-深入剖析客户端出现connect reset by peer报错相关的技术知识
Golang源码深入-Go1.15.6发起http请求流程-1
Golang源码深入-Go1.15.6发起http请求流程-2
Golang源码深入-Go1.15.6发起http请求流程-3(http2)
go应用
需求整理-手把手带大家用go开发一个匿名在线聊天室
第二篇-手把手带大家用go开发一个匿名在线聊天室
第三篇-手把手带大家用go开发一个匿名在线聊天室
go面试
【建议收藏】吐血整理Golang面试干货21问-吊打面试官-1
【建议收藏】整理Golang面试第二篇干货13问
【建议收藏】Redis知识干货汇总
【建议收藏】Mysql知识干货(mysql八股文)汇总
目录
go基础知识
go的环境搭建
go变量
go常量
go字符串
go数组和切片
go的map和range的使用
go的struct的使用
go的函数使用
go的interface的使用
go channel使用
go的routine使用
go的panic和recover使用
go实现http请求
go 复杂的http请求
go实现表单提交
go实现表单验证
go上传附件
go实现mysql连接
go实现redis操作
go对xml操作
go的json操作
go的base64使用
go实现websocket功能
go的单元测试
go的文件操作
go的web服务基础
golang url解析和包介绍使用
go的正则表达式-MatchString,FindString等的使用
golang实现从byte和文件中读取csv格式数据
go进阶
go的类型转换
go的map的多维应用
go的多维数组和slice使用
go的select使用
go的原子性atomic类库使用
go给图片添加水印
go给图片添加文字
go实现http的rpc服务
go实现tcp的rpc服务
go实现json格式的rpc服务
多个defer的执行问题
golang的队列机制实现同步主线程接受子协程的结果
go的值传递和引用传递以及引用类型的问题
go中的make和new的使用问题
golang读文件分析1
golang读文件分析2
golang实现自然周计算
golang实现读写excel
go实战
beego的安装和使用
beego聊天室的基本配置
beego聊天室的生成
Go 写一个类似 cron 的定时任务管理器
Go 调度器 M, P 和 G
AES对称加密算法如何用golang语言实现?
非对称加密的RSA算法如何通过golang来实现?
golang实现http2.0服务端,客户端完整案例
go实战总结
go的日期操作类使用-日常使用类库no.1
go的字符串的连接讲解-日常实战总结no.1
golang实现队列服务-日常实战总结no.2
深入理解golang的channel的使用-日常实战总结no.3
go的sync.pool在实际应用中的讲解和性能分析比较-日常实战总结no.4
go语言中一个典型的引用类型的数据使用案例的注意点-日常实战总结no.5
go的sync包的使用详解1-日常实战总结6
go的sync包的使用详解2-日常实战总结7
深度学习go判断各个类型相等-日常实战总结8
go的排序类使用讲解-日常实战总结9
go的context使用讲解
golang 网络爬虫框架gocolly
golang实现桶排序
golang处理gb2312转utf-8编码的问题
golang实现单链的添加,删除以及翻转
golang的一个err不判断引起的血案(json.Marshal的error到底要不要判断?)
如何控制golang协程的并发数量问题-panic: too many concurrent operations on a single file or socket (max 1048575)
你所要知道的redis客户端返回值知识点
golang实现连续的时间,比如连续的天,月,年等。
go深入
由引用类型引发的概念的深入理解
sync.WaitGroup深入源码理解
golang如何创建动态的struct类型以及如何转换成slice类型
深入理解go的管道数据读写
关于go的只读管道只写管道以及单向管道的理解
深入理解go的slice深入,slice扩容机制
深入理解go的函数参数传递
golang实现动态调用不同struct中不同的方法
如何配置sqlx.DB的SetMaxOpenConns SetMaxIdleConns 和 SetConnMaxLifetime来保证更好的性能
深入理解go的select原理
深入理解golang的GPM模型
精通golang的项目管理go modules
深入理解golang的GC回收机制
超级肝文-深入剖析客户端出现connect reset by peer报错相关的技术知识
Golang源码深入-Go1.15.6发起http请求流程-1
Golang源码深入-Go1.15.6发起http请求流程-2
Golang源码深入-Go1.15.6发起http请求流程-3(http2)
go应用
需求整理-手把手带大家用go开发一个匿名在线聊天室
第二篇-手把手带大家用go开发一个匿名在线聊天室
第三篇-手把手带大家用go开发一个匿名在线聊天室
go面试
【建议收藏】吐血整理Golang面试干货21问-吊打面试官-1
【建议收藏】整理Golang面试第二篇干货13问
【建议收藏】Redis知识干货汇总
【建议收藏】Mysql知识干货(mysql八股文)汇总
Golang源码深入-Go1.15.6发起http请求流程-2
阅读:58
分享次数:0
上一篇文章我们讲到go client的大概实现的大概思路,整理了相关client.go的核心源码,详情请翻阅:[Golang源码深入-Go1.15.6发起http请求流程-1](https://zengzhihai.com/study/wiki/type/Z29fc3R1ZHlfaW5mbw==/id/624b0ab3469aea5bbc9a7437 "Golang源码深入-Go1.15.6发起http请求流程-1")。笔者这一篇分享一下transport.go相关核心的代码,整理相关核心的技术点,希望读者多交流学习。 go client的整理流程,主要函数调用和流程如下:NewRequestWithContext->client.Do->client.do->client.send->send->rt.RoundTrip->Transport.roundTrip->Transport.getConn->Transport.queueForDial->Transport.dialConnFor->Transport.dialConn-> Transport.readLoop()和Transport.writeLoop()->persistConn.roundTrip。 http.Client对象保存着Transport连接对象,Transport里面是一个最核心的是tcp连接池,连接池是处理http的请求,相对一个服务来说是全局的。在不同的函数中实例化这个对象处理不同的请求,在不重写Transport对象时,一个服务的连接都是默认复用。为什么是复用呢?是因为transport有个全局变量DefaultTransport,默认都是使用DefaultTransport这个全局对象。 http.NewRequest针对于每个请求都是独立的,每个请求request都是从http.Client里面获取连接,每个请求request都开启一个写协程处理发送请求,一个读协程处理响应请求,这个request本身则调用roundTrip函数启动for select 来监听读协程的结果,到此则请求完成。 下面我们来看核心模块代码翻译: 1:Transport.RoundTrip实现RoundTripper的方法 ``` func (t *Transport) RoundTrip(req *Request) (*Response, error) { return t.roundTrip(req) } ``` 2:Transport.roundTrip是主入口 ``` func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) if req.URL == nil { req.closeBody() return nil, errors.New("http: nil Request.URL") } if req.Header == nil { req.closeBody() return nil, errors.New("http: nil Request.Header") } scheme := req.URL.Scheme isHTTP := scheme == "http" || scheme == "https" // 下面判断request首部的有效性 if isHTTP { for k, vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { req.closeBody() return nil, fmt.Errorf("net/http: invalid header field name %q", k) } for _, v := range vv { if !httpguts.ValidHeaderFieldValue(v) { req.closeBody() return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k) } } } } origReq := req cancelKey := cancelKey{origReq} req = setupRewindBody(req) if altRT := t.alternateRoundTripper(req); altRT != nil { if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp, err } var err error req, err = rewindBody(req) if err != nil { return nil, err } } if !isHTTP { req.closeBody() return nil, badStringError("unsupported protocol scheme", scheme) } if req.Method != "" && !validMethod(req.Method) { req.closeBody() return nil, fmt.Errorf("net/http: invalid method %q", req.Method) } if req.URL.Host == "" { req.closeBody() return nil, errors.New("http: no Host in request URL") } // 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled) 的情况是不会进行重试的。具体参见shouldRetryRequest函数 for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // 获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,使用其自注册的RoundTrip处理,从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。 if err != nil { // 每个request都会在getConn中设置reqCanceler,获取连接失败,清空设置 t.setReqCanceler(cancelKey, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP2处理,使用HTTP2时,由于不缓存HTTP2连接,不对其做限制 t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // pconn.roundTrip中做了比较复杂的处理,该函数用于发送request并返回response。通过writeLoop发送request,通过readLoop返回response resp, err = pconn.roundTrip(treq) } if err == nil { resp.Request = origReq return resp, nil } // Failed. Clean up and determine whether to retry. if http2isNoCachedConnError(err) { if t.removeIdleConn(pconn) { t.decConnsPerHost(pconn.cacheKey) } } else if !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } return nil, err } testHookRoundTripRetried() // 用于重定向场景 req, err = rewindBody(req) if err != nil { return nil, err } } } ``` 3:getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接。 ``` func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func() { if err != nil { w.cancel(t, err) } }() // 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接 if delivered := t.queueForIdleConn(w); delivered { pc := w.pc // Trace only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(treq.cancelKey, func(error) {}) return pc, nil } cancelc := make(chan error, 1) t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err }) // 排队等待获取连接 t.queueForDial(w) // 通过select监听获取连接完成或者取消 select { case <-w.ready: // Trace success but only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) } if w.err != nil { // If the request has been cancelled, that's probably // what caused w.err; if so, prefer to return the // cancellation error (see golang.org/issue/16049). select { case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // return below } } return w.pc, w.err case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } } ``` 4:排队等待新建连接 ``` func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() // 如果没有限制最大连接数,直接建立连接 if t.MaxConnsPerHost <= 0 { go t.dialConnFor(w) return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() // 如果没超过连接数限制,直接建立连接 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } // 排队等待连接建立 q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) t.connsPerHostWait[w.key] = q } ``` 5:调用t.dialConn获取一个真正的*persistConn ``` func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() // 执行新建连接,拨号功能,如果新建连接成功,则添加当前连接到连接池 pc, err := t.dialConn(w.ctx, w.cm) delivered := w.tryDeliver(pc, err) if err == nil && (!delivered || pc.alt != nil) { // pconn was not passed to w, // or it is HTTP/2 and can be shared. // Add to the idle connection pool. t.putOrCloseIdleConn(pc) } // 如果建立连接或者获取连接失败,则删除连接池中的连接。 if err != nil { t.decConnsPerHost(w.key) } } ``` 6:dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop ``` func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } // 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout if cm.scheme() == "https" && t.hasCustomTLSDialer() { var err error pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } // 如果连接类型是TLS的,则需要处理TLS协商 if tc, ok := pconn.conn.(*tls.Conn); ok { // Handshake here, in case DialTLS didn't. TLSNextProto below // depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() } // 启动TLS协商,如果协商失败需要 关闭连接 if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return nil, err } cs := tc.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) } pconn.tlsState = &cs } } else { // 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr() conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // 处理proxy的情况 switch { case cm.proxyURL == nil: // Do nothing. Not using a proxy. case cm.proxyURL.Scheme == "socks5": conn := pconn.conn d := socksNewDialer("tcp", conn.RemoteAddr().String()) if u := cm.proxyURL.User; u != nil { auth := &socksUsernamePassword{ Username: u.Username(), } auth.Password, _ = u.Password() d.AuthMethods = []socksAuthMethod{ socksAuthMethodNotRequired, socksAuthMethodUsernamePassword, } d.Authenticate = auth.Authenticate } if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil { conn.Close() return nil, err } case cm.targetScheme == "http": pconn.isProxy = true if pa := cm.proxyAuth(); pa != "" { pconn.mutateHeaderFunc = func(h Header) { h.Set("Proxy-Authorization", pa) } } case cm.targetScheme == "https": conn := pconn.conn hdr := t.ProxyConnectHeader if hdr == nil { hdr = make(Header) } if pa := cm.proxyAuth(); pa != "" { hdr = hdr.Clone() hdr.Set("Proxy-Authorization", pa) } connectReq := &Request{ Method: "CONNECT", URL: &url.URL{Opaque: cm.targetAddr}, Host: cm.targetAddr, Header: hdr, } // If there's no done channel (no deadline or cancellation // from the caller possible), at least set some (long) // timeout here. This will make sure we don't block forever // and leak a goroutine if the connection stops replying // after the TCP connect. connectCtx := ctx if ctx.Done() == nil { newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() connectCtx = newCtx } didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails var ( resp *Response err error // write or read error ) // Write the CONNECT request & read the response. go func() { defer close(didReadResponse) err = connectReq.Write(conn) if err != nil { return } // Okay to use and discard buffered reader here, because // TLS server will not speak until spoken to. br := bufio.NewReader(conn) resp, err = ReadResponse(br, connectReq) }() select { case <-connectCtx.Done(): conn.Close() <-didReadResponse return nil, connectCtx.Err() case <-didReadResponse: // resp or err now set } if err != nil { conn.Close() return nil, err } if resp.StatusCode != 200 { f := strings.SplitN(resp.Status, " ", 2) conn.Close() if len(f) < 2 { return nil, errors.New("unknown status code") } return nil, errors.New(f[1]) } } if cm.proxyURL != nil && cm.targetScheme == "https" { if err := pconn.addTLS(cm.tlsHost(), trace); err != nil { return nil, err } } if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) if e, ok := alt.(http2erringRoundTripper); ok { // pconn.conn was closed by next (http2configureTransport.upgradeFn). return nil, e.err } return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil } } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) // 处理请求response go pconn.readLoop() // 开启协程处理请求 go pconn.writeLoop() return pconn, nil } ``` 7:readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。 ``` func (pc *persistConn) readLoop() { // 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个处理,任何一个loop退出(协议升级除外)则该连接不可用,readLoop跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源 closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() // 尝试将连接放回连接池 tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // 变量主要用于阻塞调用者协程读取EOF的resp.body,直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,便于连接快速重用 eofc := make(chan struct{}) defer close(eofc) // unblock reader on errors // Read this once, before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock() alive := true for alive { // 获取允许的response首部的最大字节数 pc.readLimit = pc.maxHeaderResponseSize() _, err := pc.br.Peek(1) pc.mu.Lock() if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response // 如果有response数据,则读取并解析为Response格式 if err == nil { resp, err = pc.readResponse(rc, trace) } else { // 可能的错误如server端关闭,发送EOF err = transportReadFromServerError{err} closeErr = err } if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: return } return } pc.readLimit = maxInt64 // effectively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.cancelKey, nil) // Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll // get this same conn. But we use the unbuffered channel 'rc' // to guarantee that persistConn.roundTrip got out of its select // potentially waiting for this persistConn to close. // but after alive = alive && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyWritable { closeErr = errCallerOwnsConn } select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) testHookReadLoopBeforeNextRead() continue } waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } // 返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中 resp.Body = body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } // 此处与处理不带resp.body的场景相同 select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { case bodyEOF := <-waitForBodyRead: // 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。 pc.t.setReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) // 释放阻塞的读操作 if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } } ``` 8:writeLoop用于发送request请求 ``` func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) // writeLoop会阻塞等待两个IO case 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;如果底层连接关闭,则退出writeLoop for { select { case wr := <-pc.writech: startBytesWritten := pc.nwrite // 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } // 请求失败时,需要关闭request和底层连接 if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } // 将结果发送给readLoop的pc.wroteRequest()函数处理 pc.writeErrCh <- err // 将结果返回给roundTrip处理,防止响应超时 wr.ch <- err // 如果发送request失败,需要关闭连接。writeLoop退出时会关闭pc.conn和pc.closech,同时会导致readLoop退出 if err != nil { pc.close(err) return } case <-pc.closech: return } } } ``` 9:一个roundTrip用于处理一个request,通过for select来监听结果。 ``` func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { testHookEnterRoundTrip() // 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消, 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误 if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) { pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.mu.Lock() // 与readLoop配合使用,表示期望的响应的个数 pc.numExpectedResponses++ headerFn := pc.mutateHeaderFunc pc.mu.Unlock() if headerFn != nil { headerFn(req.extraHeaders()) } // Ask for a compressed version if the caller didn't set their // own value for Accept-Encoding. We only attempt to // uncompress the gzip stream if we were the layer that // requested it. requestedGzip := false // 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且仅在调用者没有设置这些首部时设置 if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && req.Method != "HEAD" { // Request gzip only, not deflate. Deflate is ambiguous and // not as universally supported anyway. // See: https://zlib.net/zlib_faq.html#faq39 // // Note that we don't request this for HEAD requests, // due to a bug in nginx: // https://trac.nginx.org/nginx/ticket/358 // https://golang.org/issue/5522 // // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail // anyway. See https://golang.org/issue/8923 requestedGzip = true req.extraHeaders().Set("Accept-Encoding", "gzip") } var continueCh chan struct{} if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{}, 1) } // HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置 if pc.t.DisableKeepAlives && !req.wantsClose() { req.extraHeaders().Set("Connection", "close") } // 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞 gone := make(chan struct{}) defer close(gone) defer func() { if err != nil { pc.t.setReqCanceler(req.cancelKey, nil) } }() const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite // 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, cancelKey: req.cancelKey, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() // 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse, 退出roundtrip函数 for { testHookWaitResLoop() select { // writeLoop返回发送request后的结果 case err := <-writeErrCh: if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } if d := pc.t.ResponseHeaderTimeout; d > 0 { if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } // 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。 case <-pc.closech: if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) // 等待获取response超时,关闭连接 case <-respHeaderTimer: if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout // 接收到readLoop返回的response结果 case re := <-resc: // 极异常情况,直接程序panic if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } // 到这里是最终的成功返回的结果。 return re.res, nil // request取消 case <-cancelChan: pc.t.cancelRequest(req.cancelKey, errRequestCanceled) // 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.cancelKey, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } } ``` 关于源码就分享到这里。 主要核心流程以及功能梳理图解如下:  **总结:** 1:go发起http1.1请求,遇到不关心的请求,请务必完整读取响应内容以保障连接复用性。 2:如果在http.client 中没有设置transport熟悉,则会使用文章开头说的DefaultTransport,这里设置的默认最大空闲连接数MaxIdleConns,每个host最大空闲连接数MaxIdleConnsPerHost是2,每个host的最大连接数MaxConnsPerHost是0。在大量并发情况下,默认配置会造成很多链接,进而性能急剧下降。如果需要控制合适的连接数,就需要使用自定义的client和transport。配置方式如下: ``` t := http.DefaultTransport.(*http.Transport).Clone() t.MaxIdleConns = 100 t.MaxConnsPerHost = 100 t.MaxIdleConnsPerHost = 100 httpClient = &http.Client{ Timeout: 10 * time.Second, Transport: t, } ``` 3:http1.1线头阻塞:http中一个连接上的请求,需要等这个请求处理完了才能继续下一个请求。 参照文献: https://www.jb51.net/article/193675.htm https://www.cnblogs.com/charlieroro/p/11409153.html
感觉本站内容不错,读后有收获?
attach_money
我要小额打赏,鼓励作者写出更好的教程
扫码关注公众号:talk_lizhi