CatWing Pipeline Design

2. Design Principles

Principle Implementation
Data recoverability & reproducibility S3 raw JSON is immutable, append-only, and dated - the ultimate source of truth. Dual backup: DB snapshots (fast restore) + raw JSON (full rebuild). The DB can be regenerated by replaying all JSON files chronologically.
Idempotent transforms INSERT ... ON CONFLICT DO UPDATE. Re-running Step 3 on the same JSON produces identical DB state. Safe to retry, safe to replay.
Multi-tenancy & client isolation Separate database schemas per client (cw_200, cw_151). Shared reference data in cw_global. No cross-client queries.
Per-client customization Base Transformer / Aggregator / Extractor with client-specific subclass overrides. Each client defines only its delta from the base.
Different input ERP data structures Client-specific transform_raw_data() normalizes arbitrary ERP JSON into the common schema. New ERP = new subclass, no changes to shared code.
Strong cohesion, loose coupling Each phase is a self-contained Lambda with explicit inputs/outputs. Each table has a single responsibility. The _stats schema is a pure derivative - droppable and rebuildable.
Maintainability & easy extensibility New client = new schema + optional subclass overrides. New metric = new stats table. Alembic-managed migrations keep changes versioned.
Data scientists work with Parquet ML pipeline exports features and predictions as Parquet to S3. Data scientists use pandas/notebooks against Parquet - they never query the DB directly. The ML pipeline is organized in three stages: (1) AI/ML Preprocessor reads from the DB and exports feature Parquet, (2) AI/ML Engine trains models from Parquet and writes .pkl/.json artifacts to S3, (3) AI/ML Inference loads models, predicts, and writes forecasts back to the DB.
DB-level domain constraints Lookup/type tables are scoped per entity domain. When two entity types have overlapping but distinct valid values, they get separate lookup tables — even if some values repeat across tables. FK constraints enforce valid-value sets at the DB level, not the application layer.
Audit-proof configuration All UI-managed data (settings, topology, assortments) is append-only and versioned. Every change records who made it, when, and the before/after state via audit columns. Configuration history is never deleted — only superseded by newer rows. Current state = latest row per entity. ERP-sourced entity history is preserved in S3 raw JSON archives, not in the DB — point-in-time ERP state can be reconstructed by replaying JSON up to the target date (see principle 1: Data recoverability).
Explicit naming Table names and column names are always fully qualified — no abbreviations or ambiguous short forms. PKs are always id. FK columns use the singular form of the referenced table name suffixed with _id (e.g., inventory_strategy_id not strategy_id; purchase_order_id not order_id). Role-prefixed FKs keep the full entity name after the prefix (e.g., cw_product_assortment_status_id). Junction (many-to-many cross) tables use the x_ prefix (e.g., x_supplier_products, x_supplier_brands, x_user_roles).
No cyclical data dependencies The FK dependency graph must be a strict DAG — no table may reference another table that directly or transitively references it back. Self-referential FKs (e.g., product_categories.parent_product_category_id) are the sole exception and must enforce acyclicity via application-level validation (a category cannot be its own ancestor). The pipeline execution order (Steps 1–13) must also be acyclic: no step may read from a table that a later step writes to within the same run. Temporal feedback loops between runs (e.g., today's recommendations influence tomorrow's orders which influence next week's behaviors) are expected and acceptable — they are not intra-run cycles. Determinism guarantee: given identical input state (DB snapshot + S3 files), the pipeline must produce identical output regardless of whether previous runs succeeded or failed.
Minimal nullable FKs Every FK is NOT NULL by default. Nullable FKs are explicit, documented exceptions with a stated reason (e.g., self-referential hierarchy roots, optional associations like anonymous walk-in customers). Each nullable FK must be approved in the design document. Current approved list: product_categories.parent_product_category_id (hierarchy root), store_orders.purchase_order_id (direct store orders), assortments.location_id (global assortments), products.brand_id (unknown brands), sales_order_lines.customer_id (anonymous walk-ins).
No polymorphic tables Generic tables that serve multiple entity types via a discriminator column (e.g., a single translations table with entity_type + entity_id) are replaced by per-entity typed tables with proper FK enforcement (e.g., product_translations, supplier_translations). This eliminates orphan rows, enables DB-level referential integrity, and makes the schema self-documenting.
4NF: No bare string enums Every column that holds an enumerable value (status, type, source, priority, reason, method, platform, zone, gender, kind) must be a FK reference to a dedicated lookup table — no bare strings for enumerable values. Bare strings are reserved for truly free-text fields (names, descriptions, notes, SKUs). This eliminates multi-valued dependencies and enables rename-safe, typo-proof value management at the DB level.
Cached columns with documented authority Pipeline-updated cached snapshots on entity tables (e.g., products.cost from x_supplier_products.unit_cost, products.price from pricing_lists) are acceptable for query performance. Each cached column must have a convention note documenting: (a) the authoritative source table, (b) the derivation rule, (c) which pipeline step refreshes it, and (d) staleness detection — either a companion {column}_refreshed_at timestamp on the row, or verification via the pipeline_runs table that the responsible step completed successfully in the current run.
Structured over schemaless When a JSONB column's keys and value types are known and stable, it must be decomposed into a structured child table (e.g., inventory_target_params replaces inventory_targets.strategy_params JSONB). JSONB is reserved for truly dynamic/unpredictable data like client_config.config.
Aggressive DB-level constraints Data integrity is enforced at the database level via constraints — not the application layer. Every natural key gets a UNIQUE constraint (single-column marked UK in ERDs; compound keys documented in convention notes). Combined with NOT NULL defaults, FK constraints, and domain-scoped lookups, the DB rejects invalid data on write. The INSERT ... ON CONFLICT upsert pattern uses these UNIQUE constraints as conflict targets for idempotent transforms. Application-level validation is defence-in-depth, not the primary guard.
Serialized pipeline execution Only one pipeline run may execute at a time per client schema. Enforced by a database advisory lock (or a pipeline_runs table with status = 'running' checked at startup). A second run that starts while one is active must wait or fail fast — never run concurrently. This eliminates write conflicts between overlapping runs and guarantees that each step reads a consistent snapshot produced by the previous step. UI writes to *_settings tables are safe to happen concurrently (append-only rows; the pipeline reads the latest row at step start, not mid-step).
No hard deletes on ERP-sourced entities When an entity (product, supplier, location, order) disappears from the ERP feed, it is soft-deleted (is_active = false or equivalent status FK) — never hard-deleted. FK integrity is preserved, historical data (behaviors, sales, inventory) remains queryable, and the entity can be reactivated if the ERP feed resumes it. Hard deletes are reserved for developer-initiated data corrections with explicit migration scripts, never as part of normal pipeline operation.
Atomic pipeline steps Each pipeline step (Steps 1–13) runs inside a single database transaction. If the step fails mid-execution, all its writes are rolled back — downstream steps never see partial results. Combined with idempotent transforms, this means recovery is always "re-run the failed step from scratch." For bulk steps that exceed practical transaction size (e.g., Step 8 stats aggregation across millions of rows), the step must write to a staging table and swap atomically (ALTER TABLE ... RENAME or truncate-and-insert within a transaction).
Date partitioning for time-series tables Tables that accumulate daily snapshots (product_behaviors, product_location_behaviors, inventory_quantities, all 16 _stats tables) must be partitioned by date using database declarative partitioning (PARTITION BY RANGE (snapshot_date) or period_start). Partition granularity: monthly for behavior/inventory tables, quarterly for stats tables. Old partitions can be detached and archived to S3 without affecting query performance on recent data. Retention policy is configured per table category and enforced by a scheduled maintenance job, not by the pipeline itself.
Dates client-local, timestamps UTC All date columns (order_date, snapshot_date, period_start, birth_date) represent the client's business day in their local timezone — no timezone conversion at query time for daily aggregations. All timestamp columns (created_at, updated_at, refreshed_at) are stored as timestamptz in UTC. The client's timezone is defined in cw_global.client_config and used by the pipeline to determine the current business date at the start of each run. The boundary between "today" and "yesterday" is always the client's local midnight.
Data quality gating Raw data from client ERP systems passes through a hard quality gate (Step 2: DataQualityGateway) before entering the production database. The gateway loads raw S3 JSON into a separate staging database, runs completeness, consistency, freshness, and statistical anomaly checks, and blocks the pipeline on critical failures. Two client-specific mappers per client: a lightweight Quality Mapper (Step 2, raw JSON → staging DB) for validation checks, and a full Production Mapper (Step 3, raw JSON → main DB) for normalization and upsert. Results are stored in the main DB (data_quality_* tables) and surfaced in the UI.