Skip to content

Commit

Permalink
fix log streaming (still not perfect)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepu105 committed Apr 22, 2021
1 parent 7ad6956 commit 55ba5fb
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 47 deletions.
21 changes: 10 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,26 @@ A simple terminal dashboard for Kubernetes built with Rust
- [ ] StatefulSets
- [ ] ReplicaSets
- Describe resources
- [ ] Pods
- [ ] Nodes
- [ ] Services
- [ ] Deployments
- [ ] ConfigMaps
- [ ] StatefulSets
- [ ] ReplicaSets
- [x] Pods
- [x] Nodes
- [ ] Services (simulated)
- [ ] Deployments (simulated)
- [ ] ConfigMaps (simulated)
- [ ] StatefulSets (simulated)
- [ ] ReplicaSets (simulated)
- [ ] as YAML
- Stream logs/events
- [ ] Pods
- [ ] Containers
- [x] Containers
- [ ] Services
- [ ] Deployments
- [ ] ConfigMaps
- [ ] StatefulSets
- [ ] ReplicaSets
- Context
- [x] Context info
- [x] Node metrics
- [x] Context watch
- [x] Change namespace?
- [ ] Context switch
- [ ] Resources utilizations
- [x] Dark/Light themes
- [ ] Custom keymap
- [ ] Custom theme
Expand Down
11 changes: 10 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ pub struct App {
pub context_tabs: TabsState,
pub show_info_bar: bool,
pub is_loading: bool,
pub is_streaming: bool,
pub is_routing: bool,
pub tick_until_poll: u64,
pub tick_count: u64,
Expand All @@ -168,6 +169,7 @@ pub struct App {
pub confirm: bool,
pub light_theme: bool,
pub refresh: bool,
pub log_auto_scroll: bool,
pub data: Data,
}

Expand Down Expand Up @@ -234,6 +236,7 @@ impl Default for App {
),
show_info_bar: true,
is_loading: false,
is_streaming: false,
is_routing: false,
tick_until_poll: 0,
tick_count: 0,
Expand All @@ -245,6 +248,7 @@ impl Default for App {
confirm: false,
light_theme: false,
refresh: true,
log_auto_scroll: true,
data: Data::default(),
}
}
Expand Down Expand Up @@ -365,7 +369,7 @@ impl App {
pub async fn dispatch_container_logs(&mut self, c: String) {
self.data.logs = LogsState::new(c);
self.push_navigation_stack(RouteId::Home, ActiveBlock::Logs);
self.dispatch_stream(IoStreamEvent::GetPodLogs).await;
self.dispatch_stream(IoStreamEvent::GetPodLogs(true)).await;
}

pub async fn on_tick(&mut self, first_render: bool) {
Expand Down Expand Up @@ -394,6 +398,11 @@ impl App {
ActiveBlock::Services => {
self.dispatch(IoEvent::GetServices).await;
}
ActiveBlock::Logs => {
if !self.is_streaming {
self.dispatch_stream(IoStreamEvent::GetPodLogs(false)).await;
}
}
_ => {}
}
}
Expand Down
79 changes: 66 additions & 13 deletions src/app/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use std::collections::VecDeque;

use super::ActiveBlock;
use tui::{
backend::Backend,
layout::Rect,
style::Style,
text::Span,
widgets::{List, ListItem, TableState},
widgets::{Block, List, ListItem, ListState, TableState},
Frame,
};

#[derive(Clone)]
Expand Down Expand Up @@ -163,19 +165,53 @@ pub struct LogsState {
/// (original_message, (wrapped_message, wrapped_at_width))
#[allow(clippy::type_complexity)]
records: VecDeque<(String, Option<(Vec<ListItem<'static>>, u16)>)>,
wrapped_length: usize,
pub state: ListState,
pub id: String,
}

impl LogsState {
pub fn new(id: String) -> LogsState {
LogsState {
records: VecDeque::with_capacity(512),
state: ListState::default(),
wrapped_length: 0,
id,
}
}

/// Get the current state as a list widget
pub fn get_list(&mut self, logs_area: Rect, style: Style) -> List {
pub fn scroll_down(&mut self) {
let i = self.state.selected().map_or(0, |i| {
if i >= self.wrapped_length.wrapping_sub(1) {
i
} else {
i + 1
}
});
self.state.select(Some(i));
}

pub fn scroll_up(&mut self) {
let i = self
.state
.selected()
.map_or(0, |i| if i != 0 { i - 1 } else { 0 });
self.state.select(Some(i));
}

fn unselect(&mut self) {
self.state.select(None);
}

/// Render the current state as a list widget
pub fn render_list<B: Backend>(
&mut self,
f: &mut Frame<B>,
logs_area: Rect,
block: Block,
style: Style,
follow: bool,
) {
let available_lines = logs_area.height as usize;
let logs_area_width = logs_area.width as usize;

Expand All @@ -186,12 +222,19 @@ impl LogsState {

let mut items = Vec::with_capacity(logs_area.height as usize);

let lines_to_skip = if follow {
self.unselect();
num_records.saturating_sub(available_lines)
} else {
0
};

items.extend(
self
.records
.iter_mut()
// Only wrap the records we could potentially be displaying
.skip(num_records.saturating_sub(available_lines))
.skip(lines_to_skip)
.map(|r| {
// See if we can use a cached wrapped line
if let Some(wrapped) = &r.1 {
Expand All @@ -218,18 +261,28 @@ impl LogsState {
.flatten(),
);

// TODO: we should be wrapping text with paragraph, but it currently
let lines_to_skip = if follow {
wrapped_lines_len.saturating_sub(available_lines)
} else {
0
};

let items = items
.into_iter()
// Wrapping could have created more lines than what we can display;
// skip them
.skip(lines_to_skip)
.collect::<Vec<_>>();

self.wrapped_length = items.len();

// TODO: All this is a workaround. we should be wrapping text with paragraph, but it currently
// doesn't support wrapping and staying scrolled to the bottom
//
// see https://github.com/fdehau/tui-rs/issues/89
List::new(
items
.into_iter()
// Wrapping could have created more lines than what we can display;
// skip them
.skip(wrapped_lines_len.saturating_sub(available_lines))
.collect::<Vec<_>>(),
)
let list = List::new(items).block(block);

f.render_stateful_widget(list, logs_area, &mut self.state);
}
/// Add a record to be displayed
pub fn add_record(&mut self, record: String) {
Expand Down
40 changes: 30 additions & 10 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub async fn handle_key_events(key: Key, app: &mut App) {

pub async fn handle_mouse_events(mouse: MouseEvent, app: &mut App) {
match mouse.kind {
MouseEventKind::ScrollDown => handle_scroll(app, false).await,
MouseEventKind::ScrollUp => handle_scroll(app, true).await,
MouseEventKind::ScrollDown => handle_scroll(app, true, true).await,
MouseEventKind::ScrollUp => handle_scroll(app, false, true).await,
_ => {}
}
}
Expand All @@ -101,11 +101,11 @@ fn handle_escape(app: &mut App) {
async fn handle_block_events(key: Key, app: &mut App) {
// handle scrolling with keys
match key {
_ if key == DEFAULT_KEYBINDING.up.key => {
handle_scroll(app, true).await;
}
_ if key == DEFAULT_KEYBINDING.down.key => {
handle_scroll(app, false).await;
handle_scroll(app, true, false).await;
}
_ if key == DEFAULT_KEYBINDING.up.key => {
handle_scroll(app, false, false).await;
}
_ => {}
}
Expand Down Expand Up @@ -198,6 +198,11 @@ async fn handle_block_events(key: Key, app: &mut App) {
ActiveBlock::Contexts => {
let _ctx = handle_table_action(key, &mut app.data.contexts);
}
ActiveBlock::Logs => {
if key == DEFAULT_KEYBINDING.log_auto_scroll.key {
app.log_auto_scroll = !app.log_auto_scroll;
}
}
_ => {
// do nothing
}
Expand All @@ -220,15 +225,23 @@ fn handle_table_action<T: Clone>(key: Key, item: &mut StatefulTable<T>) -> Optio
}

fn handle_table_scroll<T: Clone>(item: &mut StatefulTable<T>, down: bool) {
// this is inverse for trackpad
if down {
item.previous();
} else {
item.next();
}
}

async fn handle_scroll(app: &mut App, down: bool) {
// inverse direction for natural scrolling on mouse and keyboard
fn inverse_dir(down: bool, is_mouse: bool) -> bool {
if is_mouse {
down
} else {
!down
}
}

async fn handle_scroll(app: &mut App, down: bool, is_mouse: bool) {
match app.get_current_route().active_block {
ActiveBlock::Pods => handle_table_scroll(&mut app.data.pods, down),
ActiveBlock::Containers => handle_table_scroll(
Expand All @@ -243,9 +256,16 @@ async fn handle_scroll(app: &mut App, down: bool) {
ActiveBlock::Nodes => handle_table_scroll(&mut app.data.nodes, down),
ActiveBlock::Namespaces => handle_table_scroll(&mut app.data.namespaces, down),
ActiveBlock::Contexts => handle_table_scroll(&mut app.data.contexts, down),
ActiveBlock::Logs => {}
ActiveBlock::Logs => {
app.log_auto_scroll = false;
if inverse_dir(down, is_mouse) {
app.data.logs.scroll_down();
} else {
app.data.logs.scroll_up();
}
}
ActiveBlock::Describe => {
if down {
if inverse_dir(down, is_mouse) {
app.data.describe_out.scroll_down();
} else {
app.data.describe_out.scroll_up();
Expand Down
32 changes: 22 additions & 10 deletions src/network/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use anyhow::anyhow;
use k8s_openapi::api::core::v1::Pod;
use kube::Client;
use kube::{api::LogParams, Api};
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio_stream::StreamExt;

#[derive(Debug)]
pub enum IoStreamEvent {
RefreshClient,
GetPodLogs,
GetPodLogs(bool),
}

#[derive(Clone)]
Expand Down Expand Up @@ -41,8 +41,8 @@ impl<'a> NetworkStream<'a> {
IoStreamEvent::RefreshClient => {
self.refresh_client().await;
}
IoStreamEvent::GetPodLogs => {
self.stream_container_logs().await;
IoStreamEvent::GetPodLogs(tail) => {
self.stream_container_logs(tail).await;
}
};

Expand All @@ -55,7 +55,7 @@ impl<'a> NetworkStream<'a> {
app.handle_error(e);
}

pub async fn stream_container_logs(&self) {
pub async fn stream_container_logs(&self, tail: bool) {
let (namespace, pod_name, cont_name) = {
let app = self.app.lock().await;
if let Some(p) = app.data.pods.get_selected_item() {
Expand Down Expand Up @@ -83,22 +83,31 @@ impl<'a> NetworkStream<'a> {
container: Some(cont_name.clone()),
follow: true,
previous: false,
timestamps: true,
tail_lines: Some(20),
// timestamps: true,
tail_lines: if tail { Some(10) } else { Some(0) },
..Default::default()
};

{
let mut app = self.app.lock().await;
app.is_streaming = true;
}

// TODO investigate why this stops working at times
match pods.log_stream(&pod_name, &lp).await {
Ok(mut logs) => {
Ok(logs) => {
// set a timeout so we dont wait for next item and block the thread
let logs = logs.timeout(Duration::from_secs(5));
tokio::pin!(logs);

#[allow(clippy::eval_order_dependence)]
while let (true, Some(line)) = (
while let (true, Ok(Some(Ok(line)))) = (
{
let app = self.app.lock().await;
app.get_current_route().active_block == ActiveBlock::Logs
|| app.data.logs.id == cont_name
},
logs.try_next().await.unwrap_or(None),
logs.try_next().await,
) {
let line = String::from_utf8_lossy(&line).trim().to_string();
if !line.is_empty() {
Expand All @@ -111,5 +120,8 @@ impl<'a> NetworkStream<'a> {
self.handle_error(anyhow!(e)).await;
}
};

let mut app = self.app.lock().await;
app.is_streaming = false;
}
}
6 changes: 4 additions & 2 deletions src/ui/overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,10 @@ fn draw_logs<B: Backend>(f: &mut Frame<B>, app: &mut App, area: Rect) {
let block = layout_block_top_border_span(title);

if container_name == app.data.logs.id {
let list = app.data.logs.get_list(area, style_primary()).block(block);
f.render_widget(list, area);
app
.data
.logs
.render_list(f, area, block, style_primary(), app.log_auto_scroll);
} else {
loading(f, block, area, app.is_loading);
}
Expand Down

0 comments on commit 55ba5fb

Please sign in to comment.