CatWing Pipeline Design

11. Pipeline Chronology — 13 Steps

Master Flowchart

flowchart TB
    subgraph S1["Step 1: Data Ingestion"]
        S1N["ERP APIs, Static Excel,
Market Data, Web Analytics,
Social Ads, Weather, Price Intel"] end subgraph S2["Step 2: DataQualityGateway"] S2N["Completeness, Consistency,
Freshness, Statistical Anomaly
→ staging DB → gate decision"] end subgraph S3["Step 3: Transform & Load"] S3N["Parse, Validate,
Normalize, Upsert
(idempotent)"] end subgraph S4["Step 4: DataHealthMonitor"] S4N["Post-load sanity checks,
extreme data shift detection,
data quality warnings"] end subgraph S5["Step 5: BehaviorCalculator"] S5N["Product / Supplier /
Location / Category
enrichment,
Phase-out scoring"] end subgraph S6["Step 6: Assortment Optimizer"] S6N["Global (tier+brand) →
Local (store-level) →
Workflow"] end subgraph S7["Step 7: Demand Estimation"] S7N["product × location × horizon,
OOS adjustment"] end subgraph S8["Step 8: Aggregation"] S8N["daily → weekly →
monthly → quarterly,
base currency conversion"] end subgraph S9["Step 9: AI/ML Preprocessor
(Feature Engineering)"] S9N["DB → 40 features ×
10 groups → S3 Parquet"] end subgraph S10["Step 10: AI/ML Engine
(Model Training)"] S10N["Parquet → LightGBM
classifier + regressor
→ .pkl/.json in S3"] end subgraph S11["Step 11: Inventory Strategy
Optimization (MEIO)"] S11N["3 echelons:
Stores (9) / DC (planned) /
Warehouse (5),
simulation backtesting"] end subgraph S12["Step 12: AI/ML Inference
(Recommender)"] S12N["Load models → predict
→ confidence scoring
→ PO/TO → DB"] end subgraph S13["Step 13: UpliftMonitor"] S13N["Recommendation respect tracking,
revenue/fill-rate/inventory
impact measurement"] end S1 --> S2 S2 -->|"gate passed"| S3 S2 -.->|"HALT on
critical failure"| S2 S3 --> S4 S4 --> S5 S5 --> S6 S5 --> S7 S7 --> S8 & S9 S8 --> S9 S9 --> S10 S10 --> S11 S10 --> S12 S11 --> S12 S12 -.->|"periodic"| S13 style S1 fill:#E8F5E9,stroke:#388E3C style S2 fill:#FFCDD2,stroke:#C62828 style S3 fill:#E3F2FD,stroke:#1976D2 style S4 fill:#FFECB3,stroke:#FF8F00 style S5 fill:#F3E5F5,stroke:#7B1FA2 style S6 fill:#FFF8E1,stroke:#F9A825 style S7 fill:#FFF3E0,stroke:#F57C00 style S8 fill:#FFF8E1,stroke:#F9A825 style S9 fill:#E1BEE7,stroke:#7B1FA2 style S10 fill:#E1BEE7,stroke:#7B1FA2 style S11 fill:#FCE4EC,stroke:#C62828 style S12 fill:#E1BEE7,stroke:#7B1FA2 style S13 fill:#FFECB3,stroke:#FF8F00

Step 1 — Data Ingestion

Source Type Frequency Status Storage
Client ERP APIs JSON Daily Implemented s3://raw-data/{client}/{date}/*.json
Static Excel XLSX Manual upload Implemented s3://client-data/{client}/
Open-Meteo Weather (archive + forecast) Daily Implemented s3://raw-data/weather/{city}/{date}/
Kolkostruva Price intelligence Daily Implemented s3://raw-data/others/prices/{date}/
Nielsen / JFK Market data Periodic Planned s3://raw-data/{client}/{date}/market/
Google Analytics Web analytics Daily Planned s3://raw-data/{client}/{date}/analytics/
Facebook / Instagram Ads Social ad metrics Daily Planned s3://raw-data/{client}/{date}/ads/

All raw data is immutable and append-only in S3 — the ultimate source of truth (see §8).

Output: S3 raw-data/ directory (JSON, CSV, Excel).

Step 2 — DataQualityGateway

A hard gate between Data Ingestion (Step 1) and Transform & Load (Step 3). Validates raw source data quality before it enters the production database. If the client's ERP data is corrupt, incomplete, or stale, the gateway blocks the pipeline and notifies the client with specific fix instructions.

Architecture — staging DB + two mappers:

The gateway uses a separate lightweight staging database with simplified tables (staging_products, staging_locations, staging_suppliers, staging_purchase_orders, staging_sales_orders, staging_inventory_balances). This enables cross-table consistency checks without touching the main production DB. Tables are rebuilt on every gateway run (truncate + insert).

Two client-specific mappers per client:

Mapper Step Input Output Purpose
Quality Mapper 2 (DQG) Raw S3 JSON Staging DB (simplified tables) Minimal transform for quality checks
Production Mapper 3 (Transform & Load) Raw S3 JSON Main DB (full schema) Full normalization, FK resolution, upsert

Both mappers are client-specific (e.g., CW200_QualityMapper, CW200_Transformer). The Quality Mapper does lightweight parsing — just enough structure to run consistency checks. The Production Mapper does the full transform with Pydantic validation, deduplication, and FK resolution.

Execution flow:

  1. Load raw JSON → staging DB (via Quality Mapper)
  2. Run completeness checks (on raw JSON files)
  3. Run consistency checks (on staging DB — cross-table joins)
  4. Run freshness checks (on raw JSON + staging DB)
  5. Run statistical anomaly checks (on staging DB — requires full picture, runs last)
  6. Write all results to main DB (data_quality_runs, data_quality_checks)
  7. Gate decision: any CRITICAL failure → HALT; all critical pass → proceed to Step 3

Completeness checks (5):

Check Severity Description
Required endpoints present CRITICAL All mandatory JSON files exist in S3 for this run date
Required fields present CRITICAL Each record has all non-nullable schema fields
Null rate threshold WARNING % NULL per field exceeds configured threshold
Missing date ranges WARNING Gaps in daily time-series within lookback window
Empty endpoint CRITICAL File exists but contains zero records

Consistency checks (7):

Check Severity Description
Cross-endpoint product existence CRITICAL All product_ids in SO/PO/inventory exist in products
Cross-endpoint location existence CRITICAL All location_ids in inventory/SO exist in locations
Cross-endpoint supplier existence WARNING supplier_ids in PO exist in suppliers
PO lifecycle quantities WARNING qty_ordered ≥ qty_confirmed ≥ qty_delivered
RO-to-PO line matching WARNING RO lines reference valid PO lines
SO revenue reasonableness WARNING revenue ≈ qty × unit_price (within 5% tolerance)
Inventory quantity consistency WARNING qty_all ≥ qty_available + qty_blocked + qty_reserved

Note: Field names in raw JSON are client-specific (e.g., CW-200 uses ERP.net Bulgarian names). The Quality Mapper normalizes them to the staging schema's common field names.

Freshness checks (3):

Check Severity Description
File timestamp staleness CRITICAL Most recent records within configurable window of run date
Missing recent dates WARNING Time-series endpoints have data for last N business days
Endpoint age consistency WARNING All endpoints from approximately same extraction time

Statistical anomaly checks (4 — run last):

Check Severity Description
Row count vs baseline CRITICAL (>3σ) / WARNING (>2σ) Today's count vs 30-day mean per endpoint
Volume shift detection WARNING Aggregate quantities (SO/PO/inventory) vs 30-day mean
Price/cost distribution shift WARNING Median shift >20% or new extreme outliers
New/disappeared entities WARNING >10% new or >5% disappeared products/locations/suppliers

Gate logic:

if any CRITICAL check fails:
    gate_passed = False
    pipeline HALTED
    client receives detailed instructions:
      - Which checks failed (with expected vs actual values)
      - What to fix in their ERP data
      - How to trigger a re-check after fixing
else:
    gate_passed = True
    pipeline continues to Step 3 (warnings logged)

First-run behavior: On the very first run, no historical baselines exist. Statistical anomaly checks auto-pass with a note "no baseline — first run". Baselines begin accumulating from this run.

Two-layer validation architecture: The DataQualityGateway (Step 2) catches bad SOURCE data (client's problem) as a hard gate. The DataHealthMonitor (Step 4) catches TRANSFORMATION bugs and data drift (our problem) as an advisory layer. See §9c "Data Quality & Health" for the shared ERD.

Output: data_quality_runs (per-run gate decision, scores, counts), data_quality_checks (per-check results with fix instructions), data_quality_check_reports (trend data for historical tracking). Pipeline proceeds to Step 3 only if gate_passed = True.

Step 3 — Transform & Load

See §5 for the detailed Step 3 flowchart.

Output: DB core tables (§9a, §9b), plus external data tables (product_market_observations, category_market_observations, product_web_analytics_daily, location_web_analytics_daily, product_social_ad_daily, category_social_ad_daily).

Step 4 — DataHealthMonitor

Runs immediately after Transform & Load (Step 3). Performs automated data quality validation on the freshly loaded data in the production database. Advisory only — raises warnings but does not block pipeline execution.

Two-layer architecture: This is the second validation layer. The DataQualityGateway (Step 2) validates raw SOURCE data before it enters the DB (hard gate, client-facing). The DataHealthMonitor validates TRANSFORMED data after loading (advisory, internal). Together they catch both client data problems and transformation bugs.

Sanity checks: - Row count validation: compare today's load against historical daily averages per table (e.g., products, sales_orders, inventory_quantities). Flag if count deviates > 2σ from the 30-day mean. - Null rate monitoring: track % NULL per column per table. Flag if null rate exceeds historical baseline + threshold. - Duplicate detection: verify PK uniqueness post-upsert. Flag unexpected duplicate patterns. - Referential integrity: validate all FKs resolve (e.g., every sales_order_lines.product_id exists in products).

Extreme data shift detection: - Revenue distribution shift: compare today's per-location revenue totals against trailing 30-day distribution. Flag locations with > 3σ deviation. - Inventory level shifts: detect sudden bulk increases or decreases in inventory_quantities that may indicate data corruption or system migration artifacts. - Price anomalies: flag products where cost or price changed > 50% from previous load. - New/disappeared entities: report products, locations, or suppliers that appeared or vanished since the last load.

Data quality scoring: - Compute per-table and overall data quality scores (0–100) based on completeness, consistency, and freshness. - Track scores over time to detect gradual degradation.

Output: Warning log entries + data_health_reports table (id PK, run_date, step_name, checks_passed, checks_failed, quality_score, anomalies jsonb nullable, created_at) (per-run summary of checks, scores, and flagged anomalies). Pipeline continues to Step 5 regardless of warnings. Critical failures (e.g., zero rows loaded, FK integrity broken) may optionally halt execution via configurable thresholds.

Step 5 — BehaviorCalculator

Computes enrichment metrics and writes to behavior tables. Four sub-calculators:

Product enrichmentproduct_behaviors: - Hotness score (5 components: velocity ×0.25, trend ×0.15, shelf ×0.15, margin ×0.15, volume ×0.30) - FMR/ABC/XYZ segmentation (FMR: Fast/Medium/Rare by % days with sales; ABC: 80/95/100 cumulative revenue; XYZ: CV ≤0.5/≤1.0/>1.0) - Seasonality strength and shelf time - Supplier availability scoring (formulas TBD — will be defined during implementation): - supplier_availability_score [0–100] — algorithmic assessment of supplier availability for this product - is_supplier_availability_confirmed — user confirms availability status (set via UI) - Phase-out scoring (formulas TBD — will be defined during implementation based on assortment workflow data): - phase_out_recommendation_score [0–100] — algorithmic recommendation to phase out (based on velocity trend, assortment status, supplier availability) - is_phase_out_recommendation_confirmed — user confirms recommendation accepted (set via UI) - phase_out_execution_score [0–100] — how well the phase-out is being executed (discounts, margins, inventory reduction rate) - is_phase_out_execution_confirmed — user confirms execution started (set via UI) - phase_out_efficiency_score [0–100] — inventory drawdown efficiency (calculated only for confirmed NOT-STOCK products)

Supplier enrichmentsupplier_behaviors: - Average/P90 lead time, fill rate, on-time rate, open PO count

Location enrichmentlocation_behaviors: - Fill rate, stockout rate, avg inventory value, revenue, active product count

Category enrichmentcategory_behaviors: - Product counts (total/active), revenue, inventory value, avg hotness, avg velocity, growth rate

Output: product_behaviors (global classifications), supplier_behaviors, location_behaviors, category_behaviors. Note: product_location_behaviors classifications (abc/fmr/freq/xyz/hotness) are computed at location level in Step 7 — they are independent calculations, not copies from global product_behaviors.

Step 6 — Assortment Optimizer

Two-stage optimization:

  1. Global assortment — per price tier (Luxury, Premium, Mid, Budget) and per brand. Decides keep vs. phase-out at portfolio level.
  2. Local assortment — applies global decisions to store-level data. Computes store-specific actions (STOCK/NOT-STOCK with boolean flags: is_launch, is_phase_out, is_retired, is_on_demand).

Workflow progression: in_reviewconfirmedlocked (tracked via assortment_workflow_statuses).

Output: assortments, assortment_products (with assortment_product_updates audit trail)

Step 7 — Demand Estimation

Generates product × location × horizon demand records.

Core rule: - If inv_qty > 0 AND so_qty = 0demand_qty = 0 (product is stocked but genuinely not selling) - If inv_qty = 0 (OOS) → estimate missed demand via cross-location global demand signals: 1. Compute global_demand_qty = global_so_qty / global_periods_in_stock 2. Compute global_delta = trend from previous period (clipped −0.5 to +1.0) 3. Apply: demand_qty = prev_demand × (1 + global_delta)

Demand financials: demand_revenue = demand_qty × unit_price, demand_cost = demand_qty × unit_cost

Stockout computation: stockout_qty = max(0, demand_qty − so_qty)

Output: product_location_behaviors — per product×location demand estimation stored alongside denormalized global behavior fields (fmr, abc, trend, etc.). Historical demand corrections only — not forward-looking predictions. The statistical approach may be replaced with ML in the future.

Step 8 — Aggregation

Roll up time-series data across 4 granularities × 4 metrics = 16 stats tables:

Granularity Tables
Daily daily_inventory_values, daily_sales_values, daily_pending_po_values, daily_expected_demand
Weekly (ISO) weekly_*
Monthly monthly_*
Quarterly quarterly_*

Each table aggregated per location + global (NULL location = global).

All monetary values converted to base currency (see §10 convention).

See §6 for the detailed aggregation flowchart. (Note: §6 title updated to "Phase 2 Detail (Legacy)" — the 13-step chronology is now the primary reference.)

Output: cw_200_stats schema (16 tables)

Step 9 — AI/ML Preprocessor (Feature Engineering)

The AI/ML Preprocessor reads directly from the database and exports feature Parquet files to S3. This is the first of three ML stages — it bridges the relational world (DB) with the ML world (Parquet).

Builds the ML feature matrix. 40 features across 10 groups:

# Group Features
1 Lag features demand_lag1, demand_lag2
2 Horizon lags lag_horizon1, lag_horizon2, lag_horizon3, lag_horizon4
3 Rolling statistics demand_roll4_mean, demand_roll8_mean, demand_roll8_std, demand_roll8_max
4 Rolling horizon stats demand_roll_horizon_mean, demand_roll_horizon_std, demand_roll_horizon_max, zero_count_horizon
5 Category trend brand_trend8w
6 Demand recency weeks_since_last_sale, demand_recency_score, demand_consistency_8w, demand_velocity_ratio, has_recent_demand_4w, has_recent_demand_8w, has_recent_demand_12w
7 Category demand rates category_demand_rate, collection_demand_rate, brand_demand_rate
8 Product economics horizon_time, price, price_vs_category, price_vs_brand, unit_cost, shortage_cost, critical_ratio, unit_cost_vs_category, cost_criticality, margin_ratio
9 Intermittent demand adi (Avg Demand Interval), cv2 (CV²)
10 Temporal week_of_year, week_sin, cat_month_ratio

Classifier vs. Regressor split: - Classifier uses all 40 features - Regressor excludes has_recent_demand_4w/8w/12w and lag_horizon1 (36 features) — binary flags hurt regression precision; horizon1 lag causes overfitting

Output: s3://ml-output/{client}/{date}/features.parquet

Step 10 — AI/ML Engine (Model Training)

The AI/ML Engine reads Parquet feature files produced by the Preprocessor (Step 9) and trains all demand prediction models. Trained artifacts (.pkl models + .json configs) are written back to S3.

First echelon (stores) demand models: - Classifier (LGBMClassifier): P(demand > 0), 40 features, threshold = 0.60, F1 = 95.5% - Regressor (LightGBM with custom newsvendor loss): E[demand | demand > 0], 36 features, MAE = 0.63 - Regressor excludes has_recent_demand_4w/8w/12w and lag_horizon1 (binary flags hurt regression; horizon1 causes overfitting) - Temporal split: ~80% train / 20% test (split at ~2025-08-04) - Per FMR_FREQ segment critical ratios determine quantile selection

Third echelon (central warehouse) demand model: - LightGBM quantile regression (q50, q75, q90, q95, q99) on monthly product×location data - Features: lag, rolling mean/std, seasonality, trend, downstream demand

Output: Trained model artifacts (.pkl files) and configuration (.json), stored in s3://ml-output/{client}/{date}/models/

Step 11 — Inventory Strategy Optimization (MEIO)

Uses trained models from Step 10 for simulation backtesting to select optimal inventory strategy per store/warehouse.

Architecture — 3 echelons:

Echelon Level Status # Strategies Review Cycle Lead Time Target Fill Rate
First Stores Implemented 9 Mon/Thu (3.5d) 2d (DC→store) ≥ 85%
Second Distribution Centers Planned TBD TBD TBD TBD
Third Central Warehouse Implemented 5 Mon/Wed/Fri (2.3d) 1–8d (supplier) ≥ 95%

First Echelon — 9 Store Strategies

# Strategy Formula Tunable Parameters
1 FMR-ABC Quantile target = μ×7 + Z_segment × σ × √7 fa_quantile, fb_quantile, fc_quantile ∈ [0.70–0.95]
2 Product-Level target = (L+R)×μ + safety_multiplier × σ × √(L+R) safety_multiplier ∈ {1.0, 1.5, 2.0, 2.5}
3 Service Level target = (L+R)×μ + Z_sl × σ × √(L+R) service_level ∈ {0.85, 0.90, 0.92, 0.95, 0.97}
4 Newsvendor target = Z_cr × σ × √(L+R) + (L+R)×μ holding_cost_rate ∈ [0.01–0.08]
5 Seasonal+Trend target = base_days × μ × seasonal_idx × trend seasonal_weight × base_multiplier (3×3)
6 Moving Average target = MA_4wk × 7 × safety_factor ma_window ∈ [2–8w], safety_factor ∈ [1.1–1.5]
7 ABC-XYZ Matrix target = μ×7 × multiplier(abc, xyz) 3×3 matrix multipliers ∈ [1.1–2.5]
8 Dynamic Safety target = (L+R)×μ + 2.0 × recent_σ × √(L+R) safety_multiplier ∈ {1.5, 2.0, 2.5}
9 Margin-Weighted target = μ×7 × (1 + margin_ratio × weight) base_days ∈ {5,7,9}, margin_weight ∈ {0.5,1.0,1.5}

Where: L = lead time, R = review cycle, μ = mean daily demand, σ = std dev of demand.

Second Echelon — Distribution Centers (Planned)

Regional inventory pools between stores and central warehouse. Not yet implemented. Will optimize DC-to-store replenishment using downstream demand aggregation.

Third Echelon — 5 Central Warehouse Strategies

# Strategy Formula
1 Forecast-Based target = Q50×(L+R) + (Q95−Q50) × √(L+R)
2 Downstream Aggregate target = agg_demand×(L+R) + diversification × √(n_stores)
3 Periodic Review (R,S) target = Q95_daily × (L+R) via Normal CDF
4 Base-Stock target = Q90/30 × (L+R) + 2×σ × √(L+R)
5 Buffer Min-qty target = peak_downstream × transfer_LT × safety

Post-processing caps: Sparse demand (demand_ratio < 0.50), lumpy demand (CV > 1.5 AND Q95 < 3.0), demand inflation (so_qty < 10% of demand_qty).

Note: These strategies are labeled "DC strategies" in the current codebase (cw_201/cw_202). This is incorrect — they operate at the central warehouse level. The codebase naming will be corrected as part of the MEIO migration.

Strategy Selection Process

Per-store/warehouse grid search over strategy × parameter combinations: 1. Run inventory simulation backtesting using demand predictions from trained models 2. Evaluate fill rate vs. inventory efficiency trade-off 3. Select optimal strategy per location

Output: inventory_strategies (lookup) + inventory_targets (per product×location×snapshot: winning strategy, target stock, reorder point, safety stock) + inventory_target_params (per target: named parameter key-value pairs replacing the former strategy_params JSONB column)

Step 12 — AI/ML Inference (Recommender)

The AI/ML Inference stage loads trained models from S3 and applies them for forward-looking demand prediction, then generates PO/TO recommendations using inventory targets from Step 11. This is the final ML stage — predictions and recommendations are written back to the DB.

Inference: - Apply classifier → P(demand > 0) per product×location×horizon - Apply regressor → E[demand | demand > 0] - Create a new demand_forecasts record (one per daily run), then write per-product×location predictions to demand_forecast_products (prob_demand, predicted_qty, critical_ratio, confidence)

PO Recommendations — 3-tier quantile optimization:

Tier Criteria Method
1 ≥ 30 days sales history Per-product backtest: select quantile (q50–q99) minimizing overage + underage cost
2 Insufficient history Newsvendor analytical: CR = Cu/(Cu+Co), map CR to quantile
3 Sparse products Segment pooling: (FMR, ABC) segment median cost profile

3-layer demand caps: 1. Demand cap: max(1, hist_monthly_rate × horizon × 5) 2. Network sufficiency: tighten to 1× historical when network inventory > target 3. Sparse/lumpy: cap when demand_ratio < 0.50 or CV > 1.5

Core PO formula:

network_inv = all_dc_hybrid_inv + store_excess
PO = max(0, ⌈demand_hz + dc_target − network_inv⌉)
target_floor = max(0, store_target + dc_target − total_network_inv)
PO = max(PO, target_floor)

Confidence scoring (5 components):

confidence = s1×0.25 + s2×0.30 + s3×0.15 + s4×0.15 + s5×0.15

TO Recommendations — inter-location transfer suggestions based on inventory imbalance.

Output: demand_forecasts + demand_forecast_products, po_recommendations, to_recommendations

Demand accumulation (UI): Forecasted demand must be accumulable at (1) reorder time level — summing predicted_qty over lead_time + review_cycle horizons for a given product×location (partial weeks pro-rated: predicted_qty × remaining_days / 7), and (2) arbitrary time ranges — user-selected date windows for planning. These aggregations are computed in the UI application layer and cached in Redis (see §3) to avoid repeated heavy queries against demand_forecast_products.

Step 13 — UpliftMonitor

Measures the actual added value that purchase order and transfer order recommendations bring. Runs periodically (e.g., weekly/monthly) after enough time has elapsed to observe outcomes. Not part of the daily pipeline — triggered on a separate schedule.

Recommendation respect tracking: - Compare po_recommendations.recommended_qty against actual purchase_order_lines.qty_requested for matching product×supplier×location within a configurable time window. - Set po_recommendations.is_respected = true when the actual order quantity is within a tolerance band (e.g., ±20%) of the recommended quantity. - Set po_recommendations.is_respected = false when the recommendation was ignored (no matching PO) or the actual quantity diverges significantly. - Same logic for to_recommendations vs. actual transfer_order_lines.

Impact measurement — PO recommendations: For each recommendation where enough post-recommendation data exists, compute:

Impact Type Measurement
revenue_uplift Incremental revenue attributable to following (or not following) the recommendation, compared to a baseline (e.g., previous period or control group)
fill_rate_improvement Change in product×location fill rate in the measurement window vs. the pre-recommendation baseline
inventory_optimization Reduction in excess inventory value or days-of-stock, measured as the delta between target and actual inventory levels
stockout_reduction Decrease in stockout events or stockout days for the recommended product×location
overstock_reduction Decrease in overstock quantity (inventory above target) for the recommended product×location
cost_savings Holding cost or markdown cost avoided by following the recommendation

Each impact is stored as a row in purchase_order_recommendation_impacts with baseline_value, actual_value, impact_value (actual − baseline), and impact_pct.

Impact measurement — TO recommendations: Same impact types, stored in transfer_order_recommendation_impacts. Measures the value of inter-location transfers (e.g., revenue recovered by moving stock from overstocked to understocked locations).

Aggregation: - Per-recommendation impacts roll up to per-location, per-supplier, and global summaries. - Respected vs. not-respected cohort comparison: what would have happened if all recommendations had been followed? - Trend over time: is the system's recommendation quality improving?

Output: purchase_order_recommendation_impacts, transfer_order_recommendation_impacts, is_respected flags on po_recommendations and to_recommendations