use std::{
sync::{atomic::AtomicI64, Arc},
time::Duration,
};
use sep2_common::packages::{
identification::ResponseStatus,
objects::EventStatusType as EventStatus,
pricing::{RateComponent, TariffProfile, TimeTariffInterval},
types::MRIDType,
};
use tokio::sync::{broadcast::Receiver, RwLock};
use crate::{
client::{Client, SEPResponse},
device::SEDevice,
event::{EIPair, EIStatus, EventCallback, EventInstance, Events, Schedule, Scheduler},
};
fn pricing_supersedes<'a>(
a: EIPair<'a, TimeTariffInterval>,
b: EIPair<'a, TimeTariffInterval>,
) -> Option<(
EIPair<'a, TimeTariffInterval>,
EIPair<'a, TimeTariffInterval>,
)> {
let same_rate_component = a.0.program_mrid() == b.0.program_mrid();
if a.0.does_supersede(b.0) && same_rate_component {
Some((b, a))
} else if b.0.does_supersede(a.0) && same_rate_component {
Some((a, b))
} else {
None
}
}
impl Schedule<TimeTariffInterval> {
async fn pricing_start_task(self, mut rx: Receiver<()>) {
loop {
tokio::select! {
_ = tokio::time::sleep(self.tickrate) => (),
_ = rx.recv() => {
log::info!("TimeTariffIntervalSchedule: Shutting down event start task...");
break
},
}
let mut events = self.events.write().await;
let mrid = match events.next_start() {
Some((time, mrid)) if time < self.schedule_time().into() => mrid,
_ => continue,
};
events.update_event(&mrid, EIStatus::Active);
let events = events.downgrade();
let target = events.get(&mrid).unwrap();
let resp = (self.handler)(target).await;
self.auto_pricing_response(target.event(), resp).await;
}
}
async fn pricing_end_task(self, mut rx: Receiver<()>) {
loop {
tokio::select! {
_ = tokio::time::sleep(self.tickrate) => (),
_ = rx.recv() => {
log::info!("TimeTariffIntervalSchedule: Shutting down event end task...");
break
},
}
let mut events = self.events.write().await;
let mrid = match events.next_end() {
Some((time, mrid)) if time < self.schedule_time().into() => mrid,
_ => continue,
};
events.update_event(&mrid, EIStatus::Complete);
let events = events.downgrade();
let target = events.get(&mrid).unwrap();
let resp = (self.handler)(target).await;
self.auto_pricing_response(target.event(), resp).await;
}
}
async fn cancel_timetariffinterval(
&mut self,
target_mrid: &MRIDType,
current_status: EIStatus,
cancel_reason: EIStatus,
) {
let mut events = self.events.write().await;
events.cancel_event(target_mrid, cancel_reason, self.schedule_time().into());
let events = events.downgrade();
let ei = events.get(target_mrid).unwrap();
let resp = if current_status == EIStatus::Active {
(self.handler)(ei).await
} else {
ResponseStatus::EventCancelled
};
self.auto_pricing_response(ei.event(), resp).await;
}
async fn auto_pricing_response(&self, event: &TimeTariffInterval, status: ResponseStatus) {
match self
.client
.send_pricing_response(
self.device.read().await.lfdi,
event,
status,
self.schedule_time(),
)
.await
{
Ok(
e @ (SEPResponse::BadRequest(_)
| SEPResponse::NotFound
| SEPResponse::MethodNotAllowed(_)),
) => log::warn!(
"Client: Pricing response POST attempt failed with HTTP status code: {}",
e
),
Err(e) => log::warn!(
"Client: Pricing response POST attempted failed with reason: {}",
e
),
Ok(r @ (SEPResponse::Created(_) | SEPResponse::NoContent)) => {
log::info!(
"Client: Pricing response POST attempt succeeded with reason: {}",
r
)
}
}
}
}
#[async_trait::async_trait]
impl Scheduler<TimeTariffInterval> for Schedule<TimeTariffInterval> {
type Program = (TariffProfile, RateComponent);
fn new(
client: Client,
device: Arc<RwLock<SEDevice>>,
handler: impl EventCallback<TimeTariffInterval>,
tickrate: Duration,
) -> Self {
let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
let out = Schedule {
client,
device,
events: Arc::new(RwLock::new(Events::new())),
handler: Arc::new(move |ei| {
let handler = handler.clone();
Box::pin(async move { handler.event_update(ei).await })
}),
bc_sd: tx.clone(),
tickrate,
time_offset: Arc::new(AtomicI64::new(0)),
};
tokio::spawn(out.clone().clean_events(rx));
tokio::spawn(out.clone().pricing_start_task(tx.subscribe()));
tokio::spawn(out.clone().pricing_end_task(tx.subscribe()));
out
}
async fn add_event(
&mut self,
event: TimeTariffInterval,
program: &Self::Program,
server_id: u8,
) {
let tariff_profile = &program.0;
let rate_component = &program.1;
let mrid = event.mrid;
let incoming_status = event.event_status.current_status;
let cur = { self.events.read().await.get(&mrid).map(|e| e.status()) };
if let Some(current_status) = cur {
match (current_status, incoming_status) {
(EIStatus::Active, EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded) => {
log::warn!("TimeTariffIntervalSchedule: TimeTariffInterval ({mrid}) has been marked as superseded by the server, yet it is active locally. The event will be cancelled");
self.cancel_timetariffinterval(&mrid, current_status, incoming_status.into()).await;
},
(EIStatus::Scheduled, EventStatus::Cancelled | EventStatus::CancelledRandom) => {
log::info!("TimeTariffIntervalSchedule: TimeTariffInterval ({mrid} has been marked as cancelled by the server. It will not be started");
self.cancel_timetariffinterval(&mrid, current_status, incoming_status.into()).await;
},
(EIStatus::Scheduled, EventStatus::Active) => {
log::info!("TimeTariffIntervalSchedule: TimeTariffInterval ({mrid}) has entered it's earliest effective start time.")
}
(EIStatus::Scheduled, EventStatus::Superseded) =>
log::warn!("TimeTariffIntervalSchedule: TimeTariffInterval ({mrid}) has been marked as superseded by the server, yet it is not locally."),
(EIStatus::Active, EventStatus::Scheduled) =>
log::warn!("TimeTariffIntervalSchedule: TimeTariffInterval ({mrid}) is active locally, and scheduled on the server. Is the client clock ahead?"),
(EIStatus::Active, EventStatus::Active) => (),
(EIStatus::Complete, _) => (),
(EIStatus::Cancelled | EIStatus::CancelledRandom | EIStatus::Superseded, _) => (),
(EIStatus::Scheduled, EventStatus::Scheduled) => (),
}
} else {
let mut events = self.events.write().await;
self.auto_pricing_response(&event, ResponseStatus::EventReceived)
.await;
if matches!(
incoming_status,
EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded
) {
log::warn!("TimeTariffIntervalSchedule: Told to schedule TimeTariffInterval ({mrid}) which is already {:?}, sending server response and not scheduling.", incoming_status);
self.auto_pricing_response(&event, incoming_status.into())
.await;
return;
}
let ei = EventInstance::new_rand(
tariff_profile.primacy,
event.randomize_duration,
event.randomize_start,
event,
rate_component.mrid,
server_id,
);
if ei.end_time() <= self.schedule_time().into() {
log::warn!("TimeTariffIntervalSchedule: Told to schedule TimeTariffInterval ({mrid}) which has already ended, sending server response and not scheduling.");
self.auto_pricing_response(ei.event(), ResponseStatus::EventExpired)
.await;
return;
}
let mut target = ei;
for (o_mrid, other) in events.iter_mut() {
if let Some(((superseded, _), (superseding, superseding_mrid))) =
pricing_supersedes((&mut target, &mrid), (other, o_mrid))
{
let prev_status = superseded.status();
superseded.update_status(EIStatus::Superseded);
superseded.superseded_by(superseding_mrid);
let status = if prev_status == EIStatus::Active {
(self.handler)(superseded).await;
if superseded.program_mrid() != superseding.program_mrid() {
ResponseStatus::EventAbortedProgram
} else if superseded.server_id() != superseding.server_id() {
ResponseStatus::EventAbortedServer
} else {
ResponseStatus::EventSuperseded
}
} else {
ResponseStatus::EventSuperseded
};
self.auto_pricing_response(superseded.event(), status).await;
}
}
events.update_nexts();
events.insert(&mrid, target);
};
}
}