aboutsummaryrefslogtreecommitdiff
path: root/src/jobs.rs
blob: a01a70d489c54b56c4f64d2733076a276eccaf2a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::collections::HashSet;
use std::path::Path;

use chrono::Local;
use tokio::fs;
use tokio::time::{sleep, Duration};
use tokio_rusqlite::Connection;

use crate::db;

pub async fn start(db_conn: Connection, files_dir: String) {
    loop {
        log::info!("Starting removing expired files");
        cleanup_expired(&db_conn, &files_dir).await;

        // Sleeping 1 day
        sleep(Duration::from_secs(24 * 60 * 60)).await;
    }
}

async fn cleanup_expired(db_conn: &Connection, files_dir: &String) {
    let time = Local::now();

    match read_dir(files_dir).await {
        Err(msg) => log::error!("Listing files: {msg}"),
        Ok(files) => match db::list_expire_after(db_conn, time).await {
            Err(msg) => log::error!("Getting non expirable files: {msg}"),
            Ok(non_expirable) => {
                let non_expirable = HashSet::<String>::from_iter(non_expirable.iter().cloned());
                let expired_ids = files.difference(&non_expirable);
                let count = remove_files(files_dir, expired_ids.cloned()).await;
                log::info!("Removed {} files", count);
                if let Err(msg) = db::remove_expire_before(db_conn, time).await {
                    log::error!("Removing files: {msg}")
                }
            }
        },
    }
}

async fn read_dir(files_dir: &String) -> Result<HashSet<String>, String> {
    match fs::read_dir(files_dir).await {
        Err(msg) => Err(msg.to_string()),
        Ok(mut read_dir) => {
            let mut files = HashSet::<String>::new();
            loop {
                let entry = read_dir.next_entry().await;
                match entry {
                    Ok(Some(entry)) => match entry.file_name().into_string() {
                        Ok(filename) => {
                            files.insert(filename.clone());
                        }
                        Err(_) => log::error!("Decoding filename"),
                    },
                    Ok(None) => break,
                    Err(msg) => log::error!("File entry: {msg}"),
                }
            }
            Ok(files)
        }
    }
}

async fn remove_files<I>(files_dir: &String, ids: I) -> i32
where
    I: Iterator<Item = String>,
{
    let mut count = 0;
    for id in ids {
        let path = Path::new(&files_dir).join(id.clone());
        match fs::remove_file(path).await {
            Err(msg) => log::error!("Removing file: {msg}"),
            Ok(_) => count += 1
        }
    }
    count
}