드롭쉬핑 공급사 API 연동: 재고 없이 운영하는 이커머스 자동화

이커머스

드롭쉬핑공급사 연동재고 없는 이커머스자동화물류 연동

이 글은 누구를 위한 것인가

  • 재고 없이 이커머스를 운영하고 싶은 팀
  • 여러 공급사와 연동해서 자동으로 주문을 넘기고 싶은 개발자
  • 드롭쉬핑 마진 관리와 가격 동기화를 자동화하려는 팀

들어가며

드롭쉬핑의 핵심은 공급사 재고와 우리 쇼핑몰 재고를 실시간으로 동기화하고, 주문이 들어오면 자동으로 공급사에 전달하는 것이다. 수동으로 하면 품절 주문과 배송 지연이 생긴다.

이 글은 bluefoxdev.kr의 드롭쉬핑 자동화 가이드 를 참고하여 작성했습니다.


1. 드롭쉬핑 아키텍처

[드롭쉬핑 자동화 흐름]

재고 동기화:
  공급사 API (1시간마다 폴링)
    ↓
  재고 변경 감지
    ↓
  우리 쇼핑몰 재고 업데이트
    ↓
  가격 재계산 (원가 + 마진율)

주문 처리:
  고객 주문 → 결제 완료
    ↓
  공급사 자동 발주 (API)
    ↓
  공급사 송장번호 수신
    ↓
  고객에게 배송 추적 정보 전달

[공급사 API 유형]
  REST API: 실시간 조회, 주문 전달 가능
  FTP/SFTP: 일괄 재고 파일 교환
  이메일 자동화: API 없는 공급사 대안
  EDI: 대형 공급사 표준

2. 공급사 연동 시스템

from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class SupplierProduct:
    supplier_id: str
    supplier_sku: str
    name: str
    cost_price: int
    stock_quantity: int
    lead_time_days: int

@dataclass  
class SupplierOrder:
    our_order_id: str
    supplier_order_id: str | None
    status: str
    tracking_number: str | None

class SupplierAdapter(ABC):
    """공급사별 어댑터 패턴"""
    
    @abstractmethod
    async def get_inventory(self) -> list[SupplierProduct]:
        pass
    
    @abstractmethod
    async def place_order(self, order: dict) -> SupplierOrder:
        pass
    
    @abstractmethod
    async def get_order_status(self, supplier_order_id: str) -> SupplierOrder:
        pass

class RestApiSupplierAdapter(SupplierAdapter):
    """REST API 기반 공급사 어댑터"""
    
    def __init__(self, base_url: str, api_key: str, supplier_id: str):
        self.base_url = base_url
        self.api_key = api_key
        self.supplier_id = supplier_id
    
    async def get_inventory(self) -> list[SupplierProduct]:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{self.base_url}/inventory",
                headers={"X-API-Key": self.api_key},
            )
        
        return [
            SupplierProduct(
                supplier_id=self.supplier_id,
                supplier_sku=item["sku"],
                name=item["name"],
                cost_price=int(item["price"]),
                stock_quantity=item["stock"],
                lead_time_days=item.get("lead_time", 3),
            )
            for item in response.json()["products"]
        ]
    
    async def place_order(self, order: dict) -> SupplierOrder:
        payload = {
            "reference": order["our_order_id"],
            "shipping_address": order["delivery_address"],
            "items": [
                {"sku": item["supplier_sku"], "quantity": item["quantity"]}
                for item in order["items"]
            ],
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/orders",
                json=payload,
                headers={"X-API-Key": self.api_key},
            )
        
        data = response.json()
        return SupplierOrder(
            our_order_id=order["our_order_id"],
            supplier_order_id=data["order_id"],
            status="placed",
            tracking_number=None,
        )

async def sync_supplier_inventory(supplier_id: str):
    """공급사 재고 동기화"""
    
    adapter = get_supplier_adapter(supplier_id)
    products = await adapter.get_inventory()
    
    for product in products:
        # 기존 매핑 조회
        mapping = await get_supplier_product_mapping(
            supplier_id, product.supplier_sku
        )
        
        if not mapping:
            continue
        
        our_product_id = mapping["our_product_id"]
        
        # 재고 업데이트
        await update_product_stock(our_product_id, product.stock_quantity)
        
        # 가격 업데이트 (마진 + 배송비 포함)
        margin_rate = mapping.get("margin_rate", 0.30)
        shipping_cost = mapping.get("shipping_cost", 3000)
        our_price = round(product.cost_price * (1 + margin_rate) + shipping_cost)
        
        await update_product_price(our_product_id, our_price)
    
    await log_sync_result(supplier_id, len(products))

async def auto_dropship_order(order_id: str):
    """주문 자동 발주"""
    
    order = await get_order_with_items(order_id)
    
    # 공급사별로 아이템 그룹화
    supplier_groups = {}
    for item in order["items"]:
        mapping = await get_product_supplier_mapping(item["product_id"])
        if not mapping:
            continue
        
        sid = mapping["supplier_id"]
        if sid not in supplier_groups:
            supplier_groups[sid] = []
        supplier_groups[sid].append({
            **item,
            "supplier_sku": mapping["supplier_sku"],
        })
    
    # 각 공급사에 발주
    for supplier_id, items in supplier_groups.items():
        adapter = get_supplier_adapter(supplier_id)
        
        supplier_order = await adapter.place_order({
            "our_order_id": order_id,
            "delivery_address": order["delivery_address"],
            "items": items,
        })
        
        await save_supplier_order(order_id, supplier_id, supplier_order)

마무리

드롭쉬핑 자동화에서 가장 중요한 것은 재고 동기화 주기다. 공급사 재고는 빠르게 변하므로, 최소 1시간마다 동기화해야 "재고 있음"으로 표시된 상품이 품절되는 사태를 막을 수 있다. 재고 10개 이하는 실시간 확인 로직을 추가하는 것이 안전하다.