Transport

Field

1
 Proxy func(*Request) (*url.URL, error)

Proxy接受一个request,返回一个url.URL的对象。Proxy和ProxyEnvironment的函数签名是一样的,他们有着同样的类型。

1
2
DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
Dial func(network, addr string) (net.Conn, error)

上面两个Field都是函数类型的对象,调用之后可以生成一个未加密的tcp链接。但是文档中明确提示我们,应该使用DialContext来代替Dail,因为前者可以在我们不需要这个链接的时候主动将它结束。如果一个Transport类型对象中这两个属性都赋值了,那么DialContext的优先级是要比Dial高的。

1
DialTLS func(network, addr string) (net.Conn, error)

DialTLS的调用将会建立一个TLS的链接。如果DialTLS成员被初始化,那么对于https的request就会被DialTLS成员给Hook住,否则就由Dial和TLSClientConfig两个成员配合起来做这件事。DialTLS调用之后返回的链接默认是Tls握手已经完成的状态。

1
2
TLSClientConfig *tls.Config
TLSHandshakeTimeout time.Duration

客户端上https的相关配置以及https握手的超时时间。但是在DialTLS成员被赋值了之后,这两个成员的值就将会被忽略了。

1
DisableKeepAlives bool

如果这个属性被置为true,那么在两次不同的请求之间就不会对同一条tcp链接进行重用了,即关闭了Keep-Alive功能。

1
2
3
MaxIdleConns int
MaxIdleConnsPerHost int
IdleConnTimeout time.Duration

MaxIdleConns属性为一个Transport对所有的host开启的最大空闲连接数,是一个总量,MaxIdleConnsPerHost为对每一个host所能开启的最大空闲连接数。IdleConnTimeout为空闲链接保持多久将会关闭它。

1
ResponseHeaderTimeout time.Duration

ResponseHeaderTimeout指定了一个从发送了所有的request到得到服务端发送的response的Header的超时时间。

1
MaxResponseHeaderBytes int64

MaxResponseHeaderBytes指定了客户端所能接受服务端发送response的最大字节数。

Method

1
2
3
func (t *Transport) CancelRequest(req *Request) {
	t.cancelRequest(req, errRequestCanceled)
}

cancelRequest函数内部读取了一个map[*Request]func(error)类型的map,key为request,value是request对应的cancel方法。先把request对应的cancel method拷贝出来,然后从map中删除这一条记录,最后调用这个cancel方法,关闭连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (t *Transport) CloseIdleConnections() {
	t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
	t.idleMu.Lock()
	m := t.idleConn
	t.idleConn = nil
	t.idleConnCh = nil
	t.wantIdle = true
	t.idleLRU = connLRU{}
	t.idleMu.Unlock()
	for _, conns := range m {
		for _, pconn := range conns {
			pconn.close(errCloseIdleConns)
		}
	}
	if t2 := t.h2transport; t2 != nil {
		t2.CloseIdleConnections()
	}
}

CloseIdleConnections函数主要用来关闭因之前的请求建立并且其目前处于空闲的链接。Transport内部有一个idleConn成员,他是一个map类型的变量,其value是persistConn类型,表示了一个已经建立的链接。CloseIdleConnections刚一执行,就将Transport内和空闲链接相关的field都清空,并且将idleConn成员copy了一份。之后它就遍历这个存有很多空闲链接的map,将他们一一关闭。在关闭每一个空闲链接的时候,最终调用了persistConn类型的closeLocked方法,在其内部调用了net包内Conn类型的close方法以真正关闭这条tcp链接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
	t.altMu.Lock()
	defer t.altMu.Unlock()
	oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
	if _, exists := oldMap[scheme]; exists {
		panic("protocol " + scheme + " already registered")
	}
	newMap := make(map[string]RoundTripper)
	for k, v := range oldMap {
		newMap[k] = v
	}
	newMap[scheme] = rt
	t.altProto.Store(newMap)
}

RegisterProtocol函数将会注册一个scheme和处理使用这个scheme请求的RoundTripper方法到map中。在处理请求的时候,会根据请求使用的scheme对map中的roundtripe方法进行匹配。如果没有根据scheme注册特殊的RoundTripper方法,那么会调用默认的RoundTripper

RoundTrip

RoundTrip其实是RoundTriper这个Interface的一个方法,你给它一个请求,它吐给你一个响应。在golang的http框架中,Client发送请求接受响应的逻辑主要就是依赖于对RoundTrip函数的实现,Client定义中有一个名为Transport的成员,它的类型是RoundTripper,一般来说都是调用这个成员的RoundTrip方法。在我们创建一个Client对象的时候,如果没为其特意指定一个实现了RoundTrip方法的对象,那么Client会使用一个预定义的Transport类型的变量。所以说,Transport应该才是Client的核心实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
	ctx := req.Context()
	trace := httptrace.ContextClientTrace(ctx)

	if req.URL == nil {
		req.closeBody()
		return nil, errors.New("http: nil Request.URL")
	}
	if req.Header == nil {
		req.closeBody()
		return nil, errors.New("http: nil Request.Header")
	}
	scheme := req.URL.Scheme
	isHTTP := scheme == "http" || scheme == "https"
	if isHTTP {
		for k, vv := range req.Header {
			if !httplex.ValidHeaderFieldName(k) {
				return nil, fmt.Errorf("net/http: invalid header field name %q", k)
			}
			for _, v := range vv {
				if !httplex.ValidHeaderFieldValue(v) {
					return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
				}
			}
		}
	}

在RoundTrip前半部分的代码中我们可以看到,它主要做了了些参数检查工作,如url,header等字段值的合法性。

1
2
3
4
5
6
	altProto, _ := t.altProto.Load().(map[string]RoundTripper)
	if altRT := altProto[scheme]; altRT != nil {
		if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
			return resp, err
		}
	}

接下来RoundTrip方法将在altProto这个atmoic.Value类型的变量中取出一个值,并且将它转换为一个map。其中这个map的value就是一个实现了RoundTrip方法的对象。altProto看起来是Transport这个类型对象中的一个成员,它是atomic.Value类型,里面存储了一个map,注册和请求协议相对应的RoundTrip方法。如果之前已经对某个协议指定了处理使用该协议发送请求的方法,那么在这里就会直接调用RoundTrip方法进行处理。

如果没有已经事先准备好的RoundTrip方法,那么就需要客户端先与服务端建立tcp连接,以便之后的通信。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
		// Get the cached or newly-created connection to either the
		// host (for http or https), the http proxy, or the http proxy
		// pre-CONNECTed to https server. In any case, we'll be ready
		// to send it requests.
		pconn, err := t.getConn(treq, cm)
		if err != nil {
			t.setReqCanceler(req, nil)
			req.closeBody()
			return nil, err
		}

RoundTrip方法中,transport对象调用了getConn方法,这个方法获取一条连接,这条连接可能是之前请求处理完空闲等待使用的,也有可能是新建立的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
	req := treq.Request
	trace := treq.trace
	ctx := req.Context()
	if trace != nil && trace.GetConn != nil {
		trace.GetConn(cm.addr())
	}
	if pc, idleSince := t.getIdleConn(cm); pc != nil {
		if trace != nil && trace.GotConn != nil {
			trace.GotConn(pc.gotIdleConnTrace(idleSince))
		}
		// set request canceler to some non-nil function so we
		// can detect whether it was cleared between now and when
		// we enter roundTrip
		}
		t.setReqCanceler(req, func(error) {})
		return pc, nil
	}
	
	...
}

getConn函数一进来就先调用getIdleConn去获取已经建立但是目前闲置的链接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {
	key := cm.key()
	t.idleMu.Lock()
	defer t.idleMu.Unlock()
	for {
		pconns, ok := t.idleConn[key]
		if !ok {
			return nil, time.Time{}
		}
		if len(pconns) == 1 {
			pconn = pconns[0]
			delete(t.idleConn, key)
		} else {
			// 2 or more cached connections; use the most
			// recently used one at the end.
			pconn = pconns[len(pconns)-1]
			t.idleConn[key] = pconns[:len(pconns)-1]
		}
		t.idleLRU.remove(pconn)
		if pconn.isBroken() {
			// There is a tiny window where this is
			// possible, between the connecting dying and
			// the persistConn readLoop calling
			// Transport.removeIdleConn. Just skip it and
			// carry on.
			continue
		}
		if pconn.idleTimer != nil && !pconn.idleTimer.Stop() {
			// We picked this conn at the ~same time it
			// was expiring and it's trying to close
			// itself in another goroutine. Don't use it.
			continue
		}
		return pconn, pconn.idleAt
	}
}

在getIdleConn函数内将运行一个死循环,循环内部会在idleConn这个map内根据connectMethod类型内的一些和请求有关的信息,找到一条满足条件的链接返回给外部使用。

  1. 在只有一条空闲链接的时候没有选择,只有使用这条
  2. 超过2条及以上的空闲链接,选取最近最久未使用的那一条

在选到链接的之后,在返回给外面使用之前还要检查这个链接是否已经中断了以及这条空闲链接是否已经超时了,如果已经超时了,那么很有可能在别的地方就会被关闭了。不管是中断还是超时,都应该执行continue逻辑重新再空闲连接池内再找一条。

无论是在调用getIdleConn之前还是之后,都会看到一个ClientTrace类型的变量trace来调用GetConn和GotConn。其实ClientTrace这个类型里面的成员以及方法都是对一个请求具有钩子功能的,其内部的成员和方法可以用来时刻跟踪一个请求在真正发送给服务端之前的各种情况。如果成功获取到空闲的网络连接之后,还将调用setReqCanceler函数,它将设置和本次请求有关的一个取消函数以便在之后可以有效检查一个req是否已经被取消处理。

如果getIdleConn函数成功获取到一个空闲链接,那么getConn函数将返回这个链接以供外面使用。如果没有获取到合适的空闲链接,那么接下来的逻辑将会创建一条新的链接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
	cancelc := make(chan error, 1)
	t.setReqCanceler(req, func(err error) { cancelc <- err })

	go func() {
		pc, err := t.dialConn(ctx, cm)
		dialc <- dialRes{pc, err}
	}()

	idleConnCh := t.getIdleConnCh(cm)
	select {
	case v := <-dialc:
		// Our dial finished.
		if v.pc != nil {
			if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
				trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
			}
			return v.pc, nil
		}
		// Our dial failed. See why to return a nicer error
		// value.
		select {
		case <-req.Cancel:
			// It was an error due to cancelation, so prioritize that
			// error value. (Issue 16049)
			return nil, errRequestCanceledConn
		case <-req.Context().Done():
			return nil, req.Context().Err()
		case err := <-cancelc:
			if err == errRequestCanceled {
				err = errRequestCanceledConn
			}
			return nil, err
		default:
			// It wasn't an error due to cancelation, so
			// return the original error message:
			return nil, v.err
		}
	case pc := <-idleConnCh:
		// Another request finished first and its net.Conn
		// became available before our dial. Or somebody
		// else's dial that they didn't use.
		// But our dial is still going, so give it away
		// when it finishes:
		handlePendingDial()
		if trace != nil && trace.GotConn != nil {
			trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
		}
		return pc, nil
	case <-req.Cancel:
		handlePendingDial()
		return nil, errRequestCanceledConn
	case <-req.Context().Done():
		handlePendingDial()
		return nil, req.Context().Err()
	case err := <-cancelc:
		handlePendingDial()
		if err == errRequestCanceled {
			err = errRequestCanceledConn
		}
		return nil, err
	}

可以看出,再创建新的链接时候,主要是靠dialConn这个函数,在一个goroutine里面调用它之后,利用select语句来等待处理建立链接过程当中的各种情况。这里是阻塞的,因为select的case都是阻塞的channel在等待接受到来的值。当链接顺利建立完成的之后,dialc这个channel将会接收到一个值,因此将会命中select的第一个case。如果其接收到的值当中的persistConn类型的成员部位nil那么就应该返回它,一个新的网络链接就已经建立好了。

那么接下来,我们就来看一下dialConn函数都做了些什么:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
pconn := &persistConn{
		t:             t,
		cacheKey:      cm.key(),
		reqch:         make(chan requestAndChan, 1),
		writech:       make(chan writeRequest, 1),
		closech:       make(chan struct{}),
		writeErrCh:    make(chan error, 1),
		writeLoopDone: make(chan struct{}),
	}
	trace := httptrace.ContextClientTrace(ctx)
	tlsDial := t.DialTLS != nil && cm.targetScheme == "https" && cm.proxyURL == nil
	if tlsDial {
		...
	} else {
		conn, err := t.dial(ctx, "tcp", cm.addr())
		if err != nil {
			if cm.proxyURL != nil {
				// Return a typed error, per Issue 16997:
				err = &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
			}
			return nil, err
		}
		pconn.conn = conn
	}

	...
	
	pconn.br = bufio.NewReader(pconn)
	pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
	go pconn.readLoop()
	go pconn.writeLoop()
	return pconn, nil

在不考虑https和代理的情况下,dialConn函数首先会通过t.dial函数建立一个网络链接。之后,通过bufio.NewReader和bufio.NewWriter分别初始化了persistConn类型对象内的br和bw成员。这两个成员一个负责从连接中读取数据,另外一个负责向链接中写入数据。接下来启动两个goroutine,运行读循环和写循环。

暂时不关心读写循环中做的事情,链接建立之后,回到RoundTrip函数中.

如果新的链接已经创建好了,那么我们就会拿到一个persistConn类型的连接对象。如果此次http通信的请求还是建立在1.1的版本上的话,就会调用persistConn类型对象的roundTrip函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
		pconn, err := t.getConn(treq, cm)
		if err != nil {
			t.setReqCanceler(req, nil)
			req.closeBody()
			return nil, err
		}

		var resp *Response
		if pconn.alt != nil {
			// HTTP/2 path.
			t.setReqCanceler(req, nil) // not cancelable with CancelRequest
			resp, err = pconn.alt.RoundTrip(req)
		} else {
			resp, err = pconn.roundTrip(treq)
		}
		if err == nil {
			

到目前为止,尽管这一路看下来其中的逻辑比较复杂,又是空闲链接,又是读写循环的。但是我们在看这部分源码的时候,要始终记住一条线。客户端与服务端建立链接,在次连接上发送请求,从此链接上获取请求响应。那么到目前为止,客户端已经与服务端建立了一个连接了,接下来就是在这条连接上进行Http的通信。那么发送请求和接收响应的的逻辑主要就是在roundTrip这个方法内了。因为我们可以清楚的看到,这个方法传进去一个request,返回一个response.

ps: 个人觉得看源码最忌讳的就是被源码绕晕了。任何一个开源项目的远吗都是很复杂的,如果你一头扎进去,深度优先遍历得看,那老铁你很快就从入门到放弃了。所以说,看源码第一遍的时候,尽量不要在意太多细节。把整体的脉络先屡清楚,然后一点点的去追各种细节。

roundTrip方法内具体是怎么做的,我们跟进去看看。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	....
	writeErrCh := make(chan error, 1)
	pc.writech <- writeRequest{req, writeErrCh, continueCh}

	resc := make(chan responseAndError)
	pc.reqch <- requestAndChan{
		req:        req.Request,
		ch:         resc,
		addedGzip:  requestedGzip,
		continueCh: continueCh,
		callerGone: gone,
	}

	var re responseAndError
	var respHeaderTimer <-chan time.Time
	cancelChan := req.Request.Cancel
	ctxDoneChan := req.Context().Done()
WaitResponse:
	for {
		testHookWaitResLoop()
		select {
		case err := <-writeErrCh:
			if err != nil {
				if cerr := pc.canceled(); cerr != nil {
					err = cerr
				}
				re = responseAndError{err: err}
				pc.close(fmt.Errorf("write error: %v", err))
				break WaitResponse
			}
			if d := pc.t.ResponseHeaderTimeout; d > 0 {
				timer := time.NewTimer(d)
				defer timer.Stop() // prevent leaks
				respHeaderTimer = timer.C
			}
		case <-pc.closech:
			re = responseAndError{err: pc.mapRoundTripErrorAfterClosed(req.Request, startBytesWritten)}
			break WaitResponse
		case <-respHeaderTimer:
			pc.close(errTimeout)
			re = responseAndError{err: errTimeout}
			break WaitResponse
		case re = <-resc:
			re.err = pc.mapRoundTripErrorFromReadLoop(req.Request, startBytesWritten, re.err)
			break WaitResponse
		case <-cancelChan:
			pc.t.CancelRequest(req.Request)
			cancelChan = nil
		case <-ctxDoneChan:
			pc.t.cancelRequest(req.Request, req.Context().Err())
			cancelChan = nil
			ctxDoneChan = nil
		}
	}

	if re.err != nil {
		pc.t.setReqCanceler(req.Request, nil)
	}
	if (re.res == nil) == (re.err == nil) {
		panic("internal error: exactly one of res or err should be set")
	}
	return re.res, re.err
}

roundTrip方法前面做了很多检查参数,以及我根本看不到为啥要这么做的逻辑。暂时先忽略,我们要关注的还是核心的发送请求和接收响应的逻辑。所以,我们需要额外注意两行代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
	writeErrCh := make(chan error, 1)
	pc.writech <- writeRequest{req, writeErrCh, continueCh}

	resc := make(chan responseAndError)
	pc.reqch <- requestAndChan{
		req:        req.Request,
		ch:         resc,
		addedGzip:  requestedGzip,
		continueCh: continueCh,
		callerGone: gone,
	}

上面这段代码看起来很普通,但是确实开启发请求和读相应的重要部分。代码对pc.writech和pc.reqch两个channel进行了赋值操作。在go语言中,对阻塞性的channel赋值,往往都意味着gorountine之间的通信。那么这两个管道的两段究竟链接了哪些goroutine呢?

通过查看这两个channel的定义我们就一目了然了:

1
2
	reqch     chan requestAndChan // written by roundTrip; read by readLoop
	writech   chan writeRequest   // written by roundTrip; read by writeLoop

这两个channel,正是连接了之前在dialConn函数内开启的两个读写goroutine以及我们目前所看到的roundTrip方法所在的goroutine。这么一看,貌似和之前看到的东西一下子就串起来了。所以说再看源码的时候,不要死磕一个地方,往往你顺着一条线接着向后看看,自然而然的就能够领会到作者这么实现的精妙之处。

看起来,这一个读goroutine和一个写goroutine都是运行在这条前面所建立的连接上的。猜测一下,读循环负责从连接上读取服务端返回的resp,写循环负责将req传递给服务端。带着这个猜想,我们可以来看看读写goroutine的实现了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (pc *persistConn) writeLoop() {
	defer close(pc.writeLoopDone)
	for {
		select {
		case wr := <-pc.writech:
			startBytesWritten := pc.nwrite
			err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
			if err == nil {
				err = pc.bw.Flush()
			}
			if err != nil {
				wr.req.Request.closeBody()
				if pc.nwrite == startBytesWritten && wr.req.outgoingLength() == 0 {
					err = nothingWrittenError{err}
				}
			}
			pc.writeErrCh <- err // to the body reader, which might recycle us
			wr.ch <- err         // to the roundTrip function
			if err != nil {
				pc.close(err)
				return
			}
		case <-pc.closech:
			return
		}
	}
}

从上面的代码中我们可以看出,pc.writech这个用于写循环的管道在接收了roundtrip方法对其的赋值操作后,有三个关键的操作。

  1. err = pc.bw.Flush()
  2. wr.ch <- err // to the roundTrip function
  3. pc.writeErrCh <- err

第一个操作是调用了bw的Flush方法,跳转bw成员的定义,可以看到它就是为了向链接以及链接另一边链接的服务端写数据用的,他是一个bufio.Writer类型的对象。也就是说,当经过前期的一些处理之后,我们就可以针对使用的链接写入数据。写入数据之后,无论写入的有没有问题,都会把结果通过writech内的ch成员传递给外层的roundTrip,也会传给writeErrCh这个channel。

既然writech.ch这个成员是为了通知外部的roundTrip方法的,那么writeErrCh这个channel又是用来做什么的呢?首先,在看channel作用的时候,还是要遵守一个原则,channel肯定是连接了两个goroutine。那么这两个goroutine是哪两个呢?查看这个channel的引用我们就可以看到,在readLoop中,我们会调用一个wroteRequest的方法,这个方法内会接受writeErrch这个channel所发送过来的消息。也就是说这个channel是负责链接writeloop和readloop这两个gorountine的。

其次,我们要看下这个channel的定义,它连接读写循环的gorountine具体起了什么作用。

1
2
3
4
	// writeErrCh passes the request write error (usually nil)
	// from the writeLoop goroutine to the readLoop which passes
	// it off to the res.Body reader, which then uses it to decide
	// whether or not a connection can be reused. Issue 7569.

源码中的注释说,writeErrCh这个channel,是为了能将请求写入的错误从写循环的goroutine中传递到读循环的goroutine中,以此来判定目前的这条连接是否可用,如果在这条连接上写入数据的时候都出错了,那么想在这条连接读数据也会有问题。 看完了写循环中的几个关键操作,那么是时候先跟着wr.ch的脚步,回到外层的roundTrip中了。

roundTrip中的后半部分,起了一个死循环外加一个select,用来监控这条连接上的请求和响应的情况。可以看出在监控writeErrCh这个channel的case上,框架先检查在连接上写入数据有误错误,如果没有,那么就要设置定时器,后面应该会等待响应的传输。 基本上writeloop所做的事情就是这样。下面让我们来看下readloop。

在看readLoop的时候,我们可以换一个思路,这次先看roundTrip方法对读循环channel的监控情况。roundTrip在select中监控了传递给readloop的pc.reqch这个变量里面的ch成员,也就是resc这个channel。可以看出,在成功接收了这个channel返回回来的response之后,roundTrip的任务就完成了,如果期间没有发生任何错误的话,那么这个response就会返回给这个客户端的调用者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
	for alive {
		pc.readLimit = pc.maxHeaderResponseSize()
		_, err := pc.br.Peek(1)

		pc.mu.Lock()
		if pc.numExpectedResponses == 0 {
			pc.readLoopPeekFailLocked(err)
			pc.mu.Unlock()
			return
		}
		pc.mu.Unlock()

		rc := <-pc.reqch
		trace := httptrace.ContextClientTrace(rc.req.Context())

		var resp *Response
		if err == nil {
			resp, err = pc.readResponse(rc, trace)
		} else {
			err = transportReadFromServerError{err}
			closeErr = err
		}
	...
}

读循环中的关键逻辑如上,在readLoop中起了循环,起循环的前提条件是通信的这条连接的是活跃的。也就是通过alive来判断的。运行到rc := <- pc.reqch这一句的时候,如果pc.reqch这个channel如果还没有接受到数据的时候,会在这里卡住。当外层的roundTrip方法向这个channel发送数据的时候,rc接收到数据,开始进入pc.readResponse的逻辑,这个逻辑就是从连接中读取响应数据的关键。

在pc.readResponse这个方法里,会调用一个同名的大写方法resp, err = ReadResponse(pc.br, rc.req)。这个方法将会从pc.br这个*bufio.Reader类型的对象内,不断的读取response的内容。先逐行的将Http响应的头部的基本信息都解析出来,最后通过readTransfer函数将body以及其他的响应信息都解析出来。

解析完服务端发回来的响应之后,

1
2
3
4
5
6
7
	if resp.StatusCode == 100 {
		pc.readLimit = pc.maxHeaderResponseSize() // reset the limit
		resp, err = ReadResponse(pc.br, rc.req)
		if err != nil {
			return
		}
	}

go还会检查一下服务端吐出来的statusCode是否是100,如果是100就要继续去读取真正的响应,并且把读取数据的长度限制重置。这里要请求一点的时候,为什么接收到100的状态码之后,客户端仅仅只是继续去读接下来的响应而不是再发一次请求,是因为我们的写循环已经一次性把请求的内容都传递给服务端了。不存在先发送100-continue的请求测试服务端然后再处理的情况。

获取并解析完body之后,readLoop中会发分几种情况来处理这个响应。

  1. 接收这个响应的时候有错误,那么直接将错误返回给外层的roundTrip
  2. 接收的响应没有Body,直接将解析出来的response返回
  3. 如果有body,那么对body要做一些处理。最终返回给外部。

至此,Transport类型中的RoundTrip方法就已经介绍完毕了。其中还是有很多细节没有去探究的,但是这并不影响我们领略go语言这套http框架实现的精妙之处。这么一边看下来,我觉得实现比较巧妙的就是对几个channel的运用。我目前觉得,能否把go用好,一个是看channel的运用,还有一个就是interface的使用。