Five feature tables for a DSP: frequency cap (user × campaign), audience interest (user), pacing (campaign), yield (publisher × ad slot), and fraud / brand safety (site). No SQL, no materialized views, no Flink job.
import beava as bv
@bv.stream
class BidEvent:
user_id: str # anonymous cookie / device id
campaign_id: str # advertiser campaign
publisher_id: str # e.g. "examplenews_com"
site_id: str # same granularity as publisher here
ad_slot: str # above_fold | sidebar | video_preroll | in_feed
device: str # mobile | desktop | ctv
category: str # news | sports | finance | tech | lifestyle
event_type: str # bid_request | win | impression | click
price: float # CPM in USD cents
@bv.table(key=("user_id", "campaign_id"))
def UserImpressionFeatures(ev: BidEvent) -> bv.Table:
# Frequency capping. Composite key (user_id, campaign_id).
# Wire format: values joined by '|'. GET /features/u4812%7Cc101
return ev.group_by("user_id", "campaign_id").agg(
imps_1h = bv.count(window="1h", where="_event.event_type == 'impression'"),
imps_24h = bv.count(window="24h", where="_event.event_type == 'impression'"),
last_seen_ts = bv.max("ts", window="24h"),
)
@bv.table(key="user_id")
def UserInterestFeatures(ev: BidEvent) -> bv.Table:
# Audience targeting — is this an active user?
return ev.group_by("user_id").agg(
sessions_24h = bv.count(window="24h", where="_event.event_type == 'bid_request'"),
distinct_sites_24h = bv.count_distinct("site_id", window="24h"),
distinct_categories_24h = bv.count_distinct("category", window="24h"),
)
@bv.table(key="campaign_id")
def CampaignFeatures(ev: BidEvent) -> bv.Table:
# Pacing — slow down if spend_1h is near the hourly budget.
return ev.group_by("campaign_id").agg(
spend_1h = bv.sum("price", window="1h", where="_event.event_type == 'win'"),
spend_today = bv.sum("price", window="24h", where="_event.event_type == 'win'"),
wins_today = bv.count(window="24h", where="_event.event_type == 'win'"),
impressions_today = bv.count(window="24h", where="_event.event_type == 'impression'"),
)
@bv.table(key=("publisher_id", "ad_slot"))
def PublisherSlotFeatures(ev: BidEvent) -> bv.Table:
# Yield — is this publisher/slot worth the CPM?
return ev.group_by("publisher_id", "ad_slot").agg(
bid_requests_1h = bv.count(window="1h", where="_event.event_type == 'bid_request'"),
wins_1h = bv.count(window="1h", where="_event.event_type == 'win'"),
viewability_1h = bv.count(window="1h", where="_event.event_type == 'impression'"),
avg_cpm_24h = bv.avg("price", window="24h"),
)
@bv.table(key="site_id")
def SiteQualityFeatures(ev: BidEvent) -> bv.Table:
# Fraud / brand safety — bot_ratio derived as 1 - valid_imps_1h / bid_requests_1h.
return ev.group_by("site_id").agg(
bid_requests_1h = bv.count(window="1h", where="_event.event_type == 'bid_request'"),
valid_imps_1h = bv.count(window="1h", where="_event.event_type == 'impression'"),
distinct_users_1h = bv.count_distinct("user_id", window="1h"),
)
# Conversion attribution — separate stream, joined at read time
@bv.stream
class ConversionEvent:
user_id: str # same id space as BidEvent
campaign_id: str # last-touch attribution
conversion_type: str # signup | purchase | lead
revenue: float # USD cents
ts: float
@bv.table(key="campaign_id")
def CampaignConversionFeatures(ev: ConversionEvent) -> bv.Table:
# Read alongside CampaignFeatures for real-time conv_rate_1h.
return ev.group_by("campaign_id").agg(
conversions_1h = bv.count(window="1h"),
conversions_24h = bv.count(window="24h"),
revenue_24h = bv.sum("revenue", window="24h"),
last_conversion_ts = bv.max("ts", window="24h"),
)
# Register 2 streams + 6 tables with Beava over TCP
app = bv.App("beava:6400")
app.register(BidEvent, ConversionEvent,
UserImpressionFeatures, UserInterestFeatures, CampaignFeatures,
PublisherSlotFeatures, SiteQualityFeatures, CampaignConversionFeatures)
Events arrive via HTTP POST. Edit any field, then Run.
One GET returns all features for an entity. Six entity types across two streams — two composite keys (user × campaign, publisher × slot) and one cross-stream join (CampaignConversion joins with CampaignFeatures at read time).
campaign_id key.
That's the cross-stream attribution without a manual Flink join. Click Record-the-outcome
to push a win event and measure write-to-read freshness end-to-end.
bid_ok(campaign, user, site) = (UserImpressionFeatures[user|campaign].imps_24h < campaign.freq_cap) // frequency cap AND (CampaignFeatures[campaign].spend_1h < campaign.hourly_budget × 0.95) // pacing (rolling 1h) AND (SiteQualityFeatures[site].bot_ratio_1h < 0.90) // fraud (2 of 10 sites flagged) AND (PublisherSlotFeatures[pub|slot].avg_cpm_24h × 0.9 < max_bid) // yield
| System | Write-to-read | Why |
|---|---|---|
| Beava | — | stream-inline with storage |
| Feast | minutes | scheduled materialization |
| Flink | 5-30s | window-close interval |
| Aerospike (raw KV) | n/a | no aggregates — need Flink for that |
sim-{token}-*), so its pacing and wins
are isolated from this firehose. Same Beava, same pipeline, two parallel worlds. Polling every 100ms.