LIVE DEMO
Real-time feature store for programmatic bidding · Click "Run" to execute — latency measured server-side
WHAT IS THIS?
Beava is a single-binary feature store. This demo is a DSP feature store in 140 lines of Python — frequency capping, pacing, yield, fraud scoring — all as windowed aggregates served at sub-10ms. Jump to Tab 2 to watch a bid decision fan-out in 14ms, then see the aggregate update in <300ms after you record a win.

What Beava does for RTB

📦
One binary, one pipeline
Replaces Aerospike (KV) + Flink (stream aggregates) + custom glue. 5 feature tables, ~140 LOC of Python.
Sub-10ms feature reads
Composite keys (user×campaign, publisher×slot) read in a few ms. 15 parallel lookups for a bid decision complete under one frame.
🔄
Sub-second write freshness
Push an impression — frequency-cap aggregates reflect it in <300ms. Feast materializes on a schedule (minutes). Flink windows close on intervals (5-30s). Beava is stream-inline.
Classic RTB feature stack
Kafka
Flink
Aerospike
custom glue
Beava
Beava
one binary
1

Define the pipeline — plain Python

Five feature tables for a DSP: frequency cap (user × campaign), audience interest (user), pacing (campaign), yield (publisher × ad slot), and fraud / brand safety (site). No SQL, no materialized views, no Flink job.

pipeline-dsp.py
import beava as bv

@bv.stream
class BidEvent:
    user_id: str          # anonymous cookie / device id
    campaign_id: str      # advertiser campaign
    publisher_id: str     # e.g. "examplenews_com"
    site_id: str          # same granularity as publisher here
    ad_slot: str          # above_fold | sidebar | video_preroll | in_feed
    device: str           # mobile | desktop | ctv
    category: str         # news | sports | finance | tech | lifestyle
    event_type: str       # bid_request | win | impression | click
    price: float          # CPM in USD cents

@bv.table(key=("user_id", "campaign_id"))
def UserImpressionFeatures(ev: BidEvent) -> bv.Table:
    # Frequency capping. Composite key (user_id, campaign_id).
    # Wire format: values joined by '|'.  GET /features/u4812%7Cc101
    return ev.group_by("user_id", "campaign_id").agg(
        imps_1h      = bv.count(window="1h",  where="_event.event_type == 'impression'"),
        imps_24h     = bv.count(window="24h", where="_event.event_type == 'impression'"),
        last_seen_ts = bv.max("ts", window="24h"),
    )

@bv.table(key="user_id")
def UserInterestFeatures(ev: BidEvent) -> bv.Table:
    # Audience targeting — is this an active user?
    return ev.group_by("user_id").agg(
        sessions_24h            = bv.count(window="24h", where="_event.event_type == 'bid_request'"),
        distinct_sites_24h      = bv.count_distinct("site_id", window="24h"),
        distinct_categories_24h = bv.count_distinct("category", window="24h"),
    )

@bv.table(key="campaign_id")
def CampaignFeatures(ev: BidEvent) -> bv.Table:
    # Pacing — slow down if spend_1h is near the hourly budget.
    return ev.group_by("campaign_id").agg(
        spend_1h          = bv.sum("price", window="1h",  where="_event.event_type == 'win'"),
        spend_today       = bv.sum("price", window="24h", where="_event.event_type == 'win'"),
        wins_today        = bv.count(window="24h", where="_event.event_type == 'win'"),
        impressions_today = bv.count(window="24h", where="_event.event_type == 'impression'"),
    )

@bv.table(key=("publisher_id", "ad_slot"))
def PublisherSlotFeatures(ev: BidEvent) -> bv.Table:
    # Yield — is this publisher/slot worth the CPM?
    return ev.group_by("publisher_id", "ad_slot").agg(
        bid_requests_1h = bv.count(window="1h", where="_event.event_type == 'bid_request'"),
        wins_1h         = bv.count(window="1h", where="_event.event_type == 'win'"),
        viewability_1h  = bv.count(window="1h", where="_event.event_type == 'impression'"),
        avg_cpm_24h     = bv.avg("price", window="24h"),
    )

@bv.table(key="site_id")
def SiteQualityFeatures(ev: BidEvent) -> bv.Table:
    # Fraud / brand safety — bot_ratio derived as 1 - valid_imps_1h / bid_requests_1h.
    return ev.group_by("site_id").agg(
        bid_requests_1h   = bv.count(window="1h", where="_event.event_type == 'bid_request'"),
        valid_imps_1h     = bv.count(window="1h", where="_event.event_type == 'impression'"),
        distinct_users_1h = bv.count_distinct("user_id", window="1h"),
    )

# Conversion attribution — separate stream, joined at read time
@bv.stream
class ConversionEvent:
    user_id: str          # same id space as BidEvent
    campaign_id: str      # last-touch attribution
    conversion_type: str  # signup | purchase | lead
    revenue: float        # USD cents
    ts: float

@bv.table(key="campaign_id")
def CampaignConversionFeatures(ev: ConversionEvent) -> bv.Table:
    # Read alongside CampaignFeatures for real-time conv_rate_1h.
    return ev.group_by("campaign_id").agg(
        conversions_1h     = bv.count(window="1h"),
        conversions_24h    = bv.count(window="24h"),
        revenue_24h        = bv.sum("revenue", window="24h"),
        last_conversion_ts = bv.max("ts", window="24h"),
    )

# Register 2 streams + 6 tables with Beava over TCP
app = bv.App("beava:6400")
app.register(BidEvent, ConversionEvent,
             UserImpressionFeatures, UserInterestFeatures, CampaignFeatures,
             PublisherSlotFeatures, SiteQualityFeatures, CampaignConversionFeatures)
$ python pipeline-dsp.py
Click Run to check the pipeline is registered (idempotent — safe to re-run)
2

Send a BidEvent over HTTP

Events arrive via HTTP POST. Edit any field, then Run.

$ curl — push BidEvent
3

Read features — HTTP GET

One GET returns all features for an entity. Six entity types across two streams — two composite keys (user × campaign, publisher × slot) and one cross-stream join (CampaignConversion joins with CampaignFeatures at read time).

$ curl — get features