grpc-go底层实现二:客户端与服务端建立连接过程

NewHTTP2Client 方法详细分析

NewHTTP2Client 方法是 gRPC 客户端创建 HTTP/2 传输层连接的核心方法,负责建立与服务器的 HTTP/2 连接,并设置相关参数。下面我将详细分析这个方法的功能、流程以及头帧发送与连接建立的过程。

方法功能概述

NewHTTP2Client 方法位于 internal/transport/http2_client.go 文件中,其主要功能是:

  1. 创建与服务器的网络连接
  2. 配置 HTTP/2 连接参数
  3. 执行 TLS 握手(如果启用了安全连接)
  4. 发送 HTTP/2 客户端前言和设置帧
  5. 启动读写协程处理数据交换
  6. 返回可用的 HTTP/2 客户端传输对象

详细流程分析

1. 上下文和取消处理

1
2
3
4
5
6
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()

这段代码创建了一个可取消的上下文,如果函数返回错误,则取消上下文,确保资源被正确释放。

2. 建立网络连接

1
2
connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UserAgent)

这段代码使用提供的拨号器建立与服务器的网络连接,并将地址属性添加到连接上下文中。

3. 上下文监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ctxMonitorDone := grpcsync.NewEvent()
newClientCtx, newClientDone := context.WithCancel(connectCtx)
defer func() {
newClientDone() // 如果 connectCtx 未过期,唤醒下面的协程
<-ctxMonitorDone.Done() // 等待下面的协程退出
}()
go func(conn net.Conn) {
defer ctxMonitorDone.Fire() // 发出信号表示协程已退出
<-newClientCtx.Done() // 阻塞直到 connectCtx 过期或上面的 defer 执行
if err := connectCtx.Err(); err != nil {
// connectCtx 在函数退出前过期,强制关闭连接
conn.Close()
}
}(conn)

这段代码启动一个协程监控连接上下文,如果上下文过期,则强制关闭连接。

4. 配置保活参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kp := opts.KeepaliveParams
if kp.Time == 0 {
kp.Time = defaultClientKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultClientKeepaliveTimeout
}
keepaliveEnabled := false
if kp.Time != infinity {
if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
keepaliveEnabled = true
}

这段代码配置了 TCP 保活参数,确保长连接的稳定性。

5. 安全连接处理

1
2
3
4
5
6
7
8
9
10
11
if transportCreds != nil {
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
// 检查凭证安全级别
isSecure = true
if transportCreds.Info().SecurityProtocol == "tls" {
scheme = "https"
}
}

这段代码执行 TLS 握手(如果启用了安全连接),并验证安全级别是否满足要求。

6. 创建 HTTP/2 客户端对象

1
2
3
4
5
6
7
8
9
10
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // 缓存 Done 通道
cancel: cancel,
userAgent: opts.UserAgent,
registeredCompressors: grpcutil.RegisteredCompressors(),
address: addr,
conn: conn,
// ... 其他字段初始化
}

这段代码创建了 HTTP/2 客户端对象,并初始化各种字段。

7. 启动读取协程

1
2
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)

这段代码启动了一个协程读取来自服务器的 HTTP/2 帧,并将错误发送到 readerErrCh 通道。

8. 发送客户端前言和设置帧(关键部分)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 发送客户端前言到服务器
n, err := t.conn.Write(clientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
return nil, err
}
if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, err
}
var ss []http2.Setting

if t.initialWindowSize != defaultWindowSize {
ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
}
if opts.MaxHeaderListSize != nil {
ss = append(ss, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *opts.MaxHeaderListSize,
})
}
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
return nil, err
}
// 如果需要,调整连接流控窗口
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, err
}
}

这段代码是建立 HTTP/2 连接的核心部分,包括:

  1. 发送客户端前言(clientPreface):这是 HTTP/2 连接的起始标记,固定为 “PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n”,表示这是一个 HTTP/2 连接。

  2. 发送 SETTINGS 帧:配置连接参数,包括:

    • 初始窗口大小(如果与默认值不同)
    • 最大头部列表大小(如果指定了)
  3. 发送 WINDOW_UPDATE 帧:如果需要,调整连接流控窗口大小。

9. 等待服务器前言

1
2
3
4
5
6
7
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// 阻塞直到成功接收到服务器前言或发生错误
if err = <-readerErrCh; err != nil {
return nil, err
}

这段代码刷新写入缓冲区,并等待服务器前言的接收。服务器前言包括 SETTINGS 帧,表示服务器接受了连接。

10. 启动写入协程

1
2
3
4
5
6
7
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
if err := t.loopy.run(); !isIOError(err) {
t.conn.Close()
}
close(t.writerDone)
}()

这段代码启动了一个协程处理数据写入,使用 loopyWriter 管理帧的发送。

HTTP/2 头帧发送与连接建立详解

HTTP/2 连接建立过程包括以下几个关键步骤:

1. 客户端前言

客户端首先发送一个固定的字节序列(客户端前言),表明这是一个 HTTP/2 连接:

1
PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n

这个前言不是一个 HTTP/2 帧,而是一个特殊的字节序列,用于区分 HTTP/2 和 HTTP/1.x 连接。

2. SETTINGS 帧

紧接着客户端前言,客户端发送一个 SETTINGS 帧,包含客户端的连接参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var ss []http2.Setting

if t.initialWindowSize != defaultWindowSize {
ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
}
if opts.MaxHeaderListSize != nil {
ss = append(ss, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *opts.MaxHeaderListSize,
})
}
err = t.framer.fr.WriteSettings(ss...)

这个 SETTINGS 帧可能包含以下参数:

  • SETTINGS_INITIAL_WINDOW_SIZE:流控窗口的初始大小
  • SETTINGS_MAX_HEADER_LIST_SIZE:头部列表的最大大小

3. WINDOW_UPDATE 帧

如果需要调整连接级别的流控窗口,客户端会发送一个 WINDOW_UPDATE 帧:

1
2
3
4
5
6
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, err
}
}

这个帧用于增加连接级别(流 ID 为 0)的流控窗口大小。

4. 等待服务器响应

客户端发送完前言和设置帧后,会等待服务器的响应:

1
2
3
if err = <-readerErrCh; err != nil {
return nil, err
}

服务器应该响应一个 SETTINGS 帧,表示接受了连接。这个响应由 reader 协程处理,如果成功接收到服务器前言,readerErrCh 通道会被关闭;如果发生错误,错误会被发送到通道。

流程图

整体流程

整体流程

HTTP/2 连接建立详细流程

HTTP/2 连接建立详细流程

头帧发送流程

头帧发送流程

关键代码解释

1. 客户端前言

1
2
3
4
5
// 客户端前言是一个固定的字节序列
const clientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"

// 发送客户端前言到服务器
n, err := t.conn.Write(clientPreface)

这段代码发送 HTTP/2 客户端前言,这是 HTTP/2 连接的起始标记。

2. SETTINGS 帧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var ss []http2.Setting

if t.initialWindowSize != defaultWindowSize {
ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
}
if opts.MaxHeaderListSize != nil {
ss = append(ss, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *opts.MaxHeaderListSize,
})
}
err = t.framer.fr.WriteSettings(ss...)

这段代码创建并发送 SETTINGS 帧,配置 HTTP/2 连接参数。

3. WINDOW_UPDATE 帧

1
2
3
4
5
6
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, err
}
}

这段代码发送 WINDOW_UPDATE 帧,调整连接级别的流控窗口大小。

4. 读取协程

1
go t.reader(readerErrCh)

这段代码启动一个协程读取来自服务器的 HTTP/2 帧,包括服务器的 SETTINGS 帧。

5. 写入协程

1
2
3
4
5
6
7
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
if err := t.loopy.run(); !isIOError(err) {
t.conn.Close()
}
close(t.writerDone)
}()

这段代码启动一个协程处理数据写入,使用 loopyWriter 管理帧的发送。

总结

NewHTTP2Client 方法是 gRPC 客户端创建 HTTP/2 传输层连接的核心方法,它通过以下步骤建立与服务器的连接:

  1. 建立网络连接(TCP 或 TLS)
  2. 发送 HTTP/2 客户端前言
  3. 发送 SETTINGS 帧配置连接参数
  4. 发送 WINDOW_UPDATE 帧调整流控窗口(如果需要)
  5. 等待服务器响应前言
  6. 启动读写协程处理数据交换

这个过程遵循 HTTP/2 协议规范,确保客户端和服务器之间建立了正确的 HTTP/2 连接,为后续的 gRPC 调用提供基础。