CatWing Pipeline Design

5. Step 3: Transform & Load (Detail)

flowchart LR
    subgraph S3["S3: raw-data/{client}/{date}/"]
        J_PROD["products.json"]
        J_SUP["suppliers.json"]
        J_LOC["locations.json"]
        J_SO["sales_orders.json"]
        J_PO["purchase_orders.json"]
        J_RO["receiving_orders.json"]
        J_STO["store_orders.json"]
        J_TO["transfer_orders.json"]
        J_INV["inventory.json"]
        J_PL["pricing_lists.json"]
        J_STATIC["static_suppliers_list.xlsx
locations.xlsx
supplier_available_products_*.xlsx"] J_MKT["market_data.json"] J_GA["analytics.json"] J_ADS["social_ads.json"] end subgraph LAMBDA["Transform Lambda"] direction TB PARSE["1. Parse JSON + Excel"] VALIDATE["2. Validate schema
(Pydantic models)"] NORM["3. Normalize
• flatten nested
• type-cast
• dedup by PK
• resolve FKs"] UPSERT["4. Upsert
(idempotent:
INSERT ON CONFLICT)"] PARSE --> VALIDATE --> NORM --> UPSERT end subgraph DB["cw_200 schema"] T1["products
brands"] T2["suppliers
x_supplier_brands"] T3["locations
location_relationships"] T4["sales_orders
sales_order_lines
customers"] T5["purchase_orders
purchase_order_lines"] T6["receiving_orders
receiving_order_lines"] T7["store_orders
store_order_lines"] T8["transfer_orders
transfer_order_lines"] T9["inventory_quantities"] T10["pricing_lists"] T11["product_market_observations
category_market_observations"] T12["product_web_analytics_daily
location_web_analytics_daily"] T13["product_social_ad_daily
category_social_ad_daily"] end J_PROD & J_SUP & J_LOC --> PARSE J_SO & J_PO & J_RO --> PARSE J_STO & J_TO & J_INV & J_PL & J_STATIC --> PARSE J_MKT & J_GA & J_ADS --> PARSE UPSERT --> T1 & T2 & T3 UPSERT --> T4 & T5 & T6 UPSERT --> T7 & T8 & T9 & T10 UPSERT --> T11 & T12 & T13 style LAMBDA fill:#E3F2FD,stroke:#1976D2

Idempotency: Every transform is re-runnable. INSERT ... ON CONFLICT (primary_key) DO UPDATE ensures replaying the same JSON file produces identical DB state.

Note: Reference and lookup tables (product_attributes, product_images, product_category_types, product_categories, brands, customers, supplier_available_products, and all domain-specific lookup tables) are also populated during Step 3 (Transform & Load) but omitted from the diagram for readability.


6. Phase 2 Detail (Legacy)

Note: This section predates the 13-step pipeline chronology (§11). For the current pipeline reference, see §11 Steps 5 (BehaviorCalculator) and 8 (Aggregation).

flowchart TB
    subgraph SOURCE["cw_200 schema (general)"]
        SO["sales_orders + lines"]
        PO["purchase_orders + lines"]
        INV["inventory_quantities"]
        PROD["products"]
        LOC["locations"]
    end

    subgraph ENRICH_STEP["Enrichment Lambda"]
        direction TB
        E1["Product enrichment
• hotness score
• FMR / ABC / XYZ
• embeddings"] E2["Feature engineering
• demand estimation
• velocity / trend
• seasonality
• lag features"] E3["Demand forecasting
• LightGBM classifier
• Newsvendor regressor"] end subgraph WRITE_BACK["Write back → cw_200"] PB["product_behaviors"] DF["demand_forecast_products"] end subgraph AGG_STEP["Aggregation Lambda"] AGG["Roll up by:
• day
• week (ISO)
• month
• quarter

Per location + global"] end subgraph STATS["cw_200_stats schema"] D_T["daily_inventory_values
daily_sales_values
daily_pending_po_values
daily_expected_demand"] W_T["weekly_*"] M_T["monthly_*"] Q_T["quarterly_*"] end SOURCE --> ENRICH_STEP ENRICH_STEP --> WRITE_BACK SOURCE --> AGG_STEP WRITE_BACK --> AGG_STEP AGG_STEP --> D_T & W_T & M_T & Q_T style ENRICH_STEP fill:#E8F5E9,stroke:#388E3C style AGG_STEP fill:#FFF8E1,stroke:#F9A825 style STATS fill:#FFF8E1,stroke:#F9A825