torvyn_types/traits.rs
1//! Shared traits for the Torvyn runtime.
2//!
3//! The primary trait here is [`EventSink`], the hot-path interface for
4//! recording observability events. All methods must be non-blocking and
5//! allocation-free on the hot path.
6
7use crate::{
8 enums::{CopyReason, ObservabilityLevel},
9 error::ProcessErrorKind,
10 ComponentId, FlowId, ResourceId, StreamId,
11};
12
13/// The hot-path trait for recording observability events.
14///
15/// Implemented by the observability collector (`torvyn-observability`) and
16/// provided to the reactor, resource manager, and host lifecycle manager.
17///
18/// Per Doc 05, Section 9.1: all methods must be non-blocking and
19/// allocation-free on the hot path.
20///
21/// Use [`NoopEventSink`] for testing or when observability is disabled.
22///
23/// # Examples
24/// ```
25/// use torvyn_types::{NoopEventSink, EventSink, ObservabilityLevel};
26///
27/// let sink = NoopEventSink;
28/// assert_eq!(sink.level(), ObservabilityLevel::Off);
29/// ```
30pub trait EventSink: Send + Sync + 'static {
31 /// Record a component invocation completion.
32 ///
33 /// Called by the reactor after every component invocation.
34 ///
35 /// # HOT PATH — must be non-blocking, allocation-free.
36 fn record_invocation(
37 &self,
38 flow_id: FlowId,
39 component_id: ComponentId,
40 start_ns: u64,
41 end_ns: u64,
42 status: InvocationStatus,
43 );
44
45 /// Record a stream element transfer between components.
46 ///
47 /// Called by the reactor when an element moves through a stream queue.
48 ///
49 /// # HOT PATH — must be non-blocking, allocation-free.
50 fn record_element_transfer(
51 &self,
52 flow_id: FlowId,
53 stream_id: StreamId,
54 element_sequence: u64,
55 queue_depth_after: u32,
56 );
57
58 /// Record a backpressure state change.
59 ///
60 /// Called by the reactor when backpressure activates or deactivates.
61 ///
62 /// # WARM PATH — called per backpressure event.
63 fn record_backpressure(
64 &self,
65 flow_id: FlowId,
66 stream_id: StreamId,
67 activated: bool,
68 queue_depth: u32,
69 timestamp_ns: u64,
70 );
71
72 /// Record a resource copy operation.
73 ///
74 /// Called by the resource manager when data is copied across a boundary.
75 ///
76 /// # HOT PATH — must be non-blocking, allocation-free.
77 fn record_copy(
78 &self,
79 flow_id: FlowId,
80 resource_id: ResourceId,
81 from_component: ComponentId,
82 to_component: ComponentId,
83 copy_bytes: u64,
84 reason: CopyReason,
85 );
86
87 /// Returns the current observability level.
88 ///
89 /// Hot-path callers can skip expensive recording at lower levels.
90 ///
91 /// # HOT PATH — checked per element to skip recording.
92 fn level(&self) -> ObservabilityLevel;
93}
94
95/// Status of a component invocation, for observability recording.
96///
97/// # HOT PATH — created per invocation.
98#[derive(Clone, Copy, Debug, PartialEq, Eq)]
99pub enum InvocationStatus {
100 /// Invocation completed successfully.
101 Ok,
102 /// Invocation completed with an error.
103 Error(ProcessErrorKind),
104 /// Invocation timed out.
105 Timeout,
106 /// Invocation was cancelled.
107 Cancelled,
108}
109
110impl std::fmt::Display for InvocationStatus {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 InvocationStatus::Ok => write!(f, "ok"),
114 InvocationStatus::Error(kind) => write!(f, "error({:?})", kind),
115 InvocationStatus::Timeout => write!(f, "timeout"),
116 InvocationStatus::Cancelled => write!(f, "cancelled"),
117 }
118 }
119}
120
121/// A no-op implementation of [`EventSink`] for testing and benchmarking.
122///
123/// All methods are empty. Returns [`ObservabilityLevel::Off`].
124///
125/// # Examples
126/// ```
127/// use torvyn_types::{NoopEventSink, EventSink, FlowId, ComponentId, ObservabilityLevel};
128/// use torvyn_types::InvocationStatus;
129///
130/// let sink = NoopEventSink;
131/// sink.record_invocation(FlowId::new(1), ComponentId::new(1), 0, 100, InvocationStatus::Ok);
132/// assert_eq!(sink.level(), ObservabilityLevel::Off);
133/// ```
134#[derive(Clone, Copy)]
135pub struct NoopEventSink;
136
137impl EventSink for NoopEventSink {
138 #[inline]
139 fn record_invocation(
140 &self,
141 _flow_id: FlowId,
142 _component_id: ComponentId,
143 _start_ns: u64,
144 _end_ns: u64,
145 _status: InvocationStatus,
146 ) {
147 // No-op: zero cost when observability is off.
148 }
149
150 #[inline]
151 fn record_element_transfer(
152 &self,
153 _flow_id: FlowId,
154 _stream_id: StreamId,
155 _element_sequence: u64,
156 _queue_depth_after: u32,
157 ) {
158 // No-op.
159 }
160
161 #[inline]
162 fn record_backpressure(
163 &self,
164 _flow_id: FlowId,
165 _stream_id: StreamId,
166 _activated: bool,
167 _queue_depth: u32,
168 _timestamp_ns: u64,
169 ) {
170 // No-op.
171 }
172
173 #[inline]
174 fn record_copy(
175 &self,
176 _flow_id: FlowId,
177 _resource_id: ResourceId,
178 _from_component: ComponentId,
179 _to_component: ComponentId,
180 _copy_bytes: u64,
181 _reason: CopyReason,
182 ) {
183 // No-op.
184 }
185
186 #[inline]
187 fn level(&self) -> ObservabilityLevel {
188 ObservabilityLevel::Off
189 }
190}
191
192// ---------------------------------------------------------------------------
193// Tests
194// ---------------------------------------------------------------------------
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn test_noop_event_sink_level() {
202 let sink = NoopEventSink;
203 assert_eq!(sink.level(), ObservabilityLevel::Off);
204 }
205
206 #[test]
207 fn test_noop_event_sink_record_invocation_does_not_panic() {
208 let sink = NoopEventSink;
209 sink.record_invocation(
210 FlowId::new(1),
211 ComponentId::new(1),
212 0,
213 100,
214 InvocationStatus::Ok,
215 );
216 }
217
218 #[test]
219 fn test_noop_event_sink_record_copy_does_not_panic() {
220 let sink = NoopEventSink;
221 sink.record_copy(
222 FlowId::new(1),
223 ResourceId::new(0, 0),
224 ComponentId::new(1),
225 ComponentId::new(2),
226 1024,
227 CopyReason::CrossComponent,
228 );
229 }
230
231 #[test]
232 fn test_invocation_status_display() {
233 assert_eq!(format!("{}", InvocationStatus::Ok), "ok");
234 assert_eq!(format!("{}", InvocationStatus::Timeout), "timeout");
235 assert_eq!(format!("{}", InvocationStatus::Cancelled), "cancelled");
236 }
237
238 #[test]
239 fn test_noop_event_sink_is_send_sync() {
240 fn assert_send_sync<T: Send + Sync + 'static>() {}
241 assert_send_sync::<NoopEventSink>();
242 }
243}