×

api开发 电商平台 数据挖掘

构建高效数据采集服务:基于 1688 API 的商品详情拉取方案

admin admin 发表于2025-11-04 10:27:46 浏览8 评论0

抢沙发发表评论

电商数据分析、竞品监控和供应链管理等场景中,高效采集 1688 平台的商品详情数据具有重要价值。本文将介绍如何基于 1688 平台 API 构建专业的数据采集服务,涵盖系统设计、接口调用、数据处理及性能优化等关键环节,并提供可落地的代码实现。

一、方案设计思路

1688 提供了标准化的 API 接口,相比爬虫方案具有稳定性高、合规性强、数据结构统一等优势。构建高效采集服务需重点解决以下问题:

  • 接口权限与认证管理

  • 批量请求的并发控制

  • 数据解析与结构化存储

  • 异常处理与重试机制

  • 性能优化与资源控制

整体架构采用 "调度层 - 采集层 - 存储层 - 应用层" 四层设计:

  • 调度层:负责任务分发与优先级管理

  • 采集层:处理 API 调用、数据解析与异常重试

  • 存储层:实现结构化数据持久化

  • 应用层:提供查询与数据导出能力

二、开发准备

1. 环境配置

  • 开发语言:Python 3.8+

  • 核心库:requests(HTTP 请求)、pymysql(数据库连接)、redis(缓存)、concurrent.futures(并发控制)

  • 依赖安装:

pip install requests pymysql redis python-dotenv

2. 1688 开放准备

  1. 注册开发者账号并获取apikeyapisecret

  2. 申请商品详情相关接口权限(如alibaba.item.get

  3. 配置 IP 白名单及回调地址

  4. 了解接口限流规则(通常为 QPS=10-30)

三、核心功能实现

1. 认证模块

1688 API 采用 OAuth2.0 认证机制,需要先获取访问令牌:

import time
import hashlib
import requests
from dotenv import load_dotenv
import os

load_dotenv()  # 加载环境变量

class Ali1688Auth:
    def __init__(self):
        self.appkey = os.getenv("ALI_APPKEY")
        self.appsecret = os.getenv("ALI_APPSECRET")
        self.token_url = "https://open.1688.com/auth/token"
        
    def get_access_token(self, refresh_token=None):
        """获取或刷新访问令牌"""
        params = {
            "grant_type": "client_credentials" if not refresh_token else "refresh_token",
            "client_id": self.appkey,
            "client_secret": self.appsecret
        }
        if refresh_token:
            params["refresh_token"] = refresh_token
            
        try:
            response = requests.get(self.token_url, params=params, timeout=10)
            result = response.json()
            
            if "error" in result:
                raise Exception(f"认证失败: {result['error_description']}")
                
            return {
                "access_token": result["access_token"],
                "expires_in": result["expires_in"],
                "refresh_token": result.get("refresh_token"),
                "expires_time": time.time() + result["expires_in"] - 300  # 提前5分钟过期
            }
        except Exception as e:
            print(f"获取令牌失败: {str(e)}")
            return None

2. 签名工具

1688 API 要求对请求参数进行签名验证:

class Ali1688Signer:
    @staticmethod
    def sign(params, appsecret):
        """生成API请求签名"""
        # 按参数名ASCII排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        # 拼接参数
        sign_str = appsecret + "".join([f"{k}{v}" for k, v in sorted_params]) + appsecret
        # SHA1加密并转为大写
        return hashlib.sha1(sign_str.encode("utf-8")).hexdigest().upper()

3. 商品详情采集器

实现核心的数据采集逻辑,包含并发控制和重试机制:

import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import redis

class ProductFetcher:
    def __init__(self, auth_client, max_workers=5):
        self.auth_client = auth_client
        self.token_info = auth_client.get_access_token()
        self.api_url = "https://gw.open.1688.com/openapi/param2/1/com.alibaba.product/alibaba.item.get"
        self.redis_client = redis.Redis(host="localhost", port=6379, db=0)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.retry_limit = 3  # 最大重试次数
        
    def _check_token(self):
        """检查令牌有效性,过期则刷新"""
        if not self.token_info or time.time() >= self.token_info["expires_time"]:
            self.token_info = self.auth_client.get_access_token(
                self.token_info.get("refresh_token") if self.token_info else None
            )
        return self.token_info is not None
        
    def fetch_single_product(self, product_id):
        """获取单个商品详情"""
        if not self._check_token():
            return None
            
        # 检查缓存,避免重复请求
        cache_key = f"ali1688:product:{product_id}"
        cached_data = self.redis_client.get(cache_key)
        if cached_data:
            return json.loads(cached_data)
            
        # 构建请求参数
        params = {
            "app_key": self.auth_client.appkey,
            "access_token": self.token_info["access_token"],
            "method": "alibaba.item.get",
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "format": "json",
            "v": "2.0",
            "sign_method": "sha1",
            "item_id": product_id,
            "fields": "item_id,title,price,quantity,sale_count,detail_url,images"  # 需要的字段
        }
        
        # 生成签名
        params["sign"] = Ali1688Signer.sign(params, self.auth_client.appsecret)
        
        # 带重试的请求
        for retry in range(self.retry_limit):
            try:
                response = requests.get(self.api_url, params=params, timeout=15)
                result = response.json()
                
                if "error_response" in result:
                    error = result["error_response"]
                    print(f"请求错误 {product_id}: {error['msg']} (重试 {retry+1}/{self.retry_limit})")
                    continue
                    
                # 缓存结果(有效期1小时)
                self.redis_client.setex(cache_key, 3600, json.dumps(result))
                return result
                
            except Exception as e:
                print(f"请求异常 {product_id}: {str(e)} (重试 {retry+1}/{self.retry_limit})")
                time.sleep(0.5 * (retry + 1))  # 指数退避
                
        return None
        
    def fetch_batch_products(self, product_ids, max_concurrent=10):
        """批量获取商品详情"""
        results = []
        futures = []
        
        # 控制并发数
        with ThreadPoolExecutor(max_workers=min(max_concurrent, len(product_ids))) as executor:
            for pid in product_ids:
                futures.append(executor.submit(self.fetch_single_product, pid))
                
            for future in as_completed(futures):
                result = future.result()
                if result:
                    results.append(result)
                    
        return results

4. 数据存储模块

将采集到的商品数据结构化存储到 MySQL:

import pymysql
from pymysql.cursors import DictCursor

class ProductStorage:
    def __init__(self):
        self.db_config = {
            "host": os.getenv("DB_HOST"),
            "user": os.getenv("DB_USER"),
            "password": os.getenv("DB_PASSWORD"),
            "db": os.getenv("DB_NAME"),
            "charset": "utf8mb4"
        }
        self.connection = None
        
    def connect(self):
        """建立数据库连接"""
        if not self.connection or self.connection closed:
            self.connection = pymysql.connect(**self.db_config, cursorclass=DictCursor)
        return self.connection
        
    def save_product(self, product_data):
        """保存商品数据到数据库"""
        try:
            item = product_data.get("item", {})
            conn = self.connect()
            
            with conn.cursor() as cursor:
                sql = """
                INSERT INTO ali1688_products 
                (item_id, title, price, quantity, sale_count, detail_url, images, raw_data, created_at)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
                ON DUPLICATE KEY UPDATE
                title = VALUES(title), price = VALUES(price), quantity = VALUES(quantity),
                sale_count = VALUES(sale_count), detail_url = VALUES(detail_url),
                images = VALUES(images), raw_data = VALUES(raw_data), updated_at = NOW()
                """
                
                # 处理图片数据
                images = json.dumps(item.get("images", [])) if "images" in item else None
                
                cursor.execute(sql, (
                    item.get("item_id"),
                    item.get("title"),
                    item.get("price"),
                    item.get("quantity"),
                    item.get("sale_count"),
                    item.get("detail_url"),
                    images,
                    json.dumps(product_data)  # 保存原始数据
                ))
            
            conn.commit()
            return True
            
        except Exception as e:
            print(f"存储失败: {str(e)}")
            if conn:
                conn.rollback()
            return False

5. 服务调度器

实现任务管理和流程控制:

class ProductCrawlService:
    def __init__(self):
        self.auth = Ali1688Auth()
        self.fetcher = ProductFetcher(self.auth)
        self.storage = ProductStorage()
        
    def process_product_list(self, product_ids):
        """处理商品ID列表,完成采集和存储"""
        print(f"开始处理 {len(product_ids)} 个商品...")
        
        # 批量采集
        products = self.fetcher.fetch_batch_products(product_ids)
        
        # 批量存储
        success_count = 0
        for product in products:
            if self.storage.save_product(product):
                success_count += 1
                
        print(f"处理完成: 成功 {success_count}/{len(product_ids)}")
        return success_count
        
# 使用示例
if __name__ == "__main__":
    service = ProductCrawlService()
    # 测试商品ID列表
    test_product_ids = ["61234567890", "69876543210", "65432109876"]
    service.process_product_list(test_product_ids)

四、性能优化策略

1.** 缓存机制 :使用 Redis 缓存已获取的商品数据,减少重复请求2. 并发控制 :根据 API 限流规则动态调整并发数,避免触发限制3. 批量处理 :采用分页和批量请求模式,减少网络往返4. 增量更新 :通过定时任务只更新有变化的商品数据5. 异常隔离 **:单个请求失败不影响整体任务,实现故障隔离

五、注意事项

1.** 合规性 :严格遵守 1688 协议,不超量请求2. 错误监控 :实现完善的日志记录和告警机制3. 令牌管理 :确保令牌安全存储,定期自动刷新4. 数据更新 :根据业务需求设置合理的缓存过期时间5. 接口变更 **:关注 1688 API 版本更新,及时适配接口变化

通过上述方案,可以构建一个高效、稳定、合规的 1688 商品详情采集服务。实际应用中可根据业务需求扩展功能,如增加数据清洗、增量同步、监控仪表盘等模块,进一步提升服务的实用性和可靠性。


少长咸集

群贤毕至

访客