이 글은 누구를 위한 것인가
- 스마트스토어·쿠팡·자사몰 등 여러 채널에서 같은 상품을 팔다가 과매도가 생긴 팀
- 채널별 재고를 엑셀로 수동 관리하다 지친 운영팀
- 멀티채널 주문을 하나의 시스템에서 처리하고 싶은 팀
들어가며
한 창고에 재고 100개인데, 스마트스토어에 100개, 쿠팡에 100개로 각각 등록하면 과매도가 생긴다. 멀티채널 재고 동기화는 총 재고를 채널별로 분배하고, 주문마다 실시간으로 차감하는 구조가 핵심이다.
이 글은 bluefoxdev.kr의 멀티채널 이커머스 운영 가이드 를 참고하여 작성했습니다.
1. 재고 분배 전략
[멀티채널 재고 분배 방법]
방법 1: 채널별 고정 분배
총 재고 100개
스마트스토어: 40개
쿠팡: 40개
자사몰: 20개
단점: 채널별 판매 속도 차이로 낭비 발생
방법 2: 중앙 집중식 (권장)
총 재고 풀: 100개
채널에는 실시간 재고 표시
주문 시 중앙 풀에서 차감
장점: 과매도 없음, 재고 효율 최대
방법 3: 버퍼 설정
실제 재고: 100개
채널 표시 재고: 실제 - 버퍼(10개)
과매도 안전장치로 사용
[채널별 가격 정책]
자사몰: 정가
스마트스토어: 정가 (수수료 낮음)
쿠팡: 경쟁가 (알고리즘 상위 노출)
11번가: 프로모션 가격
→ 최저가 정책 채널은 자사몰 정가 이하 금지 설정
2. 중앙 재고 동기화 시스템
import asyncio
from typing import Protocol
class ChannelAdapter(Protocol):
"""채널별 API 어댑터 인터페이스"""
async def update_stock(self, channel_product_id: str, quantity: int) -> bool:
...
async def get_new_orders(self) -> list[dict]:
...
async def confirm_order(self, channel_order_id: str) -> bool:
...
class InventoryOrchestrator:
"""중앙 재고 조율기"""
def __init__(self, channels: dict[str, ChannelAdapter]):
self.channels = channels
async def deduct_and_sync(self, product_id: str, quantity: int, source_channel: str):
"""주문 발생 시 중앙 재고 차감 후 모든 채널 동기화"""
async with db.transaction():
# 원자적 재고 차감 (Race Condition 방지)
result = await db.fetchrow("""
UPDATE products
SET stock_quantity = stock_quantity - $1
WHERE id = $2
AND stock_quantity >= $1
RETURNING stock_quantity
""", quantity, product_id)
if not result:
raise ValueError("재고 부족")
new_stock = result["stock_quantity"]
# 모든 채널에 새 재고 동기화 (비동기)
sync_tasks = []
channel_products = await get_channel_product_mappings(product_id)
for mapping in channel_products:
if mapping["channel"] == source_channel:
continue # 주문 발생 채널은 자동 처리됨
adapter = self.channels[mapping["channel"]]
sync_tasks.append(
adapter.update_stock(
mapping["channel_product_id"],
max(0, new_stock - mapping.get("buffer", 0)),
)
)
results = await asyncio.gather(*sync_tasks, return_exceptions=True)
# 실패한 채널 재시도 큐에 추가
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_channel = channel_products[i]["channel"]
await queue_retry_sync(product_id, new_stock, failed_channel)
return new_stock
async def pull_all_channel_orders(self):
"""모든 채널 주문 수집 (10분마다 실행)"""
all_orders = []
for channel_name, adapter in self.channels.items():
try:
orders = await adapter.get_new_orders()
for order in orders:
order["source_channel"] = channel_name
all_orders.extend(orders)
except Exception as e:
await alert_channel_error(channel_name, str(e))
# 통합 주문 처리
for order in all_orders:
await process_unified_order(order)
return len(all_orders)
class SmartStoreAdapter:
"""네이버 스마트스토어 API 어댑터"""
BASE_URL = "https://api.commerce.naver.com/external"
def __init__(self, client_id: str, client_secret: str):
self.client_id = client_id
self.client_secret = client_secret
async def update_stock(self, channel_product_id: str, quantity: int) -> bool:
token = await self.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.put(
f"{self.BASE_URL}/v2/products/{channel_product_id}/stocks",
json={"stockQuantity": quantity},
headers={"Authorization": f"Bearer {token}"},
)
return response.status_code == 200
async def get_new_orders(self) -> list[dict]:
token = await self.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/v1/pay-order/seller/orders/new",
headers={"Authorization": f"Bearer {token}"},
)
return response.json().get("data", {}).get("contents", [])
마무리
멀티채널 재고 동기화의 핵심 원칙은 "중앙 풀에서 관리, 채널은 UI"다. 채널에 각각 재고를 등록하는 것이 아니라, 주문이 들어올 때마다 중앙 재고에서 차감하고 나머지 채널을 업데이트한다. 채널 API 장애 시를 위한 버퍼(실제 재고 - 10%)를 설정하면 과매도 리스크를 더 줄일 수 있다.