-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added a new Tonic project with anonymisation
- Loading branch information
Showing
9 changed files
with
439 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
[package] | ||
name = "seank-tonic-project-rs" | ||
version = "0.1.1" | ||
edition = "2021" | ||
|
||
[dependencies] | ||
tonic = "0.11.0" | ||
tokio = { version = "1", features = ["full"] } | ||
prost = "0.12.6" | ||
async-stream = "0.3.5" | ||
tokio-stream = "0.1.15" | ||
async-channel = "2.3.1" | ||
h2 = "0.3.26" | ||
log = "0.4.21" | ||
env_logger = "0.11.3" | ||
|
||
[build-dependencies] | ||
tonic-build = "0.11" | ||
|
||
[build-dependencies.config_struct] | ||
version = "~0.5.0" | ||
features = ["toml-parsing"] | ||
|
||
[dev-dependencies] | ||
tonic-mock = "0.3.0" | ||
|
||
[[bin]] | ||
name = "client" | ||
path = "src/client.rs" | ||
|
||
[[bin]] | ||
name = "mock_server" | ||
path = "src/mock_server.rs" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
# gRPC Client & Mock Server Project | ||
|
||
## Introduction | ||
|
||
This repository contains a gRPC client project implemented in Rust. | ||
|
||
## Technologies Used | ||
This project uses the Rust language and the following libraries: | ||
- Tonic: A Rust gRPC library | ||
- Tokio: An asynchronous runtime for Rust | ||
|
||
## Repository Structure | ||
|
||
### 1. `Cargo.toml` | ||
|
||
This is a Rust build file containing required dependencies and the 'client' binary setup. | ||
|
||
### 2. `build.rs` | ||
|
||
This is a Tonic-specific build script that runs automatically as part of the Rust build process. | ||
|
||
### 3. `src/client.rs` | ||
|
||
This file contains the main client implementation. | ||
|
||
### 4. `proto/service.proto` | ||
|
||
This is the protocol buffer definition file for the gRPC service. | ||
|
||
## How To Run | ||
|
||
1. Ensure that the token and endpoint used in the code are valid by testing with the grpcurl command. (e.g., grpcurl -plaintext -proto ./proto/service.proto -H "Authorization: Bearer TOKEN" -d @ -v 'example.com:3001' service.Controller/Connection) | ||
2. Run 'cargo build' in the project root directory to build the project. | ||
3. Run 'RUST_LOG=debug cargo run --bin client' to start the client. | ||
4. Use ctrl-c to cancel the execution. | ||
|
||
Note: | ||
1. The client is designed to run continuously, restarting the connection to the server after certain events. | ||
2. The client may print out some state information between sessions. Please ignore these logs, as they do not indicate the start of a new session. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
use std::error::Error; | ||
use std::{env, path::PathBuf}; | ||
use config_struct::StructOptions; | ||
|
||
fn main() -> Result<(), Box<dyn Error>> { | ||
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); | ||
|
||
tonic_build::configure() | ||
.file_descriptor_set_path(out_dir.join("service_descriptor.bin")) | ||
.compile(&["proto/service.proto"], &["proto"])?; | ||
|
||
tonic_build::compile_protos("proto/service.proto") | ||
.unwrap_or_else(|e| panic!("Failed to compile protos {:?}", e)); | ||
|
||
config_struct::create_struct( | ||
"config.toml", | ||
"src/config.rs", | ||
&StructOptions::default()).unwrap(); | ||
|
||
Ok(()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# config.toml | ||
endpoint = "http://[::1]:50051" | ||
token = "Bearer MYTOKEN" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
// there was some proper names here. I modified them to anonymise | ||
|
||
syntax = "proto3"; | ||
|
||
package service; | ||
|
||
service Controller { | ||
rpc Connection(stream ClientMsg) returns (stream ServerMsg); | ||
} | ||
|
||
message ClientMsg { | ||
int32 c = 3; | ||
} | ||
|
||
message Point { | ||
float x = 1; | ||
} | ||
|
||
message Area { | ||
Point n_point = 1; | ||
Point m_point = 2; | ||
} | ||
|
||
message ServerMsg { | ||
message SimOver { | ||
bool success = 1; | ||
optional string details = 2; | ||
} | ||
|
||
message SimStart { | ||
Point l = 1; | ||
Area limit = 2; | ||
Area g = 3; | ||
} | ||
|
||
message SimUpdate { | ||
Point l = 1; | ||
} | ||
|
||
oneof data { | ||
SimStart start = 1; | ||
SimOver ended = 2; | ||
SimUpdate update = 3; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
#![recursion_limit = "512"] | ||
|
||
use std::sync::Arc; | ||
use async_channel::{Receiver, Sender, unbounded}; | ||
use tonic::metadata::MetadataValue; | ||
use tonic::{Request, Status}; | ||
use tonic::transport::Channel; | ||
use tokio::sync::{OnceCell}; | ||
use tonic::codegen::InterceptedService; | ||
use tonic::service::Interceptor; | ||
use tokio::time::{Duration, interval}; | ||
use log::{debug, info, warn}; | ||
|
||
pub mod proto { | ||
tonic::include_proto!("service"); | ||
} | ||
|
||
mod config; | ||
|
||
use proto::controller_client::ControllerClient; | ||
use proto::ClientMsg; | ||
use crate::proto::server_msg::{Data, SimOver, SimStart, SimUpdate}; | ||
use crate::proto::Area; | ||
|
||
#[derive(Clone)] | ||
struct ServiceState { | ||
x: f32, | ||
} | ||
|
||
#[allow(dead_code)] | ||
#[derive(Clone, Debug)] | ||
struct ServiceInstruction { | ||
b_minimum_x: f32, | ||
b_maximum_x: f32, | ||
g_minimum_x: f32, | ||
g_maximum_x: f32, | ||
} | ||
|
||
struct MyInterceptor { | ||
token: String | ||
} | ||
|
||
impl Interceptor for MyInterceptor { | ||
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> { | ||
let token: MetadataValue<_> = self.token.parse() | ||
.map_err(|e| Status::invalid_argument(format!("Invalid metadata value: {:?}", e)))?; | ||
request.metadata_mut().insert("authorization", token.clone()); | ||
Ok(request) | ||
} | ||
} | ||
|
||
fn calculate_control_values(service_state: &ServiceState, prev_service_state: &ServiceState, instruction: &ServiceInstruction, dt: f32) -> i32 { | ||
// there was some algorithm here. I deleted it to anonymise the code and hard coded the return value | ||
let g_x = (instruction.g_minimum_x + instruction.g_maximum_x) / 2.0; | ||
let distance_x = g_x - service_state.x; | ||
distance_x as i32 | ||
} | ||
|
||
|
||
async fn establish_connection( | ||
state_receiver: Arc<Receiver<Option<ServiceState>>>, | ||
server_instruction: Arc<OnceCell<ServiceInstruction>>, | ||
navigation_success: Arc<OnceCell<bool>>, | ||
config: config::Config, | ||
) -> Result<(ControllerClient<InterceptedService<Channel, MyInterceptor>>, tonic::Streaming<proto::ServerMsg>), Box<dyn std::error::Error>> { | ||
let channel = Channel::from_shared(config.endpoint.to_string())?.connect().await?; | ||
let mut client = ControllerClient::with_interceptor(channel, MyInterceptor { token: config.token.to_string() } ); | ||
let delta_time = 0.02; | ||
|
||
let outbound = async_stream::stream! { | ||
let mut interval = interval(Duration::from_secs_f32(delta_time)); | ||
let mut prev_service_state = ServiceState { x: 0.0 }; | ||
|
||
while let (None, server_instruction, Ok(service_state_maybe)) = | ||
(navigation_success.get(), server_instruction.get(), state_receiver.recv().await) { | ||
interval.tick().await; | ||
|
||
match (service_state_maybe, server_instruction) { | ||
(Some(service_state), Some(instruction)) => { | ||
let c = calculate_control_values(&service_state, &prev_service_state, &instruction, delta_time); | ||
prev_service_state = service_state; | ||
debug!("both information present: c:{:?}", c); | ||
yield ClientMsg { c }; | ||
}, | ||
_ => { | ||
debug!("Not supposed to happen but returning empty Service Client Message "); | ||
yield ClientMsg { c: 0 }; | ||
}, | ||
} | ||
} | ||
}; | ||
|
||
let response = client.connection(Request::new(outbound)).await?; | ||
let inbound = response.into_inner(); | ||
|
||
Ok((client, inbound)) | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
env_logger::init(); | ||
info!("Endpoint: {}", config::CONFIG.endpoint); | ||
info!("Token: {}", config::CONFIG.token); | ||
|
||
let empty_string = String::from(""); | ||
loop { | ||
let (state_sender, state_receiver): (Sender<Option<ServiceState>>, Receiver<Option<ServiceState>>) = unbounded(); | ||
let state_receiver_arc = Arc::new(state_receiver); | ||
let server_instruction = Arc::new(OnceCell::new()); | ||
let navigation_success = Arc::new(OnceCell::new()); | ||
let (_, mut inbound) = establish_connection(state_receiver_arc, server_instruction.clone(), navigation_success.clone(), config::CONFIG).await?; | ||
|
||
while let Some(server_msg) = inbound.message().await? { | ||
match &server_msg.data { | ||
Some(Data::Start(SimStart { | ||
l: Some(l), | ||
limit: Some(Area { n_point: Some(boundary_min), m_point: Some(boundary_max) }), | ||
g: Some(Area { n_point: Some(goal_min), m_point: Some(goal_max) })})) => { | ||
let state = ServiceState { x: l.x }; | ||
|
||
let service_instruction = ServiceInstruction { | ||
b_minimum_x: boundary_min.x, | ||
b_maximum_x: boundary_max.x, | ||
g_minimum_x: goal_min.x, | ||
g_maximum_x: goal_max.x, | ||
}; | ||
|
||
server_instruction.set(service_instruction).unwrap(); | ||
state_sender.send(Some(state)).await.unwrap_or_else(|e| warn!("async-channel error, SendError is {:?}", e.to_string())); | ||
} | ||
|
||
Some(Data::Start(SimStart { l: _, limit: _, g: _, })) => { | ||
panic!("this shouldn't happen, SimStart should provide all the necessary instructions needed"); | ||
} | ||
Some(Data::Update(SimUpdate { l: Some(location) })) => { | ||
let state = ServiceState { x: location.x, }; | ||
state_sender.send(Some(state)).await?; | ||
} | ||
Some(Data::Ended(SimOver { success: true, details })) => { | ||
info!("\n\n******** Simulation ended with success ******** {:?}\n", details.as_ref().unwrap_or(&empty_string)); | ||
navigation_success.set(true).unwrap(); | ||
break; | ||
} | ||
Some(Data::Ended(SimOver { success: false, details })) => { | ||
info!("\n\n******** Simulation ended with failure ******** {:?}\n", details.as_ref().unwrap_or(&empty_string)); | ||
navigation_success.set(false).unwrap(); | ||
break; | ||
} | ||
_ => { | ||
state_sender.send(None).await?; | ||
warn!("unknown"); | ||
} | ||
} | ||
} | ||
|
||
info!("\nWaiting for 30 seconds (cool off period) before restarting the connection...\n\n"); | ||
drop(inbound); | ||
drop(state_sender); | ||
tokio::time::sleep(Duration::from_secs(30)).await; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#![cfg_attr(rustfmt, rustfmt_skip)] | ||
#![allow(dead_code)] | ||
|
||
use std::borrow::Cow; | ||
|
||
#[derive(Debug, Clone)] | ||
#[allow(non_camel_case_types)] | ||
pub struct Config { | ||
pub endpoint: Cow<'static, str>, | ||
pub token: Cow<'static, str>, | ||
} | ||
|
||
pub const CONFIG: Config = Config { | ||
endpoint: Cow::Borrowed("http://[::1]:50051"), | ||
token: Cow::Borrowed("Bearer MYTOKEN"), | ||
}; |
Oops, something went wrong.