torvyn_types/
traits.rs

1//! Shared traits for the Torvyn runtime.
2//!
3//! The primary trait here is [`EventSink`], the hot-path interface for
4//! recording observability events. All methods must be non-blocking and
5//! allocation-free on the hot path.
6
7use crate::{
8    enums::{CopyReason, ObservabilityLevel},
9    error::ProcessErrorKind,
10    ComponentId, FlowId, ResourceId, StreamId,
11};
12
13/// The hot-path trait for recording observability events.
14///
15/// Implemented by the observability collector (`torvyn-observability`) and
16/// provided to the reactor, resource manager, and host lifecycle manager.
17///
18/// Per Doc 05, Section 9.1: all methods must be non-blocking and
19/// allocation-free on the hot path.
20///
21/// Use [`NoopEventSink`] for testing or when observability is disabled.
22///
23/// # Examples
24/// ```
25/// use torvyn_types::{NoopEventSink, EventSink, ObservabilityLevel};
26///
27/// let sink = NoopEventSink;
28/// assert_eq!(sink.level(), ObservabilityLevel::Off);
29/// ```
30pub trait EventSink: Send + Sync + 'static {
31    /// Record a component invocation completion.
32    ///
33    /// Called by the reactor after every component invocation.
34    ///
35    /// # HOT PATH — must be non-blocking, allocation-free.
36    fn record_invocation(
37        &self,
38        flow_id: FlowId,
39        component_id: ComponentId,
40        start_ns: u64,
41        end_ns: u64,
42        status: InvocationStatus,
43    );
44
45    /// Record a stream element transfer between components.
46    ///
47    /// Called by the reactor when an element moves through a stream queue.
48    ///
49    /// # HOT PATH — must be non-blocking, allocation-free.
50    fn record_element_transfer(
51        &self,
52        flow_id: FlowId,
53        stream_id: StreamId,
54        element_sequence: u64,
55        queue_depth_after: u32,
56    );
57
58    /// Record a backpressure state change.
59    ///
60    /// Called by the reactor when backpressure activates or deactivates.
61    ///
62    /// # WARM PATH — called per backpressure event.
63    fn record_backpressure(
64        &self,
65        flow_id: FlowId,
66        stream_id: StreamId,
67        activated: bool,
68        queue_depth: u32,
69        timestamp_ns: u64,
70    );
71
72    /// Record a resource copy operation.
73    ///
74    /// Called by the resource manager when data is copied across a boundary.
75    ///
76    /// # HOT PATH — must be non-blocking, allocation-free.
77    fn record_copy(
78        &self,
79        flow_id: FlowId,
80        resource_id: ResourceId,
81        from_component: ComponentId,
82        to_component: ComponentId,
83        copy_bytes: u64,
84        reason: CopyReason,
85    );
86
87    /// Returns the current observability level.
88    ///
89    /// Hot-path callers can skip expensive recording at lower levels.
90    ///
91    /// # HOT PATH — checked per element to skip recording.
92    fn level(&self) -> ObservabilityLevel;
93}
94
95/// Status of a component invocation, for observability recording.
96///
97/// # HOT PATH — created per invocation.
98#[derive(Clone, Copy, Debug, PartialEq, Eq)]
99pub enum InvocationStatus {
100    /// Invocation completed successfully.
101    Ok,
102    /// Invocation completed with an error.
103    Error(ProcessErrorKind),
104    /// Invocation timed out.
105    Timeout,
106    /// Invocation was cancelled.
107    Cancelled,
108}
109
110impl std::fmt::Display for InvocationStatus {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            InvocationStatus::Ok => write!(f, "ok"),
114            InvocationStatus::Error(kind) => write!(f, "error({:?})", kind),
115            InvocationStatus::Timeout => write!(f, "timeout"),
116            InvocationStatus::Cancelled => write!(f, "cancelled"),
117        }
118    }
119}
120
121/// A no-op implementation of [`EventSink`] for testing and benchmarking.
122///
123/// All methods are empty. Returns [`ObservabilityLevel::Off`].
124///
125/// # Examples
126/// ```
127/// use torvyn_types::{NoopEventSink, EventSink, FlowId, ComponentId, ObservabilityLevel};
128/// use torvyn_types::InvocationStatus;
129///
130/// let sink = NoopEventSink;
131/// sink.record_invocation(FlowId::new(1), ComponentId::new(1), 0, 100, InvocationStatus::Ok);
132/// assert_eq!(sink.level(), ObservabilityLevel::Off);
133/// ```
134#[derive(Clone, Copy)]
135pub struct NoopEventSink;
136
137impl EventSink for NoopEventSink {
138    #[inline]
139    fn record_invocation(
140        &self,
141        _flow_id: FlowId,
142        _component_id: ComponentId,
143        _start_ns: u64,
144        _end_ns: u64,
145        _status: InvocationStatus,
146    ) {
147        // No-op: zero cost when observability is off.
148    }
149
150    #[inline]
151    fn record_element_transfer(
152        &self,
153        _flow_id: FlowId,
154        _stream_id: StreamId,
155        _element_sequence: u64,
156        _queue_depth_after: u32,
157    ) {
158        // No-op.
159    }
160
161    #[inline]
162    fn record_backpressure(
163        &self,
164        _flow_id: FlowId,
165        _stream_id: StreamId,
166        _activated: bool,
167        _queue_depth: u32,
168        _timestamp_ns: u64,
169    ) {
170        // No-op.
171    }
172
173    #[inline]
174    fn record_copy(
175        &self,
176        _flow_id: FlowId,
177        _resource_id: ResourceId,
178        _from_component: ComponentId,
179        _to_component: ComponentId,
180        _copy_bytes: u64,
181        _reason: CopyReason,
182    ) {
183        // No-op.
184    }
185
186    #[inline]
187    fn level(&self) -> ObservabilityLevel {
188        ObservabilityLevel::Off
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Tests
194// ---------------------------------------------------------------------------
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_noop_event_sink_level() {
202        let sink = NoopEventSink;
203        assert_eq!(sink.level(), ObservabilityLevel::Off);
204    }
205
206    #[test]
207    fn test_noop_event_sink_record_invocation_does_not_panic() {
208        let sink = NoopEventSink;
209        sink.record_invocation(
210            FlowId::new(1),
211            ComponentId::new(1),
212            0,
213            100,
214            InvocationStatus::Ok,
215        );
216    }
217
218    #[test]
219    fn test_noop_event_sink_record_copy_does_not_panic() {
220        let sink = NoopEventSink;
221        sink.record_copy(
222            FlowId::new(1),
223            ResourceId::new(0, 0),
224            ComponentId::new(1),
225            ComponentId::new(2),
226            1024,
227            CopyReason::CrossComponent,
228        );
229    }
230
231    #[test]
232    fn test_invocation_status_display() {
233        assert_eq!(format!("{}", InvocationStatus::Ok), "ok");
234        assert_eq!(format!("{}", InvocationStatus::Timeout), "timeout");
235        assert_eq!(format!("{}", InvocationStatus::Cancelled), "cancelled");
236    }
237
238    #[test]
239    fn test_noop_event_sink_is_send_sync() {
240        fn assert_send_sync<T: Send + Sync + 'static>() {}
241        assert_send_sync::<NoopEventSink>();
242    }
243}