关于 http 处理 grpc-gateway stream 流式响应时的问题

2021-05-11 19:29:39 +08:00
 HUNYXV

http 请求 stream 响应时,response body 打印出来是正确的结果:

		{"result":{"code":1,"msg":"1111"}}
        {"result":{"code":2,"msg":"2222"}}
        {"result":{"code":3,"msg":"3333"}}
        {"result":{"code":4,"msg":"4444"}}
        {"result":{"code":5,"msg":"5555"}}
        {"result":{"code":6,"msg":"6666"}}

但是使用runtime.JSONPb.Decode 时,也会得到五个结果,但每个 decode 出来是个 nil... :

=== RUN   TestHttpRespStream
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:147: resp: <nil>
    service_test.go:149: EOF
--- PASS: TestHttpRespStream (0.62s)

这是 proto 文件:

// ./pb/test.proto

syntax = "proto3";

package pb;

option go_package = "/pb;pb";

import "google/api/annotations.proto";

message Req {
    int32 id = 1;
    string name = 2;
}

message Resp {
    int32 code = 1;
    string msg = 2;
}

service TestService {
    rpc QueryStreamResp(Req) returns (stream Resp){
        option (google.api.http) = {
            post: "/query-stream-resp"
            body: "*"
        };
    };
    rpc QueryStreamReq(stream Req) returns (Resp){
        option (google.api.http) = {
            post: "/query-stream-req"
            body: "*"
        };
    };
    
    rpc Query(stream Req) returns (stream Resp){
        option (google.api.http) = {
            post: "/query"
            body: "*"
        };
    };
}

grpc 服务端:

func (ts *TestService) QueryStreamResp(req *pb.Req, stream pb.TestService_QueryStreamRespServer) error {
	log.Printf("QueryStreamResp|start...|req: %+v\n", req)
	result := []*pb.Resp{
		{Code: 1, Msg: "1111"},
		{Code: 2, Msg: "2222"},
		{Code: 3, Msg: "3333"},
		{Code: 4, Msg: "4444"},
		{Code: 5, Msg: "5555"},
		{Code: 6, Msg: "6666"},
	}

	// header := make(metadata.MD)
	// header.Append("content-type", "application/json")
	// stream.SendHeader(header)
	for i := range result {
		log.Printf("resp: %+v", result[i])
		
		if err := stream.Send(result[i]); err != nil {
			log.Fatal(err)
		}
		time.Sleep(100 * time.Millisecond)
	}
	log.Println("QueryStreamResp|stop...")

	return nil
}

单元测试:

func TestHttpRespStream(t *testing.T) {
	url := "http://127.0.0.1:8080/query-stream-resp"
	reqData := &pb.Req{Id: 1, Name: "111"}

	var buffer bytes.Buffer
	encoder := (&runtime.JSONPb{}).NewEncoder(&buffer)

	if err := encoder.Encode(reqData); err != nil {
		t.Fatal(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, &buffer)
	if err != nil {
		t.Fatal(err)
	}

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	// body, err := ioutil.ReadAll(resp.Body)
	// if err != nil {
	// 	t.Fatal(err)
	// }
	// t.Logf("body: %s", string(body))
	jsonb := new(runtime.JSONPb)
	dencoder := jsonb.NewDecoder(resp.Body)

	for {
		var result *pb.Resp
		err := dencoder.Decode(result)
		if err == nil {
			t.Logf("resp: %+v", result)
		} else {
			t.Logf("%+v", err)
			break
		}
	}
}
1035 次点击
所在节点    HTTP
1 条回复
HUNYXV
2021-05-13 17:32:31 +08:00

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/776316

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX