High-performance RPC with tonic
gRPC is a high-performance RPC framework using Protocol Buffers for serialization. It supports streaming, strong typing, and is ideal for microservices communication.
Building gRPC services in Rust requires:
tonic-build generates Rust types// proto/user_service.proto
syntax = "proto3";
package userservice;
// User message
message User {
string id = 1;
string email = 2;
string name = 3;
int64 created_at = 4;
}
// Request/Response messages
message CreateUserRequest {
string email = 1;
string name = 2;
string password = 3;
}
message CreateUserResponse {
User user = 1;
}
message GetUserRequest {
string id = 1;
}
message GetUserResponse {
User user = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 per_page = 2;
}
message ListUsersResponse {
repeated User users = 1;
int32 total = 2;
}
message UpdateUserRequest {
string id = 1;
optional string email = 2;
optional string name = 3;
}
message DeleteUserRequest {
string id = 1;
}
message DeleteUserResponse {
bool success = 1;
}
// Streaming example
message WatchUsersRequest {
// Empty - watch all users
}
message UserEvent {
enum EventType {
CREATED = 0;
UPDATED = 1;
DELETED = 2;
}
EventType event_type = 1;
User user = 2;
}
// Service definition
service UserService {
// Unary RPCs
rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
rpc GetUser(GetUserRequest) returns (GetUserResponse);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
rpc UpdateUser(UpdateUserRequest) returns (GetUserResponse);
rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);
// Server streaming
rpc WatchUsers(WatchUsersRequest) returns (stream UserEvent);
// Client streaming
rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// Bidirectional streaming
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user_id = 1;
string content = 2;
int64 timestamp = 3;
}
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/user_service.proto")?;
Ok(())
}
// src/server.rs
use tonic::{Request, Response, Status, Streaming};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::pin::Pin;
use futures::Stream;
// Include generated code
pub mod user_service {
tonic::include_proto!("userservice");
}
use user_service::{
user_service_server::{UserService, UserServiceServer},
User, CreateUserRequest, CreateUserResponse,
GetUserRequest, GetUserResponse,
ListUsersRequest, ListUsersResponse,
UpdateUserRequest, DeleteUserRequest, DeleteUserResponse,
WatchUsersRequest, UserEvent, user_event::EventType,
ChatMessage,
};
// Server state
#[derive(Debug, Clone)]
pub struct UserServiceImpl {
users: Arc<RwLock<HashMap<String, User>>>,
next_id: Arc<RwLock<u64>>,
event_senders: Arc<RwLock<Vec<mpsc::Sender<UserEvent>>>>,
}
impl UserServiceImpl {
pub fn new() -> Self {
UserServiceImpl {
users: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
event_senders: Arc::new(RwLock::new(Vec::new())),
}
}
fn generate_id(&self) -> String {
let mut id = self.next_id.write().unwrap();
let current = *id;
*id += 1;
current.to_string()
}
fn broadcast_event(&self, event: UserEvent) {
let senders = self.event_senders.read().unwrap();
for sender in senders.iter() {
let _ = sender.try_send(event.clone());
}
}
}
impl Default for UserServiceImpl {
fn default() -> Self {
Self::new()
}
}
#[tonic::async_trait]
impl UserService for UserServiceImpl {
// Unary RPC: Create user
async fn create_user(
&self,
request: Request<CreateUserRequest>,
) -> Result<Response<CreateUserResponse>, Status> {
let req = request.into_inner();
// Validate
if !req.email.contains('@') {
return Err(Status::invalid_argument("Invalid email format"));
}
// Check duplicate
{
let users = self.users.read().unwrap();
if users.values().any(|u| u.email == req.email) {
return Err(Status::already_exists("Email already registered"));
}
}
let user = User {
id: self.generate_id(),
email: req.email,
name: req.name,
created_at: chrono::Utc::now().timestamp(),
};
self.users.write().unwrap().insert(user.id.clone(), user.clone());
// Broadcast event
self.broadcast_event(UserEvent {
event_type: EventType::Created as i32,
user: Some(user.clone()),
});
Ok(Response::new(CreateUserResponse { user: Some(user) }))
}
// Unary RPC: Get user
async fn get_user(
&self,
request: Request<GetUserRequest>,
) -> Result<Response<GetUserResponse>, Status> {
let req = request.into_inner();
let users = self.users.read().unwrap();
let user = users.get(&req.id)
.cloned()
.ok_or_else(|| Status::not_found("User not found"))?;
Ok(Response::new(GetUserResponse { user: Some(user) }))
}
// Unary RPC: List users
async fn list_users(
&self,
request: Request<ListUsersRequest>,
) -> Result<Response<ListUsersResponse>, Status> {
let req = request.into_inner();
let page = req.page.max(1) as usize;
let per_page = req.per_page.clamp(1, 100) as usize;
let users = self.users.read().unwrap();
let total = users.len() as i32;
let users: Vec<User> = users
.values()
.skip((page - 1) * per_page)
.take(per_page)
.cloned()
.collect();
Ok(Response::new(ListUsersResponse { users, total }))
}
// Unary RPC: Update user
async fn update_user(
&self,
request: Request<UpdateUserRequest>,
) -> Result<Response<GetUserResponse>, Status> {
let req = request.into_inner();
let mut users = self.users.write().unwrap();
let user = users.get_mut(&req.id)
.ok_or_else(|| Status::not_found("User not found"))?;
if let Some(email) = req.email {
user.email = email;
}
if let Some(name) = req.name {
user.name = name;
}
let user = user.clone();
// Broadcast event
drop(users);
self.broadcast_event(UserEvent {
event_type: EventType::Updated as i32,
user: Some(user.clone()),
});
Ok(Response::new(GetUserResponse { user: Some(user) }))
}
// Unary RPC: Delete user
async fn delete_user(
&self,
request: Request<DeleteUserRequest>,
) -> Result<Response<DeleteUserResponse>, Status> {
let req = request.into_inner();
let user = {
let mut users = self.users.write().unwrap();
users.remove(&req.id)
.ok_or_else(|| Status::not_found("User not found"))?
};
// Broadcast event
self.broadcast_event(UserEvent {
event_type: EventType::Deleted as i32,
user: Some(user),
});
Ok(Response::new(DeleteUserResponse { success: true }))
}
// Server streaming RPC: Watch users
type WatchUsersStream = Pin<Box<dyn Stream<Item = Result<UserEvent, Status>> + Send>>;
async fn watch_users(
&self,
_request: Request<WatchUsersRequest>,
) -> Result<Response<Self::WatchUsersStream>, Status> {
let (tx, rx) = mpsc::channel(128);
// Register this subscriber
self.event_senders.write().unwrap().push(tx);
let stream = ReceiverStream::new(rx).map(Ok);
Ok(Response::new(Box::pin(stream)))
}
// Client streaming RPC: Batch create users
async fn batch_create_users(
&self,
request: Request<Streaming<CreateUserRequest>>,
) -> Result<Response<ListUsersResponse>, Status> {
let mut stream = request.into_inner();
let mut created_users = Vec::new();
while let Some(req) = stream.message().await? {
// Validate and create each user
if req.email.contains('@') {
let user = User {
id: self.generate_id(),
email: req.email,
name: req.name,
created_at: chrono::Utc::now().timestamp(),
};
self.users.write().unwrap().insert(user.id.clone(), user.clone());
created_users.push(user);
}
}
let total = created_users.len() as i32;
Ok(Response::new(ListUsersResponse {
users: created_users,
total,
}))
}
// Bidirectional streaming RPC: Chat
type ChatStream = Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
async fn chat(
&self,
request: Request<Streaming<ChatMessage>>,
) -> Result<Response<Self::ChatStream>, Status> {
let mut stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Ok(Some(msg)) = stream.message().await {
// Echo back with timestamp
let response = ChatMessage {
user_id: msg.user_id,
content: format!("Echo: {}", msg.content),
timestamp: chrono::Utc::now().timestamp(),
};
if tx.send(Ok(response)).await.is_err() {
break;
}
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream)))
}
}
// Simulate chrono
mod chrono {
pub struct Utc;
impl Utc {
pub fn now() -> Self { Utc }
}
impl Utc {
pub fn timestamp(&self) -> i64 { 0 }
}
}
// Use futures::StreamExt for map
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let service = UserServiceImpl::new();
println!("gRPC server listening on {}", addr);
tonic::transport::Server::builder()
.add_service(UserServiceServer::new(service))
.serve(addr)
.await?;
Ok(())
}
| Mode | Description | Use Case |
|------|-------------|----------|
| Unary | Single request, single response | CRUD operations |
| Server streaming | Single request, stream of responses | Real-time feeds |
| Client streaming | Stream of requests, single response | Batch uploads |
| Bidirectional | Both sides stream | Chat, gaming |
// Use Status codes appropriately
Err(Status::invalid_argument("message")) // Bad input
Err(Status::not_found("message")) // Resource missing
Err(Status::already_exists("message")) // Conflict
Err(Status::permission_denied("message")) // Auth failure
Err(Status::internal("message")) // Server error
// DON'T: Return generic errors
Err(Status::unknown("Error occurred"))
// DO: Be specific
Err(Status::invalid_argument("Email must contain @"))
// DON'T: Ignore streaming backpressure
while let Some(msg) = stream.next().await {
// Process without considering channel capacity
}
// DO: Handle channel full scenarios
if tx.try_send(msg).is_err() {
// Client too slow, handle appropriately
}
Run this code in the official Rust Playground