GRPC

官方文档:http://grpc.mydoc.io/

proto文件:

syntax = "proto3";

//以外部类模式生成
option java_multiple_files = true;
//所在包名
option java_package = "cn.dxh.practice";
//最外层类名称
option java_outer_classname = "PracticeService";

// 定义服务
service UserService {
    //按id查询数据
    rpc FindById (FindByIdRequest) returns (FindByIdResponse) {
    }

    //服务器端流式 RPC
    //    客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
    //    通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
    //查询所有符合条件的数据
    rpc FindByAge (FindByAgeRequest) returns (stream UserEntity) {
    }

    //客户端流式 RPC
    //    即客户端用提供的一个数据流写入并发送一系列消息给服务端。
    //    客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
    //    通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
    //删除数据
    rpc DeleteById (DeletedByIdRequest) returns (DeletedByIdResponse) {
    }

    //双向流式 RPC
    //    即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,
    //    例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
    //通过在请求和响应前加 stream 关键字去制定方法的类型。
    rpc FindByName (stream FindByNameRequest) returns (stream UserEntity) {
    }
}

//按id查询数据参数
message FindByIdRequest {
    int64 id = 1;
}
//按id查询数据返回结果
message FindByIdResponse {
    string name = 2;
    int32 age = 3;
}
//查询语句的条件参数
message FindByAgeRequest {
    int32 age = 1;
}

//删除数据的参数
message DeletedByIdRequest {
    int64 id = 1;
}

//删除数据的返回结果
message DeletedByIdResponse {
}

//参数
message FindByNameRequest {
}

//实例
message UserEntity {
    int64 id = 1;
    string name = 2;
    int32 age = 3;
    string from = 4;
    string pwd = 5;
}

服务端

编写serviceImpl类

此类需要继承 UserServiceGrpc.UserServiceImplBase ,这个类提供了我们定义的接口,继承后并覆盖需要实现的方法。

@Slf4j
@Service
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {

  @Resource
  private UserMapper userMapper;

  /**
   * 简单rpc
   * 其中StreamObserver是一个应答观察者,用于封装返回的信息,服务器把该信息传给客户端.请求结束要调用onCompleted()方法.
   * @param request
   * @param responseObserver
   */
  @Override
  public void findById(FindByIdRequest request, StreamObserver responseObserver) {
    try {
      long id = request.getId();
      userMapper.findById(id)
      responseObserver.onNext(UserEntity.newBuilder().build());
//      请求结束
      responseObserver.onCompleted();
    } catch (Exception e) {
      log.error("findById is error");
    }
  }

  /**
   * 服务器端流式 RPC
   *
   * @param request
   * @param responseObserver
   */
  @Override
  public void findByAge(FindByAgeRequest request, StreamObserver responseObserver) {
    try {
      int age = request.getAge();
      List userEntities = userMapper.findByAgs(age);
      userEntities.forEach(userEntity -> responseObserver.onNext(UserEntity.newBuilder().build()));
      responseObserver.onCompleted();
    } catch (Exception e) {
      log.error("");
    }
  }


  /**
   * 客户端流式 RPC
   * 服务端就需要一直监控客户端写入情况,因此需要一个StreamObserver接口,其中onNext方法会在客户端每次写入时调用,当写入完毕时调用onCompleted()方法
   * @param responseObserver
   * @return
   */
  @Override
  public StreamObserver deleteById(StreamObserver responseObserver) {
    return new StreamObserver() {
      @Override
      public void onNext(DeletedByIdRequest deletedByIdRequest) {
        userMapper.deletedById(deletedByIdRequest.getId());
      }

      @Override
      public void onError(Throwable throwable) {
        throwable.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onNext(DeletedByIdResponse.newBuilder().build());
        responseObserver.onCompleted();
      }
    };
  }

  /**
   * 双向流式 RPC
   *
   * @param responseObserver
   * @return
   */
  @Override
  public StreamObserver findByName(StreamObserver responseObserver) {
    return new StreamObserver() {
      @Override
      public void onNext(FindByNameRequest findByNameRequest) {
        userMapper.findByName(findByNameRequest.getName());
      }

      @Override
      public void onError(Throwable throwable) {
        throwable.printStackTrace();
      }

      @Override
      public void onCompleted() {
        responseObserver.onNext(UserEntity.newBuilder().build());
        responseObserver.onCompleted();
      }
    };
  }
}

创建服务端

@Component
public class GrpcService {

  private int port = 19090;
  private Server server;

  @Resource
  private UserServiceGrpc.UserServiceImplBase userService;

  @PostConstruct
  public void init() throws IOException {
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
                                                            .setNameFormat("grpc-server-thread-%d")
                                                            .build();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                                                         100,
                                                         60L,
                                                         TimeUnit.SECONDS,
                                                         new LinkedBlockingQueue<>(),
                                                         threadFactory,
                                                         new ThreadPoolExecutor.CallerRunsPolicy());
    server = ServerBuilder.forPort(port)
                          .addService(userService)
                          .executor(executor).build();
    server.start();
  }

  @PreDestroy
  public void destroy() {
    if (server != null) {
      server.shutdown();
    }
  }

}

客户端