diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 0b5751f7b..222a6af55 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use bytes::Bytes; -use fred::clients::RedisPool; +use fred::clients::{RedisClient, RedisPool, SubscriberClient}; use fred::interfaces::{ClientLike, KeysInterface, PubsubInterface}; use fred::types::{Builder, ConnectionConfig, PerformanceConfig, ReconnectPolicy, RedisConfig}; use nativelink_config::stores::RedisMode; @@ -46,6 +46,10 @@ pub struct RedisStore { /// A channel to publish updates to when a key is added, removed, or modified. pub_sub_channel: Option, + /// A redis client for managing subscriptions. + /// TODO: This should be moved into the store in followups once a standard use pattern has been determined. + subscriber_client: SubscriberClient, + /// A function used to generate names for temporary keys. temp_name_generator_fn: fn() -> String, @@ -114,12 +118,17 @@ impl RedisStore { .build_pool(CONNECTION_POOL_SIZE) .err_tip(|| "while creating redis connection pool")?; + let subscriber_client = builder + .build_subscriber_client() + .err_tip(|| "while creating redis subscriber client")?; // Fires off a background task using `tokio::spawn`. client_pool.connect(); + subscriber_client.connect(); Ok(Self { client_pool, pub_sub_channel, + subscriber_client, temp_name_generator_fn, key_prefix, }) @@ -147,6 +156,17 @@ impl RedisStore { } } } + + // TODO: These helpers eventually should not be necessary, as they are only used for functionality + // that could hypothetically be moved behind this API with some non-trivial logic adjustments + // and the addition of one or two new endpoints. + pub fn get_subscriber_client(&self) -> SubscriberClient { + self.subscriber_client.clone() + } + + pub fn get_client(&self) -> RedisClient { + self.client_pool.next().clone() + } } #[async_trait]