1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
//! Distributed Energy Resources Function Set
//!
//! This module is primarily an implementation of a Schedule for DERControl events.
//!
//! The interface for a DERControl Schedule can be found in [`Schedule`]
//!
//!

use std::{
    sync::{atomic::AtomicI64, Arc},
    time::Duration,
};

use sep2_common::packages::{
    der::{DERControl, DERProgram},
    identification::ResponseStatus,
    objects::EventStatusType as EventStatus,
    types::MRIDType,
};
use tokio::sync::{broadcast::Receiver, RwLock};

use crate::{
    client::{Client, SEPResponse},
    device::SEDevice,
    event::{EIPair, EIStatus, EventCallback, EventInstance, Events, Schedule, Scheduler},
};

impl EventInstance<DERControl> {
    // Check if two DERControls have the same base
    fn has_same_target(&self, other: &Self) -> bool {
        self.event()
            .der_control_base
            .same_target(&other.event().der_control_base)
    }
}

/// Given two DERControls, determine which is superseded, and which is superseding, or None if neither supersede one another
fn der_supersedes<'a>(
    a: EIPair<'a, DERControl>,
    b: EIPair<'a, DERControl>,
) -> Option<(EIPair<'a, DERControl>, EIPair<'a, DERControl>)> {
    // lazy since this is somewhat expensive
    let same_target = || a.0.has_same_target(b.0);
    if a.0.does_supersede(b.0) && same_target() {
        Some((b, a))
    } else if b.0.does_supersede(a.0) && same_target() {
        Some((a, b))
    } else {
        None
    }
}

impl Schedule<DERControl> {
    async fn der_start_task(self, mut rx: Receiver<()>) {
        loop {
            // Intermittently sleep until next event start time
            tokio::select! {
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("DERControlSchedule: Shutting down event start task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_start() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                // If no next, or not time yet
                _ => continue,
            };

            // Mark event as complete
            events.update_event(&mrid, EIStatus::Active);

            // Notify client and server
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_der_response(target.event(), resp).await;
            // If the device opts-out or if the event cannot be active, we update it's internal status.
            match resp {
                ResponseStatus::EventOptOut
                | ResponseStatus::EventNotApplicable
                | ResponseStatus::EventInvalid => events.update_event(&mrid, EIStatus::Cancelled),
                _ => (),
            }
        }
    }

    async fn der_end_task(self, mut rx: Receiver<()>) {
        loop {
            // Intermittently sleep until next event end time
            tokio::select! {
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("DERControlSchedule: Shutting down event end task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_end() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                // If no next, or not time yet
                _ => continue,
            };

            // Mark event as complete
            events.update_event(&mrid, EIStatus::Complete);

            // Notify client and server
            let events = events.downgrade();
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_der_response(target.event(), resp).await;
        }
    }

    /// Cancel an [`EventInstance<DerControl>`] that has been previously added to the schedule
    ///
    /// Update the internal [`EventInstance<DerControl>`] state.
    /// If the event is responsible for superseding other events,
    /// and those events have not started, they will be marked as scheduled internally - the client will not be informed.
    ///
    /// `cancel_reason` must/will be one of [`EIStatus::Cancelled`] | [`EIStatus::CancelledRandom`] | [`EIStatus::Superseded`]
    async fn cancel_dercontrol(
        &mut self,
        target_mrid: &MRIDType,
        current_status: EIStatus,
        cancel_reason: EIStatus,
    ) {
        let mut events = self.events.write().await;
        events.cancel_event(target_mrid, cancel_reason, self.schedule_time().into());
        let events = events.downgrade();
        let ei = events.get(target_mrid).unwrap();
        let resp = if current_status == EIStatus::Active {
            // If the event was active, let the client know it is over
            (self.handler)(ei).await
        } else {
            // If it's not active, the client doesn't even know about this event
            ResponseStatus::EventCancelled
        };
        self.auto_der_response(ei.event(), resp).await;
    }

    async fn auto_der_response(&self, event: &DERControl, status: ResponseStatus) {
        match self
            .client
            .send_der_response(
                self.device.read().await.lfdi,
                event,
                status,
                self.schedule_time(),
            )
            .await
        {
            Ok(
                e @ (SEPResponse::BadRequest(_)
                | SEPResponse::NotFound
                | SEPResponse::MethodNotAllowed(_)),
            ) => {
                log::warn!(
                    "DERControlSchedule: DERControlResponse POST attempt failed with HTTP status code: {}",
                    e
                );
            }
            Err(e) => log::warn!(
                "DERControlSchedule: DERControlResponse POST attempt failed with reason: {}",
                e
            ),
            Ok(r @ (SEPResponse::Created(_) | SEPResponse::NoContent)) => {
                log::info!(
                    "DERControlSchedule: DERControlResponse POST attempt succeeded with reason: {}",
                    r
                )
            }
        }
    }
}

/// DER is a function set where events exhibit 'direct control', according to the specification.
/// Thus, this Schedule will determine when overlapping or superseded events should be superseded based off their target, their primacy, and their creation time and when they are no longer superseded, if their superseding event is cancelled.
#[async_trait::async_trait]
impl Scheduler<DERControl> for Schedule<DERControl> {
    type Program = DERProgram;
    /// Create a schedule for the given [`Client`] & it's [`SEDevice`] representation.
    ///
    /// Any instance of [`Client`] can be used, as responses are made in accordance to the hostnames within the provided events.
    fn new(
        client: Client,
        device: Arc<RwLock<SEDevice>>,
        handler: impl EventCallback<DERControl>,
        tickrate: Duration,
    ) -> Self {
        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
        let out = Schedule {
            client,
            device,
            events: Arc::new(RwLock::new(Events::new())),
            handler: Arc::new(move |ei| {
                let handler = handler.clone();
                Box::pin(async move { handler.event_update(ei).await })
            }),
            bc_sd: tx.clone(),
            tickrate,
            time_offset: Arc::new(AtomicI64::new(0)),
        };
        tokio::spawn(out.clone().clean_events(rx));
        tokio::spawn(out.clone().der_start_task(tx.subscribe()));
        tokio::spawn(out.clone().der_end_task(tx.subscribe()));
        out
    }

    async fn add_event(&mut self, event: DERControl, program: &Self::Program, server_id: u8) {
        let mrid = event.mrid;
        let incoming_status = event.event_status.current_status;
        // Devices SHOULD ignore events that do not indicate their device category.
        // If not present, all devices SHOULD respond
        if let Some(category) = event.device_category {
            if !category.intersects(self.device.read().await.device_categories) {
                log::warn!("DERControlSchedule: DERControl ({mrid}) does not target this category of device. Not scheduling event.");
                return;
            }
        }

        // If the event already exists in the schedule
        // "Editing events shall NOT be allowed, except for updating status"
        let cur = { self.events.read().await.get(&mrid).map(|e| e.status()) };
        if let Some(current_status) = cur {
            match (current_status, incoming_status) {
                // Active -> (Cancelled || CancelledRandom || Superseded)
                (EIStatus::Active, EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded) => {
                    log::info!("DERControlSchedule: DERControl ({mrid}) has been marked as superseded by the server, yet it is active locally. The event will be cancelled");
                    self.cancel_dercontrol(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> (Cancelled || CancelledRandom)
                (EIStatus::Scheduled, EventStatus::Cancelled | EventStatus::CancelledRandom) => {
                    log::info!("DERControlSchedule: DERControl ({mrid} has been marked as cancelled by the server. It will not be started");
                    self.cancel_dercontrol(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> Active
                (EIStatus::Scheduled, EventStatus::Active) => {
                    log::info!("DERControlSchedule: DERControl ({mrid}) has entered it's earliest effective start time.")
                }
                // Scheduled -> Superseded
                (EIStatus::Scheduled, EventStatus::Superseded) =>
                    log::warn!("DERControlSchedule: DERControl ({mrid}) has been marked as superseded by the server, yet it is not locally."),
                // Active -> Scheduled
                (EIStatus::Active, EventStatus::Scheduled) =>
                    log::warn!("DERControlSchedule: DERControl ({mrid}) is active locally, and scheduled on the server. Is the client clock ahead?"),
                // Active -> Active
                (EIStatus::Active, EventStatus::Active) => (),
                // Complete -> Any
                (EIStatus::Complete, _) => (),
                // (Cancelled | CancelledRandom | Superseded) -> Any
                (EIStatus::Cancelled | EIStatus::CancelledRandom | EIStatus::Superseded, _) => (),
                // Scheduled -> Scheduled
                (EIStatus::Scheduled, EventStatus::Scheduled) => (),
            }
        } else {
            // We intentionally hold this lock for this entire scope
            let mut events = self.events.write().await;
            // Inform server event was received
            self.auto_der_response(&event, ResponseStatus::EventReceived)
                .await;

            // Event arrives cancelled or superseded
            if matches!(
                incoming_status,
                EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded
            ) {
                log::warn!("DERControlSchedule: Told to schedule DERControl ({mrid}) which is already {:?}, sending server response and not scheduling.", incoming_status);
                self.auto_der_response(&event, incoming_status.into()).await;
                return;
            }

            // Calculate start & end times
            // TODO: Clamp the duration and start time to remove gaps between successive events
            let ei = EventInstance::new_rand(
                program.primacy,
                event.randomize_duration,
                event.randomize_start,
                event,
                program.mrid,
                server_id,
            );

            // The event may have expired already
            if ei.end_time() <= self.schedule_time().into() {
                log::warn!("DERControlSchedule: Told to schedule DERControl ({mrid}) which has already ended, sending server response and not scheduling.");
                // Do not add event to schedule
                // For function sets with direct control ... Do this response
                self.auto_der_response(ei.event(), ResponseStatus::EventExpired)
                    .await;
                return;
            }

            // For each event that would be superseded by this event starting:
            // - inform the client
            // - inform the server
            // - mark as superseded

            // Determine what events this supersedes
            let mut target = ei;
            for (o_mrid, other) in events.iter_mut() {
                if let Some(((superseded, _), (superseding, superseding_mrid))) =
                    der_supersedes((&mut target, &mrid), (other, o_mrid))
                {
                    // Mark as superseded
                    let prev_status = superseded.status();
                    superseded.update_status(EIStatus::Superseded);
                    superseded.superseded_by(superseding_mrid);

                    // Determine appropriate status
                    let status = if prev_status == EIStatus::Active {
                        // Since the newly superseded event is over, tell the client it's finished
                        // We override whatever response the client provides to the more correct one
                        (self.handler)(superseded).await;
                        if superseded.program_mrid() != superseding.program_mrid() {
                            // If the two events come from different programs
                            ResponseStatus::EventAbortedProgram
                        } else if superseded.server_id() != superseding.server_id() {
                            // If the two events come from different servers
                            ResponseStatus::EventAbortedServer
                        } else {
                            ResponseStatus::EventSuperseded
                        }
                    } else {
                        ResponseStatus::EventSuperseded
                    };
                    self.auto_der_response(superseded.event(), status).await;
                }
            }

            // Update `next_start` and `next_end`
            events.update_nexts();

            // Add it to our schedule
            events.insert(&mrid, target);
        };
    }
}