이 글은 누구를 위한 것인가
- 엑셀로 리포팅하다가 데이터 인프라가 필요해진 팀
- 마케팅·상품팀이 데이터를 직접 분석하기를 원하는 CTO
- BigQuery나 Snowflake를 도입하려는 데이터 엔지니어
들어가며
이커머스 데이터는 세 가지로 나뉜다: 트랜잭션 데이터(주문, 결제), 행동 데이터(클릭, 조회, 장바구니), 외부 데이터(광고비, 날씨). 이를 하나의 웨어하우스에서 JOIN해야 진짜 분석이 된다.
이 글은 bluefoxdev.kr의 이커머스 데이터 인프라 가이드 를 참고하여 작성했습니다.
1. 이커머스 데이터 스택 설계
[Modern Data Stack for E-commerce]
수집 (Ingestion):
트랜잭션 DB → Airbyte/Fivetran → S3 Raw
앱/웹 이벤트 → Segment/Rudderstack → S3 Raw
광고 플랫폼 → Airbyte → S3 Raw
저장 (Storage):
S3 (Raw Zone) → Parquet 변환 → S3 (Processed Zone)
변환 (Transform):
dbt → BigQuery/Snowflake Mart Layer
서빙 (Serving):
BI: Metabase, Looker, Tableau
실시간: Superset, Grafana
ML: Python, dbt ML
[레이어 구조]
Raw → Staging → Intermediate → Mart
Mart 테이블:
mart_orders: 일별 주문 집계
mart_products: 상품 성과
mart_customers: 고객 LTV/RFM
mart_marketing: 채널별 ROAS
2. 이벤트 수집 파이프라인
from dataclasses import dataclass, asdict
from datetime import datetime
import json
@dataclass
class EcommerceEvent:
event_id: str
user_id: str | None
session_id: str
event_type: str # 'page_view', 'product_view', 'add_to_cart', 'purchase'
timestamp: str
properties: dict
class EventCollector:
def track_product_view(self, session_id: str, user_id: str | None, product_id: str):
return EcommerceEvent(
event_id=generate_uuid(),
user_id=user_id,
session_id=session_id,
event_type="product_view",
timestamp=datetime.utcnow().isoformat(),
properties={
"product_id": product_id,
"referrer": get_current_referrer(),
"page_url": get_current_url(),
},
)
def track_add_to_cart(
self, session_id: str, user_id: str,
product_id: str, quantity: int, price: int,
):
return EcommerceEvent(
event_id=generate_uuid(),
user_id=user_id,
session_id=session_id,
event_type="add_to_cart",
timestamp=datetime.utcnow().isoformat(),
properties={
"product_id": product_id,
"quantity": quantity,
"price": price,
"total": quantity * price,
},
)
async def flush_to_s3(self, events: list[EcommerceEvent]):
"""이벤트를 S3에 Parquet으로 저장"""
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import boto3
df = pd.DataFrame([asdict(e) for e in events])
# 날짜별 파티셔닝
date_str = datetime.utcnow().strftime("%Y/%m/%d")
hour_str = datetime.utcnow().strftime("%H")
buffer = pa.BufferOutputStream()
pq.write_table(
pa.Table.from_pandas(df),
buffer,
compression="snappy",
)
s3 = boto3.client("s3")
s3.put_object(
Bucket="ecommerce-datalake",
Key=f"events/year={date_str[:4]}/month={date_str[5:7]}/day={date_str[8:10]}/hour={hour_str}/{generate_uuid()}.parquet",
Body=buffer.getvalue().to_pybytes(),
)
3. dbt 변환 모델
-- models/mart/mart_orders.sql
-- 일별 주문 집계 mart
WITH orders_base AS (
SELECT
o.id AS order_id,
o.user_id,
o.created_at,
DATE(o.created_at) AS order_date,
o.total_amount,
o.status,
o.payment_method,
c.channel AS acquisition_channel
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_order_attributions') }} c
ON o.id = c.order_id AND c.model = 'last_click'
WHERE o.status NOT IN ('cancelled', 'failed')
),
daily_metrics AS (
SELECT
order_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT user_id) AS unique_buyers,
SUM(total_amount) AS gross_revenue,
AVG(total_amount) AS average_order_value,
COUNT(DISTINCT CASE WHEN acquisition_channel IS NOT NULL
THEN user_id END) AS paid_buyer_count
FROM orders_base
GROUP BY 1
)
SELECT
order_date,
order_count,
unique_buyers,
gross_revenue,
ROUND(average_order_value) AS aov,
paid_buyer_count,
ROUND(gross_revenue / NULLIF(order_count, 0)) AS revenue_per_order
FROM daily_metrics
ORDER BY order_date DESC
마무리
이커머스 데이터 스택은 S3(레이크) → BigQuery(웨어하우스) → dbt(변환) → Metabase(BI) 조합으로 월 100만원 이하로 구축 가능하다. 처음에는 주문 데이터와 행동 이벤트 두 개만 연결하고, 분석 요구가 늘면 광고비·반품 데이터를 추가하는 순서로 진행하라.