官方文档:http://grpc.mydoc.io/
proto文件:
syntax = "proto3";
//以外部类模式生成
option java_multiple_files = true;
//所在包名
option java_package = "cn.dxh.practice";
//最外层类名称
option java_outer_classname = "PracticeService";
// 定义服务
service UserService {
//按id查询数据
rpc FindById (FindByIdRequest) returns (FindByIdResponse) {
}
//服务器端流式 RPC
// 客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
// 通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
//查询所有符合条件的数据
rpc FindByAge (FindByAgeRequest) returns (stream UserEntity) {
}
//客户端流式 RPC
// 即客户端用提供的一个数据流写入并发送一系列消息给服务端。
// 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
// 通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
//删除数据
rpc DeleteById (DeletedByIdRequest) returns (DeletedByIdResponse) {
}
//双向流式 RPC
// 即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,
// 例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
//通过在请求和响应前加 stream 关键字去制定方法的类型。
rpc FindByName (stream FindByNameRequest) returns (stream UserEntity) {
}
}
//按id查询数据参数
message FindByIdRequest {
int64 id = 1;
}
//按id查询数据返回结果
message FindByIdResponse {
string name = 2;
int32 age = 3;
}
//查询语句的条件参数
message FindByAgeRequest {
int32 age = 1;
}
//删除数据的参数
message DeletedByIdRequest {
int64 id = 1;
}
//删除数据的返回结果
message DeletedByIdResponse {
}
//参数
message FindByNameRequest {
}
//实例
message UserEntity {
int64 id = 1;
string name = 2;
int32 age = 3;
string from = 4;
string pwd = 5;
}
服务端
编写serviceImpl类
此类需要继承 UserServiceGrpc.UserServiceImplBase ,这个类提供了我们定义的接口,继承后并覆盖需要实现的方法。
@Slf4j
@Service
public class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {
@Resource
private UserMapper userMapper;
/**
* 简单rpc
* 其中StreamObserver是一个应答观察者,用于封装返回的信息,服务器把该信息传给客户端.请求结束要调用onCompleted()方法.
* @param request
* @param responseObserver
*/
@Override
public void findById(FindByIdRequest request, StreamObserver responseObserver) {
try {
long id = request.getId();
userMapper.findById(id)
responseObserver.onNext(UserEntity.newBuilder().build());
// 请求结束
responseObserver.onCompleted();
} catch (Exception e) {
log.error("findById is error");
}
}
/**
* 服务器端流式 RPC
*
* @param request
* @param responseObserver
*/
@Override
public void findByAge(FindByAgeRequest request, StreamObserver responseObserver) {
try {
int age = request.getAge();
List userEntities = userMapper.findByAgs(age);
userEntities.forEach(userEntity -> responseObserver.onNext(UserEntity.newBuilder().build()));
responseObserver.onCompleted();
} catch (Exception e) {
log.error("");
}
}
/**
* 客户端流式 RPC
* 服务端就需要一直监控客户端写入情况,因此需要一个StreamObserver接口,其中onNext方法会在客户端每次写入时调用,当写入完毕时调用onCompleted()方法
* @param responseObserver
* @return
*/
@Override
public StreamObserver deleteById(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(DeletedByIdRequest deletedByIdRequest) {
userMapper.deletedById(deletedByIdRequest.getId());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onNext(DeletedByIdResponse.newBuilder().build());
responseObserver.onCompleted();
}
};
}
/**
* 双向流式 RPC
*
* @param responseObserver
* @return
*/
@Override
public StreamObserver findByName(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(FindByNameRequest findByNameRequest) {
userMapper.findByName(findByNameRequest.getName());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
responseObserver.onNext(UserEntity.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}
创建服务端
@Component
public class GrpcService {
private int port = 19090;
private Server server;
@Resource
private UserServiceGrpc.UserServiceImplBase userService;
@PostConstruct
public void init() throws IOException {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("grpc-server-thread-%d")
.build();
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
100,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy());
server = ServerBuilder.forPort(port)
.addService(userService)
.executor(executor).build();
server.start();
}
@PreDestroy
public void destroy() {
if (server != null) {
server.shutdown();
}
}
}