×

api开发 电商平台 数据挖掘

构建高可用 1688 商品数据采集服务:API 接口设计与优化实践

admin admin 发表于2025-09-23 17:37:04 浏览49 评论0

抢沙发发表评论

在电商数据分析、价格监控和竞品分析等场景中,构建一个高可用的 1688 商品数据采集服务至关重要。本文将从 API 接口设计、服务架构、性能优化和容错机制等方面,详细介绍如何构建一个稳定、高效的 1688 商品数据采集服务。

系统架构设计

一个高可用的商品数据采集服务应该具备以下特点:

  • 可扩展性:能够应对业务增长带来的采集需求增加

  • 容错性:部分组件故障不影响整体服务

  • 可监控:关键指标可监控,问题可预警

  • 性能优:响应迅速,资源占用合理

我们采用分层架构设计:

  1. API 层:提供统一的数据访问接口

  2. 服务层:实现核心业务逻辑

  3. 采集层:负责与 1688 API 交互

  4. 存储层:缓存和持久化商品数据

  5. 监控层:监控系统运行状态

API 接口设计

合理的 API 接口设计是服务易用性和可维护性的基础。我们采用 RESTful 风格设计 API,主要包含以下接口:

核心 API 接口定义

plaintext

# 商品搜索接口
GET /api/products?keyword=笔记本&page=1&limit=20

# 商品详情接口
GET /api/products/{offerId}

# 批量获取商品接口
POST /api/products/batch
Body: { "offerIds": ["123", "456", "789"] }

# 商品类目接口
GET /api/categories

# 采集任务管理接口
POST /api/tasks
GET /api/tasks/{taskId}

服务实现(Python 版本)

下面我们实现核心的服务组件,采用 Python 的 FastAPI 框架构建 API 服务,结合 Redis 进行缓存,使用 Celery 处理异步任务。

1. 核心配置与依赖

import os
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # 1688 API配置
    APP_KEY: str = os.getenv("ALI1688_APP_KEY", "")
    APP_SECRET: str = os.getenv("ALI1688_APP_SECRET", "")
    API_ENDPOINT: str = "https://gw.open.1688.com/openapi/param2/1/"
    
    # 缓存配置
    REDIS_URL: str = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    CACHE_TTL: int = 3600  # 缓存默认过期时间,单位秒
    
    # 服务配置
    API_RATE_LIMIT: int = 100  # 每分钟最大请求数
    BATCH_MAX_SIZE: int = 50  # 批量请求最大商品数量
    
    # 任务配置
    TASK_QUEUE: str = "product_collector"
    TASK_RETRY_LIMIT: int = 3  # 任务重试次数

settings = Settings()

2. 1688 API 客户端实现

import time
import hashlib
import requests
import json
from typing import Dict, List, Optional
from config import settings

class Ali1688Client:
    def __init__(self, app_key: str = None, app_secret: str = None):
        self.app_key = app_key or settings.APP_KEY
        self.app_secret = app_secret or settings.APP_SECRET
        self.endpoint = settings.API_ENDPOINT
        self.access_token = None
        self.session = requests.Session()
        # 设置连接池大小,提高并发性能
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=10,
            pool_maxsize=20,
            max_retries=3
        )
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def set_access_token(self, token: str):
        self.access_token = token
        return self
    
    def _generate_signature(self, params: Dict[str, str]) -> str:
        """生成API请求签名"""
        # 按参数名排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        # 拼接参数
        sign_str = self.app_secret
        for key, value in sorted_params:
            sign_str += f"{key}{value}"
        sign_str += self.app_secret
        # 计算MD5并转为大写
        return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
    
    def _get_common_params(self, method: str) -> Dict[str, str]:
        """获取公共请求参数"""
        return {
            'app_key': self.app_key,
            'method': method,
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'format': 'json',
            'v': '1.0',
            'sign_method': 'md5',
            'access_token': self.access_token or ''
        }
    
    def call_api(self, method: str, biz_params: Dict = None, 
                 retry: int = 0, timeout: int = 10) -> Dict:
        """
        调用1688 API
        
        Args:
            method: API方法名
            biz_params: 业务参数
            retry: 重试次数
            timeout: 超时时间(秒)
            
        Returns:
            API返回结果
        """
        biz_params = biz_params or {}
        try:
            # 构建请求参数
            params = self._get_common_params(method)
            # 添加业务参数
            if biz_params:
                params['biz_params'] = json.dumps(biz_params, ensure_ascii=False)
            # 生成签名
            params['sign'] = self._generate_signature(params)
            
            # 发送请求
            url = f"{self.endpoint}{method.replace('.', '/')}"
            response = self.session.get(
                url, 
                params=params,
                timeout=timeout
            )
            response.raise_for_status()
            
            result = response.json()
            
            # 处理API错误
            if 'error_response' in result:
                error = result['error_response']
                error_msg = f"API Error: {error.get('msg', 'Unknown error')} (code: {error.get('code')})"
                # 针对特定错误码进行处理
                if error.get('code') in [10010, 10011] and retry < settings.TASK_RETRY_LIMIT:
                    # Token过期或无效,尝试刷新token后重试
                    if self._refresh_access_token():
                        return self.call_api(method, biz_params, retry + 1, timeout)
                
                raise Exception(error_msg)
                
            return result
            
        except Exception as e:
            if retry < settings.TASK_RETRY_LIMIT:
                # 指数退避重试
                time.sleep(0.5 * (2 ** retry))
                return self.call_api(method, biz_params, retry + 1, timeout)
            raise Exception(f"API调用失败: {str(e)}")
    
    def _refresh_access_token(self) -> bool:
        """刷新access token(实际实现需根据1688 OAuth流程)"""
        # 此处简化实现,实际应根据1688的token刷新机制实现
        try:
            # 调用刷新token的API
            # 实际应用中需要根据1688的OAuth文档实现
            return True
        except:
            return False
    
    def search_products(self, keyword: str, page: int = 1, page_size: int = 20) -> Dict:
        """搜索商品"""
        return self.call_api(
            method="alibaba.icbu.product.search",
            biz_params={
                "keywords": keyword,
                "page": page,
                "page_size": page_size
            }
        )
    
    def get_product_detail(self, offer_id: str) -> Dict:
        """获取商品详情"""
        return self.call_api(
            method="alibaba.icbu.product.get",
            biz_params={
                "offer_id": offer_id,
                "fields": "offer_id,title,price,min_order,detail_url,pictures"
            }
        )
    
    def batch_get_products(self, offer_ids: List[str]) -> Dict:
        """批量获取商品信息"""
        return self.call_api(
            method="alibaba.icbu.product.batchget",
            biz_params={
                "offer_ids": ",".join(offer_ids),
                "fields": "offer_id,title,price,min_order,detail_url,pictures"
            }
        )

3. 缓存与数据服务实现

import json
from typing import List, Dict, Optional
from redis import Redis
from client import Ali1688Client
from config import settings

class ProductService:
    def __init__(self, redis: Redis = None, client: Ali1688Client = None):
        self.redis = redis or Redis.from_url(settings.REDIS_URL)
        self.client = client or Ali1688Client()
        self.cache_ttl = settings.CACHE_TTL
    
    def _get_cache_key(self, offer_id: str) -> str:
        """生成缓存键"""
        return f"product:{offer_id}"
    
    def _get_search_cache_key(self, keyword: str, page: int, page_size: int) -> str:
        """生成搜索缓存键"""
        return f"search:{keyword}:{page}:{page_size}"
    
    def get_product(self, offer_id: str, use_cache: bool = True) -> Dict:
        """
        获取商品详情
        
        Args:
            offer_id: 商品ID
            use_cache: 是否使用缓存
            
        Returns:
            商品详情字典
        """
        # 尝试从缓存获取
        if use_cache:
            cache_key = self._get_cache_key(offer_id)
            cached_data = self.redis.get(cache_key)
            if cached_data:
                return json.loads(cached_data)
        
        # 缓存未命中,调用API
        result = self.client.get_product_detail(offer_id)
        
        # 存入缓存
        if result and 'result' in result:
            self.redis.setex(
                cache_key, 
                self.cache_ttl, 
                json.dumps(result['result'])
            )
            return result['result']
        
        return {}
    
    def batch_get_products(self, offer_ids: List[str], use_cache: bool = True) -> Dict[str, Dict]:
        """
        批量获取商品信息
        
        Args:
            offer_ids: 商品ID列表
            use_cache: 是否使用缓存
            
        Returns:
            商品信息字典,key为商品ID
        """
        result = {}
        cache_miss_ids = []
        
        # 先从缓存获取
        if use_cache:
            for offer_id in offer_ids:
                cache_key = self._get_cache_key(offer_id)
                cached_data = self.redis.get(cache_key)
                if cached_data:
                    result[offer_id] = json.loads(cached_data)
                else:
                    cache_miss_ids.append(offer_id)
        else:
            cache_miss_ids = offer_ids
        
        # 处理缓存未命中的商品ID
        if cache_miss_ids:
            # 按API限制分批处理
            batch_size = settings.BATCH_MAX_SIZE
            for i in range(0, len(cache_miss_ids), batch_size):
                batch_ids = cache_miss_ids[i:i+batch_size]
                api_result = self.client.batch_get_products(batch_ids)
                
                if api_result and 'result' in api_result and 'toArray' in api_result['result']:
                    for product in api_result['result']['toArray']:
                        offer_id = product.get('offer_id')
                        if offer_id:
                            result[offer_id] = product
                            # 存入缓存
                            self.redis.setex(
                                self._get_cache_key(offer_id),
                                self.cache_ttl,
                                json.dumps(product)
                            )
        
        return result
    
    def search_products(self, keyword: str, page: int = 1, 
                       page_size: int = 20, use_cache: bool = True) -> Dict:
        """
        搜索商品
        
        Args:
            keyword: 搜索关键词
            page: 页码
            page_size: 每页数量
            use_cache: 是否使用缓存
            
        Returns:
            搜索结果
        """
        # 尝试从缓存获取
        if use_cache:
            cache_key = self._get_search_cache_key(keyword, page, page_size)
            cached_data = self.redis.get(cache_key)
            if cached_data:
                return json.loads(cached_data)
        
        # 缓存未命中,调用API
        result = self.client.search_products(keyword, page, page_size)
        
        # 存入缓存
        if result and 'result' in result:
            self.redis.setex(
                cache_key,
                self.cache_ttl // 2,  # 搜索结果缓存时间短于详情
                json.dumps(result['result'])
            )
            return result['result']
        
        return {}
    
    def refresh_product_cache(self, offer_id: str) -> bool:
        """刷新商品缓存"""
        try:
            result = self.client.get_product_detail(offer_id)
            if result and 'result' in result:
                cache_key = self._get_cache_key(offer_id)
                self.redis.setex(
                    cache_key,
                    self.cache_ttl,
                    json.dumps(result['result'])
                )
                return True
            return False
        except:
            return False

4. API 服务实现

from fastapi import FastAPI, HTTPException, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from typing import List, Optional, Dict
import json
from redis import Redis
from client import Ali1688Client
from service import ProductService
from config import settings
import time
import asyncio

# 初始化应用
app = FastAPI(
    title="1688商品数据采集服务",
    description="高可用的1688商品数据采集API服务",
    version="1.0.0"
)

# 添加中间件
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化服务
redis = Redis.from_url(settings.REDIS_URL)
client = Ali1688Client()
product_service = ProductService(redis, client)

# 简单的请求计数中间件,用于限流基础
@app.middleware("http")
async def request_counter(request, call_next):
    start_time = time.time()
    
    # 获取客户端IP
    client_ip = request.client.host
    # 记录请求计数
    counter_key = f"rate_limit:{client_ip}:{time.strftime('%Y%m%d%H%M')}"
    current_count = redis.incr(counter_key)
    # 设置1分钟过期
    redis.expire(counter_key, 60)
    
    # 检查是否超过限流
    if current_count > settings.API_RATE_LIMIT:
        return JSONResponse(
            status_code=429,
            content={"error": "请求过于频繁,请稍后再试"}
        )
    
    response = await call_next(request)
    
    # 记录响应时间
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    
    return response

@app.get("/api/products", summary="搜索商品")
async def search_products(
    keyword: str = Query(..., description="搜索关键词"),
    page: int = Query(1, ge=1, description="页码"),
    page_size: int = Query(20, ge=1, le=100, description="每页数量"),
    use_cache: bool = Query(True, description="是否使用缓存")
):
    try:
        result = product_service.search_products(
            keyword=keyword,
            page=page,
            page_size=page_size,
            use_cache=use_cache
        )
        return {
            "success": True,
            "data": result,
            "from_cache": use_cache and "toArray" in result  # 简单判断是否来自缓存
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/products/{offer_id}", summary="获取商品详情")
async def get_product(
    offer_id: str,
    use_cache: bool = Query(True, description="是否使用缓存"),
    background_tasks: BackgroundTasks = None
):
    try:
        result = product_service.get_product(
            offer_id=offer_id,
            use_cache=use_cache
        )
        
        if not result:
            raise HTTPException(status_code=404, detail="商品不存在或获取失败")
        
        # 如果使用缓存,添加后台任务异步刷新缓存
        if use_cache:
            background_tasks.add_task(product_service.refresh_product_cache, offer_id)
        
        return {
            "success": True,
            "data": result,
            "from_cache": use_cache
        }
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/products/batch", summary="批量获取商品信息")
async def batch_get_products(
    offer_ids: List[str],
    use_cache: bool = Query(True, description="是否使用缓存")
):
    if not offer_ids:
        raise HTTPException(status_code=400, detail="商品ID列表不能为空")
    
    if len(offer_ids) > settings.BATCH_MAX_SIZE:
        raise HTTPException(
            status_code=400, 
            detail=f"批量查询最大支持{settings.BATCH_MAX_SIZE}个商品ID"
        )
    
    try:
        result = product_service.batch_get_products(
            offer_ids=offer_ids,
            use_cache=use_cache
        )
        return {
            "success": True,
            "data": result,
            "count": len(result)
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/health", summary="服务健康检查")
async def health_check():
    # 检查Redis连接
    redis_ok = False
    try:
        redis.ping()
        redis_ok = True
    except:
        pass
    
    # 检查API客户端
    client_ok = False
    try:
        # 简单调用一个公开API检查连接
        client.call_api("alibaba.icbu.category.get", {"category_id": 1})
        client_ok = True
    except:
        pass
    
    status = "healthy" if (redis_ok and client_ok) else "unhealthy"
    status_code = 200 if status == "healthy" else 503
    
    return {
        "status": status,
        "services": {
            "redis": "up" if redis_ok else "down",
            "1688_api": "up" if client_ok else "down"
        },
        "timestamp": time.time()
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main.py:app", host="0.0.0.0", port=8000, reload=True)

优化实践

1. 缓存策略优化

  • 多级缓存:结合本地内存缓存和 Redis 分布式缓存

  • 差异化 TTL:搜索结果缓存时间较短(30 分钟),商品详情缓存时间较长(1 小时)

  • 缓存预热:针对热门商品提前缓存,减少 API 调用

  • 异步刷新:使用后台任务刷新缓存,避免阻塞用户请求

2. 并发控制与限流

  • API 调用限流:通过 Redis 实现基于 IP 的分钟级限流

  • 连接池管理:设置合理的 HTTP 连接池大小,避免频繁创建连接

  • 批量处理:将多个单品请求合并为批量请求,减少 API 调用次数

  • 请求合并:短时间内相同的请求合并为一个,避免重复调用

3. 容错与重试机制

  • 指数退避重试:API 调用失败时采用指数退避策略重试

  • 断路器模式:当 API 持续失败时,暂时停止调用,避免资源浪费

  • 降级策略:核心接口故障时,返回缓存数据或基础数据

  • 详细日志:记录 API 调用详情,便于问题排查

4. 性能优化

  • 数据压缩:启用 GZip 压缩,减少网络传输量

  • 字段过滤:只请求需要的商品字段,减少数据传输

  • 异步处理:非关键操作使用后台任务异步处理

  • 连接复用:使用 HTTP 长连接,减少握手开销

监控与告警

为确保服务高可用,需要实现完善的监控体系:

  1. 关键指标监控

    • API 调用成功率

    • 平均响应时间

    • 缓存命中率

    • 错误率和错误类型分布

  2. 告警机制

    • 当错误率超过阈值时发送告警

    • 响应时间异常时告警

    • 服务不可用时立即告警

  3. 日志管理

    • 记录详细的访问日志和错误日志

    • 实现日志轮转,避免磁盘空间耗尽

    • 关键操作日志持久化存储

总结

构建高可用的 1688 商品数据采集服务需要从多个维度进行设计和优化。本文介绍的方案通过合理的 API 设计、缓存策略、并发控制和容错机制,能够有效提高服务的可用性和性能。

在实际应用中,还需要根据业务需求和流量特点持续优化系统,例如:

  • 根据商品热度动态调整缓存时间

  • 实现更精细的限流策略

  • 针对不同 API 错误码设计差异化的重试策略

  • 结合消息队列实现更可靠的异步任务处理

通过不断优化和迭代,可以构建一个稳定、高效、可扩展的商品数据采集服务,为电商数据分析和业务决策提供可靠的数据支持。


少长咸集

群贤毕至

访客