use sep2_common::packages::{
drlc::{DemandResponseProgram, EndDeviceControl},
identification::ResponseStatus,
objects::EventStatusType as EventStatus,
types::MRIDType,
};
use crate::{
client::SEPResponse,
event::{EIPair, EIStatus, EventCallback, EventInstance, Schedule},
};
use std::{
sync::{atomic::AtomicI64, Arc},
time::Duration,
};
use tokio::sync::{broadcast::Receiver, RwLock};
use crate::{
client::Client,
device::SEDevice,
event::{Events, Scheduler},
};
fn drlc_supersedes<'a>(
a: EIPair<'a, EndDeviceControl>,
b: EIPair<'a, EndDeviceControl>,
) -> Option<(EIPair<'a, EndDeviceControl>, EIPair<'a, EndDeviceControl>)> {
if a.0.does_supersede(b.0) {
Some((b, a))
} else if b.0.does_supersede(a.0) {
Some((a, b))
} else {
None
}
}
impl Schedule<EndDeviceControl> {
async fn drlc_start_task(self, mut rx: Receiver<()>) {
loop {
tokio::select! {
_ = tokio::time::sleep(self.tickrate) => (),
_ = rx.recv() => {
log::info!("EndDeviceControlSchedule: Shutting down event start task...");
break
},
}
let mut events = self.events.write().await;
let mrid = match events.next_start() {
Some((time, mrid)) if time < self.schedule_time().into() => mrid,
_ => continue,
};
events.update_event(&mrid, EIStatus::Active);
let events = events.downgrade();
let target = events.get(&mrid).unwrap();
let resp = (self.handler)(target).await;
self.auto_drlc_response(target.event(), resp).await;
}
}
async fn drlc_end_task(self, mut rx: Receiver<()>) {
loop {
tokio::select! {
_ = tokio::time::sleep(self.tickrate) => (),
_ = rx.recv() => {
log::info!("EndDeviceControlSchedule: Shutting down event end task...");
break
},
}
let mut events = self.events.write().await;
let mrid = match events.next_end() {
Some((time, mrid)) if time < self.schedule_time().into() => mrid,
_ => continue,
};
events.update_event(&mrid, EIStatus::Complete);
let events = events.downgrade();
let target = events.get(&mrid).unwrap();
let resp = (self.handler)(target).await;
self.auto_drlc_response(target.event(), resp).await;
}
}
async fn cancel_enddevicecontrol(
&mut self,
target_mrid: &MRIDType,
current_status: EIStatus,
cancel_reason: EIStatus,
) {
let mut events = self.events.write().await;
events.cancel_event(target_mrid, cancel_reason, self.schedule_time().into());
let events = events.downgrade();
let ei = events.get(target_mrid).unwrap();
let resp = if current_status == EIStatus::Active {
(self.handler)(ei).await
} else {
ResponseStatus::EventCancelled
};
self.auto_drlc_response(ei.event(), resp).await;
}
async fn auto_drlc_response(&self, event: &EndDeviceControl, status: ResponseStatus) {
match self
.client
.send_drlc_response(
&*self.device.read().await,
event,
status,
self.schedule_time(),
)
.await
{
Ok(
e @ (SEPResponse::BadRequest(_)
| SEPResponse::NotFound
| SEPResponse::MethodNotAllowed(_)),
) => {
log::warn!(
"Client: DRLC response POST attempt failed with HTTP status code: {}",
e
);
}
Err(e) => log::warn!(
"Client: DRLC response POST attempt failed with reason: {}",
e
),
Ok(r @ (SEPResponse::Created(_) | SEPResponse::NoContent)) => {
log::info!(
"Client: DRLC response POST attempt succeeded with reason: {}",
r
)
}
}
}
}
#[async_trait::async_trait]
impl Scheduler<EndDeviceControl> for Schedule<EndDeviceControl> {
type Program = DemandResponseProgram;
fn new(
client: Client,
device: Arc<RwLock<SEDevice>>,
handler: impl EventCallback<EndDeviceControl>,
tickrate: Duration,
) -> Self {
let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
let out = Schedule {
client,
device,
events: Arc::new(RwLock::new(Events::new())),
handler: Arc::new(move |ei| {
let handler = handler.clone();
Box::pin(async move { handler.event_update(ei).await })
}),
bc_sd: tx.clone(),
tickrate,
time_offset: Arc::new(AtomicI64::new(0)),
};
tokio::spawn(out.clone().clean_events(rx));
tokio::spawn(out.clone().drlc_start_task(tx.subscribe()));
tokio::spawn(out.clone().drlc_end_task(tx.subscribe()));
out
}
async fn add_event(&mut self, event: EndDeviceControl, program: &Self::Program, server_id: u8) {
let mrid = event.mrid;
let incoming_status = event.event_status.current_status;
if !event
.device_category
.intersects(self.device.read().await.device_categories)
{
log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) does not target this category of device. Not scheduling event.");
return;
}
let cur = { self.events.read().await.get(&mrid).map(|e| e.status()) };
if let Some(current_status) = cur {
match (current_status, incoming_status) {
(EIStatus::Active, EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded) => {
log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has been marked as superseded by the server, yet it is active locally. The event will be cancelled");
self.cancel_enddevicecontrol(&mrid, current_status, incoming_status.into()).await;
},
(EIStatus::Scheduled, EventStatus::Cancelled | EventStatus::CancelledRandom) => {
log::info!("EndDeviceControlSchedule: EndDeviceControl ({mrid} has been marked as cancelled by the server. It will not be started");
self.cancel_enddevicecontrol(&mrid, current_status, incoming_status.into()).await;
},
(EIStatus::Scheduled, EventStatus::Active) => {
log::info!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has entered it's earliest effective start time.")
}
(EIStatus::Scheduled, EventStatus::Superseded) =>
log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has been marked as superseded by the server, yet it is not locally."),
(EIStatus::Active, EventStatus::Scheduled) =>
log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) is active locally, and scheduled on the server. Is the client clock ahead?"),
(EIStatus::Active, EventStatus::Active) => (),
(EIStatus::Complete, _) => (),
(EIStatus::Cancelled | EIStatus::CancelledRandom | EIStatus::Superseded, _) => (),
(EIStatus::Scheduled, EventStatus::Scheduled) => (),
}
} else {
let mut events = self.events.write().await;
self.auto_drlc_response(&event, ResponseStatus::EventReceived)
.await;
if matches!(
incoming_status,
EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded
) {
log::warn!("EndDeviceControlSchedule: Told to schedule EndDeviceControl ({mrid}) which is already {:?}, sending server response and not scheduling.", incoming_status);
self.auto_drlc_response(&event, incoming_status.into())
.await;
return;
}
let ei = EventInstance::new_rand(
program.primacy,
event.randomize_duration,
event.randomize_start,
event,
program.mrid,
server_id,
);
if ei.end_time() <= self.schedule_time().into() {
log::warn!("EndDeviceControlSchedule: Told to schedule EndDeviceControl ({mrid}) which has already ended, sending server response and not scheduling.");
self.auto_drlc_response(ei.event(), ResponseStatus::EventExpired)
.await;
return;
}
let mut target = ei;
for (o_mrid, other) in events.iter_mut() {
if let Some(((superseded, _), (superseding, superseding_mrid))) =
drlc_supersedes((&mut target, &mrid), (other, o_mrid))
{
let prev_status = superseded.status();
superseded.update_status(EIStatus::Superseded);
superseded.superseded_by(superseding_mrid);
let status = if prev_status == EIStatus::Active {
(self.handler)(superseded).await;
if superseded.program_mrid() != superseding.program_mrid() {
ResponseStatus::EventAbortedProgram
} else if superseded.server_id() != superseding.server_id() {
ResponseStatus::EventAbortedServer
} else {
ResponseStatus::EventSuperseded
}
} else {
ResponseStatus::EventSuperseded
};
self.auto_drlc_response(superseded.event(), status).await;
}
}
events.update_nexts();
events.insert(&mrid, target);
};
}
}