5. Step 3: Transform & Load (Detail)
flowchart LR
subgraph S3["S3: raw-data/{client}/{date}/"]
J_PROD["products.json"]
J_SUP["suppliers.json"]
J_LOC["locations.json"]
J_SO["sales_orders.json"]
J_PO["purchase_orders.json"]
J_RO["receiving_orders.json"]
J_STO["store_orders.json"]
J_TO["transfer_orders.json"]
J_INV["inventory.json"]
J_PL["pricing_lists.json"]
J_STATIC["static_suppliers_list.xlsx
locations.xlsx
supplier_available_products_*.xlsx"]
J_MKT["market_data.json"]
J_GA["analytics.json"]
J_ADS["social_ads.json"]
end
subgraph LAMBDA["Transform Lambda"]
direction TB
PARSE["1. Parse JSON + Excel"]
VALIDATE["2. Validate schema
(Pydantic models)"]
NORM["3. Normalize
• flatten nested
• type-cast
• dedup by PK
• resolve FKs"]
UPSERT["4. Upsert
(idempotent:
INSERT ON CONFLICT)"]
PARSE --> VALIDATE --> NORM --> UPSERT
end
subgraph DB["cw_200 schema"]
T1["products
brands"]
T2["suppliers
x_supplier_brands"]
T3["locations
location_relationships"]
T4["sales_orders
sales_order_lines
customers"]
T5["purchase_orders
purchase_order_lines"]
T6["receiving_orders
receiving_order_lines"]
T7["store_orders
store_order_lines"]
T8["transfer_orders
transfer_order_lines"]
T9["inventory_quantities"]
T10["pricing_lists"]
T11["product_market_observations
category_market_observations"]
T12["product_web_analytics_daily
location_web_analytics_daily"]
T13["product_social_ad_daily
category_social_ad_daily"]
end
J_PROD & J_SUP & J_LOC --> PARSE
J_SO & J_PO & J_RO --> PARSE
J_STO & J_TO & J_INV & J_PL & J_STATIC --> PARSE
J_MKT & J_GA & J_ADS --> PARSE
UPSERT --> T1 & T2 & T3
UPSERT --> T4 & T5 & T6
UPSERT --> T7 & T8 & T9 & T10
UPSERT --> T11 & T12 & T13
style LAMBDA fill:#E3F2FD,stroke:#1976D2
Idempotency: Every transform is re-runnable. INSERT ... ON CONFLICT (primary_key) DO UPDATE ensures replaying the same JSON file produces identical DB state.
Note: Reference and lookup tables (
product_attributes,product_images,product_category_types,product_categories,brands,customers,supplier_available_products, and all domain-specific lookup tables) are also populated during Step 3 (Transform & Load) but omitted from the diagram for readability.
6. Phase 2 Detail (Legacy)
Note: This section predates the 13-step pipeline chronology (§11). For the current pipeline reference, see §11 Steps 5 (BehaviorCalculator) and 8 (Aggregation).
flowchart TB
subgraph SOURCE["cw_200 schema (general)"]
SO["sales_orders + lines"]
PO["purchase_orders + lines"]
INV["inventory_quantities"]
PROD["products"]
LOC["locations"]
end
subgraph ENRICH_STEP["Enrichment Lambda"]
direction TB
E1["Product enrichment
• hotness score
• FMR / ABC / XYZ
• embeddings"]
E2["Feature engineering
• demand estimation
• velocity / trend
• seasonality
• lag features"]
E3["Demand forecasting
• LightGBM classifier
• Newsvendor regressor"]
end
subgraph WRITE_BACK["Write back → cw_200"]
PB["product_behaviors"]
DF["demand_forecast_products"]
end
subgraph AGG_STEP["Aggregation Lambda"]
AGG["Roll up by:
• day
• week (ISO)
• month
• quarter
Per location + global"]
end
subgraph STATS["cw_200_stats schema"]
D_T["daily_inventory_values
daily_sales_values
daily_pending_po_values
daily_expected_demand"]
W_T["weekly_*"]
M_T["monthly_*"]
Q_T["quarterly_*"]
end
SOURCE --> ENRICH_STEP
ENRICH_STEP --> WRITE_BACK
SOURCE --> AGG_STEP
WRITE_BACK --> AGG_STEP
AGG_STEP --> D_T & W_T & M_T & Q_T
style ENRICH_STEP fill:#E8F5E9,stroke:#388E3C
style AGG_STEP fill:#FFF8E1,stroke:#F9A825
style STATS fill:#FFF8E1,stroke:#F9A825