Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Fan-Out

What It Demonstrates

A pipeline with fan-out topology: a single source produces events, a router component inspects each event and directs it to one of two branches based on a routing criterion, and each branch has its own processor and sink.

Concepts Covered

  • The torvyn:filtering/router interface for multi-port dispatch
  • Named output ports in pipeline configuration
  • Fan-out topology (one-to-many routing)
  • Branch-specific processing

Router Component

components/event-router/src/lib.rs

#![allow(unused)]
fn main() {
//! Event router.
//!
//! Routes events to named output ports based on the "type" field.
//! Events with type "metric" go to port "metrics".
//! Events with type "log" go to port "logs".
//! Events with any other type are broadcast to both ports.
//!
//! The router interface returns a list of port name strings.
//! The runtime uses this list to forward the element's borrowed
//! buffer handle to each named downstream edge. No additional
//! buffer allocation occurs for fan-out — the same host buffer
//! is borrowed by each receiving component in sequence.

wit_bindgen::generate!({
    world: "content-router",
    path: "../../wit",
});

use exports::torvyn::filtering::router::Guest as RouterGuest;
use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use torvyn::streaming::types::*;

struct EventRouter;
static mut INITIALIZED: bool = false;

impl LifecycleGuest for EventRouter {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { INITIALIZED = true; }
        Ok(())
    }
    fn teardown() { unsafe { INITIALIZED = false; } }
}

impl RouterGuest for EventRouter {
    fn route(element: StreamElement) -> Result<Vec<String>, ProcessError> {
        let bytes = element.payload.read_all();
        let text = String::from_utf8_lossy(&bytes);

        if text.contains("\"type\":\"metric\"") {
            Ok(vec!["metrics".to_string()])
        } else if text.contains("\"type\":\"log\"") {
            Ok(vec!["logs".to_string()])
        } else {
            // Unknown type: broadcast to both branches.
            Ok(vec!["metrics".to_string(), "logs".to_string()])
        }
    }
}

export!(EventRouter);
}

Source Component: event-source

components/event-source/src/lib.rs

#![allow(unused)]
fn main() {
//! Event source for the fan-out example.
//! Produces a mix of metric and log events.

wit_bindgen::generate!({
    world: "data-source",
    path: "../../wit/torvyn-streaming",
});

use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use exports::torvyn::streaming::source::Guest as SourceGuest;
use torvyn::streaming::buffer_allocator;
use torvyn::streaming::types::*;

const EVENTS: &[&str] = &[
    r#"{"type":"metric","name":"cpu_usage","value":72.5}"#,
    r#"{"type":"log","level":"info","message":"user login successful"}"#,
    r#"{"type":"metric","name":"mem_used","value":4096}"#,
    r#"{"type":"log","level":"warn","message":"disk usage above 80%"}"#,
];

struct EventSource { index: usize }
static mut STATE: Option<EventSource> = None;
fn state() -> &'static mut EventSource {
    unsafe { STATE.as_mut().expect("not initialized") }
}

impl LifecycleGuest for EventSource {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { STATE = Some(EventSource { index: 0 }); }
        Ok(())
    }
    fn teardown() { unsafe { STATE = None; } }
}

impl SourceGuest for EventSource {
    fn pull() -> Result<Option<OutputElement>, ProcessError> {
        let s = state();
        if s.index >= EVENTS.len() { return Ok(None); }
        let payload = EVENTS[s.index].as_bytes();
        let buf = buffer_allocator::allocate(payload.len() as u64)
            .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.append(payload).map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.set_content_type("application/json");
        let frozen = buf.freeze();
        s.index += 1;
        Ok(Some(OutputElement {
            meta: ElementMeta {
                sequence: (s.index - 1) as u64,
                timestamp_ns: 0,
                content_type: "application/json".to_string(),
            },
            payload: frozen,
        }))
    }
    fn notify_backpressure(_signal: BackpressureSignal) {}
}

export!(EventSource);
}

Branch Processors

components/metric-processor/src/lib.rs

#![allow(unused)]
fn main() {
//! Metric processor — adds a "processed_by":"metrics-pipeline" tag.

wit_bindgen::generate!({
    world: "managed-transform",
    path: "../../wit/torvyn-streaming",
});

use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use exports::torvyn::streaming::processor::Guest as ProcessorGuest;
use torvyn::streaming::buffer_allocator;
use torvyn::streaming::types::*;

struct MetricProcessor;
static mut INITIALIZED: bool = false;

impl LifecycleGuest for MetricProcessor {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { INITIALIZED = true; }
        Ok(())
    }
    fn teardown() { unsafe { INITIALIZED = false; } }
}

impl ProcessorGuest for MetricProcessor {
    fn process(input: StreamElement) -> Result<ProcessResult, ProcessError> {
        let bytes = input.payload.read_all();
        let text = String::from_utf8(bytes)
            .map_err(|e| ProcessError::InvalidInput(format!("{e}")))?;

        let tagged = text.trim_end_matches('}').to_string()
            + ",\"processed_by\":\"metrics-pipeline\"}";

        let out_bytes = tagged.as_bytes();
        let buf = buffer_allocator::allocate(out_bytes.len() as u64)
            .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.append(out_bytes).map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.set_content_type("application/json");
        let frozen = buf.freeze();

        Ok(ProcessResult::Emit(OutputElement {
            meta: ElementMeta {
                sequence: input.meta.sequence,
                timestamp_ns: 0,
                content_type: "application/json".to_string(),
            },
            payload: frozen,
        }))
    }
}

export!(MetricProcessor);
}

components/log-processor/src/lib.rs

#![allow(unused)]
fn main() {
//! Log processor — adds a "processed_by":"log-pipeline" tag.

wit_bindgen::generate!({
    world: "managed-transform",
    path: "../../wit/torvyn-streaming",
});

use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use exports::torvyn::streaming::processor::Guest as ProcessorGuest;
use torvyn::streaming::buffer_allocator;
use torvyn::streaming::types::*;

struct LogProcessor;
static mut INITIALIZED: bool = false;

impl LifecycleGuest for LogProcessor {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { INITIALIZED = true; }
        Ok(())
    }
    fn teardown() { unsafe { INITIALIZED = false; } }
}

impl ProcessorGuest for LogProcessor {
    fn process(input: StreamElement) -> Result<ProcessResult, ProcessError> {
        let bytes = input.payload.read_all();
        let text = String::from_utf8(bytes)
            .map_err(|e| ProcessError::InvalidInput(format!("{e}")))?;

        let tagged = text.trim_end_matches('}').to_string()
            + ",\"processed_by\":\"log-pipeline\"}";

        let out_bytes = tagged.as_bytes();
        let buf = buffer_allocator::allocate(out_bytes.len() as u64)
            .map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.append(out_bytes).map_err(|e| ProcessError::Internal(format!("{e:?}")))?;
        buf.set_content_type("application/json");
        let frozen = buf.freeze();

        Ok(ProcessResult::Emit(OutputElement {
            meta: ElementMeta {
                sequence: input.meta.sequence,
                timestamp_ns: 0,
                content_type: "application/json".to_string(),
            },
            payload: frozen,
        }))
    }
}

export!(LogProcessor);
}

Branch Sinks

components/metric-sink/src/lib.rs

#![allow(unused)]
fn main() {
//! Metric sink — prints received metric events.

wit_bindgen::generate!({
    world: "data-sink",
    path: "../../wit/torvyn-streaming",
});

use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use exports::torvyn::streaming::sink::Guest as SinkGuest;
use torvyn::streaming::types::*;

struct MetricSink;
static mut INITIALIZED: bool = false;

impl LifecycleGuest for MetricSink {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { INITIALIZED = true; }
        Ok(())
    }
    fn teardown() { unsafe { INITIALIZED = false; } }
}

impl SinkGuest for MetricSink {
    fn push(element: StreamElement) -> Result<BackpressureSignal, ProcessError> {
        let bytes = element.payload.read_all();
        let text = String::from_utf8_lossy(&bytes);
        println!("[metric-sink] Received metric: {}", text);
        Ok(BackpressureSignal::Ready)
    }
    fn complete() -> Result<(), ProcessError> {
        println!("[metric-sink] Stream complete.");
        Ok(())
    }
}

export!(MetricSink);
}

components/log-sink/src/lib.rs

#![allow(unused)]
fn main() {
//! Log sink — prints received log events.

wit_bindgen::generate!({
    world: "data-sink",
    path: "../../wit/torvyn-streaming",
});

use exports::torvyn::streaming::lifecycle::Guest as LifecycleGuest;
use exports::torvyn::streaming::sink::Guest as SinkGuest;
use torvyn::streaming::types::*;

struct LogSink;
static mut INITIALIZED: bool = false;

impl LifecycleGuest for LogSink {
    fn init(_config: String) -> Result<(), ProcessError> {
        unsafe { INITIALIZED = true; }
        Ok(())
    }
    fn teardown() { unsafe { INITIALIZED = false; } }
}

impl SinkGuest for LogSink {
    fn push(element: StreamElement) -> Result<BackpressureSignal, ProcessError> {
        let bytes = element.payload.read_all();
        let text = String::from_utf8_lossy(&bytes);
        println!("[log-sink] Received log: {}", text);
        Ok(BackpressureSignal::Ready)
    }
    fn complete() -> Result<(), ProcessError> {
        println!("[log-sink] Stream complete.");
        Ok(())
    }
}

export!(LogSink);
}

Pipeline Configuration

Torvyn.toml

[torvyn]
name = "fan-out"
version = "0.1.0"
contract_version = "0.1.0"
description = "Fan-out pipeline with router"

[[component]]
name = "event-source"
path = "components/event-source"

[[component]]
name = "event-router"
path = "components/event-router"

[[component]]
name = "metric-processor"
path = "components/metric-processor"

[[component]]
name = "log-processor"
path = "components/log-processor"

[[component]]
name = "metric-sink"
path = "components/metric-sink"

[[component]]
name = "log-sink"
path = "components/log-sink"

[flow.main]
description = "Source → Router → (Metrics branch, Logs branch)"

[flow.main.nodes.source]
component = "event-source"
interface = "torvyn:streaming/source"

[flow.main.nodes.router]
component = "event-router"
interface = "torvyn:filtering/router"

[flow.main.nodes.metric-proc]
component = "metric-processor"
interface = "torvyn:streaming/processor"

[flow.main.nodes.log-proc]
component = "log-processor"
interface = "torvyn:streaming/processor"

[flow.main.nodes.metric-sink]
component = "metric-sink"
interface = "torvyn:streaming/sink"

[flow.main.nodes.log-sink]
component = "log-sink"
interface = "torvyn:streaming/sink"

# Source feeds the router.
[[flow.main.edges]]
from = { node = "source", port = "output" }
to = { node = "router", port = "input" }

# Router "metrics" port feeds the metrics branch.
[[flow.main.edges]]
from = { node = "router", port = "metrics" }
to = { node = "metric-proc", port = "input" }

# Router "logs" port feeds the logs branch.
[[flow.main.edges]]
from = { node = "router", port = "logs" }
to = { node = "log-proc", port = "input" }

# Each branch flows to its sink.
[[flow.main.edges]]
from = { node = "metric-proc", port = "output" }
to = { node = "metric-sink", port = "input" }

[[flow.main.edges]]
from = { node = "log-proc", port = "output" }
to = { node = "log-sink", port = "input" }

Expected Output

$ torvyn run flow.main
[torvyn] Running flow 'main'

[metric-sink] Received metric: {"type":"metric","name":"cpu_usage","value":72.5}
[log-sink] Received log: {"type":"log","level":"info","message":"user login successful"}
[metric-sink] Received metric: {"type":"metric","name":"mem_used","value":4096}
[log-sink] Received log: {"type":"log","level":"warn","message":"disk usage above 80%"}

[torvyn] Flow 'main' completed. 4 events routed: 2 to metrics, 2 to logs.

Performance Characteristics (Design Targets)

MetricTarget
Router overhead per element< 15 us (read + string match)
Fan-out to N portsNo additional buffer allocation (borrow forwarding)
Independent backpressureEach branch has its own queue and backpressure

Learn More