Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use api boundary nodes as socks proxies #2712

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7995aa1
tmep
mihailjianu1 Nov 13, 2024
0a06cf0
add socks caches
mihailjianu1 Nov 14, 2024
bce7d54
fixes
mihailjianu1 Nov 19, 2024
2088183
fix things
mihailjianu1 Nov 19, 2024
2116a1f
cache
mihailjianu1 Nov 19, 2024
0d4cb86
comments
mihailjianu1 Nov 19, 2024
4e57980
remove btreemap
mihailjianu1 Nov 19, 2024
1ada60d
renamings
mihailjianu1 Nov 19, 2024
6cc0bc6
.
mihailjianu1 Nov 20, 2024
66f2446
mutex
mihailjianu1 Nov 20, 2024
f28f4d4
fix
mihailjianu1 Nov 20, 2024
ae1469a
mutex, retry
mihailjianu1 Nov 21, 2024
91bafd8
parking_lot
mihailjianu1 Nov 22, 2024
8de8072
metrics
mihailjianu1 Nov 22, 2024
85f1574
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Nov 22, 2024
c85671a
clippy
mihailjianu1 Nov 22, 2024
0aae15c
buildifier
mihailjianu1 Nov 22, 2024
e68ebd1
attempt
mihailjianu1 Nov 27, 2024
6a4e052
ch
mihailjianu1 Nov 29, 2024
149f49e
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Nov 29, 2024
9433d02
debug
mihailjianu1 Dec 2, 2024
a1f32a1
final changes
mihailjianu1 Dec 4, 2024
dd401e9
clippy
mihailjianu1 Dec 4, 2024
4169bf0
better debug messages
mihailjianu1 Dec 4, 2024
ed73d7b
renaming
mihailjianu1 Dec 4, 2024
e7bbcb0
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Dec 4, 2024
2ea658b
fix
mihailjianu1 Dec 4, 2024
5b14628
fix
mihailjianu1 Dec 4, 2024
9054a99
coments
mihailjianu1 Dec 13, 2024
117a763
refactor
mihailjianu1 Dec 13, 2024
3a650a7
rename metric
mihailjianu1 Dec 13, 2024
a40e10d
comments
mihailjianu1 Dec 23, 2024
a4329d9
lock
mihailjianu1 Dec 27, 2024
95aee5a
lock
mihailjianu1 Dec 27, 2024
5f3a200
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Dec 27, 2024
92da343
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Jan 13, 2025
609fb05
comments
mihailjianu1 Jan 14, 2025
9876cf2
Merge branch 'master' into mihailjianu1/tmep2
mihailjianu1 Jan 21, 2025
fc831e0
comments
mihailjianu1 Jan 22, 2025
8be6dd8
remove todo
mihailjianu1 Jan 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rs/https_outcalls/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ DEPENDENCIES = [
"@crate_index//:hyper-rustls",
"@crate_index//:hyper-socks2",
"@crate_index//:hyper-util",
"@crate_index//:parking_lot",
"@crate_index//:prometheus",
"@crate_index//:rand",
"@crate_index//:serde",
"@crate_index//:serde_json",
"@crate_index//:slog",
Expand All @@ -35,7 +37,6 @@ DEV_DEPENDENCIES = [
"@crate_index//:async-stream",
"@crate_index//:bytes",
"@crate_index//:once_cell",
"@crate_index//:rand",
"@crate_index//:rstest",
"@crate_index//:rustls",
"@crate_index//:rustls-pemfile",
Expand Down
4 changes: 3 additions & 1 deletion rs/https_outcalls/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ ic-config = { path = "../../config" }
ic-https-outcalls-service = { path = "../service" }
ic-logger = { path = "../../monitoring/logger" }
ic-metrics = { path = "../../monitoring/metrics" }
parking_lot = { workspace = true }
prometheus = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slog = { workspace = true }
Expand All @@ -29,11 +31,11 @@ tokio = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }


[dev-dependencies]
async-stream = { workspace = true }
bytes = { workspace = true }
once_cell = "1.13.1"
rand = { workspace = true }
rustls = { workspace = true }
rustls-pemfile = "2.1.2"
rstest = { workspace = true }
Expand Down
27 changes: 26 additions & 1 deletion rs/https_outcalls/adapter/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ic_metrics::MetricsRegistry;
use prometheus::{IntCounter, IntCounterVec};
use prometheus::{IntCounter, IntCounterVec, IntGauge};

/// Labels for request errors
pub(crate) const LABEL_BODY_RECEIVE_SIZE: &str = "body_receive_size";
Expand All @@ -20,6 +20,14 @@ pub struct AdapterMetrics {
pub requests: IntCounter,
/// The number of requests served via a SOCKS proxy.
pub requests_socks: IntCounter,
/// The number of socks connections attempts
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
pub socks_connections_attempts: IntCounter,
/// The number of socks clients in the cache
pub socks_cache_size: IntGauge,
/// The number of cache misses for socks clients
pub socks_cache_miss: IntCounter,
/// The number of successful socks connections
pub succesful_socks_connections: IntCounterVec,
/// Network traffic generated by adapter.
pub network_traffic: IntCounterVec,
/// Request failure types.
Expand All @@ -38,6 +46,23 @@ impl AdapterMetrics {
"requests_socks_total",
"Total number of requests served via a SOCKS proxy",
),
socks_connections_attempts: metrics_registry.int_counter(
"socks_connections_attempts",
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
"Total number of time the adapter tries to proxy a request via a SOCKS proxy",
),
socks_cache_size: metrics_registry.int_gauge(
"socks_cache_size",
"The size of the cache for SOCKS clients",
),
socks_cache_miss: metrics_registry.int_counter(
"socks_cache_miss",
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
"Total number of times the adapter failed to find a SOCKS client in the cache",
),
succesful_socks_connections: metrics_registry.int_counter_vec(
"successful_socks_connections_total",
"Total number of successful SOCKS connections",
&["number_of_tries"],
rumenov marked this conversation as resolved.
Show resolved Hide resolved
),
network_traffic: metrics_registry.int_counter_vec(
"network_traffic_bytes_total",
"Network traffic generated by adapter.",
Expand Down
182 changes: 178 additions & 4 deletions rs/https_outcalls/adapter/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use hyper::{
header::{HeaderMap, ToStrError},
Method,
};
use rand::Rng;

mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
use hyper::body::Incoming;
use hyper_rustls::HttpsConnector;
use hyper_rustls::HttpsConnectorBuilder;
use hyper_socks2::SocksConnector;
Expand All @@ -23,7 +26,11 @@ use ic_https_outcalls_service::{
};
use ic_logger::{debug, ReplicaLogger};
use ic_metrics::MetricsRegistry;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rand::{seq::SliceRandom, thread_rng};
use std::collections::BTreeMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tonic::{Request, Response, Status};

Expand All @@ -41,15 +48,24 @@ const USER_AGENT_ADAPTER: &str = "ic/1.0";
/// "the total number of bytes representing the header names and values must not exceed 48KiB".
const MAX_HEADER_LIST_SIZE: u32 = 52 * 1024;

/// The maximum number of times we will try to connect to a SOCKS proxy.
const MAX_SOCKS_PROXY_RETRIES: usize = 3;

/// The probability of using api boundary node addresses for SOCKS proxy dark launch.
const REGISTRY_SOCKS_PROXY_DARK_LAUNCH_PERCENTAGE: u32 = 0;

type OutboundRequestBody = Full<Bytes>;

/// Implements HttpsOutcallsService
// TODO: consider making this private
type Cache =
BTreeMap<String, Client<HttpsConnector<SocksConnector<HttpConnector>>, OutboundRequestBody>>;

pub struct CanisterHttp {
client: Client<HttpsConnector<HttpConnector>, OutboundRequestBody>,
socks_client: Client<HttpsConnector<SocksConnector<HttpConnector>>, OutboundRequestBody>,
cache: Arc<RwLock<Cache>>,
rumenov marked this conversation as resolved.
Show resolved Hide resolved
logger: ReplicaLogger,
metrics: AdapterMetrics,
http_connect_timeout_secs: u64,
}

impl CanisterHttp {
Expand Down Expand Up @@ -98,8 +114,148 @@ impl CanisterHttp {
Self {
client,
socks_client,
cache: Arc::new(RwLock::new(BTreeMap::new())),
logger,
metrics: AdapterMetrics::new(metrics),
http_connect_timeout_secs: config.http_connect_timeout_secs,
}
}

fn create_socks_client_for_address(
rumenov marked this conversation as resolved.
Show resolved Hide resolved
&self,
address: &str,
) -> Option<Client<HttpsConnector<SocksConnector<HttpConnector>>, OutboundRequestBody>> {
// Create a new HTTP connector
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
http_connector
.set_connect_timeout(Some(Duration::from_secs(self.http_connect_timeout_secs)));

match address.parse() {
Ok(proxy_addr) => {
let proxy_connector = SocksConnector {
proxy_addr,
auth: None,
connector: http_connector,
};

let proxied_https_connector = HttpsConnectorBuilder::new()
.with_native_roots()
.expect("Failed to set native roots")
.https_only()
.enable_all_versions()
.wrap_connector(proxy_connector);

let socks_client = Client::builder(TokioExecutor::new())
.build::<_, Full<Bytes>>(proxied_https_connector);

Some(socks_client)
}
Err(e) => {
debug!(self.logger, "Failed to parse SOCKS address: {}", e);
None
}
}
}

fn compare_results(
&self,
result: &Result<http::Response<Incoming>, String>,
dl_result: &Result<http::Response<Incoming>, String>,
) {
match (result, dl_result) {
(Ok(_), Ok(_)) => {
debug!(self.logger, "SOCKS_PROXY_DL: Both requests succeeded");
}
(Err(_), Err(_)) => {
debug!(self.logger, "SOCKS_PROXY_DL: Both requests failed");
}
(Ok(_), Err(err)) => {
debug!(
self.logger,
"SOCKS_PROXY_DL: regular request succeeded, DL request failed with error {}",
err,
);
}
(Err(err), Ok(_)) => {
debug!(
self.logger,
"SOCKS_PROXY_DL: DL request succeeded, regular request failed with error {}",
err,
);
}
}
}

async fn https_outcall_dl_socks_proxy(
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
&self,
socks_proxy_addrs: &[String],
mihailjianu1 marked this conversation as resolved.
Show resolved Hide resolved
request: http::Request<Full<Bytes>>,
) -> Result<http::Response<Incoming>, String> {
let mut socks_proxy_addrs = socks_proxy_addrs.to_owned();

socks_proxy_addrs.shuffle(&mut thread_rng());

let mut last_error = None;

let mut tries = 0;

for socks_proxy_addr in &socks_proxy_addrs {
tries += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would convert the socks_proxy_addr to URI here and use the URI from here on. You will have less errors to handle and propagate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I would argue that working working with strings is easier.

First, we don't always need to parse to URI, only when there is a cache miss.

Second, the cache still works with string addresses, so we would still need to keep track of both the URI and the string address

if tries > MAX_SOCKS_PROXY_RETRIES {
break;
}
let next_socks_proxy_addr = socks_proxy_addr.clone();

let socks_client = {
let cache_guard = self.cache.upgradable_read();

if let Some(client) = cache_guard.get(&next_socks_proxy_addr) {
client.clone()
} else {
let mut cache_guard = RwLockUpgradableReadGuard::upgrade(cache_guard);
self.metrics.socks_cache_miss.inc();

match self.create_socks_client_for_address(&next_socks_proxy_addr) {
Some(client) => {
cache_guard.insert(next_socks_proxy_addr.clone(), client.clone());
self.metrics.socks_cache_size.set(cache_guard.len() as i64);
client
}
None => {
// there is something wrong with this address, try another one.
continue;
}
}
}
};

self.metrics.socks_connections_attempts.inc();

match socks_client.request(request.clone()).await {
Ok(resp) => {
self.metrics
.succesful_socks_connections
.with_label_values(&[&tries.to_string()])
.inc();
return Ok(resp);
}
Err(socks_err) => {
debug!(
self.logger,
"Failed to connect through SOCKS with address {}: {}",
next_socks_proxy_addr,
socks_err
);
last_error = Some(socks_err);
}
}
}

if let Some(last_error) = last_error {
Err(last_error.to_string())
} else {
Err("No SOCKS proxy addresses provided".to_string())
}
}
}
Expand Down Expand Up @@ -198,9 +354,18 @@ impl HttpsOutcallsService for CanisterHttp {
// fail fast because our interface does not have an ipv4 assigned.
Err(direct_err) => {
self.metrics.requests_socks.inc();
self.socks_client.request(http_req_clone).await.map_err(|e| {

let result = self.socks_client.request(http_req_clone.clone()).await.map_err(|e| {
format!("Request failed direct connect {direct_err} and connect through socks {e}")
})
});

if should_dl_socks_proxy() {
let dl_result= self.https_outcall_dl_socks_proxy(&req.socks_proxy_addrs, http_req_clone).await;

self.compare_results(&result, &dl_result);
}

result
}
Ok(resp)=> Ok(resp),
}
Expand Down Expand Up @@ -308,6 +473,15 @@ impl HttpsOutcallsService for CanisterHttp {
}
}

#[allow(clippy::absurd_extreme_comparisons)]
fn should_dl_socks_proxy() -> bool {
let mut rng = rand::thread_rng();
let random_number: u32 = rng.gen_range(0..100);
// This is a dark launch feature. We want to test the SOCKS proxy with a small percentage of requests.
// Currently this is set to 0%, hence always false.
random_number < REGISTRY_SOCKS_PROXY_DARK_LAUNCH_PERCENTAGE
}

fn validate_headers(raw_headers: Vec<HttpHeader>) -> Result<HeaderMap, Status> {
// Check we are within limit for number of headers.
if raw_headers.len() > HEADERS_LIMIT {
Expand Down
Loading
Loading