基于grpc-go 的v1.72.0
gRPC 客户端创建过程分析 分析 clientconn.go
中的 NewClient
方法及其调用的方法,详细阐述 gRPC 在创建客户端时的工作流程。
1. 客户端创建流程概述 gRPC 客户端创建主要通过 NewClient
方法完成,该方法创建一个 ClientConn
对象,这是 gRPC 客户端的核心组件。整个创建过程可以分为以下几个主要阶段:
初始化 ClientConn
结构体
应用拨号选项(Dial Options)
解析目标地址并获取解析器构建器
设置拦截器链
验证传输凭证
解析服务配置
初始化权限信息
注册 Channelz
初始化连接状态管理器和选择器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ```go cc := &ClientConn{ target : target, conns: make(map[*addrConn]struct{}), dopts: defaultDialOptions(), // 默认使用dns解析器 } cc.retryThrottler .Store ((*retryThrottler)(nil)) cc.safeConfigSelector .UpdateConfigSelector (&defaultConfigSelector{nil}) cc.ctx , cc.cancel = context.WithCancel (context.Background ())
这一步创建了 ClientConn
结构体,并初始化了基本字段:
target
: 目标服务地址
conns
: 存储所有子连接的映射
dopts
: 默认拨号选项,包括默认使用 DNS 解析器
初始化重试限流器、配置选择器和上下文
2.2 应用拨号选项 1 2 3 4 5 6 7 8 9 10 11 if !disableGlobalOpts { for _, opt := range globalDialOptions { opt.apply(&cc.dopts) } }for _, opt := range opts { opt.apply(&cc.dopts) }
gRPC 使用选项模式来配置客户端连接。这一步应用了两类选项:
全局拨号选项:适用于所有客户端连接的默认选项
用户提供的拨号选项:用户在创建连接时指定的特定选项
2.3 解析目标地址并获取解析器构建器 1 2 3 if err := cc.initParsedTargetAndResolverBuilder(); err != nil { return nil , err }
initParsedTargetAndResolverBuilder
方法解析目标地址并获取相应的解析器构建器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (cc *ClientConn) initParsedTargetAndResolverBuilder() error { logger.Infof("original dial target is: %q" , cc.target) var rb resolver.Builder parsedTarget, err := parseTarget(cc.target) if err == nil { rb = cc.getResolver(parsedTarget.URL.Scheme) if rb != nil { cc.parsedTarget = parsedTarget cc.resolverBuilder = rb return nil } } defScheme := cc.dopts.defaultScheme if internal.UserSetDefaultScheme { defScheme = resolver.GetDefaultScheme() } canonicalTarget := defScheme + "://" + "/" + cc.target parsedTarget, err = parseTarget(canonicalTarget) if err != nil { return err } rb = cc.getResolver(parsedTarget.URL.Scheme) if rb == nil { return fmt.Errorf("could not get resolver for default scheme: %q" , parsedTarget.URL.Scheme) } cc.parsedTarget = parsedTarget cc.resolverBuilder = rb return nil }
这个方法的主要功能是:
尝试解析用户提供的目标地址,获取其中的方案(scheme)
如果解析成功且能找到对应的解析器构建器,则使用它
如果解析失败或找不到对应的解析器构建器,则使用默认方案(通常是 DNS)
最终将解析后的目标地址和解析器构建器存储在 ClientConn
中
2.4 设置拦截器链 1 2 chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc)
这一步设置了两种类型的拦截器链:
一元拦截器:用于拦截普通 RPC 调用
流式拦截器:用于拦截流式 RPC 调用
拦截器允许在 RPC 调用前后执行自定义逻辑,如日志记录、认证、监控等。
2.5 验证传输凭证 1 2 3 if err := cc.validateTransportCredentials(); err != nil { return nil , err }
validateTransportCredentials
方法验证传输凭证的有效性,确保:
至少配置了传输凭证或凭证包
不同时配置传输凭证和凭证包
如果配置了凭证包,其中包含有效的传输凭证
如果使用不安全传输,确保没有需要传输层安全的调用凭证
2.6 解析服务配置 1 2 3 4 5 6 7 if cc.dopts.defaultServiceConfigRawJSON != nil { scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts) if scpr.Err != nil { return nil , fmt.Errorf("%s: %v" , invalidDefaultServiceConfigErrPrefix, scpr.Err) } cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) }
如果用户提供了默认服务配置(如超时设置、重试策略、负载均衡策略等),这一步会解析该配置并存储在 ClientConn
中。
2.7 初始化权限信息 1 2 3 if err = cc.initAuthority(); err != nil { return nil , err }
initAuthority
方法确定通道的权限信息,按以下优先级:
用户通过 WithAuthority
拨号选项指定的权限覆盖
凭证对认证握手的服务器名称的概念
拨号目标中的端点形式 “scheme://[authority]/endpoint”
2.8 注册 Channelz 和初始化连接状态管理器 1 2 3 cc.channelzRegistration(target) cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz) cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
这些步骤完成了:
向 Channelz 注册 ClientConn,用于监控和调试
创建连接状态管理器,管理连接的状态变化
创建选择器包装器,用于选择子连接进行 RPC 调用
2.9 初始化空闲状态 1 2 cc.initIdleStateLocked() cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
这一步初始化了空闲状态相关的组件,包括:
初始化解析器包装器、负载均衡器包装器和首次解析事件
创建空闲管理器,管理连接的空闲状态
3. 连接建立过程 NewClient
方法只创建了 ClientConn
对象,但并未实际建立连接。实际连接是在需要时(如进行 RPC 调用时,这里以SayHello
方法为例)通过 Connect
方法建立的:
1 2 3 4 5 6 7 8 9 10 func (cc *ClientConn) Connect() { if err := cc.idlenessMgr.ExitIdleMode(); err != nil { cc.addTraceEvent(err.Error()) return } cc.mu.Lock() cc.balancerWrapper.exitIdle() cc.mu.Unlock() }
Connect
方法的主要功能是:
退出空闲模式,这会重新创建名称解析器和负载均衡器
调用负载均衡器的 exitIdle
方法,触发连接创建
3.1 退出空闲模式 1 2 3 4 5 6 7 8 func (cc *ClientConn) exitIdleMode() (err error ) { if err := cc.resolverWrapper.start(); err != nil { return err } cc.addTraceEvent("exiting idle mode" ) return nil }
退出空闲模式的关键是启动解析器包装器,这会触发名称解析过程。
3.2 解析器包装器启动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (ccr *ccResolverWrapper) start() error { errCh := make (chan error ) ccr.serializer.TrySchedule(func (ctx context.Context) { opts := resolver.BuildOptions{...} var err error if ccr.cc.dopts.copts.Dialer != nil || !ccr.cc.dopts.useProxy { ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts) } else { ccr.resolver, err = delegatingresolver.New(ccr.cc.parsedTarget, ccr, opts, ccr.cc.resolverBuilder, ccr.cc.dopts.enableLocalDNSResolution) } errCh <- err }) return <-errCh }
解析器包装器启动时,会根据配置创建适当的解析器:
如果配置了自定义拨号器或禁用代理,则直接使用解析器构建器创建解析器
否则,创建委托解析器,支持代理功能
3.3 解析器更新状态 解析器解析目标地址后,会通过 UpdateState
方法将解析结果通知给 ClientConn
:
1 2 3 4 5 func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { ccr.curState = s return ccr.cc.updateResolverStateAndUnlock(s, nil ) }
3.4 更新解析器状态 1 2 3 4 5 6 7 8 9 10 11 12 func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error ) error { var ret error var balCfg serviceconfig.LoadBalancingConfig if sc != nil { balCfg, ret = cc.applyServiceConfigAndBalancer(sc, s, err) } bw := cc.balancerWrapper uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) return ret }
这个方法的主要功能是:
应用服务配置和负载均衡器
更新负载均衡器的客户端连接状态
3.5 负载均衡器更新状态 1 2 3 4 5 6 7 8 9 10 11 func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { name := gracefulswitch.ChildName(ccs.BalancerConfig) if ccb.curBalancerName != name { ccb.curBalancerName = name channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q" , name) } err := ccb.balancer.UpdateClientConnState(*ccs) return err }
负载均衡器更新状态时,会:
确定要使用的负载均衡策略名称
如果需要,切换到新的负载均衡策略
更新负载均衡器的客户端连接状态
4. 负载均衡器创建子连接 负载均衡器根据解析器提供的地址创建子连接:
1 2 3 4 5 6 7 8 9 10 11 func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error ) { ac, err := ccb.cc.newAddrConnLocked(addrs, opts) if err != nil { return nil , err } acbw := &acBalancerWrapper{...} ac.acbw = acbw return acbw, nil }
4.1 创建地址连接 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error ) { ac := &addrConn{ state: connectivity.Idle, cc: cc, addrs: copyAddresses(addrs), scopts: opts, dopts: cc.dopts, channelz: channelz.RegisterSubChannel(cc.channelz, "" ), resetBackoff: make (chan struct {}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) cc.conns[ac] = struct {}{} return ac, nil }
创建地址连接时:
创建 addrConn
对象,初始状态为 Idle
设置地址、选项和上下文
注册子通道到 Channelz
将地址连接添加到 ClientConn
的连接映射中
4.2 连接地址 当负载均衡器调用 Connect
方法时,地址连接开始实际建立连接:
1 2 3 func (acbw *acBalancerWrapper) Connect() { go acbw.ac.connect() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (ac *addrConn) connect() { ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() return } ac.updateConnectivityState(connectivity.Connecting, nil ) ac.mu.Unlock() err := ac.tryAllAddrs(addrs, connectDeadline) }
4.3 尝试所有地址 1 2 3 4 5 6 7 8 9 10 11 func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { for _, addr := range addrs { err := ac.createTransport(addr, connectDeadline, true ) if err == nil { return nil } } }
这个方法尝试连接所有解析到的地址,直到成功或全部失败。
4.4 创建传输 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (ac *addrConn) createTransport(addr resolver.Address, connectDeadline time.Time, skipReset bool ) error { newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, ac.cc.dopts, ac) if err != nil { return err } ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() newTr.Close() return errConnClosing } ac.curAddr = addr ac.transport = newTr ac.updateConnectivityState(connectivity.Ready, nil ) ac.mu.Unlock() return nil }
创建传输时:
调用 transport.NewClientTransport
创建客户端传输
如果成功,更新当前地址和传输
将连接状态更新为 Ready
5. pickFirst 负载均衡策略 pickFirst
是 gRPC 的默认负载均衡策略,它的实现位于 balancer/pickfirst/pickfirst.go
:
1 2 3 4 5 6 7 type pickfirstBalancer struct { cc balancer.ClientConn subConn balancer.SubConn resolverErr error connErr error }
5.1 更新客户端连接状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { if len (state.ResolverState.Endpoints) == 0 { return balancer.ErrBadResolverState } if b.subConn != nil { b.cc.UpdateAddresses(b.subConn, state.ResolverState.Endpoints[0 ].Addresses) return nil } sc, err := b.cc.NewSubConn(state.ResolverState.Endpoints[0 ].Addresses, balancer.NewSubConnOptions{}) if err != nil { return err } b.subConn = sc b.subConn.Connect() return nil }
pickFirst
策略的特点是:
只使用解析器提供的第一个端点
只创建一个子连接
如果已有子连接,则只更新其地址
5.2 更新子连接状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (b *pickfirstBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { switch state.ConnectivityState { case connectivity.Ready: b.resolverErr = nil b.connErr = nil b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Ready, Picker: &picker{sc: sc}, }) case connectivity.Connecting: b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Connecting, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) case connectivity.Idle: b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Idle, Picker: &idlePicker{sc: sc}, }) case connectivity.TransientFailure: b.connErr = state.ConnectionError b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.TransientFailure, Picker: &picker{err: state.ConnectionError}, }) } }
当子连接状态变化时,pickFirst
策略会:
更新连接错误状态
根据子连接状态创建相应的选择器
更新客户端连接状态
5.3 选择器实现 1 2 3 4 5 6 7 8 9 10 11 type picker struct { sc balancer.SubConn err error }func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error ) { if p.err != nil { return balancer.PickResult{}, p.err } return balancer.PickResult{SubConn: p.sc}, nil }
pickFirst
策略的选择器非常简单:
如果有错误,返回错误
否则,始终返回同一个子连接
6. 完整的客户端创建和连接流程
7. 总结 gRPC 客户端创建过程是一个复杂而精心设计的流程,主要包括以下几个关键部分:
ClientConn 初始化 :创建并配置 ClientConn 对象,应用各种拨号选项。
名称解析 :通过解析器将目标地址解析为具体的网络地址。
负载均衡 :使用负载均衡策略(默认为 pickFirst)选择要连接的地址。
连接建立 :创建子连接并建立实际的网络连接。
状态管理 :管理连接的各种状态变化,如 Idle、Connecting、Ready、TransientFailure 等。
这种设计使得 gRPC 能够支持各种复杂的连接场景,如名称解析、负载均衡、故障恢复等,同时保持了良好的可扩展性和可配置性。