grpc-go底层实现一:gRPC Client初始化与连接建立准备工作

基于grpc-go 的v1.72.0

gRPC 客户端创建过程分析

分析 clientconn.go 中的 NewClient 方法及其调用的方法,详细阐述 gRPC 在创建客户端时的工作流程。

1. 客户端创建流程概述

gRPC 客户端创建主要通过 NewClient 方法完成,该方法创建一个 ClientConn 对象,这是 gRPC 客户端的核心组件。整个创建过程可以分为以下几个主要阶段:

  1. 初始化 ClientConn 结构体
  2. 应用拨号选项(Dial Options)
  3. 解析目标地址并获取解析器构建器
  4. 设置拦截器链
  5. 验证传输凭证
  6. 解析服务配置
  7. 初始化权限信息
  8. 注册 Channelz
  9. 初始化连接状态管理器和选择器

客户端创建流程概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

## 2. `NewClient` 方法详细分析

### 2.1 初始化 ClientConn 结构体

```go
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(), // 默认使用dns解析器
}

cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())

这一步创建了 ClientConn 结构体,并初始化了基本字段:

  • target: 目标服务地址
  • conns: 存储所有子连接的映射
  • dopts: 默认拨号选项,包括默认使用 DNS 解析器
  • 初始化重试限流器、配置选择器和上下文

2.2 应用拨号选项

1
2
3
4
5
6
7
8
9
10
11
// 应用全局拨号选项
if !disableGlobalOpts {
for _, opt := range globalDialOptions {
opt.apply(&cc.dopts)
}
}

// 应用用户提供的拨号选项
for _, opt := range opts {
opt.apply(&cc.dopts)
}

gRPC 使用选项模式来配置客户端连接。这一步应用了两类选项:

  • 全局拨号选项:适用于所有客户端连接的默认选项
  • 用户提供的拨号选项:用户在创建连接时指定的特定选项

2.3 解析目标地址并获取解析器构建器

1
2
3
if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
return nil, err
}

initParsedTargetAndResolverBuilder 方法解析目标地址并获取相应的解析器构建器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
logger.Infof("original dial target is: %q", cc.target)

var rb resolver.Builder
parsedTarget, err := parseTarget(cc.target)
if err == nil {
rb = cc.getResolver(parsedTarget.URL.Scheme) // 获取具体resolverBuilder, 如自定义等
if rb != nil {
cc.parsedTarget = parsedTarget
cc.resolverBuilder = rb
return nil
}
}

// 如果目标地址没有包含方案或指定了未注册的方案,则使用默认方案
defScheme := cc.dopts.defaultScheme // 默认使用dns
if internal.UserSetDefaultScheme {
defScheme = resolver.GetDefaultScheme()
}

canonicalTarget := defScheme + "://" + "/" + cc.target

parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
return err
}
rb = cc.getResolver(parsedTarget.URL.Scheme) // 获取默认的dns resolverBuilder
if rb == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
}
cc.parsedTarget = parsedTarget
cc.resolverBuilder = rb
return nil
}

这个方法的主要功能是:

  1. 尝试解析用户提供的目标地址,获取其中的方案(scheme)
  2. 如果解析成功且能找到对应的解析器构建器,则使用它
  3. 如果解析失败或找不到对应的解析器构建器,则使用默认方案(通常是 DNS)
  4. 最终将解析后的目标地址和解析器构建器存储在 ClientConn

2.4 设置拦截器链

1
2
chainUnaryClientInterceptors(cc)  // 设置一元拦截器
chainStreamClientInterceptors(cc) // 设置流式拦截器

这一步设置了两种类型的拦截器链:

  • 一元拦截器:用于拦截普通 RPC 调用
  • 流式拦截器:用于拦截流式 RPC 调用

拦截器允许在 RPC 调用前后执行自定义逻辑,如日志记录、认证、监控等。

2.5 验证传输凭证

1
2
3
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}

validateTransportCredentials 方法验证传输凭证的有效性,确保:

  1. 至少配置了传输凭证或凭证包
  2. 不同时配置传输凭证和凭证包
  3. 如果配置了凭证包,其中包含有效的传输凭证
  4. 如果使用不安全传输,确保没有需要传输层安全的调用凭证

2.6 解析服务配置

1
2
3
4
5
6
7
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}

如果用户提供了默认服务配置(如超时设置、重试策略、负载均衡策略等),这一步会解析该配置并存储在 ClientConn 中。

2.7 初始化权限信息

1
2
3
if err = cc.initAuthority(); err != nil {
return nil, err
}

initAuthority 方法确定通道的权限信息,按以下优先级:

  1. 用户通过 WithAuthority 拨号选项指定的权限覆盖
  2. 凭证对认证握手的服务器名称的概念
  3. 拨号目标中的端点形式 “scheme://[authority]/endpoint”

2.8 注册 Channelz 和初始化连接状态管理器

1
2
3
cc.channelzRegistration(target)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

这些步骤完成了:

  • 向 Channelz 注册 ClientConn,用于监控和调试
  • 创建连接状态管理器,管理连接的状态变化
  • 创建选择器包装器,用于选择子连接进行 RPC 调用

2.9 初始化空闲状态

1
2
cc.initIdleStateLocked()
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)

这一步初始化了空闲状态相关的组件,包括:

  • 初始化解析器包装器、负载均衡器包装器和首次解析事件
  • 创建空闲管理器,管理连接的空闲状态

3. 连接建立过程

NewClient 方法只创建了 ClientConn 对象,但并未实际建立连接。实际连接是在需要时(如进行 RPC 调用时,这里以SayHello方法为例)通过 Connect 方法建立的:

1
2
3
4
5
6
7
8
9
10
func (cc *ClientConn) Connect() {
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
cc.addTraceEvent(err.Error())
return
}
// 如果 ClientConn 不处于空闲模式,需要调用负载均衡策略的 ExitIdle 方法来创建连接
cc.mu.Lock()
cc.balancerWrapper.exitIdle()
cc.mu.Unlock()
}

Connect 方法的主要功能是:

  1. 退出空闲模式,这会重新创建名称解析器和负载均衡器
  2. 调用负载均衡器的 exitIdle 方法,触发连接创建

3.1 退出空闲模式

1
2
3
4
5
6
7
8
func (cc *ClientConn) exitIdleMode() (err error) {
// ...
if err := cc.resolverWrapper.start(); err != nil {
return err
}
cc.addTraceEvent("exiting idle mode")
return nil
}

退出空闲模式的关键是启动解析器包装器,这会触发名称解析过程。

3.2 解析器包装器启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (ccr *ccResolverWrapper) start() error {
errCh := make(chan error)
ccr.serializer.TrySchedule(func(ctx context.Context) {
// ...
opts := resolver.BuildOptions{...}
var err error
if ccr.cc.dopts.copts.Dialer != nil || !ccr.cc.dopts.useProxy {
ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
} else {
ccr.resolver, err = delegatingresolver.New(ccr.cc.parsedTarget, ccr, opts, ccr.cc.resolverBuilder, ccr.cc.dopts.enableLocalDNSResolution)
}
errCh <- err
})
return <-errCh
}

解析器包装器启动时,会根据配置创建适当的解析器:

  1. 如果配置了自定义拨号器或禁用代理,则直接使用解析器构建器创建解析器
  2. 否则,创建委托解析器,支持代理功能

3.3 解析器更新状态

解析器解析目标地址后,会通过 UpdateState 方法将解析结果通知给 ClientConn

1
2
3
4
5
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
// ...
ccr.curState = s
return ccr.cc.updateResolverStateAndUnlock(s, nil)
}

3.4 更新解析器状态

1
2
3
4
5
6
7
8
9
10
11
12
func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
// ...
var ret error
var balCfg serviceconfig.LoadBalancingConfig
if sc != nil {
balCfg, ret = cc.applyServiceConfigAndBalancer(sc, s, err)
}
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
// ...
return ret
}

这个方法的主要功能是:

  1. 应用服务配置和负载均衡器
  2. 更新负载均衡器的客户端连接状态

3.5 负载均衡器更新状态

1
2
3
4
5
6
7
8
9
10
11
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
// ...
name := gracefulswitch.ChildName(ccs.BalancerConfig)
if ccb.curBalancerName != name {
ccb.curBalancerName = name
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}
err := ccb.balancer.UpdateClientConnState(*ccs)
// ...
return err
}

负载均衡器更新状态时,会:

  1. 确定要使用的负载均衡策略名称
  2. 如果需要,切换到新的负载均衡策略
  3. 更新负载均衡器的客户端连接状态

4. 负载均衡器创建子连接

负载均衡器根据解析器提供的地址创建子连接:

1
2
3
4
5
6
7
8
9
10
11
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
// ...
ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
if err != nil {
// ...
return nil, err
}
acbw := &acBalancerWrapper{...}
ac.acbw = acbw
return acbw, nil
}

4.1 创建地址连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
// ...
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// ...
cc.conns[ac] = struct{}{}
return ac, nil
}

创建地址连接时:

  1. 创建 addrConn 对象,初始状态为 Idle
  2. 设置地址、选项和上下文
  3. 注册子通道到 Channelz
  4. 将地址连接添加到 ClientConn 的连接映射中

4.2 连接地址

当负载均衡器调用 Connect 方法时,地址连接开始实际建立连接:

1
2
3
func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (ac *addrConn) connect() {
// ...
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// 设置状态为 Connecting
ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()

// 尝试所有地址
err := ac.tryAllAddrs(addrs, connectDeadline)
// ...
}

4.3 尝试所有地址

1
2
3
4
5
6
7
8
9
10
11
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
// ...
for _, addr := range addrs {
err := ac.createTransport(addr, connectDeadline, true)
if err == nil {
return nil
}
// ...
}
// ...
}

这个方法尝试连接所有解析到的地址,直到成功或全部失败。

4.4 创建传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (ac *addrConn) createTransport(addr resolver.Address, connectDeadline time.Time, skipReset bool) error {
// ...
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, ac.cc.dopts, ac)
if err != nil {
// ...
return err
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
newTr.Close()
return errConnClosing
}
ac.curAddr = addr
ac.transport = newTr
ac.updateConnectivityState(connectivity.Ready, nil)
ac.mu.Unlock()
// ...
return nil
}

创建传输时:

  1. 调用 transport.NewClientTransport 创建客户端传输
  2. 如果成功,更新当前地址和传输
  3. 将连接状态更新为 Ready

5. pickFirst 负载均衡策略

pickFirst 是 gRPC 的默认负载均衡策略,它的实现位于 balancer/pickfirst/pickfirst.go

1
2
3
4
5
6
7
type pickfirstBalancer struct {
cc balancer.ClientConn
subConn balancer.SubConn

resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
}

5.1 更新客户端连接状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
// ...
if len(state.ResolverState.Endpoints) == 0 {
return balancer.ErrBadResolverState
}

if b.subConn != nil {
// 更新现有子连接的地址
b.cc.UpdateAddresses(b.subConn, state.ResolverState.Endpoints[0].Addresses)
return nil
}

// 创建新的子连接
sc, err := b.cc.NewSubConn(state.ResolverState.Endpoints[0].Addresses, balancer.NewSubConnOptions{})
if err != nil {
return err
}
b.subConn = sc
b.subConn.Connect()
return nil
}

pickFirst 策略的特点是:

  1. 只使用解析器提供的第一个端点
  2. 只创建一个子连接
  3. 如果已有子连接,则只更新其地址

5.2 更新子连接状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (b *pickfirstBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
// ...
switch state.ConnectivityState {
case connectivity.Ready:
b.resolverErr = nil
b.connErr = nil
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{sc: sc},
})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.Idle:
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{sc: sc},
})
case connectivity.TransientFailure:
b.connErr = state.ConnectionError
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: state.ConnectionError},
})
}
}

当子连接状态变化时,pickFirst 策略会:

  1. 更新连接错误状态
  2. 根据子连接状态创建相应的选择器
  3. 更新客户端连接状态

5.3 选择器实现

1
2
3
4
5
6
7
8
9
10
11
type picker struct {
sc balancer.SubConn
err error
}

func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
if p.err != nil {
return balancer.PickResult{}, p.err
}
return balancer.PickResult{SubConn: p.sc}, nil
}

pickFirst 策略的选择器非常简单:

  1. 如果有错误,返回错误
  2. 否则,始终返回同一个子连接

6. 完整的客户端创建和连接流程

完整客户端创建和连接流程

7. 总结

gRPC 客户端创建过程是一个复杂而精心设计的流程,主要包括以下几个关键部分:

  1. ClientConn 初始化:创建并配置 ClientConn 对象,应用各种拨号选项。

  2. 名称解析:通过解析器将目标地址解析为具体的网络地址。

  3. 负载均衡:使用负载均衡策略(默认为 pickFirst)选择要连接的地址。

  4. 连接建立:创建子连接并建立实际的网络连接。

  5. 状态管理:管理连接的各种状态变化,如 Idle、Connecting、Ready、TransientFailure 等。

这种设计使得 gRPC 能够支持各种复杂的连接场景,如名称解析、负载均衡、故障恢复等,同时保持了良好的可扩展性和可配置性。