Dubbo3 Triple 協(xié)議

2022-04-24 16:41 更新

Triple 協(xié)議使用

Triple 協(xié)議是 Dubbo3 的主力協(xié)議,完整兼容 gRPC over HTTP/2,并在協(xié)議層面擴展了負(fù)載均衡和流量控制相關(guān)機制。本文檔旨在指導(dǎo)用戶正確的使用 Triple 協(xié)議。

在開始前,需要決定服務(wù)使用的序列化方式,如果為新服務(wù),推薦使用 protobuf 作為默認(rèn)序列化,在性能和跨語言上的效果都會更好。如果是原有服務(wù)想進行協(xié)議升級,Triple 協(xié)議也已經(jīng)支持其他序列化方式,如 Hessian / JSON 等

Protobuf

  1. 編寫 IDL 文件
    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "org.apache.dubbo.hello";
    option java_outer_classname = "HelloWorldProto";
    option objc_class_prefix = "HLW";
    
    package helloworld;
    
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
    }
    
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
  2. 添加編譯 protobuf 的 extension 和 plugin (以 maven 為例)
    <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.6.1</version>
                </extension>
            </extensions>
            <plugins>
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.6.1</version>
                    <configuration>
                        <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>triple-java</pluginId>
                        <outputDirectory>build/generated/source/proto/main/java</outputDirectory>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>test-compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
  3. 構(gòu)建/ 編譯生成 protobuf Message 類
    $ mvn clean install

Unary 方式

  1. 編寫 Java 接口
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    
    public interface IGreeter {
        /**
         * <pre>
         *  Sends a greeting
         * </pre>
         */
        HelloReply sayHello(HelloRequest request);
    }
  2. 創(chuàng)建 Provider
        public static void main(String[] args) throws InterruptedException {
            ServiceConfig<IGreeter> service = new ServiceConfig<>();
            service.setInterface(IGreeter.class);
            service.setRef(new IGreeter1Impl());
            // 這里需要顯示聲明使用的協(xié)議為triple 
            service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
            service.setApplication(new ApplicationConfig("demo-provider"));
            service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
            service.export();
            System.out.println("dubbo service started");
            new CountDownLatch(1).await();
        }
  3. 創(chuàng)建 Consumer
    public static void main(String[] args) throws IOException {
        ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
        ref.setInterface(IGreeter.class);
        ref.setCheck(false);
        ref.setInterface(IGreeter.class);
        ref.setCheck(false);
        ref.setProtocol(CommonConstants.TRIPLE);
        ref.setLazy(true);
        ref.setTimeout(100000);
        ref.setApplication(new ApplicationConfig("demo-consumer"));
        ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        final IGreeter iGreeter = ref.get();
    
        System.out.println("dubbo ref started");
        try {
            final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
                    .setName("name")
                    .build());
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Reply:" + reply);
        } catch (Throwable t) {
            t.printStackTrace();
        }
        System.in.read();
    }
  4. 運行 Provider 和 Consumer ,可以看到請求正常返回了
    Reply:message: “name”

stream 方式

  1. 編寫 Java 接口
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    
    public interface IGreeter {
        /**
     	* <pre>
     	*  Sends greeting by stream
     	* </pre>
    	 */
        StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    
    }
  2. 編寫實現(xiàn)類
    public class IStreamGreeterImpl implements IStreamGreeter {
    
        @Override
        public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
    
            return new StreamObserver<HelloRequest>() {
                private List<HelloReply> replyList = new ArrayList<>();
    
                @Override
                public void onNext(HelloRequest helloRequest) {
                    System.out.println("onNext receive request name:" + helloRequest.getName());
                    replyList.add(HelloReply.newBuilder()
                        .setMessage("receive name:" + helloRequest.getName())
                        .build());
                }
    
                @Override
                public void onError(Throwable cause) {
                    System.out.println("onError");
                    replyObserver.onError(cause);
                }
    
                @Override
                public void onCompleted() {
                    System.out.println("onComplete receive request size:" + replyList.size());
                    for (HelloReply reply : replyList) {
                        replyObserver.onNext(reply);
                    }
                    replyObserver.onCompleted();
                }
            };
        }
    }
  3. 創(chuàng)建 Provider
    public class StreamProvider {
        public static void main(String[] args) throws InterruptedException {
            ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
            service.setInterface(IStreamGreeter.class);
            service.setRef(new IStreamGreeterImpl());
            service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
            service.setApplication(new ApplicationConfig("stream-provider"));
            service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
            service.export();
            System.out.println("dubbo service started");
            new CountDownLatch(1).await();
        }
    }
  4. 創(chuàng)建 Consumer
    public class StreamConsumer {
        public static void main(String[] args) throws InterruptedException, IOException {
            ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
            ref.setInterface(IStreamGreeter.class);
            ref.setCheck(false);
            ref.setProtocol(CommonConstants.TRIPLE);
            ref.setLazy(true);
            ref.setTimeout(100000);
            ref.setApplication(new ApplicationConfig("stream-consumer"));
            ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
            final IStreamGreeter iStreamGreeter = ref.get();
    
            System.out.println("dubbo ref started");
            try {
    
                StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
                    @Override
                    public void onNext(HelloReply reply) {
                        System.out.println("onNext");
                        System.out.println(reply.getMessage());
                    }
    
                    @Override
                    public void onError(Throwable throwable) {
                        System.out.println("onError:" + throwable.getMessage());
                    }
    
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                });
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("tony")
                    .build());
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("nick")
                    .build());
    
                streamObserver.onCompleted();
            } catch (Throwable t) {
                t.printStackTrace();
            }
            System.in.read();
        }
    }
  5. 運行 Provider 和 Consumer ,可以看到請求正常返回了
    onNext
    receive name:tony
    onNext
    receive name:nick
    onCompleted

其他序列化方式

省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的協(xié)議即可完成協(xié)議升級。

示例程序 

本文的示例程序可以在 triple-samples 找到


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號