Skip to content

Commit

Permalink
making things async
Browse files Browse the repository at this point in the history
  • Loading branch information
alexpasmantier committed Sep 25, 2024
1 parent 98b5830 commit 791ad72
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 223 deletions.
16 changes: 11 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ pub enum Mode {
}

impl App {
pub fn new(channel: UnitTvChannel, tick_rate: f64, frame_rate: f64) -> Result<Self> {
pub async fn new(channel: UnitTvChannel, tick_rate: f64, frame_rate: f64) -> Result<Self> {
let (action_tx, action_rx) = mpsc::unbounded_channel();
let (render_tx, _) = mpsc::unbounded_channel();
let event_loop = EventLoop::new(tick_rate, true);
let television = Arc::new(Mutex::new(Television::new(channel)));
let television = Arc::new(Mutex::new(Television::new(channel).await));

Ok(Self {
tick_rate,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl App {
action_tx.send(action)?;
}

self.handle_actions()?;
self.handle_actions().await?;

if self.should_quit {
self.event_abort_tx.send(())?;
Expand Down Expand Up @@ -195,7 +195,7 @@ impl App {
}
}

fn handle_actions(&mut self) -> Result<()> {
async fn handle_actions(&mut self) -> Result<()> {
while let Ok(action) = self.action_rx.try_recv() {
if action != Action::Tick && action != Action::Render {
debug!("{action:?}");
Expand All @@ -219,7 +219,13 @@ impl App {
Action::Render => self.render_tx.send(RenderingTask::Render)?,
_ => {}
}
if let Some(action) = self.television.lock().unwrap().update(action.clone())? {
if let Some(action) = self
.television
.lock()
.unwrap()
.update(action.clone())
.await?
{
self.action_tx.send(action)?
};
}
Expand Down
4 changes: 1 addition & 3 deletions src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use crate::{action::Action, config::Config};

pub mod channels;
mod finders;
pub mod fps;
pub mod home;
mod input;
mod pickers;
mod previewers;
Expand Down Expand Up @@ -70,7 +68,7 @@ pub trait Component: Send {
/// # Returns
///
/// * `Result<Option<Action>>` - An action to be processed or none.
fn update(&mut self, action: Action) -> Result<Option<Action>> {
async fn update(&mut self, action: Action) -> Result<Option<Action>> {
let _ = action; // to appease clippy
Ok(None)
}
Expand Down
10 changes: 5 additions & 5 deletions src/components/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ pub enum TvChannel {
}

impl TvChannel {
pub fn load_entries(&mut self, pattern: &str) -> Result<()> {
pub async fn load_entries(&mut self, pattern: &str) -> Result<()> {
match self {
TvChannel::Env(picker) => picker.load_entries(pattern),
TvChannel::Files(picker) => picker.load_entries(pattern),
TvChannel::Env(picker) => picker.load_entries(pattern).await,
TvChannel::Files(picker) => picker.load_entries(pattern).await,
}
}

Expand Down Expand Up @@ -43,10 +43,10 @@ impl TvChannel {
}
}

pub fn get_tv_channel(channel: UnitTvChannel) -> TvChannel {
pub async fn get_tv_channel(channel: UnitTvChannel) -> TvChannel {
match channel {
UnitTvChannel::ENV => TvChannel::Env(pickers::env::EnvVarPicker::new()),
UnitTvChannel::FILES => TvChannel::Files(pickers::files::FilePicker::new()),
UnitTvChannel::FILES => TvChannel::Files(pickers::files::FilePicker::new().await),
_ => unimplemented!(),
}
}
3 changes: 2 additions & 1 deletion src/components/finders.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::Stream;
use rust_devicons::FileIcon;

mod env;
Expand Down Expand Up @@ -42,5 +43,5 @@ pub const ENTRY_PLACEHOLDER: Entry = Entry {
/// # Methods
/// - `find`: Find entries based on a pattern.
pub trait Finder {
fn find(&mut self, pattern: &str) -> impl Iterator<Item = Entry>;
async fn find(&mut self, pattern: &str) -> impl Stream<Item = Entry>;
}
5 changes: 3 additions & 2 deletions src/components/finders/env.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{cmp::max, collections::HashMap, env::vars, path::Path};

use futures::{stream, Stream};
use fuzzy_matcher::skim::SkimMatcherV2;
use rust_devicons::{icon_for_file, File, FileIcon};

Expand Down Expand Up @@ -34,7 +35,7 @@ impl EnvVarFinder {
}

impl Finder for EnvVarFinder {
fn find(&mut self, pattern: &str) -> impl Iterator<Item = Entry> {
async fn find(&mut self, pattern: &str) -> impl Stream<Item = Entry> {
let mut results: Vec<Entry> = Vec::new();
// try to get from cache
if let Some(entries) = self.cache.get(pattern) {
Expand Down Expand Up @@ -87,6 +88,6 @@ impl Finder for EnvVarFinder {
// cache the results
self.cache.insert(pattern.to_string(), results.clone());
}
results.into_iter()
stream::iter(results)
}
}
166 changes: 117 additions & 49 deletions src/components/finders/files.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use futures::{stream, StreamExt};
use std::{
collections::HashMap,
path::{Path, PathBuf},
};

use fuzzy_matcher::skim::SkimMatcherV2;
use ignore::{types::TypesBuilder, WalkBuilder};
use tokio::sync::mpsc;
use tracing::info;

use crate::{
components::finders::{Entry, Finder},
Expand All @@ -19,8 +22,8 @@ pub struct FileFinder {
}

impl FileFinder {
pub fn new() -> Self {
let files = load_files(&std::env::current_dir().unwrap());
pub async fn new() -> Self {
let files = load_files(&std::env::current_dir().unwrap()).await;
FileFinder {
current_directory: std::env::current_dir().unwrap(),
files,
Expand All @@ -30,67 +33,98 @@ impl FileFinder {
}
}

/// TODO: we might need to tweak this a bit -> read the paper associated to the matcher to get
/// a sense of what the threshold should be
const FUZZY_THRESHOLD: i64 = 2;

impl Finder for FileFinder {
fn find(&mut self, pattern: &str) -> impl Iterator<Item = Entry> {
//async fn find(&mut self, pattern: &str) -> impl Iterator<Item = Entry> {
// let mut results: Vec<Entry> = Vec::new();
// // try to get from cache
// if let Some(entries) = self.cache.get(pattern) {
// results.extend(entries.iter().cloned());
// } else {
// for file in &self.files {
// let rel_path = file.strip_prefix(&self.current_directory).unwrap();
// let rel_path_str = rel_path.to_string_lossy();
// if !pattern.is_empty() {
// if let Some((score, indices)) = self.matcher.fuzzy(&rel_path_str, pattern, true)
// {
// if score < FUZZY_THRESHOLD {
// continue;
// }
// results.push(Entry {
// name: rel_path_str.to_string(),
// display_name: None,
// preview: None,
// score,
// name_match_ranges: Some(indices.iter().map(|i| (*i, *i + 1)).collect()),
// preview_match_ranges: None,
// icon: None,
// line_number: None,
// });
// }
// }
// }
// self.cache.insert(pattern.to_string(), results.clone());
// }
// results.into_iter()
//}

async fn find(&mut self, pattern: &str) -> impl stream::Stream<Item = Entry> {
let mut results: Vec<Entry> = Vec::new();
// try to get from cache

// Try to get from cache asynchronously
if let Some(entries) = self.cache.get(pattern) {
results.extend(entries.iter().cloned());
} else {
for file in &self.files {
let rel_path = file.strip_prefix(&self.current_directory).unwrap();
let rel_path_str = rel_path.to_string_lossy();
if !pattern.is_empty() {
if let Some((score, indices)) = self.matcher.fuzzy(&rel_path_str, pattern, true)
{
if score < FUZZY_THRESHOLD {
continue;
let entries = stream::iter(self.files)
.filter_map(|file| {
let rel_path = file.strip_prefix(&self.current_directory).ok()?;
let rel_path_str = rel_path.to_string_lossy().to_string();
let matcher = self.matcher.clone(); // Clone matcher to use inside async tasks
let pattern = pattern.to_string(); // Clone pattern for async task

// Spawn async block for each file
async move {
if !pattern.is_empty() {
if let Some((score, indices)) =
matcher.fuzzy(&rel_path_str, &pattern, true)
{
if score >= FUZZY_THRESHOLD {
// If score is acceptable, return the Entry
return Some(Entry {
name: rel_path_str,
display_name: None,
preview: None,
score,
name_match_ranges: Some(
indices.iter().map(|i| (*i, *i + 1)).collect(),
),
preview_match_ranges: None,
icon: None,
line_number: None,
});
}
}
}
results.push(Entry {
name: rel_path_str.to_string(),
display_name: None,
preview: None,
score,
name_match_ranges: Some(indices.iter().map(|i| (*i, *i + 1)).collect()),
preview_match_ranges: None,
icon: None,
line_number: None,
});
None
}
}
}
})
.buffer_unordered(10) // Limit concurrent tasks
.collect::<Vec<Option<Entry>>>()
.await;

results.extend(entries.into_iter().filter_map(|entry| entry));
self.cache.insert(pattern.to_string(), results.clone());
}
results.into_iter()

stream::iter(results) // Return the results as a Stream
}
}

const DEFAULT_RECV_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(250);

fn load_files(path: &Path) -> Vec<PathBuf> {
let (tx, rx) = std::sync::mpsc::channel();
let walker = walk_builder(path, default_num_threads().into()).build_parallel();
walker.run(|| {
let tx = tx.clone();
Box::new(move |result| {
if let Ok(entry) = result {
if entry.file_type().unwrap().is_file() {
tx.send(entry.path().to_path_buf()).unwrap();
}
ignore::WalkState::Continue
} else {
ignore::WalkState::Continue
}
})
});

let mut files = Vec::new();
while let Ok(file) = rx.recv_timeout(DEFAULT_RECV_TIMEOUT) {
files.push(file);
}
files
lazy_static::lazy_static! {
static ref DEFAULT_NUM_THREADS: usize = default_num_threads().into();
}

fn walk_builder(path: &Path, n_threads: usize) -> WalkBuilder {
Expand All @@ -104,3 +138,37 @@ fn walk_builder(path: &Path, n_threads: usize) -> WalkBuilder {
builder.threads(n_threads);
builder
}

async fn load_files(path: &Path) -> Vec<PathBuf> {
let (tx, mut rx) = mpsc::channel(100);
let path = path.to_owned();

// Spawn a blocking task for the file walker
tokio::task::spawn_blocking(move || {
let walker = WalkBuilder::new(path)
.threads(*DEFAULT_NUM_THREADS)
.build_parallel();

walker.run(|| {
let tx = tx.clone();
Box::new(move |result| {
if let Ok(entry) = result {
if entry.file_type().unwrap().is_file() {
// Send the path via the async channel
let _ = tx.blocking_send(entry.path().to_path_buf());
}
}
ignore::WalkState::Continue
})
});
});

// Collect the files asynchronously
let mut files = Vec::new();
while let Some(file) = rx.recv().await {
files.push(file);
}
info!("Loaded {} files", files.len());

files
}
Loading

0 comments on commit 791ad72

Please sign in to comment.