利志分享
fast_forward
view_headline
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
开发工具箱
go教程
clickhouse教程
kafka教程
python教程
shell教程
原创杂文
打赏
go基础知识
go的环境搭建
go变量
go常量
go字符串
go数组和切片
go的map和range的使用
go的struct的使用
go的函数使用
go的interface的使用
go channel使用
go的routine使用
go的panic和recover使用
go实现http请求
go 复杂的http请求
go实现表单提交
go实现表单验证
go上传附件
go实现mysql连接
go实现redis操作
go对xml操作
go的json操作
go的base64使用
go实现websocket功能
go的单元测试
go的文件操作
go的web服务基础
golang url解析和包介绍使用
go的正则表达式-MatchString,FindString等的使用
golang实现从byte和文件中读取csv格式数据
go进阶
go的类型转换
go的map的多维应用
go的多维数组和slice使用
go的select使用
go的原子性atomic类库使用
go给图片添加水印
go给图片添加文字
go实现http的rpc服务
go实现tcp的rpc服务
go实现json格式的rpc服务
多个defer的执行问题
golang的队列机制实现同步主线程接受子协程的结果
go的值传递和引用传递以及引用类型的问题
go中的make和new的使用问题
golang读文件分析1
golang读文件分析2
golang实现自然周计算
golang实现读写excel
go实战
beego的安装和使用
beego聊天室的基本配置
beego聊天室的生成
Go 写一个类似 cron 的定时任务管理器
Go 调度器 M, P 和 G
AES对称加密算法如何用golang语言实现?
非对称加密的RSA算法如何通过golang来实现?
golang实现http2.0服务端,客户端完整案例
go实战总结
go的日期操作类使用-日常使用类库no.1
go的字符串的连接讲解-日常实战总结no.1
golang实现队列服务-日常实战总结no.2
深入理解golang的channel的使用-日常实战总结no.3
go的sync.pool在实际应用中的讲解和性能分析比较-日常实战总结no.4
go语言中一个典型的引用类型的数据使用案例的注意点-日常实战总结no.5
go的sync包的使用详解1-日常实战总结6
go的sync包的使用详解2-日常实战总结7
深度学习go判断各个类型相等-日常实战总结8
go的排序类使用讲解-日常实战总结9
go的context使用讲解
golang 网络爬虫框架gocolly
golang实现桶排序
golang处理gb2312转utf-8编码的问题
golang实现单链的添加,删除以及翻转
golang的一个err不判断引起的血案(json.Marshal的error到底要不要判断?)
如何控制golang协程的并发数量问题-panic: too many concurrent operations on a single file or socket (max 1048575)
你所要知道的redis客户端返回值知识点
golang实现连续的时间,比如连续的天,月,年等。
go深入
由引用类型引发的概念的深入理解
sync.WaitGroup深入源码理解
golang如何创建动态的struct类型以及如何转换成slice类型
深入理解go的管道数据读写
关于go的只读管道只写管道以及单向管道的理解
深入理解go的slice深入,slice扩容机制
深入理解go的函数参数传递
golang实现动态调用不同struct中不同的方法
如何配置sqlx.DB的SetMaxOpenConns SetMaxIdleConns 和 SetConnMaxLifetime来保证更好的性能
深入理解go的select原理
深入理解golang的GPM模型
精通golang的项目管理go modules
深入理解golang的GC回收机制
超级肝文-深入剖析客户端出现connect reset by peer报错相关的技术知识
Golang源码深入-Go1.15.6发起http请求流程-1
Golang源码深入-Go1.15.6发起http请求流程-2
Golang源码深入-Go1.15.6发起http请求流程-3(http2)
go应用
需求整理-手把手带大家用go开发一个匿名在线聊天室
第二篇-手把手带大家用go开发一个匿名在线聊天室
第三篇-手把手带大家用go开发一个匿名在线聊天室
go面试
【建议收藏】吐血整理Golang面试干货21问-吊打面试官-1
【建议收藏】整理Golang面试第二篇干货13问
【建议收藏】Redis知识干货汇总
【建议收藏】Mysql知识干货(mysql八股文)汇总
目录
go基础知识
go的环境搭建
go变量
go常量
go字符串
go数组和切片
go的map和range的使用
go的struct的使用
go的函数使用
go的interface的使用
go channel使用
go的routine使用
go的panic和recover使用
go实现http请求
go 复杂的http请求
go实现表单提交
go实现表单验证
go上传附件
go实现mysql连接
go实现redis操作
go对xml操作
go的json操作
go的base64使用
go实现websocket功能
go的单元测试
go的文件操作
go的web服务基础
golang url解析和包介绍使用
go的正则表达式-MatchString,FindString等的使用
golang实现从byte和文件中读取csv格式数据
go进阶
go的类型转换
go的map的多维应用
go的多维数组和slice使用
go的select使用
go的原子性atomic类库使用
go给图片添加水印
go给图片添加文字
go实现http的rpc服务
go实现tcp的rpc服务
go实现json格式的rpc服务
多个defer的执行问题
golang的队列机制实现同步主线程接受子协程的结果
go的值传递和引用传递以及引用类型的问题
go中的make和new的使用问题
golang读文件分析1
golang读文件分析2
golang实现自然周计算
golang实现读写excel
go实战
beego的安装和使用
beego聊天室的基本配置
beego聊天室的生成
Go 写一个类似 cron 的定时任务管理器
Go 调度器 M, P 和 G
AES对称加密算法如何用golang语言实现?
非对称加密的RSA算法如何通过golang来实现?
golang实现http2.0服务端,客户端完整案例
go实战总结
go的日期操作类使用-日常使用类库no.1
go的字符串的连接讲解-日常实战总结no.1
golang实现队列服务-日常实战总结no.2
深入理解golang的channel的使用-日常实战总结no.3
go的sync.pool在实际应用中的讲解和性能分析比较-日常实战总结no.4
go语言中一个典型的引用类型的数据使用案例的注意点-日常实战总结no.5
go的sync包的使用详解1-日常实战总结6
go的sync包的使用详解2-日常实战总结7
深度学习go判断各个类型相等-日常实战总结8
go的排序类使用讲解-日常实战总结9
go的context使用讲解
golang 网络爬虫框架gocolly
golang实现桶排序
golang处理gb2312转utf-8编码的问题
golang实现单链的添加,删除以及翻转
golang的一个err不判断引起的血案(json.Marshal的error到底要不要判断?)
如何控制golang协程的并发数量问题-panic: too many concurrent operations on a single file or socket (max 1048575)
你所要知道的redis客户端返回值知识点
golang实现连续的时间,比如连续的天,月,年等。
go深入
由引用类型引发的概念的深入理解
sync.WaitGroup深入源码理解
golang如何创建动态的struct类型以及如何转换成slice类型
深入理解go的管道数据读写
关于go的只读管道只写管道以及单向管道的理解
深入理解go的slice深入,slice扩容机制
深入理解go的函数参数传递
golang实现动态调用不同struct中不同的方法
如何配置sqlx.DB的SetMaxOpenConns SetMaxIdleConns 和 SetConnMaxLifetime来保证更好的性能
深入理解go的select原理
深入理解golang的GPM模型
精通golang的项目管理go modules
深入理解golang的GC回收机制
超级肝文-深入剖析客户端出现connect reset by peer报错相关的技术知识
Golang源码深入-Go1.15.6发起http请求流程-1
Golang源码深入-Go1.15.6发起http请求流程-2
Golang源码深入-Go1.15.6发起http请求流程-3(http2)
go应用
需求整理-手把手带大家用go开发一个匿名在线聊天室
第二篇-手把手带大家用go开发一个匿名在线聊天室
第三篇-手把手带大家用go开发一个匿名在线聊天室
go面试
【建议收藏】吐血整理Golang面试干货21问-吊打面试官-1
【建议收藏】整理Golang面试第二篇干货13问
【建议收藏】Redis知识干货汇总
【建议收藏】Mysql知识干货(mysql八股文)汇总
Golang源码深入-Go1.15.6发起http请求流程-3(http2)
阅读:51
分享次数:0
这一篇文章是分享http2的文章,前两篇主要是http1的源码实现,前两篇有兴趣的可以再阅读一下。详情可点击:[Golang源码深入-Go1.15.6发起http请求流程-1](https://zengzhihai.com/study/wiki/type/Z29fc3R1ZHlfaW5mbw==/id/624b0ab3469aea5bbc9a7437 "Golang源码深入-Go1.15.6发起http请求流程-1")和[Golang源码深入-Go1.15.6发起http请求流程-2](https://zengzhihai.com/study/wiki/type/Z29fc3R1ZHlfaW5mbw==/id/6250a2f7469aea1db651b5ea "Golang源码深入-Go1.15.6发起http请求流程-2")。 目前http2协议在很多的web网站中有应用,http2通过多路复用,二进制流,Header压缩等等技术,极大地提高了性能。 http2的源码核心流程图解如下: https://www.processon.com/diagraming/6250fe4963768946f277cde9 下面我们来分别看下不同的模块的源码: 1:RoundTrip函数 ``` // http2的入口函数 func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { return t.RoundTripOpt(req, RoundTripOpt{}) } ``` 2:RoundTripOpt函数,实现处理请求 ``` // 实现函数:RoundTripOpt func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { // 判断是否是https,或者客户端手动设置是允许HTTP if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { return nil, errors.New("http2: unsupported scheme") } // 鉴权scheme和host,返回ip:port的结构 addr := authorityAddr(req.URL.Scheme, req.URL.Host) // http2协议发送请求有重试机制,这个版本默认是重试6次。只有在获取不到连接或者重试次数6次的时候进行重试。 for retry := 0; ; retry++ { // 这个是获取连接的方法,从连接池拿或者新建连接。 cc, err := t.connPool().GetClientConn(req, addr) if err != nil { t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) return nil, err } reused := !atomic.CompareAndSwapUint32(&cc.reused, 0, 1) // 这个是打印traceLog日志的 traceGotConn(req, cc, reused) // 这个方法是最后处理请求或者响应的方法 res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req) if err != nil && retry <= 6 { // 判断哪些err能重试,并且 if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil { // 第一次直接重试,不进行暂停逻辑。 if retry == 0 { continue } // 生成一个随机值来,判断过多久之后进行重试。 backoff := float64(uint(1) << (uint(retry) - 1)) backoff += backoff * (0.1 * mathrand.Float64()) // 通过select 来监听重试时间和当前请求执行超时,重试是继续,请求超时则直接返回。 select { case <-time.After(time.Second * time.Duration(backoff)): continue case <-req.Context().Done(): return nil, req.Context().Err() } } } // 这里是重试次数超过了还是报错,则返回错误。 if err != nil { t.vlogf("RoundTrip failure: %v", err) return nil, err } // 这里是正常请求返回 return res, nil } } ``` 3:函数:GetClientConn,获取处理请求连接。 ``` func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) { return p.getClientConn(req, addr, dialOnMiss) } ``` 4:函数:getClientConn,实现获取处理请求连接。 ``` func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) { // 先判断请求是否关闭 if isConnectionCloseRequest(req) && dialOnMiss { // It gets its own connection. traceGetConn(req, addr) const singleUse = true cc, err := p.t.dialClientConn(addr, singleUse) if err != nil { return nil, err } return cc, nil } // 获取请求加锁,全局锁 p.mu.Lock() // 循环连接池,key是host+port生成的 for _, cc := range p.conns[addr] { // 判断当前的连接状态,是否能获取到新请求 if st := cc.idleState(); st.canTakeNewRequest { if p.shouldTraceGetConn(st) { traceGetConn(req, addr) } // 如果能获取到连接,则解锁,返回连接 p.mu.Unlock() return cc, nil } } // 这个判断,反正我是没看懂为啥,感觉是多余 if !dialOnMiss { p.mu.Unlock() return nil, ErrNoCachedConn } traceGetConn(req, addr) // 这里是第一次获取连接。 call := p.getStartDialLocked(addr) // 请求解锁 p.mu.Unlock() <-call.done return call.res, call.err } ``` 5:getStartDialLocked 获取开始的监听之后生成的第一个链接。 ``` // requires p.mu is held. func (p *clientConnPool) getStartDialLocked(addr string) *dialCall { // 判断正在连接,如果存在则返回这个连接dialCall if call, ok := p.dialing[addr]; ok { // A dial is already in-flight. Don't start another. return call } // 生成新的连接对象,进行返回 call := &dialCall{p: p, done: make(chan struct{})} if p.dialing == nil { p.dialing = make(map[string]*dialCall) } p.dialing[addr] = call // 这里是开启协程去建立新连接,给call对象赋值 go call.dial(addr) // 直接返回本身的连接对象 return call } ``` 6:dial 建立新连接 ``` // run in its own goroutine. func (c *dialCall) dial(addr string) { const singleUse = false // shared conn // 建立客户端的tcp连接 c.res, c.err = c.p.t.dialClientConn(addr, singleUse) close(c.done) // 加锁 c.p.mu.Lock() // 这个目前没看懂为什么要删除,目的何在? delete(c.p.dialing, addr) if c.err == nil { // 初始化p.conns和p.keys 并把当前对象连接存进去 c.p.addConnLocked(addr, c.res) } c.p.mu.Unlock() } ``` 7:dialClientConn 建立tcp链接 ``` func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { // 获取host host, _, err := net.SplitHostPort(addr) if err != nil { return nil, err } // 建立tls安全协议的tcp连接。 tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) if err != nil { return nil, err } // 返回建立的新连接 return t.newClientConn(tconn, singleUse) } ``` 8:newClientConn 建立基于tls的安全tcp协议的http 2的通信 ``` func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { cc := &ClientConn{ t: t, tconn: c, readerDone: make(chan struct{}), nextStreamID: 1, maxFrameSize: 16 << 10, // spec default initialWindowSize: 65535, // spec default maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. streams: make(map[uint32]*clientStream), singleUse: singleUse, wantSettingsAck: true, pings: make(map[[8]byte]chan struct{}), } // 把设置的空闲时间赋值给每个新建的连接,并设置空闲定时器 if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) } // 打印日志 if VerboseLogs { t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) } // 初始化ClientConn相关的结构 cc.cond = sync.NewCond(&cc.mu) cc.flow.add(int32(initialWindowSize)) // TODO: adjust this writer size to account for frame size + // MTU + crypto/tls record padding. cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr}) cc.br = bufio.NewReader(c) cc.fr = NewFramer(cc.bw, cc.br) cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) cc.fr.MaxHeaderListSize = t.maxHeaderListSize() // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on // henc in response to SETTINGS frames? cc.henc = hpack.NewEncoder(&cc.hbuf) if t.AllowHTTP { cc.nextStreamID = 3 } if cs, ok := c.(connectionStater); ok { state := cs.ConnectionState() cc.tlsState = &state } initialSettings := []Setting{ {ID: SettingEnablePush, Val: 0}, {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, } if max := t.maxHeaderListSize(); max != 0 { initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max}) } cc.bw.Write(clientPreface) cc.fr.WriteSettings(initialSettings...) cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) cc.inflow.add(transportDefaultConnFlow + initialWindowSize) cc.bw.Flush() if cc.werr != nil { cc.Close() return nil, cc.werr } // 起一个处理response的协程处理。 go cc.readLoop() return cc, nil } ``` 9:readLoop ``` // readLoop运行在一个一个协程中,处理读数据 func (cc *ClientConn) readLoop() { // 初始化读的对象 rl := &clientConnReadLoop{cc: cc} defer rl.cleanup() // 读对象执行run方法 cc.readerErr = rl.run() if ce, ok := cc.readerErr.(ConnectionError); ok { cc.wmu.Lock() cc.fr.WriteGoAway(0, ErrCode(ce), nil) cc.wmu.Unlock() } } ``` 10:clientConnReadLoop.run ``` func (rl *clientConnReadLoop) run() error { cc := rl.cc // 如果禁用了长连接,或者开启单个http请求,则设置空闲开关关闭 rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse gotReply := false // ever saw a HEADERS reply gotSettings := false readIdleTimeout := cc.t.ReadIdleTimeout var t *time.Timer if readIdleTimeout != 0 { t = time.AfterFunc(readIdleTimeout, cc.healthCheck) defer t.Stop() } // 死循环读取帧数据,为了尽快读完http2协议和返回的数据 for { // 这里是读帧数据 f, err := cc.fr.ReadFrame() // 如果设置了读数据的超时时间,则要等读完数据之后再进行心跳检测包。所以需要重置定时器 if t != nil { t.Reset(readIdleTimeout) } if err != nil { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } // 如果是流数据读取错误,则会重置流数据进行读取 if se, ok := err.(StreamError); ok { if cs := cc.streamByID(se.StreamID, false); cs != nil { cs.cc.writeStreamReset(cs.ID, se.Code, err) cs.cc.forgetStreamID(cs.ID) if se.Cause == nil { se.Cause = cc.fr.errDetail } rl.endStreamError(cs, se) } continue // 如果是其他错误,则直接返回错误 } else if err != nil { return err } if VerboseLogs { cc.vlogf("http2: Transport received %s", summarizeFrame(f)) } if !gotSettings { // 协议参数配置错误,则会报PROTOCOL_ERROR if _, ok := f.(*SettingsFrame); !ok { cc.logf("protocol error: received %T before a SETTINGS frame", f) return ConnectionError(ErrCodeProtocol) } gotSettings = true } maybeIdle := false // whether frame might transition us to idle switch f := f.(type) { // 处理http2头部数据流 case *MetaHeadersFrame: err = rl.processHeaders(f) maybeIdle = true gotReply = true // 处理http数据流 case *DataFrame: err = rl.processData(f) maybeIdle = true // 告诉远端停止在当前连接上创建流 case *GoAwayFrame: err = rl.processGoAway(f) maybeIdle = true // 协议数据被重置RST的流 case *RSTStreamFrame: err = rl.processResetStream(f) maybeIdle = true // 检验点对点的设置参数配置数据流 case *SettingsFrame: err = rl.processSettings(f) // 用于初始化服务器流 case *PushPromiseFrame: err = rl.processPushPromise(f) // 用于实现流控制 case *WindowUpdateFrame: err = rl.processWindowUpdate(f) // 心跳检测流 case *PingFrame: err = rl.processPing(f) default: cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { if VerboseLogs { cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) } return err } // 如果获取完数据,并且这个连接空闲,并且空闲开关是关闭,则会直接关闭当前连接。 if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } } } ``` 11:processData:处理http2的数据流函数 ``` func (rl *clientConnReadLoop) processData(f *DataFrame) error { cc := rl.cc cs := cc.streamByID(f.StreamID, f.StreamEnded()) data := f.Data() // 如果当前连接不存在 if cs == nil { cc.mu.Lock() neverSent := cc.nextStreamID cc.mu.Unlock() // 如果当前的流数据Id大于等于下一个,则表示结束或者协议有异常 if f.StreamID >= neverSent { // We never asked for this. cc.logf("http2: Transport received unsolicited DATA frame; closing connection") return ConnectionError(ErrCodeProtocol) } // We probably did ask for this, but canceled. Just ignore it. // TODO: be stricter here? only silently ignore things which // we canceled, but not things which were closed normally // by the peer? Tough without accumulating too much state. // But at least return their flow control: if f.Length > 0 { cc.mu.Lock() cc.inflow.add(int32(f.Length)) cc.mu.Unlock() cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(f.Length)) cc.bw.Flush() cc.wmu.Unlock() } return nil } if !cs.firstByte { cc.logf("protocol error: received DATA before a HEADERS frame") rl.endStreamError(cs, StreamError{ StreamID: f.StreamID, Code: ErrCodeProtocol, }) return nil } if f.Length > 0 { // 判断不是头协议 if cs.req.Method == "HEAD" && len(data) > 0 { cc.logf("protocol error: received DATA on a HEAD request") rl.endStreamError(cs, StreamError{ StreamID: f.StreamID, Code: ErrCodeProtocol, }) return nil } // Check connection-level flow control. // 从数据流读data的数据 cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { cs.inflow.take(int32(f.Length)) } else { cc.mu.Unlock() return ConnectionError(ErrCodeFlowControl) } // Return any padded flow control now, since we won't // refund it later on body reads. var refund int if pad := int(f.Length) - len(data); pad > 0 { refund += pad } // Return len(data) now if the stream is already closed, // since data will never be read. didReset := cs.didReset if didReset { refund += len(data) } // 这里是读取到数据之后,更新读取当前的tcp流数据 if refund > 0 { cc.inflow.add(int32(refund)) cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(refund)) if !didReset { cs.inflow.add(int32(refund)) cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) } cc.bw.Flush() cc.wmu.Unlock() } cc.mu.Unlock() // 这里是读取数据,然后最后读到的数据写到当前连接的read buf中 if len(data) > 0 && !didReset { if _, err := cs.bufPipe.Write(data); err != nil { rl.endStreamError(cs, err) return err } } } // 判断当前读出流数据是不是结束的帧,如果是则会把最后的响应结果写到cs.resc, if f.StreamEnded() { rl.endStream(cs) } return nil } ``` 12:roundTrip函数,处理请求头,和响应的函数 ``` func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) { // 检测连接协议头是否有异常 if err := checkConnHeaders(req); err != nil { return nil, false, err } // 停止空闲连接的定时器。 if cc.idleTimer != nil { cc.idleTimer.Stop() } // 处理请求头信息 trailers, err := commaSeparatedTrailers(req) if err != nil { return nil, false, err } hasTrailers := trailers != "" cc.mu.Lock() if err := cc.awaitOpenSlotForRequest(req); err != nil { cc.mu.Unlock() return nil, false, err } // 计算请求的长度 body := req.Body contentLen := actualContentLength(req) hasBody := contentLen != 0 // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? // 请求未压缩,则设置gzip压缩 var requestedGzip bool if !cc.t.disableCompression() && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && req.Method != "HEAD" { // Request gzip only, not deflate. Deflate is ambiguous and // not as universally supported anyway. // See: https://zlib.net/zlib_faq.html#faq39 // // Note that we don't request this for HEAD requests, // due to a bug in nginx: // http://trac.nginx.org/nginx/ticket/358 // https://golang.org/issue/5522 // // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail // anyway. See https://golang.org/issue/8923 requestedGzip = true } // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is // sent by writeRequestBody below, along with any Trailers, // again in form HEADERS{1}, CONTINUATION{0,}) // 处理请求头信息encode hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) if err != nil { cc.mu.Unlock() return nil, false, err } cs := cc.newStream() cs.req = req cs.trace = httptrace.ContextClientTrace(req.Context()) cs.requestedGzip = requestedGzip bodyWriter := cc.t.getBodyWriterState(cs, body) cs.on100 = bodyWriter.on100 defer func() { cc.wmu.Lock() werr := cc.werr cc.wmu.Unlock() if werr != nil { cc.Close() } }() cc.wmu.Lock() endStream := !hasBody && !hasTrailers // 发送头部请求 werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) cc.wmu.Unlock() traceWroteHeaders(cs.trace) cc.mu.Unlock() if werr != nil { if hasBody { req.Body.Close() // per RoundTripper contract bodyWriter.cancel() } cc.forgetStreamID(cs.ID) // Don't bother sending a RST_STREAM (our write already failed; // no need to keep writing) traceWroteRequest(cs.trace, werr) return nil, false, werr } var respHeaderTimer <-chan time.Time if hasBody { bodyWriter.scheduleBodyWrite() } else { traceWroteRequest(cs.trace, nil) if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } } readLoopResCh := cs.resc bodyWritten := false ctx := req.Context() handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) { res := re.res if re.err != nil || res.StatusCode > 299 { // On error or status code 3xx, 4xx, 5xx, etc abort any // ongoing write, assuming that the server doesn't care // about our request body. If the server replied with 1xx or // 2xx, however, then assume the server DOES potentially // want our body (e.g. full-duplex streaming: // golang.org/issue/13444). If it turns out the server // doesn't, they'll RST_STREAM us soon enough. This is a // heuristic to avoid adding knobs to Transport. Hopefully // we can keep it. bodyWriter.cancel() cs.abortRequestBodyWrite(errStopReqBodyWrite) if hasBody && !bodyWritten { <-bodyWriter.resc } } if re.err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), re.err } res.Request = req res.TLS = cc.tlsState return res, false, nil } // 通过for select 来监听读请求详情结果,之前起的读协程读取response的响应结果会写回readLoopResCh := cs.resc for { select { // 处理响应结果 case re := <-readLoopResCh: return handleReadLoopResponse(re) // 响应头超时 case <-respHeaderTimer: if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) <-bodyWriter.resc } cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), errTimeout // 这个是客户端请求的超时处理 case <-ctx.Done(): if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) <-bodyWriter.resc } cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), ctx.Err() // 请求主动取消 case <-req.Cancel: if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) <-bodyWriter.resc } cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), errRequestCanceled // 处理报文被重置 case <-cs.peerReset: // processResetStream already removed the // stream from the streams map; no need for // forgetStreamID. return nil, cs.getStartedWrite(), cs.resetErr // body写入的状态监听 case err := <-bodyWriter.resc: bodyWritten = true // Prefer the read loop's response, if available. Issue 16102. select { case re := <-readLoopResCh: return handleReadLoopResponse(re) default: } if err != nil { cc.forgetStreamID(cs.ID) return nil, cs.getStartedWrite(), err } if d := cc.responseHeaderTimeout(); d != 0 { timer := time.NewTimer(d) defer timer.Stop() respHeaderTimer = timer.C } } } } ``` 相关学习的知识整理: 1:http2的帧,FrameHeader基础协议是9个字节,结构如下: ``` type FrameHeader struct { //标志位,默认是1个位 valid bool // 帧类型,默认是1个字节,8位 Type FrameType // 帧类型标记,默认1个字节 8位 Flags Flags // 帧的长度 默认三个字节,24位,最大表示16MB Length uint32 // 帧的唯一ID,标识符,0x00值被保留给与连接相关联的帧作为一个整体,而不是单独的流。是31位 StreamID uint32 } ``` 2:http2的数据协议帧结构如下: ``` type DataFrame struct { FrameHeader data []byte } ``` 3:http2的头部协议实现结构是HeadersFrame,这个主要是实现http2的协议响应。 4:关于如何通过本地实现http2的服务实现,并且通过代码debug来查看http2的请求过程,笔者也是通过先实现功能,然后通过debug,单步调试查看源码的执行流程,我这里整理一份代码,有兴趣可以参考如下链接:[golang实现http2.0服务端,客户端完整案例](https://zengzhihai.com/study/wiki/type/Z29fc3R1ZHlfaW5mbw==/id/625a23df469aea5437eb0c8f "golang实现http2.0服务端,客户端完整案例")。 总结: 1:http2其实是tcp的长连接,并在tcp协议上封装了一层http2协议。 2:http2的FrameHeader的数据结构的定义解决了tcp的粘包问题。 3:http2默认底层实现重试为6次 4:http2通过多路复用,二进制流,Header压缩等等技术,极大地提高了性能。 参考文献: https://httpwg.org/http2-spec/draft-ietf-httpbis-http2bis.html#name-data 注意: 笔者本着严谨的态度,http2流程中的很多细节并未详细提及或讲述,请读者酌情参考。
感觉本站内容不错,读后有收获?
attach_money
我要小额打赏,鼓励作者写出更好的教程
扫码关注公众号:talk_lizhi