电商数据分析、竞品监控和供应链管理等场景中,高效采集 1688 平台的商品详情数据具有重要价值。本文将介绍如何基于 1688 平台 API 构建专业的数据采集服务,涵盖系统设计、接口调用、数据处理及性能优化等关键环节,并提供可落地的代码实现。
一、方案设计思路
1688 提供了标准化的 API 接口,相比爬虫方案具有稳定性高、合规性强、数据结构统一等优势。构建高效采集服务需重点解决以下问题:
接口权限与认证管理
批量请求的并发控制
数据解析与结构化存储
异常处理与重试机制
性能优化与资源控制
整体架构采用 "调度层 - 采集层 - 存储层 - 应用层" 四层设计:
调度层:负责任务分发与优先级管理
采集层:处理 API 调用、数据解析与异常重试
存储层:实现结构化数据持久化
应用层:提供查询与数据导出能力
二、开发准备
1. 环境配置
开发语言:Python 3.8+
核心库:requests(HTTP 请求)、pymysql(数据库连接)、redis(缓存)、concurrent.futures(并发控制)
依赖安装:
pip install requests pymysql redis python-dotenv
2. 1688 开放准备
注册开发者账号并获取
和apikeyapisecret申请商品详情相关接口权限(如
alibaba.item.get)配置 IP 白名单及回调地址
了解接口限流规则(通常为 QPS=10-30)
三、核心功能实现
1. 认证模块
1688 API 采用 OAuth2.0 认证机制,需要先获取访问令牌:
import time
import hashlib
import requests
from dotenv import load_dotenv
import os
load_dotenv() # 加载环境变量
class Ali1688Auth:
def __init__(self):
self.appkey = os.getenv("ALI_APPKEY")
self.appsecret = os.getenv("ALI_APPSECRET")
self.token_url = "https://open.1688.com/auth/token"
def get_access_token(self, refresh_token=None):
"""获取或刷新访问令牌"""
params = {
"grant_type": "client_credentials" if not refresh_token else "refresh_token",
"client_id": self.appkey,
"client_secret": self.appsecret
}
if refresh_token:
params["refresh_token"] = refresh_token
try:
response = requests.get(self.token_url, params=params, timeout=10)
result = response.json()
if "error" in result:
raise Exception(f"认证失败: {result['error_description']}")
return {
"access_token": result["access_token"],
"expires_in": result["expires_in"],
"refresh_token": result.get("refresh_token"),
"expires_time": time.time() + result["expires_in"] - 300 # 提前5分钟过期
}
except Exception as e:
print(f"获取令牌失败: {str(e)}")
return None2. 签名工具
1688 API 要求对请求参数进行签名验证:
class Ali1688Signer:
@staticmethod
def sign(params, appsecret):
"""生成API请求签名"""
# 按参数名ASCII排序
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 拼接参数
sign_str = appsecret + "".join([f"{k}{v}" for k, v in sorted_params]) + appsecret
# SHA1加密并转为大写
return hashlib.sha1(sign_str.encode("utf-8")).hexdigest().upper()3. 商品详情采集器
实现核心的数据采集逻辑,包含并发控制和重试机制:
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
import redis
class ProductFetcher:
def __init__(self, auth_client, max_workers=5):
self.auth_client = auth_client
self.token_info = auth_client.get_access_token()
self.api_url = "https://gw.open.1688.com/openapi/param2/1/com.alibaba.product/alibaba.item.get"
self.redis_client = redis.Redis(host="localhost", port=6379, db=0)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.retry_limit = 3 # 最大重试次数
def _check_token(self):
"""检查令牌有效性,过期则刷新"""
if not self.token_info or time.time() >= self.token_info["expires_time"]:
self.token_info = self.auth_client.get_access_token(
self.token_info.get("refresh_token") if self.token_info else None
)
return self.token_info is not None
def fetch_single_product(self, product_id):
"""获取单个商品详情"""
if not self._check_token():
return None
# 检查缓存,避免重复请求
cache_key = f"ali1688:product:{product_id}"
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 构建请求参数
params = {
"app_key": self.auth_client.appkey,
"access_token": self.token_info["access_token"],
"method": "alibaba.item.get",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"format": "json",
"v": "2.0",
"sign_method": "sha1",
"item_id": product_id,
"fields": "item_id,title,price,quantity,sale_count,detail_url,images" # 需要的字段
}
# 生成签名
params["sign"] = Ali1688Signer.sign(params, self.auth_client.appsecret)
# 带重试的请求
for retry in range(self.retry_limit):
try:
response = requests.get(self.api_url, params=params, timeout=15)
result = response.json()
if "error_response" in result:
error = result["error_response"]
print(f"请求错误 {product_id}: {error['msg']} (重试 {retry+1}/{self.retry_limit})")
continue
# 缓存结果(有效期1小时)
self.redis_client.setex(cache_key, 3600, json.dumps(result))
return result
except Exception as e:
print(f"请求异常 {product_id}: {str(e)} (重试 {retry+1}/{self.retry_limit})")
time.sleep(0.5 * (retry + 1)) # 指数退避
return None
def fetch_batch_products(self, product_ids, max_concurrent=10):
"""批量获取商品详情"""
results = []
futures = []
# 控制并发数
with ThreadPoolExecutor(max_workers=min(max_concurrent, len(product_ids))) as executor:
for pid in product_ids:
futures.append(executor.submit(self.fetch_single_product, pid))
for future in as_completed(futures):
result = future.result()
if result:
results.append(result)
return results4. 数据存储模块
将采集到的商品数据结构化存储到 MySQL:
import pymysql
from pymysql.cursors import DictCursor
class ProductStorage:
def __init__(self):
self.db_config = {
"host": os.getenv("DB_HOST"),
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"db": os.getenv("DB_NAME"),
"charset": "utf8mb4"
}
self.connection = None
def connect(self):
"""建立数据库连接"""
if not self.connection or self.connection closed:
self.connection = pymysql.connect(**self.db_config, cursorclass=DictCursor)
return self.connection
def save_product(self, product_data):
"""保存商品数据到数据库"""
try:
item = product_data.get("item", {})
conn = self.connect()
with conn.cursor() as cursor:
sql = """
INSERT INTO ali1688_products
(item_id, title, price, quantity, sale_count, detail_url, images, raw_data, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
title = VALUES(title), price = VALUES(price), quantity = VALUES(quantity),
sale_count = VALUES(sale_count), detail_url = VALUES(detail_url),
images = VALUES(images), raw_data = VALUES(raw_data), updated_at = NOW()
"""
# 处理图片数据
images = json.dumps(item.get("images", [])) if "images" in item else None
cursor.execute(sql, (
item.get("item_id"),
item.get("title"),
item.get("price"),
item.get("quantity"),
item.get("sale_count"),
item.get("detail_url"),
images,
json.dumps(product_data) # 保存原始数据
))
conn.commit()
return True
except Exception as e:
print(f"存储失败: {str(e)}")
if conn:
conn.rollback()
return False5. 服务调度器
实现任务管理和流程控制:
class ProductCrawlService:
def __init__(self):
self.auth = Ali1688Auth()
self.fetcher = ProductFetcher(self.auth)
self.storage = ProductStorage()
def process_product_list(self, product_ids):
"""处理商品ID列表,完成采集和存储"""
print(f"开始处理 {len(product_ids)} 个商品...")
# 批量采集
products = self.fetcher.fetch_batch_products(product_ids)
# 批量存储
success_count = 0
for product in products:
if self.storage.save_product(product):
success_count += 1
print(f"处理完成: 成功 {success_count}/{len(product_ids)}")
return success_count
# 使用示例
if __name__ == "__main__":
service = ProductCrawlService()
# 测试商品ID列表
test_product_ids = ["61234567890", "69876543210", "65432109876"]
service.process_product_list(test_product_ids)四、性能优化策略
1.** 缓存机制 :使用 Redis 缓存已获取的商品数据,减少重复请求2. 并发控制 :根据 API 限流规则动态调整并发数,避免触发限制3. 批量处理 :采用分页和批量请求模式,减少网络往返4. 增量更新 :通过定时任务只更新有变化的商品数据5. 异常隔离 **:单个请求失败不影响整体任务,实现故障隔离
五、注意事项
1.** 合规性 :严格遵守 1688 协议,不超量请求2. 错误监控 :实现完善的日志记录和告警机制3. 令牌管理 :确保令牌安全存储,定期自动刷新4. 数据更新 :根据业务需求设置合理的缓存过期时间5. 接口变更 **:关注 1688 API 版本更新,及时适配接口变化
通过上述方案,可以构建一个高效、稳定、合规的 1688 商品详情采集服务。实际应用中可根据业务需求扩展功能,如增加数据清洗、增量同步、监控仪表盘等模块,进一步提升服务的实用性和可靠性。