【go语言之thrift协议三之client端分析】
创始人
2024-05-28 10:07:44
0

go语言之thrift协议之client端分析

  • runClient
    • Open
    • protocolFactory.GetProtocol
    • handleClient
      • NewTStandardClient
      • NewCalculatorClient
      • handleClient的具体实现

上一篇文章分析了thrift协议server端的实现,这边还是基于官方的示例去分析。

import ("crypto/tls""flag""fmt""os""github.com/apache/thrift/lib/go/thrift"
)func Usage() {fmt.Fprint(os.Stderr, "Usage of ", os.Args[0], ":\n")flag.PrintDefaults()fmt.Fprint(os.Stderr, "\n")
}func main() {flag.Usage = Usageserver := flag.Bool("server", false, "Run server")protocol := flag.String("P", "compact", "Specify the protocol (binary, compact, json, simplejson)")framed := flag.Bool("framed", false, "Use framed transport")buffered := flag.Bool("buffered", false, "Use buffered transport")addr := flag.String("addr", "localhost:9090", "Address to listen to")secure := flag.Bool("secure", false, "Use tls secure transport")flag.Parse()var protocolFactory thrift.TProtocolFactoryswitch *protocol {case "compact":protocolFactory = thrift.NewTCompactProtocolFactoryConf(nil)case "simplejson":protocolFactory = thrift.NewTSimpleJSONProtocolFactoryConf(nil)case "json":protocolFactory = thrift.NewTJSONProtocolFactory()case "binary", "":protocolFactory = thrift.NewTBinaryProtocolFactoryConf(nil)default:fmt.Fprint(os.Stderr, "Invalid protocol specified", protocol, "\n")Usage()os.Exit(1)}var transportFactory thrift.TTransportFactorycfg := &thrift.TConfiguration{TLSConfig: &tls.Config{InsecureSkipVerify: true,},}if *buffered {transportFactory = thrift.NewTBufferedTransportFactory(8192)} else {transportFactory = thrift.NewTTransportFactory()}if *framed {transportFactory = thrift.NewTFramedTransportFactoryConf(transportFactory, cfg)}if *server {if err := runServer(transportFactory, protocolFactory, *addr, *secure); err != nil {fmt.Println("error running server:", err)}} else {if err := runClient(transportFactory, protocolFactory, *addr, *secure, cfg); err != nil {fmt.Println("error running client:", err)}}
}

这里的代码因为是和server端是公用的,所以就不进行具体的分析,详情可以看上一篇。

runClient

还是先看一下具体的实现

func runClient(transportFactory thrift.TTransportFactory, protocolFactory thrift.TProtocolFactory, addr string, secure bool, cfg *thrift.TConfiguration) error {// 初始化transportvar transport thrift.TTransportif secure {transport = thrift.NewTSSLSocketConf(addr, cfg)} else {transport = thrift.NewTSocketConf(addr, cfg)}// 根据transportFactory获取transporttransport, err := transportFactory.GetTransport(transport)if err != nil {return err}defer transport.Close()// 根据地址获取连接if err := transport.Open(); err != nil {return err}// 获取inbound的protoiprot := protocolFactory.GetProtocol(transport)// 获取inbound的protooprot := protocolFactory.GetProtocol(transport)// 进行请求return handleClient(tutorial.NewCalculatorClient(thrift.NewTStandardClient(iprot, oprot)))
}

首先就是初始化client的transport,这里因为没有走https,所以这里走的是transport = thrift.NewTSocketConf(addr, cfg).然后和server一样,其实最终是初始化了TSocket这个结构体,如下

// NewTSocketFromAddrConf creates a TSocket from a net.Addr
func NewTSocketFromAddrConf(addr net.Addr, conf *TConfiguration) *TSocket {return &TSocket{addr: addr,cfg:  conf,}
}

然后调用transportFactory.GetTransport。这里的GetTransport其实还是原封不动的返回了,所以这里就是TSocket的这个结构体,然后就是open方法去根据地址获取连接。

Open

// Connects the socket, creating a new socket object if necessary.
func (p *TSocket) Open() error {if p.conn.isValid() {return NewTTransportException(ALREADY_OPEN, "Socket already connected.")}if p.addr == nil {return NewTTransportException(NOT_OPEN, "Cannot open nil address.")}if len(p.addr.Network()) == 0 {return NewTTransportException(NOT_OPEN, "Cannot open bad network name.")}if len(p.addr.String()) == 0 {return NewTTransportException(NOT_OPEN, "Cannot open bad address.")}var err error// 根据地址获取连接if p.conn, err = createSocketConnFromReturn(net.DialTimeout(p.addr.Network(),p.addr.String(),p.cfg.GetConnectTimeout(),)); err != nil {return &tTransportException{typeId: NOT_OPEN,err:    err,msg:    err.Error(),}}p.addr = p.conn.RemoteAddr()return nil
}

这里的逻辑其实也是比较简单,通过官方的net.DialTimeout方法去获取一个链接。

protocolFactory.GetProtocol

这里的实现是和server端一样,根据compact的proto去返回一个TProtocol。


func (p *TCompactProtocolFactory) GetProtocol(trans TTransport) TProtocol {return NewTCompactProtocolConf(trans, p.cfg)
}
func NewTCompactProtocolConf(trans TTransport, conf *TConfiguration) *TCompactProtocol {PropagateTConfiguration(trans, conf)p := &TCompactProtocol{origTransport: trans,cfg:           conf,}if et, ok := trans.(TRichTransport); ok {p.trans = et} else {p.trans = NewTRichTransport(trans)}return p
}

handleClient

注意这里首先是需要初始化client的方法也就是 tutorial.NewCalculatorClient(thrift.NewTStandardClient(iprot, oprot))。接下来进行逐步的解析

NewTStandardClient

// TStandardClient implements TClient, and uses the standard message format for Thrift.
// It is not safe for concurrent use.
func NewTStandardClient(inputProtocol, outputProtocol TProtocol) *TStandardClient {return &TStandardClient{iprot: inputProtocol,oprot: outputProtocol,}
}type TClient interface {Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error)
}

首先需要注意的是这个TStandardClient结构体实现了TClient 这个interface,这个也是这个结构体核心的功能,在后面的很多地方也都有使用。然后看一下call这个方法

type TStruct interface {Write(ctx context.Context, p TProtocol) errorRead(ctx context.Context, p TProtocol) error
}
func (p *TStandardClient) Call(ctx context.Context, method string, args, result TStruct) (ResponseMeta, error) {p.seqId++seqId := p.seqIdif err := p.Send(ctx, p.oprot, seqId, method, args); err != nil {return ResponseMeta{}, err}// method is onewayif result == nil {return ResponseMeta{}, nil}err := p.Recv(ctx, p.iprot, seqId, method, result)var headers THeaderMapif hp, ok := p.iprot.(*THeaderProtocol); ok {headers = hp.transport.readHeaders}return ResponseMeta{Headers: headers,}, err
}

这里的seqId每次调用都会自增一次。
然后可以看出来这个方法的主要是由Send 和Recv这两个方法构成。分别看一下Send方法和Recv方法,

func (p *TStandardClient) Send(ctx context.Context, oprot TProtocol, seqId int32, method string, args TStruct) error {// Set headers from context object on THeaderProtocolif headerProt, ok := oprot.(*THeaderProtocol); ok {headerProt.ClearWriteHeaders()for _, key := range GetWriteHeaderList(ctx) {if value, ok := GetHeader(ctx, key); ok {headerProt.SetWriteHeader(key, value)}}}// 注意这里的oprot就是compact所生成的TProtocol。if err := oprot.WriteMessageBegin(ctx, method, CALL, seqId); err != nil {return err}// 因为这里的args是一个interface,所以后面根据具体的实现分析if err := args.Write(ctx, oprot); err != nil {return err}// 这里compact的WriteMessageEnd方法if err := oprot.WriteMessageEnd(ctx); err != nil {return err}// 调用compact的Flush方法return oprot.Flush(ctx)
}

这里没有多余的逻辑主要就是调用compact中的WriteMessageBegin,WriteMessageEnd方法,然后调用args的args方法。
然后看一下recv方法。

func (p *TStandardClient) Recv(ctx context.Context, iprot TProtocol, seqId int32, method string, result TStruct) error {// 读取compact的ReadMessageBegin方法 并且获取对应的参数rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin(ctx)if err != nil {return err}// 判断是不是自己指定的方法if method != rMethod {return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))} else if seqId != rSeqId {return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))} else if rTypeId == EXCEPTION {var exception tApplicationExceptionif err := exception.Read(ctx, iprot); err != nil {return err}if err := iprot.ReadMessageEnd(ctx); err != nil {return err}return &exception} else if rTypeId != REPLY {return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))}// result读取结果if err := result.Read(ctx, iprot); err != nil {return err}// 调用compact的ReadMessageEnd方法return iprot.ReadMessageEnd(ctx)
}

看起来Recv方法其实和Send类似,读取的是compact的ReadMessageBegin和ReadMessageEnd方法,然后就是调用args的write和result.Read方法。

NewCalculatorClient

需要注意这个是通过thrift实现的方法,入参就是上面实现的TStandardClient结构体,然后实现了TClient这个interface也就是Call方法。

func NewCalculatorClient(c thrift.TClient) *CalculatorClient {return &CalculatorClient{SharedServiceClient: shared.NewSharedServiceClient(c),}
}
func NewSharedServiceClient(c thrift.TClient) *SharedServiceClient {return &SharedServiceClient{c: c,}
}

handleClient的具体实现

func handleClient(client *tutorial.CalculatorClient) (err error) {// 调用ping方法client.Ping(defaultCtx)fmt.Println("ping()")// 调用Add方法sum, _ := client.Add(defaultCtx, 1, 1)fmt.Print("1+1=", sum, "\n")// 调用Calculate方法work := tutorial.NewWork()work.Op = tutorial.Operation_DIVIDEwork.Num1 = 1work.Num2 = 0quotient, err := client.Calculate(defaultCtx, 1, work)if err != nil {switch v := err.(type) {case *tutorial.InvalidOperation:fmt.Println("Invalid operation:", v)default:fmt.Println("Error during operation:", err)}} else {fmt.Println("Whoa we can divide by 0 with new value:", quotient)}// 调用Calculate方法 work.Op = tutorial.Operation_SUBTRACTwork.Num1 = 15work.Num2 = 10diff, err := client.Calculate(defaultCtx, 1, work)if err != nil {switch v := err.(type) {case *tutorial.InvalidOperation:fmt.Println("Invalid operation:", v)default:fmt.Println("Error during operation:", err)}return err} else {fmt.Print("15-10=", diff, "\n")}// 调用GetStruct方法log, err := client.GetStruct(defaultCtx, 1)if err != nil {fmt.Println("Unable to get struct:", err)return err} else {fmt.Println("Check log:", log.Value)}return err
}

这个方法从使用上来看是好理解的,主要作用就是组装参数,调用client所实现的方法,接下来看一下这些方法在thrift中的实现。因为这里的实现都是大同小异的,所以这里看一下Add这个方法,代码如下

type CalculatorAddArgs struct {Num1 int32 `thrift:"num1,1" db:"num1" json:"num1"`Num2 int32 `thrift:"num2,2" db:"num2" json:"num2"`
}// Attributes:
//   - Success
type CalculatorAddResult struct {Success *int32 `thrift:"success,0" db:"success" json:"success,omitempty"`
}// ResponseMeta represents the metadata attached to the response.
type ResponseMeta struct {// The headers in the response, if any.// If the underlying transport/protocol is not THeader, this will always be nil.Headers THeaderMap
}// Parameters:
//   - Num1
//   - Num2
func (p *CalculatorClient) Add(ctx context.Context, num1 int32, num2 int32) (_r int32, _err error) {var _args3 CalculatorAddArgs_args3.Num1 = num1_args3.Num2 = num2var _result5 CalculatorAddResultvar _meta4 thrift.ResponseMeta_meta4, _err = p.Client_().Call(ctx, "add", &_args3, &_result5)p.SetLastResponseMeta_(_meta4)if _err != nil {return}return _result5.GetSuccess(), nil
}

这里的CalculatorAddArgs,CalculatorAddResult和ResponseMeta都如上所示,然后也是调用的Call方法。之所以要传入add是因为在server那边根据方法名称去获取不同的的handler,所以method名称是add。然后就是获取结果,然后调用CalculatorAddResult 中的success的成员,当然这里就是int32。

到这里基本上就说完了,当然这还远远不够,因为接下来需要说一下thrift中不同的proto对于TProtocol的具体实现。

相关内容

热门资讯

iPhone手机怎么玩安卓系统... 你有没有想过,你的iPhone手机竟然也能玩安卓系统?没错,就是那个一直以来让你觉得遥不可及的安卓世...
平板删安卓系统更新不了,原因及... 最近是不是你也遇到了这样的烦恼?平板电脑上的安卓系统更新不了,是不是让你头疼得要命?别急,今天就来给...
苹果组装机安卓系统卡,卡顿背后... 你有没有发现,最近用苹果手机的时候,有时候系统有点卡呢?这可真是让人头疼啊!你知道吗,其实这背后还有...
安卓系统原生浏览器,功能与体验... 你有没有发现,每次打开手机,那个小小的浏览器窗口总是默默无闻地在那里,陪你浏览网页、搜索信息、看视频...
安卓机如何上苹果系统,跨平台体... 你是不是也和我一样,对安卓机和苹果系统之间的切换充满了好奇?想象你的安卓手机里装满了各种应用,而苹果...
安卓导入系统证书失败,原因分析... 最近在使用安卓手机的时候,你是不是也遇到了一个让人头疼的问题——导入系统证书失败?别急,今天就来给你...
安卓原生系统有哪些手机,盘点搭... 你有没有想过,为什么有些手机用起来就是那么流畅,那么顺心呢?这背后可大有学问哦!今天,就让我带你一起...
安卓系统关机了怎么定位,安卓系... 手机突然关机了,是不是有点慌张呢?别担心,今天就来教你一招,让你的安卓手机即使关机了,也能轻松定位到...
安卓系统游戏加速器,畅享无延迟... 你有没有发现,手机游戏越来越好玩了?不过,有时候游戏体验可能并不那么顺畅,是不是因为手机性能不够强大...
安卓4系统天气功能,尽在掌握 安卓4系统天气功能大揭秘在当今这个数字化的世界里,手机已经不仅仅是一个通信工具,它更是一个集成了各种...
安卓系统如何玩碧蓝幻想,攻略与... 你有没有想过,在安卓系统上玩《碧蓝幻想》竟然可以这么酷炫?没错,就是那个让你沉迷其中的二次元大作!今...
安卓系统搜不到图朵,图朵生成之... 最近是不是你也遇到了这样的烦恼?手机里明明有那么多美美的图片,但是用安卓系统搜索的时候,却怎么也找不...
魁族8刷安卓系统,系统升级后的... 哇,你知道吗?最近在安卓系统圈子里,有一个话题可是引起了不小的轰动,那就是魁族8刷安卓系统。你是不是...
微信正版安装安卓系统,畅享沟通... 你有没有想过,你的微信是不是正版安装的安卓系统呢?这可不是一个小问题哦,它关系到你的微信使用体验和隐...
电视能刷安卓系统吗,电视也能刷... 电视能刷安卓系统吗?揭秘智能电视的无限可能想象你家的电视不再只是用来观看节目的工具,而是变成了一个功...
安卓系统开通通知功能,畅享智能... 你知道吗?最近安卓系统更新后,新增了一个超级实用的功能——开通通知功能!这可是个大喜事,让咱们的生活...
苹果系统安卓爱思助手,系统兼容... 你有没有发现,手机的世界里,苹果系统和安卓系统就像是一对欢喜冤家,总是各有各的粉丝,各有各的拥趸。而...
安卓系统占用很大内存,揭秘内存... 手机里的安卓系统是不是让你感觉内存不够用,就像你的房间堆满了杂物,总是找不到地方放新东西?别急,今天...
安卓系统p30,安卓系统下的摄... 你有没有发现,最近安卓系统P30在手机圈里可是火得一塌糊涂呢!这不,我就来给你好好扒一扒这款手机的那...
siri被安卓系统进入了,智能... 你知道吗?最近科技圈可是炸开了锅,因为一个大家伙——Siri,竟然悄悄地溜进了安卓系统!这可不是什么...