gRPC Services

High-performance RPC with tonic

advanced
grpctonicprotobuf
🎮 Interactive Playground

What is gRPC?

gRPC is a high-performance RPC framework using Protocol Buffers for serialization. It supports streaming, strong typing, and is ideal for microservices communication.

The Problem

Building gRPC services in Rust requires:

  • Proto definitions: Schema for messages and services
  • Code generation: tonic-build generates Rust types
  • Server implementation: Implement generated traits
  • Client usage: Type-safe RPC calls

Example Code

// proto/user_service.proto
syntax = "proto3";

package userservice;

// User message
message User {
  string id = 1;
  string email = 2;
  string name = 3;
  int64 created_at = 4;
}

// Request/Response messages
message CreateUserRequest {
  string email = 1;
  string name = 2;
  string password = 3;
}

message CreateUserResponse {
  User user = 1;
}

message GetUserRequest {
  string id = 1;
}

message GetUserResponse {
  User user = 1;
}

message ListUsersRequest {
  int32 page = 1;
  int32 per_page = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  int32 total = 2;
}

message UpdateUserRequest {
  string id = 1;
  optional string email = 2;
  optional string name = 3;
}

message DeleteUserRequest {
  string id = 1;
}

message DeleteUserResponse {
  bool success = 1;
}

// Streaming example
message WatchUsersRequest {
  // Empty - watch all users
}

message UserEvent {
  enum EventType {
    CREATED = 0;
    UPDATED = 1;
    DELETED = 2;
  }
  EventType event_type = 1;
  User user = 2;
}

// Service definition
service UserService {
  // Unary RPCs
  rpc CreateUser(CreateUserRequest) returns (CreateUserResponse);
  rpc GetUser(GetUserRequest) returns (GetUserResponse);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  rpc UpdateUser(UpdateUserRequest) returns (GetUserResponse);
  rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);

  // Server streaming
  rpc WatchUsers(WatchUsersRequest) returns (stream UserEvent);

  // Client streaming
  rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);

  // Bidirectional streaming
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user_id = 1;
  string content = 2;
  int64 timestamp = 3;
}
// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/user_service.proto")?;
    Ok(())
}
// src/server.rs
use tonic::{Request, Response, Status, Streaming};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::pin::Pin;
use futures::Stream;

// Include generated code
pub mod user_service {
    tonic::include_proto!("userservice");
}

use user_service::{
    user_service_server::{UserService, UserServiceServer},
    User, CreateUserRequest, CreateUserResponse,
    GetUserRequest, GetUserResponse,
    ListUsersRequest, ListUsersResponse,
    UpdateUserRequest, DeleteUserRequest, DeleteUserResponse,
    WatchUsersRequest, UserEvent, user_event::EventType,
    ChatMessage,
};

// Server state
#[derive(Debug, Clone)]
pub struct UserServiceImpl {
    users: Arc<RwLock<HashMap<String, User>>>,
    next_id: Arc<RwLock<u64>>,
    event_senders: Arc<RwLock<Vec<mpsc::Sender<UserEvent>>>>,
}

impl UserServiceImpl {
    pub fn new() -> Self {
        UserServiceImpl {
            users: Arc::new(RwLock::new(HashMap::new())),
            next_id: Arc::new(RwLock::new(1)),
            event_senders: Arc::new(RwLock::new(Vec::new())),
        }
    }

    fn generate_id(&self) -> String {
        let mut id = self.next_id.write().unwrap();
        let current = *id;
        *id += 1;
        current.to_string()
    }

    fn broadcast_event(&self, event: UserEvent) {
        let senders = self.event_senders.read().unwrap();
        for sender in senders.iter() {
            let _ = sender.try_send(event.clone());
        }
    }
}

impl Default for UserServiceImpl {
    fn default() -> Self {
        Self::new()
    }
}

#[tonic::async_trait]
impl UserService for UserServiceImpl {
    // Unary RPC: Create user
    async fn create_user(
        &self,
        request: Request<CreateUserRequest>,
    ) -> Result<Response<CreateUserResponse>, Status> {
        let req = request.into_inner();

        // Validate
        if !req.email.contains('@') {
            return Err(Status::invalid_argument("Invalid email format"));
        }

        // Check duplicate
        {
            let users = self.users.read().unwrap();
            if users.values().any(|u| u.email == req.email) {
                return Err(Status::already_exists("Email already registered"));
            }
        }

        let user = User {
            id: self.generate_id(),
            email: req.email,
            name: req.name,
            created_at: chrono::Utc::now().timestamp(),
        };

        self.users.write().unwrap().insert(user.id.clone(), user.clone());

        // Broadcast event
        self.broadcast_event(UserEvent {
            event_type: EventType::Created as i32,
            user: Some(user.clone()),
        });

        Ok(Response::new(CreateUserResponse { user: Some(user) }))
    }

    // Unary RPC: Get user
    async fn get_user(
        &self,
        request: Request<GetUserRequest>,
    ) -> Result<Response<GetUserResponse>, Status> {
        let req = request.into_inner();

        let users = self.users.read().unwrap();
        let user = users.get(&req.id)
            .cloned()
            .ok_or_else(|| Status::not_found("User not found"))?;

        Ok(Response::new(GetUserResponse { user: Some(user) }))
    }

    // Unary RPC: List users
    async fn list_users(
        &self,
        request: Request<ListUsersRequest>,
    ) -> Result<Response<ListUsersResponse>, Status> {
        let req = request.into_inner();
        let page = req.page.max(1) as usize;
        let per_page = req.per_page.clamp(1, 100) as usize;

        let users = self.users.read().unwrap();
        let total = users.len() as i32;

        let users: Vec<User> = users
            .values()
            .skip((page - 1) * per_page)
            .take(per_page)
            .cloned()
            .collect();

        Ok(Response::new(ListUsersResponse { users, total }))
    }

    // Unary RPC: Update user
    async fn update_user(
        &self,
        request: Request<UpdateUserRequest>,
    ) -> Result<Response<GetUserResponse>, Status> {
        let req = request.into_inner();

        let mut users = self.users.write().unwrap();
        let user = users.get_mut(&req.id)
            .ok_or_else(|| Status::not_found("User not found"))?;

        if let Some(email) = req.email {
            user.email = email;
        }
        if let Some(name) = req.name {
            user.name = name;
        }

        let user = user.clone();

        // Broadcast event
        drop(users);
        self.broadcast_event(UserEvent {
            event_type: EventType::Updated as i32,
            user: Some(user.clone()),
        });

        Ok(Response::new(GetUserResponse { user: Some(user) }))
    }

    // Unary RPC: Delete user
    async fn delete_user(
        &self,
        request: Request<DeleteUserRequest>,
    ) -> Result<Response<DeleteUserResponse>, Status> {
        let req = request.into_inner();

        let user = {
            let mut users = self.users.write().unwrap();
            users.remove(&req.id)
                .ok_or_else(|| Status::not_found("User not found"))?
        };

        // Broadcast event
        self.broadcast_event(UserEvent {
            event_type: EventType::Deleted as i32,
            user: Some(user),
        });

        Ok(Response::new(DeleteUserResponse { success: true }))
    }

    // Server streaming RPC: Watch users
    type WatchUsersStream = Pin<Box<dyn Stream<Item = Result<UserEvent, Status>> + Send>>;

    async fn watch_users(
        &self,
        _request: Request<WatchUsersRequest>,
    ) -> Result<Response<Self::WatchUsersStream>, Status> {
        let (tx, rx) = mpsc::channel(128);

        // Register this subscriber
        self.event_senders.write().unwrap().push(tx);

        let stream = ReceiverStream::new(rx).map(Ok);
        Ok(Response::new(Box::pin(stream)))
    }

    // Client streaming RPC: Batch create users
    async fn batch_create_users(
        &self,
        request: Request<Streaming<CreateUserRequest>>,
    ) -> Result<Response<ListUsersResponse>, Status> {
        let mut stream = request.into_inner();
        let mut created_users = Vec::new();

        while let Some(req) = stream.message().await? {
            // Validate and create each user
            if req.email.contains('@') {
                let user = User {
                    id: self.generate_id(),
                    email: req.email,
                    name: req.name,
                    created_at: chrono::Utc::now().timestamp(),
                };

                self.users.write().unwrap().insert(user.id.clone(), user.clone());
                created_users.push(user);
            }
        }

        let total = created_users.len() as i32;
        Ok(Response::new(ListUsersResponse {
            users: created_users,
            total,
        }))
    }

    // Bidirectional streaming RPC: Chat
    type ChatStream = Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;

    async fn chat(
        &self,
        request: Request<Streaming<ChatMessage>>,
    ) -> Result<Response<Self::ChatStream>, Status> {
        let mut stream = request.into_inner();
        let (tx, rx) = mpsc::channel(128);

        tokio::spawn(async move {
            while let Ok(Some(msg)) = stream.message().await {
                // Echo back with timestamp
                let response = ChatMessage {
                    user_id: msg.user_id,
                    content: format!("Echo: {}", msg.content),
                    timestamp: chrono::Utc::now().timestamp(),
                };
                if tx.send(Ok(response)).await.is_err() {
                    break;
                }
            }
        });

        let stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(stream)))
    }
}

// Simulate chrono
mod chrono {
    pub struct Utc;
    impl Utc {
        pub fn now() -> Self { Utc }
    }
    impl Utc {
        pub fn timestamp(&self) -> i64 { 0 }
    }
}

// Use futures::StreamExt for map
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let service = UserServiceImpl::new();

    println!("gRPC server listening on {}", addr);

    tonic::transport::Server::builder()
        .add_service(UserServiceServer::new(service))
        .serve(addr)
        .await?;

    Ok(())
}

Why This Works

  1. Proto-first: Schema defines contract, generates code
  2. Type safety: Rust types match proto definitions
  3. Streaming: Built-in support for all streaming modes
  4. Performance: Binary protocol, HTTP/2, multiplexing

gRPC Streaming Modes

| Mode | Description | Use Case |

|------|-------------|----------|

| Unary | Single request, single response | CRUD operations |

| Server streaming | Single request, stream of responses | Real-time feeds |

| Client streaming | Stream of requests, single response | Batch uploads |

| Bidirectional | Both sides stream | Chat, gaming |

Error Handling

// Use Status codes appropriately
Err(Status::invalid_argument("message")) // Bad input
Err(Status::not_found("message"))         // Resource missing
Err(Status::already_exists("message"))    // Conflict
Err(Status::permission_denied("message")) // Auth failure
Err(Status::internal("message"))          // Server error

⚠️ Anti-patterns

// DON'T: Return generic errors
Err(Status::unknown("Error occurred"))

// DO: Be specific
Err(Status::invalid_argument("Email must contain @"))

// DON'T: Ignore streaming backpressure
while let Some(msg) = stream.next().await {
    // Process without considering channel capacity
}

// DO: Handle channel full scenarios
if tx.try_send(msg).is_err() {
    // Client too slow, handle appropriately
}

Exercises

  1. Add authentication interceptor using tonic middleware
  2. Implement health checking service
  3. Add reflection for grpcurl/grpcui support
  4. Create client library with connection pooling

🎮 Try it Yourself

🎮

gRPC Services - Playground

Run this code in the official Rust Playground