diff --git a/src/data/v2/stream.rs b/src/data/v2/stream.rs index f0bbcc9f..9ef4dfa7 100644 --- a/src/data/v2/stream.rs +++ b/src/data/v2/stream.rs @@ -61,22 +61,22 @@ use crate::Error; use crate::Str; -type UserMessage = as subscribe::Message>::UserMessage; +type UserMessage = as subscribe::Message>::UserMessage; /// Helper function to drive a [`Subscription`] related future to /// completion. The function makes sure to poll the provided stream, /// which is assumed to be associated with the `Subscription` that the /// future belongs to, so that control messages can be received. #[inline] -pub async fn drive( +pub async fn drive( future: F, stream: &mut S, -) -> Result> +) -> Result> where F: Future + Unpin, - S: FusedStream> + Unpin, + S: FusedStream> + Unpin, { - subscribe::drive::, _, _>(future, stream).await + subscribe::drive::, _, _>(future, stream).await } @@ -193,6 +193,36 @@ where impl private::Sealed for CustomUrl {} +/// A realtime data source for the news feed (currently in beta). +/// +/// More details can be found here: https://alpaca.markets/docs/api-references/market-data-api/news-data/realtime/ +/// +/// This feed can be used like so: +/// ```no_run +/// let (mut stream, mut subscription) = client +/// .subscribe::>() +/// .await +/// .unwrap(); +/// # }) +/// ``` +#[derive(Clone, Copy, Debug)] +pub struct NewsFeed; + +impl ToString for NewsFeed { + fn to_string(&self) -> String { + "wss://stream.data.alpaca.markets/v1beta1/news".into() + } +} + +impl Source for NewsFeed { + #[inline] + fn source() -> SourceVariant { + SourceVariant::Url(NewsFeed.to_string()) + } +} + +impl private::Sealed for NewsFeed {} + /// A symbol. pub type Symbol = Str; @@ -275,6 +305,39 @@ pub struct Bar { pub timestamp: DateTime, } +/// A news from the news feed. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct News { + /// News article ID + pub id: Num, + /// Headline or title of the article + #[serde(rename = "headline")] + pub headline: String, + /// Summary text for article (may be first sentence of content). + #[serde(rename = "summary")] + pub summary: String, + /// Content of news article (might contain HTML) + #[serde(rename = "content")] + pub content: String, + /// Source where the news originated from (e.g. Benzinga) + #[serde(rename = "source")] + pub source: String, + /// List of related or mentioned symbols + #[serde(rename = "symbols")] + pub symbols: Vec, + /// Original author of news article + #[serde(rename = "author")] + pub author: String, + /// URL of article (if applicable) + #[serde(rename = "url")] + pub url: String, + /// Date article was created + #[serde(rename = "created_at")] + pub created_at: DateTime, + /// Date article was updated + #[serde(rename = "updated_at")] + pub updated_at: DateTime, +} /// A quote for an equity. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -340,10 +403,13 @@ pub struct StreamApiError { #[doc(hidden)] #[serde(tag = "T")] #[allow(clippy::large_enum_variant)] -pub enum DataMessage { +pub enum DataMessage { /// A variant representing aggregate data for a given symbol. #[serde(rename = "b")] Bar(B), + /// A variant representing a news for a given set of symbols. + #[serde(rename = "n")] + News(N), /// A variant representing a quote for a given symbol. #[serde(rename = "q")] Quote(Q), @@ -366,9 +432,11 @@ pub enum DataMessage { /// A data item as received over our websocket channel. #[derive(Debug)] #[non_exhaustive] -pub enum Data { +pub enum Data { /// A variant representing aggregate data for a given symbol. Bar(B), + /// A variant representing a news for a set of symbols. + News(N), /// A variant representing quote data for a given symbol. Quote(Q), /// A variant representing trade data for a given symbol. @@ -382,6 +450,12 @@ impl Data { matches!(self, Self::Bar(..)) } + /// Check whether this object is of the `News` variant. + #[inline] + pub fn is_news(&self) -> bool { + matches!(self, Self::News(..)) + } + /// Check whether this object is of the `Quote` variant. #[inline] pub fn is_quote(&self) -> bool { @@ -411,17 +485,18 @@ pub enum ControlMessage { /// A websocket message that we tried to parse. -type ParsedMessage = - MessageResult, JsonError>, WebSocketError>; +type ParsedMessage = + MessageResult, JsonError>, WebSocketError>; -impl subscribe::Message for ParsedMessage { - type UserMessage = Result, JsonError>, WebSocketError>; +impl subscribe::Message for ParsedMessage { + type UserMessage = Result, JsonError>, WebSocketError>; type ControlMessage = ControlMessage; fn classify(self) -> subscribe::Classification { match self { MessageResult::Ok(Ok(message)) => match message { DataMessage::Bar(bar) => subscribe::Classification::UserMessage(Ok(Ok(Data::Bar(bar)))), + DataMessage::News(news) => subscribe::Classification::UserMessage(Ok(Ok(Data::News(news)))), DataMessage::Quote(quote) => { subscribe::Classification::UserMessage(Ok(Ok(Data::Quote(quote)))) }, @@ -581,6 +656,9 @@ pub struct MarketData { /// The aggregate bars to subscribe to. #[serde(default)] pub bars: Symbols, + /// The news to subscribe to. + #[serde(default)] + pub news: Symbols, /// The quotes to subscribe to. #[serde(default)] pub quotes: Symbols, @@ -600,6 +678,16 @@ impl MarketData { self.bars = Symbols::List(symbols.into()); } + /// A convenience function for setting the [`news`][MarketData::news] + /// member. + #[inline] + pub fn set_news(&mut self, symbols: S) + where + S: Into, + { + self.news = Symbols::List(symbols.into()); + } + /// A convenience function for setting the [`quotes`][MarketData::quotes] /// member. #[inline] @@ -656,18 +744,18 @@ pub enum Request<'d> { /// the associated [`MessageStream`] stream needs to be polled; /// consider using the [`drive`] function for that purpose #[derive(Debug)] -pub struct Subscription { +pub struct Subscription { /// Our internally used subscription object for sending control /// messages. - subscription: subscribe::Subscription, wrap::Message>, + subscription: subscribe::Subscription, wrap::Message>, /// The currently active individual market data subscriptions. subscriptions: MarketData, } -impl Subscription { +impl Subscription { /// Create a `Subscription` object wrapping the `websocket_util` based one. #[inline] - fn new(subscription: subscribe::Subscription, wrap::Message>) -> Self { + fn new(subscription: subscribe::Subscription, wrap::Message>) -> Self { Self { subscription, subscriptions: MarketData::default(), @@ -675,7 +763,7 @@ impl Subscription { } } -impl Subscription +impl Subscription where S: Sink + Unpin, { @@ -781,18 +869,18 @@ where } -type ParseFn = fn( +type ParseFn = fn( Result, -) -> Result>, JsonError>, WebSocketError>; -type MapFn = - fn(Result, JsonError>, WebSocketError>) -> ParsedMessage; -type Stream = Map< +) -> Result>, JsonError>, WebSocketError>; +type MapFn = + fn(Result, JsonError>, WebSocketError>) -> ParsedMessage; +type Stream = Map< Unfold< - Map>>, ParseFn>, - DataMessage, + Map>>, ParseFn>, + DataMessage, JsonError, >, - MapFn, + MapFn, >; @@ -804,35 +892,37 @@ type Stream = Map< /// [`Quote`], and [`Trade`], respectively) that are provided by the /// library. #[derive(Debug)] -pub struct RealtimeData { +pub struct RealtimeData { /// Phantom data to make sure that we "use" `S`. - _phantom: PhantomData<(S, B, Q, T)>, + _phantom: PhantomData<(S, B, N, Q, T)>, } #[async_trait] -impl Subscribable for RealtimeData +impl Subscribable for RealtimeData where S: Source, B: Send + Unpin + Debug + DeserializeOwned, + N: Send + Unpin + Debug + DeserializeOwned, Q: Send + Unpin + Debug + DeserializeOwned, T: Send + Unpin + Debug + DeserializeOwned, { type Input = ApiInfo; - type Subscription = Subscription, wrap::Message>, B, Q, T>; - type Stream = Fuse>, ParsedMessage>>; + type Subscription = Subscription, wrap::Message>, B, N, Q, T>; + type Stream = Fuse>, ParsedMessage>>; async fn connect(api_info: &Self::Input) -> Result<(Self::Stream, Self::Subscription), Error> { - fn parse( + fn parse( result: Result, - ) -> Result>, JsonError>, WebSocketError> + ) -> Result>, JsonError>, WebSocketError> where B: DeserializeOwned, + N: DeserializeOwned, Q: DeserializeOwned, T: DeserializeOwned, { result.map(|message| match message { - wrap::Message::Text(string) => json_from_str::>>(&string), - wrap::Message::Binary(data) => json_from_slice::>>(&data), + wrap::Message::Text(string) => json_from_str::>>(&string), + wrap::Message::Binary(data) => json_from_slice::>>(&data), }) } @@ -855,9 +945,9 @@ where let stream = Unfold::new( connect(&url) .await? - .map(parse:: as ParseFn<_, _, _>), + .map(parse:: as ParseFn<_, _, _, _>), ) - .map(MessageResult::from as MapFn); + .map(MessageResult::from as MapFn); let (send, recv) = stream.split(); let (stream, subscription) = subscribe::subscribe(recv, send); let mut stream = stream.fuse(); @@ -1087,7 +1177,7 @@ mod tests { "t": "2022-01-18T23:09:42.151875584Z" }"#; - let message = json_from_str::>(json).unwrap(); + let message = json_from_str::>(json).unwrap(); let quote = match &message { DataMessage::Quote(quote) => quote, _ => panic!("Decoded unexpected message variant: {message:?}"), @@ -1107,7 +1197,7 @@ mod tests { ); assert_eq!( - json_from_str::>(&to_json(&message).unwrap()).unwrap(), + json_from_str::>(&to_json(&message).unwrap()).unwrap(), message ); } @@ -1200,7 +1290,7 @@ mod tests { "u": "corrected" }"#; - let message = json_from_str::>(json).unwrap(); + let message = json_from_str::>(json).unwrap(); let trade = match &message { DataMessage::Trade(trade) => trade, _ => panic!("Decoded unexpected message variant: {message:?}"), @@ -1220,7 +1310,7 @@ mod tests { assert_eq!(trade.update, Some("corrected".to_string())); assert_eq!( - json_from_str::>(&to_json(&message).unwrap()).unwrap(), + json_from_str::>(&to_json(&message).unwrap()).unwrap(), message ); } @@ -1513,6 +1603,50 @@ mod tests { } } + /// Check that we can stream realtime news. + /// + /// Note that we do not have any control over whether the market is + /// open or not and as such we can only try on a best-effort basis to + /// receive and decode updates. + #[test(tokio::test)] + #[serial(realtime_data)] + async fn stream_news() { + async fn test() + where + S: Source, + { + let api_info = ApiInfo::from_env().unwrap(); + let client = Client::new(api_info); + let (mut stream, mut subscription) = client.subscribe::>().await.unwrap(); + + let mut data = MarketData::default(); + data.set_news(["SPY"]); + + let subscribe = subscription.subscribe(&data).boxed_local(); + let () = drive(subscribe, &mut stream) + .await + .unwrap() + .unwrap() + .unwrap(); + + let read = stream + .map_err(Error::WebSocket) + .try_for_each(|result| async { + result + .map(|data| { + assert!(data.is_news()); + }) + .map_err(Error::Json) + }); + + if timeout(Duration::from_millis(100), read).await.is_ok() { + panic!("realtime data stream got exhausted unexpectedly") + } + } + + test::().await; + } + /// Check that we can stream realtime stock quotes. /// /// Note that we do not have any control over whether the market is