newClientStreamWithParams
方法详细分析newClientStreamWithParams
方法位于 stream.go
文件的第260-404行,是 gRPC 客户端创建流式连接的核心方法。这个方法负责初始化客户端流,设置上下文、超时、压缩选项,并创建与服务器的连接。下面我将详细分析这个方法的功能及其调用关系。
方法功能分析 1. 上下文和超时处理 1 2 3 4 5 6 var cancel context.CancelFuncif mc.Timeout != nil && *mc.Timeout >= 0 { ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) } else { ctx, cancel = context.WithCancel(ctx) }
这段代码创建了一个子上下文,如果配置了超时,则设置超时时间;否则创建一个可取消的上下文。这确保了 RPC 调用可以被超时或手动取消。
2. 执行用户自定义的前置方法 1 2 3 4 5 for _, o := range opts { if err := o.before(callInfo); err != nil { return nil , toRPCErr(err) } }
这段代码执行用户通过 CallOption
提供的前置方法,允许用户在 RPC 调用前进行自定义操作。
3. 设置消息大小限制 1 2 callInfo.maxSendMessageSize = getMaxSize(mc.MaxReqSize, callInfo.maxSendMessageSize, defaultClientMaxSendMessageSize) callInfo.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, callInfo.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
这段代码设置了发送和接收消息的最大大小,确保消息不会超过限制。
4. 设置编解码器 1 2 3 if err := setCallInfoCodec(callInfo); err != nil { return nil , err }
这段代码设置了用于序列化和反序列化消息的编解码器。
5. 创建调用头部 1 2 3 4 5 6 callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, ContentSubtype: callInfo.contentSubtype, DoneFunc: doneFunc, }
这段代码创建了一个调用头部,包含主机、方法、内容子类型和完成回调函数。
6. 设置压缩选项 1 2 3 4 5 6 7 8 9 10 11 12 13 14 var compressorV0 Compressorvar compressorV1 encoding.Compressorif ct := callInfo.compressorName; ct != "" { callHdr.SendCompress = ct if ct != encoding.Identity { compressorV1 = encoding.GetCompressor(ct) if compressorV1 == nil { return nil , status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q" , ct) } } } else if cc.dopts.compressorV0 != nil { callHdr.SendCompress = cc.dopts.compressorV0.Type() compressorV0 = cc.dopts.compressorV0 }
这段代码设置了发送压缩选项,如果用户指定了压缩器,则使用用户指定的;否则使用连接默认的压缩器。
7. 设置凭证 1 2 3 if callInfo.creds != nil { callHdr.Creds = callInfo.creds }
这段代码设置了调用凭证,用于认证。
8. 创建客户端流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 cs := &clientStream{ callHdr: callHdr, ctx: ctx, methodConfig: &mc, opts: opts, callInfo: callInfo, cc: cc, desc: desc, codec: callInfo.codec, compressorV0: compressorV0, compressorV1: compressorV1, cancel: cancel, firstAttempt: true , onCommit: onCommit, }
这段代码创建了一个客户端流对象,包含了所有必要的信息。
9. 设置重试节流器 1 2 3 if !cc.dopts.disableRetry { cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) }
这段代码设置了重试节流器,用于控制重试频率。
10. 设置二进制日志 1 2 3 4 5 6 7 8 if ml := binarylog.GetMethodLogger(method); ml != nil { cs.binlogs = append (cs.binlogs, ml) }if cc.dopts.binaryLogger != nil { if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil { cs.binlogs = append (cs.binlogs, ml) } }
这段代码设置了二进制日志记录器,用于记录 RPC 调用的二进制日志。
11. 选择传输并创建新流 1 2 3 4 5 6 7 8 9 10 11 12 13 op := func (a *csAttempt) error { if err := a.getTransport(); err != nil { return err } if err := a.newStream(); err != nil { return err } cs.attempt = a return nil }if err := cs.withRetry(op, func () { cs.bufferForRetryLocked(0 , op, nil ) }); err != nil { return nil , err }
这段代码定义了一个操作函数,该函数获取传输、创建新流,并将尝试对象赋值给客户端流。然后使用 withRetry
方法执行这个操作,如果失败则重试。
12. 记录二进制日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 if len (cs.binlogs) != 0 { md, _ := metadata.FromOutgoingContext(ctx) logEntry := &binarylog.ClientHeader{ OnClientSide: true , Header: md, MethodName: method, Authority: cs.cc.authority, } if deadline, ok := ctx.Deadline(); ok { logEntry.Timeout = time.Until(deadline) if logEntry.Timeout < 0 { logEntry.Timeout = 0 } } for _, binlog := range cs.binlogs { binlog.Log(cs.ctx, logEntry) } }
这段代码记录了客户端头部的二进制日志。
13. 设置上下文清理 1 2 3 4 5 6 7 8 9 10 if desc != unaryStreamDesc { go func () { select { case <-cc.ctx.Done(): cs.finish(ErrClientConnClosing) case <-ctx.Done(): cs.finish(toRPCErr(ctx.Err())) } }() }
这段代码为非一元流设置了上下文清理,当连接关闭或上下文取消时,结束客户端流。
关键调用方法分析 1. withRetry
方法 withRetry
方法是客户端流重试机制的核心,它接受一个操作函数和一个成功回调函数,执行操作并在失败时重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (cs *clientStream) withRetry(op func (a *csAttempt) error , onSuccess func () ) error { cs.mu.Lock() for { if cs.committed { cs.mu.Unlock() return toRPCErr(op(cs.attempt)) } if len (cs.replayBuffer) == 0 { var err error if cs.attempt, err = cs.newAttemptLocked(false ); err != nil { cs.mu.Unlock() cs.finish(err) return err } } a := cs.attempt cs.mu.Unlock() err := op(a) cs.mu.Lock() if a != cs.attempt { continue } if err == io.EOF { <-a.transportStream.Done() } if err == nil || (err == io.EOF && a.transportStream.Status().Code() == codes.OK) { onSuccess() cs.mu.Unlock() return err } if err := cs.retryLocked(a, err); err != nil { cs.mu.Unlock() return err } } }
2. newAttemptLocked
方法 newAttemptLocked
方法创建一个新的客户端流尝试对象,包含上下文、统计处理器和跟踪信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 func (cs *clientStream) newAttemptLocked(isTransparent bool ) (*csAttempt, error ) { if err := cs.ctx.Err(); err != nil { return nil , toRPCErr(err) } if err := cs.cc.ctx.Err(); err != nil { return nil , ErrClientConnClosing } ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.compressorV0, cs.compressorV1) method := cs.callHdr.Method var beginTime time.Time shs := cs.cc.dopts.copts.StatsHandlers for _, sh := range shs { ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast}) beginTime = time.Now() begin := &stats.Begin{ Client: true , BeginTime: beginTime, FailFast: cs.callInfo.failFast, IsClientStream: cs.desc.ClientStreams, IsServerStream: cs.desc.ServerStreams, IsTransparentRetryAttempt: isTransparent, } sh.HandleRPC(ctx, begin) } var trInfo *traceInfo if EnableTracing { trInfo = &traceInfo{ tr: newTrace("grpc.Sent." +methodFamily(method), method), firstLine: firstLine{ client: true , }, } if deadline, ok := ctx.Deadline(); ok { trInfo.firstLine.deadline = time.Until(deadline) } trInfo.tr.LazyLog(&trInfo.firstLine, false ) ctx = newTraceContext(ctx, trInfo.tr) } return &csAttempt{ ctx: ctx, beginTime: beginTime, cs: cs, decompressorV0: cs.cc.dopts.dc, statsHandlers: shs, trInfo: trInfo, }, nil }
3. getTransport
方法 getTransport
方法获取用于传输的客户端传输对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (a *csAttempt) getTransport() error { cs := a.cs var err error a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method) if err != nil { if de, ok := err.(dropError); ok { err = de.error a.drop = true } return err } if a.trInfo != nil { a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr()) } return nil }
4. newStream
方法 newStream
方法在传输上创建一个新的流。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (a *csAttempt) newStream() error { cs := a.cs cs.callHdr.PreviousAttempts = cs.numRetries if a.pickResult.Metadata != nil { md, _ := metadata.FromOutgoingContext(a.ctx) md = metadata.Join(md, a.pickResult.Metadata) a.ctx = metadata.NewOutgoingContext(a.ctx, md) } s, err := a.transport.NewStream(a.ctx, cs.callHdr) if err != nil { nse, ok := err.(*transport.NewStreamError) if !ok { return err } if nse.AllowTransparentRetry { a.allowTransparentRetry = true } return toRPCErr(nse.Err) } a.transportStream = s a.ctx = s.Context() a.parser = &parser{r: s, bufferPool: a.cs.cc.dopts.copts.BufferPool} return nil }
5. shouldRetry
方法 shouldRetry
方法决定是否应该重试 RPC 调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 func (a *csAttempt) shouldRetry(err error ) (bool , error ) { cs := a.cs if cs.finished || cs.committed || a.drop { return false , err } if a.transportStream == nil && a.allowTransparentRetry { return true , nil } unprocessed := false if a.transportStream != nil { <-a.transportStream.Done() unprocessed = a.transportStream.Unprocessed() } if cs.firstAttempt && unprocessed { return true , nil } if cs.cc.dopts.disableRetry { return false , err } pushback := 0 hasPushback := false if a.transportStream != nil { if !a.transportStream.TrailersOnly() { return false , err } sps := a.transportStream.Trailer()["grpc-retry-pushback-ms" ] if len (sps) == 1 { var e error if pushback, e = strconv.Atoi(sps[0 ]); e != nil || pushback < 0 { cs.retryThrottler.throttle() return false , err } hasPushback = true } else if len (sps) > 1 { cs.retryThrottler.throttle() return false , err } } var code codes.Code if a.transportStream != nil { code = a.transportStream.Status().Code() } else { code = status.Code(err) } rp := cs.methodConfig.RetryPolicy if rp == nil || !rp.RetryableStatusCodes[code] { return false , err } if cs.retryThrottler.throttle() { return false , err } if cs.numRetries+1 >= rp.MaxAttempts { return false , err } var dur time.Duration if hasPushback { dur = time.Millisecond * time.Duration(pushback) cs.numRetriesSincePushback = 0 } else { fact := math.Pow(rp.BackoffMultiplier, float64 (cs.numRetriesSincePushback)) cur := min(float64 (rp.InitialBackoff)*fact, float64 (rp.MaxBackoff)) cur *= 0.8 + 0.4 *rand.Float64() dur = time.Duration(int64 (cur)) cs.numRetriesSincePushback++ } t := time.NewTimer(dur) select { case <-t.C: cs.numRetries++ return false , nil case <-cs.ctx.Done(): t.Stop() return false , status.FromContextError(cs.ctx.Err()).Err() } }
整体流程图
详细调用关系
重试机制详细流程
总结 newClientStreamWithParams
方法是 gRPC 客户端创建流式连接的核心方法,它负责初始化客户端流,设置各种参数,并创建与服务器的连接。该方法通过 withRetry
机制提供了强大的重试功能,确保在网络不稳定的情况下仍能可靠地完成 RPC 调用。
整个流程涉及多个组件的协作,包括:
上下文和超时管理
用户自定义选项处理
消息大小限制设置
编解码器和压缩选项配置
传输层选择和流创建
重试机制
二进制日志记录
上下文清理
这些组件共同工作,提供了一个功能完善、可靠的 gRPC 客户端流实现。