1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
//! Distributed Energy Resources Function Set
//!
//! This module is primarily an implementation of a Schedule for EndDeviceControl events.
//! DRLC uses very similar rules for it's events - the exception being the rule for when events are superseded
//!
//!

use sep2_common::packages::{
    drlc::{DemandResponseProgram, EndDeviceControl},
    identification::ResponseStatus,
    objects::EventStatusType as EventStatus,
    types::MRIDType,
};

use crate::{
    client::SEPResponse,
    event::{EIPair, EIStatus, EventCallback, EventInstance, Schedule},
};

use std::{
    sync::{atomic::AtomicI64, Arc},
    time::Duration,
};

use tokio::sync::{broadcast::Receiver, RwLock};

use crate::{
    client::Client,
    device::SEDevice,
    event::{Events, Scheduler},
};

/// Given two EndDeviceControls, determine which is superseded, and which is superseding, or None if neither supersede one another
fn drlc_supersedes<'a>(
    a: EIPair<'a, EndDeviceControl>,
    b: EIPair<'a, EndDeviceControl>,
) -> Option<(EIPair<'a, EndDeviceControl>, EIPair<'a, EndDeviceControl>)> {
    if a.0.does_supersede(b.0) {
        Some((b, a))
    } else if b.0.does_supersede(a.0) {
        Some((a, b))
    } else {
        None
    }
}

// Demand Response Load Control Function Set
impl Schedule<EndDeviceControl> {
    async fn drlc_start_task(self, mut rx: Receiver<()>) {
        loop {
            tokio::select! {
                // This and end task should ideally be redesigned to sleep until the next event start/end
                // And then provide a way to signal to this to re-evaluate the next event start/end whenever it changes,
                // but still sleeping intermittently.
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("EndDeviceControlSchedule: Shutting down event start task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_start() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                _ => continue,
            };

            events.update_event(&mrid, EIStatus::Active);

            let events = events.downgrade();
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_drlc_response(target.event(), resp).await;
        }
    }

    async fn drlc_end_task(self, mut rx: Receiver<()>) {
        loop {
            tokio::select! {
                _ = tokio::time::sleep(self.tickrate) => (),
                _ = rx.recv() => {
                    log::info!("EndDeviceControlSchedule: Shutting down event end task...");
                    break
                },
            }
            let mut events = self.events.write().await;
            let mrid = match events.next_end() {
                Some((time, mrid)) if time < self.schedule_time().into() => mrid,
                _ => continue,
            };

            // Mark event as complete
            events.update_event(&mrid, EIStatus::Complete);

            // Notify client and server
            let events = events.downgrade();
            let target = events.get(&mrid).unwrap();
            let resp = (self.handler)(target).await;
            self.auto_drlc_response(target.event(), resp).await;
        }
    }

    async fn cancel_enddevicecontrol(
        &mut self,
        target_mrid: &MRIDType,
        current_status: EIStatus,
        cancel_reason: EIStatus,
    ) {
        let mut events = self.events.write().await;
        events.cancel_event(target_mrid, cancel_reason, self.schedule_time().into());
        let events = events.downgrade();
        let ei = events.get(target_mrid).unwrap();
        let resp = if current_status == EIStatus::Active {
            (self.handler)(ei).await
        } else {
            ResponseStatus::EventCancelled
        };
        self.auto_drlc_response(ei.event(), resp).await;
    }

    async fn auto_drlc_response(&self, event: &EndDeviceControl, status: ResponseStatus) {
        match self
            .client
            .send_drlc_response(
                &*self.device.read().await,
                event,
                status,
                self.schedule_time(),
            )
            .await
        {
            Ok(
                e @ (SEPResponse::BadRequest(_)
                | SEPResponse::NotFound
                | SEPResponse::MethodNotAllowed(_)),
            ) => {
                log::warn!(
                    "Client: DRLC response POST attempt failed with HTTP status code: {}",
                    e
                );
            }
            Err(e) => log::warn!(
                "Client: DRLC response POST attempt failed with reason: {}",
                e
            ),
            Ok(r @ (SEPResponse::Created(_) | SEPResponse::NoContent)) => {
                log::info!(
                    "Client: DRLC response POST attempt succeeded with reason: {}",
                    r
                )
            }
        }
    }
}

#[async_trait::async_trait]
impl Scheduler<EndDeviceControl> for Schedule<EndDeviceControl> {
    type Program = DemandResponseProgram;
    fn new(
        client: Client,
        device: Arc<RwLock<SEDevice>>,
        handler: impl EventCallback<EndDeviceControl>,
        tickrate: Duration,
    ) -> Self {
        let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
        let out = Schedule {
            client,
            device,
            events: Arc::new(RwLock::new(Events::new())),
            handler: Arc::new(move |ei| {
                let handler = handler.clone();
                Box::pin(async move { handler.event_update(ei).await })
            }),
            bc_sd: tx.clone(),
            tickrate,
            time_offset: Arc::new(AtomicI64::new(0)),
        };
        tokio::spawn(out.clone().clean_events(rx));
        tokio::spawn(out.clone().drlc_start_task(tx.subscribe()));
        tokio::spawn(out.clone().drlc_end_task(tx.subscribe()));
        out
    }

    async fn add_event(&mut self, event: EndDeviceControl, program: &Self::Program, server_id: u8) {
        let mrid = event.mrid;
        let incoming_status = event.event_status.current_status;
        if !event
            .device_category
            .intersects(self.device.read().await.device_categories)
        {
            log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) does not target this category of device. Not scheduling event.");
            return;
        }

        // If the event already exists in the schedule
        // "Editing events shall NOT be allowed, except for updating status"
        let cur = { self.events.read().await.get(&mrid).map(|e| e.status()) };
        if let Some(current_status) = cur {
            match (current_status, incoming_status) {
                // Active -> (Cancelled || CancelledRandom || Superseded)
                (EIStatus::Active, EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded) => {
                    log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has been marked as superseded by the server, yet it is active locally. The event will be cancelled");
                    self.cancel_enddevicecontrol(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> (Cancelled || CancelledRandom)
                (EIStatus::Scheduled, EventStatus::Cancelled | EventStatus::CancelledRandom) => {
                    log::info!("EndDeviceControlSchedule: EndDeviceControl ({mrid} has been marked as cancelled by the server. It will not be started");
                    self.cancel_enddevicecontrol(&mrid, current_status, incoming_status.into()).await;
                },
                // Scheduled -> Active
                (EIStatus::Scheduled, EventStatus::Active) => {
                    log::info!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has entered it's earliest effective start time.")
                }
                // Scheduled -> Superseded
                (EIStatus::Scheduled, EventStatus::Superseded) =>
                    log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) has been marked as superseded by the server, yet it is not locally."),
                // Active -> Scheduled
                (EIStatus::Active, EventStatus::Scheduled) =>
                    log::warn!("EndDeviceControlSchedule: EndDeviceControl ({mrid}) is active locally, and scheduled on the server. Is the client clock ahead?"),
                // Active -> Active
                (EIStatus::Active, EventStatus::Active) => (),
                // Complete -> Any
                (EIStatus::Complete, _) => (),
                // (Cancelled | CancelledRandom | Superseded) -> Any
                (EIStatus::Cancelled | EIStatus::CancelledRandom | EIStatus::Superseded, _) => (),
                // Scheduled -> Scheduled
                (EIStatus::Scheduled, EventStatus::Scheduled) => (),
            }
        } else {
            // We intentionally hold this lock for this entire scope
            let mut events = self.events.write().await;
            // Inform server event was received
            self.auto_drlc_response(&event, ResponseStatus::EventReceived)
                .await;

            // Event arrives cancelled or superseded
            if matches!(
                incoming_status,
                EventStatus::Cancelled | EventStatus::CancelledRandom | EventStatus::Superseded
            ) {
                log::warn!("EndDeviceControlSchedule: Told to schedule EndDeviceControl ({mrid}) which is already {:?}, sending server response and not scheduling.", incoming_status);
                self.auto_drlc_response(&event, incoming_status.into())
                    .await;
                return;
            }

            // Calculate start & end times
            // TODO: Clamp the duration and start time to remove gaps between successive events
            let ei = EventInstance::new_rand(
                program.primacy,
                event.randomize_duration,
                event.randomize_start,
                event,
                program.mrid,
                server_id,
            );

            // The event may have expired already
            if ei.end_time() <= self.schedule_time().into() {
                log::warn!("EndDeviceControlSchedule: Told to schedule EndDeviceControl ({mrid}) which has already ended, sending server response and not scheduling.");
                // Do not add event to schedule
                // For function sets with direct control ... Do this response
                self.auto_drlc_response(ei.event(), ResponseStatus::EventExpired)
                    .await;
                return;
            }

            // For each event that would be superseded by this event starting:
            // - inform the client
            // - inform the server
            // - mark as superseded

            // Determine what events this supersedes
            let mut target = ei;
            for (o_mrid, other) in events.iter_mut() {
                if let Some(((superseded, _), (superseding, superseding_mrid))) =
                    drlc_supersedes((&mut target, &mrid), (other, o_mrid))
                {
                    // Mark as superseded
                    let prev_status = superseded.status();
                    superseded.update_status(EIStatus::Superseded);
                    superseded.superseded_by(superseding_mrid);
                    // Determine appropriate response
                    let status = if prev_status == EIStatus::Active {
                        // Since the newly superseded event is over, tell the client it's finished
                        (self.handler)(superseded).await;
                        if superseded.program_mrid() != superseding.program_mrid() {
                            // If the two events come from different programs
                            ResponseStatus::EventAbortedProgram
                        } else if superseded.server_id() != superseding.server_id() {
                            // If the two events come from different servers
                            ResponseStatus::EventAbortedServer
                        } else {
                            ResponseStatus::EventSuperseded
                        }
                    } else {
                        ResponseStatus::EventSuperseded
                    };
                    self.auto_drlc_response(superseded.event(), status).await;
                }
            }

            // Update `next_start` and `next_end`
            events.update_nexts();

            // Add it to our schedule
            events.insert(&mrid, target);
        };
    }
}