Crate timer_deque_rs

Crate timer_deque_rs 

Source
Expand description

§timer-deque-rs

§A crate which combines a timer with the different deque types.

§Timer dequeue

A deque which uses timer to deque objects.

§Two deque opeartion modes:

  • DequeOnce - after a timeout an item is removed from the queue.

  • DequePeriodic - after the timeout an item’s timeout is extended and item is returned back to queue. It should be manually removed.

§Features

§MIO

  • bsd_use_timerfd - use timerfd instead of kqueue
  • bsd_use_poll - use poll instread of kqueue

§Two deque types:

§TimerDequeConsumer consumes the item and places it on the queue.
let time_list = 
    OrderTimerDeque
        ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
        ::new("test_label".into(), 4, false, true).unwrap();

or

let time_list = 
    OrderTimerDeque
        ::<DequePeriodic, TimerDequeConsumer<Arc<TestItem>, _>>
        ::new("test_label".into(), 4, false, true).unwrap();
§TimerDequeTicketIssuer issues a ticket for each iem in the queue.
let time_list = 
    OrderTimerDeque
        ::<DequeOnce, TimerDequeTicketIssuer<_>>
        ::new("test_label".into(), 4, false, true).unwrap();

or

let time_list = 
    OrderTimerDeque
        ::<DequePeriodic, TimerDequeTicketIssuer<_>>
        ::new("test_label".into(), 4, false, true).unwrap();

§Timers polling

§Async poll using Tokio

let time_list = 
    OrderTimerDeque
        ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
        ::new("test_label_async".into(), 4, false, true)
           .unwrap();

let mut time_list = 
    AsyncFd::try_with_interest(time_list, Interest::READABLE).unwrap();
 
// poll for event
let mut guard = time_list.readable_mut().await.unwrap();
 
// process
let timeout_items = guard.get_inner_mut().async_poll_for_event_and_process().await.unwrap();
drop(guard);

§Async poll using SMOLL

For SMOLL the same method can be used as for Tokio.

§Async poll

Not very efficient but timer provides the Future, so by calling

    let timeout_items = guard.get_inner_mut().async_poll_for_event_and_process().await;

depending on the timer FD mode, this funcion will either block until result or return imidiatly with some result i.e WouldBlock.

§Sync poll

For the sync a Epoll for Linux and Kqueue for BSD are used. Both are wrapped into TimerPoll.

let ev_watch = TimerPoll::new().unwrap();
 
let time_list = 
    OrderTimerDeque
        ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
        ::new("test_label".into(), 4, false, true).unwrap();
    
// adds timer to event 
let mut time_list_poll = ev_watch.add(time_list).unwrap();
 
// adds timeout
let tss_set1 = DequeOnce::new(abs_time.clone().add_sec(3));
let ent1 = Arc::new(TestItem(1));
 
time_list_poll.get_inner_mut().add(ent1.clone(), tss_set1).unwrap();
 
// polling
let res = ev_watch.poll(Option::None).unwrap();
 
// processing
let timeout_items = time_list_poll.get_inner_mut().handle_timer_event(res.unwrap().pop().unwrap()).unwrap();

§Timer

A timer can be used directly.

// timer init as blocking
let timer = 
    TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME, TimerFlags::empty()).unwrap();
 
// setting the timout and timer mode
let abs_time = AbsoluteTime::now().add_sec(3);
 
// the flags will be set automatically
let exp_time = 
    TimerExpMode::<AbsoluteTime>::new_oneshot(abs_time);

// setting timer
   let res = 
       timer.set_time(exp_time);
 
// read timer
let ovf = timer.read().unwrap().unwrap();

The above is not too efficient because in case of nonblocking, it will return none immidiatly and in case of blocking it would block thread.

A Epoll, Select, KQueue can be used to poll timer more efficient. Or, for example, AsyncFd from tokio can be used too.

§Task spawn and executer

A parallel task execution based on the task timeout.

 
#[derive(Debug)]
struct TaskStruct1
{
    a1: u64,
    s: Sender<u64>,
}
 
impl TaskStruct1
{
    fn new(a1: u64, s: Sender<u64>) -> Self
    {
        return Self{ a1: a1, s };
    }
}
 
impl PeriodicTask for TaskStruct1
{
    fn exec(&mut self) -> PeriodicTaskResult
    {
        println!("taskstruct1 val: {}", self.a1);
 
        let _ = self.s.send(self.a1);
 
        return PeriodicTaskResult::Ok;
    }
}
 
// ...
 
let s = SyncPeriodicTasks::new(2.try_into().unwrap()).unwrap();
 
// ...
 
let task1 = TaskStruct1::new(0, send.clone());
   let task1_ptt = 
        PeriodicTaskTime::interval(RelativeTime::new_time(1, 0));
 
let task5 = TaskStruct1::new(4, send.clone());
   let task5_ptt = 
        PeriodicTaskTime::exact_time(AbsoluteTime::now() + RelativeTime::new_time(5, 0));
 
let task1_guard = s.add("task1", task1, task1_ptt).unwrap();
let task5_guard = s.add("task5", task5, task5_ptt).unwrap();
 
// ...

Re-exports§

pub extern crate bitflags;
pub extern crate chrono;
pub extern crate crossbeam_deque;
pub extern crate nix;
pub extern crate rand;

Re-exports§

pub use deque_timeout::DequeOnce;
pub use deque_timeout::DequePeriodic;
pub use deque_timeout::OrderTimerDeque;
pub use deque_timeout::OrderedTimerDequeMode;
pub use deque_timeout::timer_tickets::TimerDequeTicketIssuer;
pub use deque_timeout::timer_tickets::TimerDequeTicket;
pub use deque_timeout::timer_consumer::TimerDequeConsumer;
pub use timer_portable::TimerFd;
pub use timer_portable::TimerPoll;
pub use timer_portable::TimerReadRes;
pub use timer_portable::FdTimerCom;
pub use timer_portable::AbsoluteTime;
pub use timer_portable::RelativeTime;
pub use common::TimerDequeId;
pub use periodic_task::PeriodicTask;
pub use periodic_task::PeriodicTaskResult;
pub use periodic_task::SyncPeriodicTasks;
pub use periodic_task::PeriodicTaskTime;

Modules§

common
Common things.
deque_timeout
A base implementation of the sorted timer timeout queue.
error
Crates error handling.
periodic_task
A periodic task sheduler.
timer_portable
All code which should be ported to the specific OS. Contains a system timer implementation and poll.

Macros§

map_portable_err
map_timer_err
portable_err
timer_err