grpc-go底层实现三:grpc客户端头帧发送过程

newClientStreamWithParams 方法详细分析

newClientStreamWithParams 方法位于 stream.go 文件的第260-404行,是 gRPC 客户端创建流式连接的核心方法。这个方法负责初始化客户端流,设置上下文、超时、压缩选项,并创建与服务器的连接。下面我将详细分析这个方法的功能及其调用关系。

方法功能分析

1. 上下文和超时处理

1
2
3
4
5
6
var cancel context.CancelFunc
if mc.Timeout != nil && *mc.Timeout >= 0 {
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}

这段代码创建了一个子上下文,如果配置了超时,则设置超时时间;否则创建一个可取消的上下文。这确保了 RPC 调用可以被超时或手动取消。

2. 执行用户自定义的前置方法

1
2
3
4
5
for _, o := range opts {
if err := o.before(callInfo); err != nil {
return nil, toRPCErr(err)
}
}

这段代码执行用户通过 CallOption 提供的前置方法,允许用户在 RPC 调用前进行自定义操作。

3. 设置消息大小限制

1
2
callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize)
callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)

这段代码设置了发送和接收消息的最大大小,确保消息不会超过限制。

4. 设置编解码器

1
2
3
if err := setCallInfoCodec(callInfo); err != nil {
return nil, err
}

这段代码设置了用于序列化和反序列化消息的编解码器。

5. 创建调用头部

1
2
3
4
5
6
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: callInfo.contentSubtype,
DoneFunc: doneFunc,
}

这段代码创建了一个调用头部,包含主机、方法、内容子类型和完成回调函数。

6. 设置压缩选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var compressorV0 Compressor
var compressorV1 encoding.Compressor
if ct := callInfo.compressorName; ct != "" {
callHdr.SendCompress = ct
if ct != encoding.Identity {
compressorV1 = encoding.GetCompressor(ct)
if compressorV1 == nil {
return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
}
}
} else if cc.dopts.compressorV0 != nil {
callHdr.SendCompress = cc.dopts.compressorV0.Type()
compressorV0 = cc.dopts.compressorV0
}

这段代码设置了发送压缩选项,如果用户指定了压缩器,则使用用户指定的;否则使用连接默认的压缩器。

7. 设置凭证

1
2
3
if callInfo.creds != nil {
callHdr.Creds = callInfo.creds
}

这段代码设置了调用凭证,用于认证。

8. 创建客户端流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cs := &clientStream{
callHdr: callHdr,
ctx: ctx,
methodConfig: &mc,
opts: opts,
callInfo: callInfo,
cc: cc,
desc: desc,
codec: callInfo.codec,
compressorV0: compressorV0,
compressorV1: compressorV1,
cancel: cancel,
firstAttempt: true,
onCommit: onCommit,
}

这段代码创建了一个客户端流对象,包含了所有必要的信息。

9. 设置重试节流器

1
2
3
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}

这段代码设置了重试节流器,用于控制重试频率。

10. 设置二进制日志

1
2
3
4
5
6
7
8
if ml := binarylog.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
if cc.dopts.binaryLogger != nil {
if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
}

这段代码设置了二进制日志记录器,用于记录 RPC 调用的二进制日志。

11. 选择传输并创建新流

1
2
3
4
5
6
7
8
9
10
11
12
13
op := func(a *csAttempt) error {
if err := a.getTransport(); err != nil {
return err
}
if err := a.newStream(); err != nil {
return err
}
cs.attempt = a
return nil
}
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
return nil, err
}

这段代码定义了一个操作函数,该函数获取传输、创建新流,并将尝试对象赋值给客户端流。然后使用 withRetry 方法执行这个操作,如果失败则重试。

12. 记录二进制日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{
OnClientSide: true,
Header: md,
MethodName: method,
Authority: cs.cc.authority,
}
if deadline, ok := ctx.Deadline(); ok {
logEntry.Timeout = time.Until(deadline)
if logEntry.Timeout < 0 {
logEntry.Timeout = 0
}
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}

这段代码记录了客户端头部的二进制日志。

13. 设置上下文清理

1
2
3
4
5
6
7
8
9
10
if desc != unaryStreamDesc {
go func() {
select {
case <-cc.ctx.Done():
cs.finish(ErrClientConnClosing)
case <-ctx.Done():
cs.finish(toRPCErr(ctx.Err()))
}
}()
}

这段代码为非一元流设置了上下文清理,当连接关闭或上下文取消时,结束客户端流。

关键调用方法分析

1. withRetry 方法

withRetry 方法是客户端流重试机制的核心,它接受一个操作函数和一个成功回调函数,执行操作并在失败时重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
cs.mu.Lock()
for {
if cs.committed {
cs.mu.Unlock()
return toRPCErr(op(cs.attempt))
}
if len(cs.replayBuffer) == 0 {
var err error
if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.mu.Unlock()
cs.finish(err)
return err
}
}
a := cs.attempt
cs.mu.Unlock()
err := op(a)
cs.mu.Lock()
if a != cs.attempt {
continue
}
if err == io.EOF {
<-a.transportStream.Done()
}
if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) {
onSuccess()
cs.mu.Unlock()
return err
}
if err := cs.retryLocked(a, err); err != nil {
cs.mu.Unlock()
return err
}
}
}

2. newAttemptLocked 方法

newAttemptLocked 方法创建一个新的客户端流尝试对象,包含上下文、统计处理器和跟踪信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
if err := cs.ctx.Err(); err != nil {
return nil, toRPCErr(err)
}
if err := cs.cc.ctx.Err(); err != nil {
return nil, ErrClientConnClosing
}

ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1)
method := cs.callHdr.Method
var beginTime time.Time
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
Client: true,
BeginTime: beginTime,
FailFast: cs.callInfo.failFast,
IsClientStream: cs.desc.ClientStreams,
IsServerStream: cs.desc.ServerStreams,
IsTransparentRetryAttempt: isTransparent,
}
sh.HandleRPC(ctx, begin)
}

var trInfo *traceInfo
if EnableTracing {
trInfo = &traceInfo{
tr: newTrace("grpc.Sent."+methodFamily(method), method),
firstLine: firstLine{
client: true,
},
}
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = time.Until(deadline)
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
ctx = newTraceContext(ctx, trInfo.tr)
}

return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
decompressorV0: cs.cc.dopts.dc,
statsHandlers: shs,
trInfo: trInfo,
}, nil
}

3. getTransport 方法

getTransport 方法获取用于传输的客户端传输对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (a *csAttempt) getTransport() error {
cs := a.cs

var err error
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
a.drop = true
}
return err
}
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
}
return nil
}

4. newStream 方法

newStream 方法在传输上创建一个新的流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (a *csAttempt) newStream() error {
cs := a.cs
cs.callHdr.PreviousAttempts = cs.numRetries

// 合并 PickResult 中的元数据
if a.pickResult.Metadata != nil {
md, _ := metadata.FromOutgoingContext(a.ctx)
md = metadata.Join(md, a.pickResult.Metadata)
a.ctx = metadata.NewOutgoingContext(a.ctx, md)
}

s, err := a.transport.NewStream(a.ctx, cs.callHdr)
if err != nil {
nse, ok := err.(*transport.NewStreamError)
if !ok {
return err
}

if nse.AllowTransparentRetry {
a.allowTransparentRetry = true
}

return toRPCErr(nse.Err)
}
a.transportStream = s
a.ctx = s.Context()
a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool}
return nil
}

5. shouldRetry 方法

shouldRetry 方法决定是否应该重试 RPC 调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (a *csAttempt) shouldRetry(err error) (bool, error) {
cs := a.cs

if cs.finished || cs.committed || a.drop {
return false, err
}
if a.transportStream == nil && a.allowTransparentRetry {
return true, nil
}

// 等待尾部元数据
unprocessed := false
if a.transportStream != nil {
<-a.transportStream.Done()
unprocessed = a.transportStream.Unprocessed()
}
if cs.firstAttempt && unprocessed {
return true, nil
}
if cs.cc.dopts.disableRetry {
return false, err
}

// 检查服务器推回
pushback := 0
hasPushback := false
if a.transportStream != nil {
if !a.transportStream.TrailersOnly() {
return false, err
}

sps := a.transportStream.Trailer()["grpc-retry-pushback-ms"]
if len(sps) == 1 {
var e error
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
cs.retryThrottler.throttle()
return false, err
}
hasPushback = true
} else if len(sps) > 1 {
cs.retryThrottler.throttle()
return false, err
}
}

// 检查状态码是否可重试
var code codes.Code
if a.transportStream != nil {
code = a.transportStream.Status().Code()
} else {
code = status.Code(err)
}

rp := cs.methodConfig.RetryPolicy
if rp == nil || !rp.RetryableStatusCodes[code] {
return false, err
}

// 检查重试节流
if cs.retryThrottler.throttle() {
return false, err
}

// 检查重试次数
if cs.numRetries+1 >= rp.MaxAttempts {
return false, err
}

// 计算重试延迟
var dur time.Duration
if hasPushback {
dur = time.Millisecond * time.Duration(pushback)
cs.numRetriesSincePushback = 0
} else {
fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
cur := min(float64(rp.InitialBackoff)*fact, float64(rp.MaxBackoff))
cur *= 0.8 + 0.4*rand.Float64()
dur = time.Duration(int64(cur))
cs.numRetriesSincePushback++
}

// 等待重试延迟
t := time.NewTimer(dur)
select {
case <-t.C:
cs.numRetries++
return false, nil
case <-cs.ctx.Done():
t.Stop()
return false, status.FromContextError(cs.ctx.Err()).Err()
}
}

整体流程图

整体流程图

详细调用关系

详细调用关系

重试机制详细流程

重试机制详细流程

总结

newClientStreamWithParams 方法是 gRPC 客户端创建流式连接的核心方法,它负责初始化客户端流,设置各种参数,并创建与服务器的连接。该方法通过 withRetry 机制提供了强大的重试功能,确保在网络不稳定的情况下仍能可靠地完成 RPC 调用。

整个流程涉及多个组件的协作,包括:

  1. 上下文和超时管理
  2. 用户自定义选项处理
  3. 消息大小限制设置
  4. 编解码器和压缩选项配置
  5. 传输层选择和流创建
  6. 重试机制
  7. 二进制日志记录
  8. 上下文清理

这些组件共同工作,提供了一个功能完善、可靠的 gRPC 客户端流实现。