CatWing Pipeline Design

9c. ERD — Behaviors & Recommendations

FK references to products, suppliers, locations, product_categories are defined in 9a.

Enrichment & Behaviors

Behaviors

erDiagram
    product_behaviors {
        int id PK
        int product_id FK
        date snapshot_date
        float velocity
        float trend
        float hotness_score
        int fmr_class_id FK
        int abc_class_id FK
        int xyz_class_id FK
        int freq_class_id FK
        float demand_cv
        float seasonality_strength
        float shelf_time_days
        int lead_time_days
        int reorder_time_days
        int n_stores
        int n_stores_optimized
        int min_inv_qty
        float demand_coverage_scale
        float supplier_availability_score "nullable, 0-100"
        boolean is_supplier_availability_confirmed
        float phase_out_recommendation_score "nullable, 0-100"
        boolean is_phase_out_recommendation_confirmed
        float phase_out_execution_score "nullable, 0-100"
        boolean is_phase_out_execution_confirmed
        float phase_out_efficiency_score "nullable, 0-100, confirmed NOT-STOCK only"
    }

    location_behaviors {
        int id PK
        int location_id FK
        date snapshot_date
        float fill_rate
        float stockout_rate
        float avg_inventory_value
        float revenue
        int currency_id FK "cw_global"
        int active_products
    }

    supplier_behaviors {
        int id PK
        int supplier_id FK
        date snapshot_date
        float avg_lead_time_days
        float lead_time_p90
        float fill_rate
        float on_time_rate
        int open_po_count
    }

    supplier_available_products {
        int id PK
        int product_id FK
        int supplier_id FK
        string supplier_ref
        string code
        boolean is_available
        int match_method_id FK
        date file_date
    }

    match_methods {
        int id PK
        string name UK
    }

    category_behaviors {
        int id PK
        int product_category_id FK
        date snapshot_date
        int product_count
        int active_product_count
        float total_revenue
        float total_inventory_value
        int currency_id FK "cw_global"
        float avg_hotness_score
        float avg_velocity
        float growth_rate
    }

    fmr_classes {
        int id PK
        string code UK "F | M | R"
        string name
    }

    abc_classes {
        int id PK
        string code UK "A | B | C"
        string name
    }

    xyz_classes {
        int id PK
        string code UK "X | Y | Z"
        string name
    }

    freq_classes {
        int id PK
        string code UK "Frequent | Normal | Rare"
        string name
    }

    fmr_classes ||--o{ product_behaviors : ""
    abc_classes ||--o{ product_behaviors : ""
    xyz_classes ||--o{ product_behaviors : ""
    freq_classes ||--o{ product_behaviors : ""
    fmr_classes ||--o{ product_location_behaviors : ""
    abc_classes ||--o{ product_location_behaviors : ""
    xyz_classes ||--o{ product_location_behaviors : ""
    freq_classes ||--o{ product_location_behaviors : ""
    match_methods ||--o{ supplier_available_products : ""
    suppliers ||--o{ supplier_available_products : ""
    product_categories ||--o{ category_behaviors : ""

Product Location Behaviors

erDiagram
    product_location_behaviors {
        int id PK
        int product_id FK
        int location_id FK
        date snapshot_date
        float demand_qty
        float demand_revenue "nullable"
        float demand_cost "nullable"
        float stockout_qty "nullable"
        boolean is_oos_imputed
        float velocity
        float trend
        int fmr_class_id FK
        int abc_class_id FK
        int xyz_class_id FK
        int freq_class_id FK
        float hotness_score
    }

    products ||--o{ product_location_behaviors : ""
    locations ||--o{ product_location_behaviors : ""

product_location_behaviors convention: Classification fields (fmr_class, abc_class, xyz_class, freq_class, hotness_score) are calculated at location level — they reflect product behavior within each specific location, not global values. The product_behaviors table stores the global (cross-location) classifications; product_location_behaviors stores the location-specific ones. These are independent calculations that may differ significantly (e.g., a product can be ABC class "A" globally but "C" at a low-volume store). Location-specific fields (demand_qty, stockout_qty, velocity, trend) are computed per location. demand_qty reflects historical missed demand estimation (not forward predictions — see Step 7).

Demand Forecasting & Embeddings

erDiagram
    embeddings {
        int id PK
        int product_id FK
        int embedding_type_id FK
        vector vector "pgvector; dimension depends on model"
        string model_version
    }

    embedding_types {
        int id PK
        string name UK
    }

    demand_source_types {
        int id PK
        string name UK "actual | imputed_local_global | imputed_global_only | dead_product | online_only | dropship | unknown"
    }

    demand_forecasts {
        int id PK
        date forecast_date
        int demand_forecast_status_id FK
        jsonb metadata "nullable"
        timestamp created_at
    }

    demand_forecast_statuses {
        int id PK
        string name UK "pending | running | completed | failed"
    }

    demand_forecast_products {
        int id PK
        int demand_forecast_id FK
        int product_id FK
        int location_id FK
        int demand_source_type_id FK
        int horizon_days
        float prob_demand
        float predicted_qty
        float critical_ratio
        float confidence
    }

    embedding_types ||--o{ embeddings : ""
    demand_forecast_statuses ||--o{ demand_forecasts : ""
    demand_source_types ||--o{ demand_forecast_products : ""
    demand_forecasts ||--o{ demand_forecast_products : ""
    products ||--o{ demand_forecast_products : ""
    locations ||--o{ demand_forecast_products : ""

Convention — demand forecasts & accumulation: A new demand_forecasts record is created daily for each client, containing all product×location predictions in demand_forecast_products. Multiple forecasts may exist for the same date (e.g., re-runs after failures); the active forecast is always the latest record with demand_forecast_status_id pointing to 'completed': JOIN demand_forecast_statuses dfs ON dfs.id = df.demand_forecast_status_id WHERE dfs.name = 'completed' ORDER BY df.created_at DESC LIMIT 1. The demand_forecast_id column should be indexed on demand_forecast_products for efficient JOIN lookups.

The UI must support accumulating forecasted demand at two levels: (1) reorder time level — total predicted demand over lead_time + review_cycle days for a product×location (since predictions are stored per horizon_weeks, partial-week periods are pro-rated: predicted_qty × (remaining_days / 7)), and (2) arbitrary time ranges — user-selected date ranges for planning purposes. These accumulations are computed on-the-fly in the UI layer and cached in Redis to avoid repeated heavy aggregation queries. Redis also serves as the cache for other frequently-used UI calculations and, optionally, user sessions.

Recommendations & Assortments

Recommendations

erDiagram
    po_recommendation_statuses {
        int id PK
        string name UK "pending | accepted | rejected | ordered"
    }

    po_recommendation_priorities {
        int id PK
        string name UK "critical | high | medium | low"
    }

    po_recommendations {
        int id PK
        int product_id FK
        int supplier_id FK
        int location_id FK
        date recommendation_date
        float recommended_qty
        float predicted_qty
        float order_buffer
        float unit_cost "snapshot — cost assumed at recommendation time"
        int currency_id FK "cw_global"
        float confidence
        string confidence_reasons
        int po_recommendation_priority_id FK
        string priority_reasons
        int po_recommendation_status_id FK
        boolean is_respected "nullable, set by UpliftMonitor"
    }

    to_recommendation_statuses {
        int id PK
        string name UK
    }

    to_recommendation_reasons {
        int id PK
        string name UK
    }

    to_recommendations {
        int id PK
        int product_id FK
        int from_location_id FK
        int to_location_id FK
        date recommendation_date
        float recommended_qty
        int to_recommendation_reason_id FK
        int to_recommendation_status_id FK
        boolean is_respected "nullable, set by UpliftMonitor"
    }

    po_recommendation_statuses ||--o{ po_recommendations : ""
    po_recommendation_priorities ||--o{ po_recommendations : ""
    to_recommendation_statuses ||--o{ to_recommendations : ""
    to_recommendation_reasons ||--o{ to_recommendations : ""

Recommendation Impacts

erDiagram
    purchase_order_recommendation_impact_types {
        int id PK
        string name UK "revenue_uplift | fill_rate_improvement | inventory_optimization | stockout_reduction | overstock_reduction | cost_savings"
    }

    purchase_order_recommendation_impacts {
        int id PK
        int po_recommendation_id FK
        int purchase_order_recommendation_impact_type_id FK
        date measurement_date
        float baseline_value
        float actual_value
        float impact_value "actual - baseline"
        float impact_pct "percentage change"
        int measurement_method_id FK "nullable"
    }

    transfer_order_recommendation_impact_types {
        int id PK
        string name UK "revenue_uplift | fill_rate_improvement | inventory_optimization | stockout_reduction | overstock_reduction | cost_savings"
    }

    transfer_order_recommendation_impacts {
        int id PK
        int to_recommendation_id FK
        int transfer_order_recommendation_impact_type_id FK
        date measurement_date
        float baseline_value
        float actual_value
        float impact_value "actual - baseline"
        float impact_pct "percentage change"
        int measurement_method_id FK "nullable"
    }

    measurement_methods {
        int id PK
        string name UK
    }

    po_recommendations ||--o{ purchase_order_recommendation_impacts : ""
    purchase_order_recommendation_impact_types ||--o{ purchase_order_recommendation_impacts : ""
    measurement_methods o|--o{ purchase_order_recommendation_impacts : ""
    to_recommendations ||--o{ transfer_order_recommendation_impacts : ""
    transfer_order_recommendation_impact_types ||--o{ transfer_order_recommendation_impacts : ""
    measurement_methods o|--o{ transfer_order_recommendation_impacts : ""

Assortments

erDiagram
    assortment_types {
        int id PK
        string name UK "global | local"
    }

    assortment_workflow_statuses {
        int id PK
        string name UK "in_review | confirmed | locked"
    }

    assortment_product_statuses {
        int id PK
        string name UK "STOCK | NOT-STOCK"
    }

    assortments {
        int id PK
        int assortment_type_id FK
        int brand_id FK "nullable"
        int location_id FK "nullable"
        date effective_date
        int assortment_workflow_status_id FK
    }

    assortment_products {
        int id PK
        int assortment_id FK
        int product_id FK
        int cw_product_assortment_status_id FK
        int client_product_assortment_status_id FK
        int assortment_workflow_status_id FK
        float target_stock
        int assortment_zone_id FK
        float ranking_score
        int rank
        boolean is_launch
        boolean is_phase_out
        boolean is_retired
        float phase_out_execution_score "nullable, 0-100"
        string note
        int assortment_reason_id FK
    }

    assortment_product_updates {
        int id PK
        int assortment_product_id FK
        date update_date
        int cw_product_assortment_status_id FK
        int client_product_assortment_status_id FK
        int assortment_workflow_status_id FK
        float target_stock
        int assortment_zone_id FK
        float ranking_score
        int rank
        boolean is_launch
        boolean is_phase_out
        boolean is_retired
        float phase_out_execution_score "nullable, 0-100"
        string note
        int assortment_reason_id FK
    }

    assortment_types ||--o{ assortments : ""
    assortment_workflow_statuses ||--o{ assortments : ""
    assortment_workflow_statuses ||--o{ assortment_products : ""
    assortments ||--o{ assortment_products : ""
    assortment_product_statuses ||--o{ assortment_products : "cw_status"
    assortment_product_statuses ||--o{ assortment_products : "client_status"
    assortment_products ||--o{ assortment_product_updates : "history"

Assortment Lookups & Inventory

erDiagram
    assortment_zones {
        int id PK
        string name UK
    }

    assortment_reasons {
        int id PK
        string name UK "new_launch | overstock_rebalance | phase_out | demand_signal | etc."
    }

    assortment_zones ||--o{ assortment_products : ""
    assortment_reasons ||--o{ assortment_products : ""
    assortment_zones ||--o{ assortment_product_updates : ""
    assortment_reasons ||--o{ assortment_product_updates : ""

Shared status vocabulary: cw_product_assortment_status_id and client_product_assortment_status_id both reference assortment_product_statuses intentionally. CW recommends a status; the client accepts or overrides it using the same binary vocabulary (STOCK, NOT-STOCK). Boolean flags (is_launch, is_phase_out, is_retired) and phase_out_execution_score (float 0–100) provide granular lifecycle detail without complicating the primary decision.

Inventory Strategy

erDiagram
    inventory_strategies {
        int id PK
        string name UK "fmr_abc_quantile | product_level | service_level | newsvendor | seasonal_trend | moving_average | abc_xyz_matrix | dynamic_safety | margin_weighted | forecast_based | downstream_aggregate | periodic_review | base_stock | buffer_min_qty"
        int echelon "1=store | 2=DC | 3=warehouse | 4=central"
    }

    inventory_targets {
        int id PK
        int product_id FK
        int location_id FK
        date snapshot_date
        int inventory_strategy_id FK
        float target_stock
        float reorder_point
        float safety_stock
        float fill_rate_simulated
    }

    inventory_target_params {
        int id PK
        int inventory_target_id FK
        string param_name "e.g. fa_quantile, safety_multiplier, service_level"
        float param_value
    }

    inventory_strategies ||--o{ inventory_targets : ""
    products ||--o{ inventory_targets : ""
    locations ||--o{ inventory_targets : ""
    inventory_targets ||--o{ inventory_target_params : ""

Data Quality & Health

Two-layer validation architecture: The DataQualityGateway (Step 2) catches bad SOURCE data (client's problem) as a hard gate before data enters the production DB. The DataHealthMonitor (Step 4) catches TRANSFORMATION bugs and data drift (our problem) as an advisory layer after data is loaded. Both layers write to the tables below.

erDiagram
    data_quality_run_statuses {
        int id PK
        string name UK "running | passed | failed | error"
    }

    data_quality_check_categories {
        int id PK
        string name UK "completeness | consistency | freshness | statistical_anomaly"
    }

    data_quality_check_severities {
        int id PK
        string name UK "critical | warning"
    }

    data_quality_runs {
        int id PK
        int data_quality_run_status_id FK
        boolean gate_passed
        float overall_score "0-100"
        int critical_count
        int warning_count
        int checks_total
        int checks_passed
        timestamp started_at
        timestamp completed_at
        timestamp created_at
    }

    data_quality_checks {
        int id PK
        int data_quality_run_id FK
        int data_quality_check_category_id FK
        int data_quality_check_severity_id FK
        string endpoint "e.g. products, sales_orders"
        string check_name
        boolean passed
        string expected_value "nullable"
        string actual_value "nullable"
        string detail "nullable, human-readable"
        string fix_instructions "nullable, client-facing"
    }

    data_quality_check_reports {
        int id PK
        string endpoint
        string check_name
        date run_date
        float metric_value
        float baseline "nullable, 30-day mean"
        float deviation_pct "nullable"
        timestamp created_at
    }

    data_health_reports {
        int id PK
        date run_date
        string step_name
        int checks_passed
        int checks_failed
        float quality_score
        jsonb anomalies "nullable"
        timestamp created_at
    }

    data_quality_run_statuses ||--o{ data_quality_runs : ""
    data_quality_runs ||--o{ data_quality_checks : ""
    data_quality_check_categories ||--o{ data_quality_checks : ""
    data_quality_check_severities ||--o{ data_quality_checks : ""

Convention — data_health_reports: This table stores output from Step 4 (DataHealthMonitor), the advisory post-load validation layer. It is grouped here with the quality tables rather than with Behaviors because both layers serve the same domain: data quality and health monitoring. The DataQualityGateway (Step 2) writes to data_quality_runs and data_quality_checks; the DataHealthMonitor (Step 4) writes to data_health_reports. The data_quality_check_reports table accumulates trend data from both layers for historical quality tracking.

Convention — first-run behavior: On the very first pipeline run, no historical baselines exist for statistical anomaly checks. These checks auto-pass with a note "no baseline — first run" in data_quality_checks.detail. Baselines begin accumulating from this run via data_quality_check_reports.

Staging Database

The staging database is a separate lightweight database with simplified tables — just enough structure to run cross-table consistency checks without touching the main production DB. Tables are rebuilt on every gateway run (truncate + insert). No historical state is preserved. Client-specific Quality Mappers normalize raw ERP JSON field names into this common schema.

erDiagram
    staging_products {
        string product_id PK
        string vendor_sku
        string name
        string brand
        string supplier_id "nullable"
        float cost "nullable"
        float price "nullable"
    }

    staging_locations {
        string location_id PK
        string name
        string type "nullable"
        string status "nullable"
    }

    staging_suppliers {
        string supplier_id PK
        string name
    }

    staging_purchase_orders {
        string po_id PK
        string product_id
        string supplier_id "nullable"
        float qty_ordered "nullable"
        float qty_confirmed "nullable"
        float qty_delivered "nullable"
        date date "nullable"
    }

    staging_sales_orders {
        string so_id PK
        string product_id
        string location_id "nullable"
        float qty "nullable"
        float revenue "nullable"
        float unit_price "nullable"
        date date "nullable"
    }

    staging_inventory_balances {
        string product_id
        string location_id
        float qty_available "nullable"
        float qty_blocked "nullable"
        float qty_reserved "nullable"
        float qty_all "nullable"
        date date "nullable"
    }

    staging_products ||--o{ staging_purchase_orders : ""
    staging_products ||--o{ staging_sales_orders : ""
    staging_products ||--o{ staging_inventory_balances : ""
    staging_suppliers ||--o{ staging_purchase_orders : ""
    staging_locations ||--o{ staging_sales_orders : ""
    staging_locations ||--o{ staging_inventory_balances : ""

Convention — staging vs. production schema: Staging tables use string primary keys (raw ERP identifiers) rather than int auto-increment IDs. This avoids FK resolution overhead — the Quality Mapper does minimal transformation. The Production Mapper (Step 3) handles full normalization, FK resolution, deduplication, and type casting into the int-keyed production schema.