在电商业务中,实时、准确的商品数据是数据中台的核心资产。本文将详细介绍如何通过淘宝平台 API 构建商品实时数据源,为电商数据中台提供基础支撑,并提供完整的实现代码示例。
一、淘宝平台 API 概述
淘宝平台(Taobao Platform,简称 TOP)提供了丰富的 API 接口,涵盖商品、订单、用户等多个维度。对于商品数据获取,核心 API 包括:
taobao.item.get:获取单个商品详情taobao.items.search:搜索商品列表taobao.itemcats.get:获取商品分类taobao.item.sku.get:获取商品 SKU 信息
这些 API 构成了商品数据采集的基础,通过合理调用可以构建完整的商品数据链路。
二、系统架构设计
构建淘宝商品实时数据源的系统架构主要包含以下组件:
API 接入层:负责与淘宝平台交互,处理认证、签名和请求发送
数据转换层:将 API 返回的原始数据转换为标准化格式
存储层:存储实时采集的商品数据(可选用 MySQL、MongoDB 等)
监控层:监控 API 调用状态、数据完整性和系统健康度
三、核心实现代码
1. 环境准备
首先需要安装必要的依赖库:
pip install top-api-sdk-python requests pymysql python-dotenv
2. 配置文件
创建.env配置文件存储关键信息:
APP_KEY=你的淘宝开放平台APP_KEY APP_SECRET=你的淘宝开放平台APP_SECRET ACCESS_TOKEN=你的访问令牌 MYSQL_HOST=localhost MYSQL_PORT=3306 MYSQL_USER=root MYSQL_PASSWORD=password MYSQL_DB=taobao_product
3. API 调用工具类
import os
import time
import hmac
import hashlib
import base64
import requests
from urllib.parse import urlencode
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class TaobaoAPI:
def __init__(self):
self.app_key = os.getenv('APP_KEY')
self.app_secret = os.getenv('APP_SECRET')
self.access_token = os.getenv('ACCESS_TOKEN')
self.gateway_url = "http://gw.api.taobao.com/router/rest"
def _sign(self, params):
"""生成签名"""
sorted_params = sorted(params.items(), key=lambda x: x[0])
query_string = urlencode(sorted_params)
sign_str = self.app_secret + query_string + self.app_secret
sign = hmac.new(sign_str.encode('utf-8'), digestmod=hashlib.sha1).digest()
return base64.b64encode(sign).decode('utf-8')
def call(self, method, params=None):
"""调用API"""
if params is None:
params = {}
# 公共参数
public_params = {
"app_key": self.app_key,
"format": "json",
"method": method,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"v": "2.0",
"sign_method": "hmac-sha1",
"session": self.access_token
}
# 合并参数
all_params = {**public_params,** params}
# 生成签名
all_params["sign"] = self._sign(all_params)
try:
response = requests.get(self.gateway_url, params=all_params, timeout=10)
response.raise_for_status()
result = response.json()
# 检查是否有错误
if "error_response" in result:
error = result["error_response"]
raise Exception(f"API Error: {error['msg']} (code: {error['code']})")
return result
except Exception as e:
print(f"API调用失败: {str(e)}")
return None4. 数据存储工具类
import pymysql
from pymysql.cursors import DictCursor
import os
class DBHelper:
def __init__(self):
self.connection = pymysql.connect(
host=os.getenv('MYSQL_HOST'),
port=int(os.getenv('MYSQL_PORT')),
user=os.getenv('MYSQL_USER'),
password=os.getenv('MYSQL_PASSWORD'),
db=os.getenv('MYSQL_DB'),
charset='utf8mb4',
cursorclass=DictCursor
)
def create_table(self):
"""创建商品表"""
with self.connection.cursor() as cursor:
sql = """
CREATE TABLE IF NOT EXISTS taobao_products (
num_iid BIGINT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
nick VARCHAR(100),
price DECIMAL(10,2),
sales INT,
stock INT,
category_id BIGINT,
created_time DATETIME,
modified_time DATETIME,
raw_data TEXT,
last_sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor.execute(sql)
self.connection.commit()
def save_product(self, product_data):
"""保存商品数据"""
try:
with self.connection.cursor() as cursor:
# 检查商品是否已存在
cursor.execute("SELECT num_iid FROM taobao_products WHERE num_iid = %s",
(product_data['num_iid'],))
exists = cursor.fetchone()
if exists:
# 更新商品
sql = """
UPDATE taobao_products
SET title=%s, nick=%s, price=%s, sales=%s, stock=%s,
category_id=%s, modified_time=%s, raw_data=%s
WHERE num_iid=%s
"""
else:
# 插入新商品
sql = """
INSERT INTO taobao_products
(num_iid, title, nick, price, sales, stock, category_id,
created_time, modified_time, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 提取需要的字段
params = (
product_data.get('title', ''),
product_data.get('nick', ''),
product_data.get('price', 0),
product_data.get('sale_count', 0),
product_data.get('stock', 0),
product_data.get('cid', 0),
product_data.get('modified', None),
str(product_data), # 存储原始数据
product_data.get('num_iid')
)
if not exists:
params = (product_data.get('num_iid'),) + params[:-1]
cursor.execute(sql, params)
self.connection.commit()
return True
except Exception as e:
print(f"保存商品失败: {str(e)}")
self.connection.rollback()
return False
def close(self):
"""关闭数据库连接"""
self.connection.close()5. 商品数据同步服务
import json
import time
from taobao_api import TaobaoAPI
from db_helper import DBHelper
class ProductSyncService:
def __init__(self):
self.api = TaobaoAPI()
self.db = DBHelper()
# 初始化数据库表
self.db.create_table()
def sync_product(self, num_iid):
"""同步单个商品数据"""
print(f"开始同步商品: {num_iid}")
# 调用淘宝API获取商品详情
result = self.api.call(
"taobao.item.get",
{
"num_iid": num_iid,
"fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"
}
)
if result and "item_get_response" in result:
product_data = result["item_get_response"]["item"]
# 保存商品数据到数据库
success = self.db.save_product(product_data)
if success:
print(f"商品 {num_iid} 同步成功")
return True
print(f"商品 {num_iid} 同步失败")
return False
def batch_sync_products(self, num_iids, interval=2):
"""批量同步商品数据"""
success_count = 0
for num_iid in num_iids:
if self.sync_product(num_iid):
success_count += 1
# 控制API调用频率,避免触发限流
time.sleep(interval)
print(f"批量同步完成,成功 {success_count}/{len(num_iids)}")
return success_count
def search_and_sync(self, keyword, page=1, page_size=20):
"""搜索商品并同步"""
print(f"搜索商品: {keyword}, 第 {page} 页")
result = self.api.call(
"taobao.items.search",
{
"q": keyword,
"page_no": page,
"page_size": page_size,
"fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"
}
)
if result and "items_search_response" in result:
items = result["items_search_response"]["items"]["item"]
num_iids = [item["num_iid"] for item in items]
return self.batch_sync_products(num_iids)
return 0
def close(self):
"""关闭资源"""
self.db.close()
if __name__ == "__main__":
sync_service = ProductSyncService()
try:
# 示例1:同步单个商品
sync_service.sync_product(572060726845)
# 示例2:批量同步商品
# sync_service.batch_sync_products([572060726845, 614662268993, 634862718573])
# 示例3:搜索并同步商品
# sync_service.search_and_sync("手机", page=1, page_size=10)
finally:
sync_service.close()四、数据中台集成建议
增量同步策略:通过维护商品最后更新时间,实现增量同步,减少 API 调用量
缓存机制:对频繁访问的商品数据进行缓存,提高查询性能
限流控制:严格遵守淘宝 API 的调用频率限制,避免账号被封
异常重试:实现失败重试机制,保证数据完整性
监控告警:建立 API 调用成功率、数据同步延迟等指标的监控告警
五、总结
通过淘宝平台 API 构建商品实时数据源,是电商数据中台建设的重要基础。本文提供的方案实现了从 API 调用、数据转换到存储的完整链路,可根据实际业务需求进行扩展。
在实际应用中,还需要考虑数据安全、API 权限管理、数据清洗等问题,以确保数据源的稳定、可靠和高质量,为后续的数据分析、业务决策提供有力支持。