×

api开发 电商平台 数据挖掘

电商数据中台基石:通过 API 构建淘宝商品实时数据源

admin admin 发表于2025-10-28 16:50:22 浏览36 评论0

抢沙发发表评论

在电商业务中,实时、准确的商品数据是数据中台的核心资产。本文将详细介绍如何通过淘宝平台 API 构建商品实时数据源,为电商数据中台提供基础支撑,并提供完整的实现代码示例。

一、淘宝平台 API 概述

淘宝平台(Taobao  Platform,简称 TOP)提供了丰富的 API 接口,涵盖商品、订单、用户等多个维度。对于商品数据获取,核心 API 包括:

  • taobao.item.get:获取单个商品详情

  • taobao.items.search:搜索商品列表

  • taobao.itemcats.get:获取商品分类

  • taobao.item.sku.get:获取商品 SKU 信息

这些 API 构成了商品数据采集的基础,通过合理调用可以构建完整的商品数据链路。

二、系统架构设计

构建淘宝商品实时数据源的系统架构主要包含以下组件:

  1. API 接入层:负责与淘宝平台交互,处理认证、签名和请求发送

  2. 数据转换层:将 API 返回的原始数据转换为标准化格式

  3. 存储层:存储实时采集的商品数据(可选用 MySQL、MongoDB 等)

  4. 监控层:监控 API 调用状态、数据完整性和系统健康度

三、核心实现代码

1. 环境准备

首先需要安装必要的依赖库:

pip install top-api-sdk-python requests pymysql python-dotenv

2. 配置文件

创建.env配置文件存储关键信息:

APP_KEY=你的淘宝开放平台APP_KEY
APP_SECRET=你的淘宝开放平台APP_SECRET
ACCESS_TOKEN=你的访问令牌
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=password
MYSQL_DB=taobao_product

3. API 调用工具类

import os
import time
import hmac
import hashlib
import base64
import requests
from urllib.parse import urlencode
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

class TaobaoAPI:
    def __init__(self):
        self.app_key = os.getenv('APP_KEY')
        self.app_secret = os.getenv('APP_SECRET')
        self.access_token = os.getenv('ACCESS_TOKEN')
        self.gateway_url = "http://gw.api.taobao.com/router/rest"
        
    def _sign(self, params):
        """生成签名"""
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        query_string = urlencode(sorted_params)
        sign_str = self.app_secret + query_string + self.app_secret
        sign = hmac.new(sign_str.encode('utf-8'), digestmod=hashlib.sha1).digest()
        return base64.b64encode(sign).decode('utf-8')
        
    def call(self, method, params=None):
        """调用API"""
        if params is None:
            params = {}
            
        # 公共参数
        public_params = {
            "app_key": self.app_key,
            "format": "json",
            "method": method,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "v": "2.0",
            "sign_method": "hmac-sha1",
            "session": self.access_token
        }
        
        # 合并参数
        all_params = {**public_params,** params}
        # 生成签名
        all_params["sign"] = self._sign(all_params)
        
        try:
            response = requests.get(self.gateway_url, params=all_params, timeout=10)
            response.raise_for_status()
            result = response.json()
            
            # 检查是否有错误
            if "error_response" in result:
                error = result["error_response"]
                raise Exception(f"API Error: {error['msg']} (code: {error['code']})")
                
            return result
        except Exception as e:
            print(f"API调用失败: {str(e)}")
            return None

4. 数据存储工具类

import pymysql
from pymysql.cursors import DictCursor
import os

class DBHelper:
    def __init__(self):
        self.connection = pymysql.connect(
            host=os.getenv('MYSQL_HOST'),
            port=int(os.getenv('MYSQL_PORT')),
            user=os.getenv('MYSQL_USER'),
            password=os.getenv('MYSQL_PASSWORD'),
            db=os.getenv('MYSQL_DB'),
            charset='utf8mb4',
            cursorclass=DictCursor
        )
        
    def create_table(self):
        """创建商品表"""
        with self.connection.cursor() as cursor:
            sql = """
            CREATE TABLE IF NOT EXISTS taobao_products (
                num_iid BIGINT PRIMARY KEY,
                title VARCHAR(255) NOT NULL,
                nick VARCHAR(100),
                price DECIMAL(10,2),
                sales INT,
                stock INT,
                category_id BIGINT,
                created_time DATETIME,
                modified_time DATETIME,
                raw_data TEXT,
                last_sync_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
            """
            cursor.execute(sql)
        self.connection.commit()
        
    def save_product(self, product_data):
        """保存商品数据"""
        try:
            with self.connection.cursor() as cursor:
                # 检查商品是否已存在
                cursor.execute("SELECT num_iid FROM taobao_products WHERE num_iid = %s", 
                              (product_data['num_iid'],))
                exists = cursor.fetchone()
                
                if exists:
                    # 更新商品
                    sql = """
                    UPDATE taobao_products 
                    SET title=%s, nick=%s, price=%s, sales=%s, stock=%s, 
                        category_id=%s, modified_time=%s, raw_data=%s
                    WHERE num_iid=%s
                    """
                else:
                    # 插入新商品
                    sql = """
                    INSERT INTO taobao_products 
                    (num_iid, title, nick, price, sales, stock, category_id, 
                     created_time, modified_time, raw_data)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    """
                
                # 提取需要的字段
                params = (
                    product_data.get('title', ''),
                    product_data.get('nick', ''),
                    product_data.get('price', 0),
                    product_data.get('sale_count', 0),
                    product_data.get('stock', 0),
                    product_data.get('cid', 0),
                    product_data.get('modified', None),
                    str(product_data),  # 存储原始数据
                    product_data.get('num_iid')
                )
                
                if not exists:
                    params = (product_data.get('num_iid'),) + params[:-1]
                
                cursor.execute(sql, params)
            self.connection.commit()
            return True
        except Exception as e:
            print(f"保存商品失败: {str(e)}")
            self.connection.rollback()
            return False
            
    def close(self):
        """关闭数据库连接"""
        self.connection.close()

5. 商品数据同步服务

import json
import time
from taobao_api import TaobaoAPI
from db_helper import DBHelper

class ProductSyncService:
    def __init__(self):
        self.api = TaobaoAPI()
        self.db = DBHelper()
        # 初始化数据库表
        self.db.create_table()
        
    def sync_product(self, num_iid):
        """同步单个商品数据"""
        print(f"开始同步商品: {num_iid}")
        # 调用淘宝API获取商品详情
        result = self.api.call(
            "taobao.item.get",
            {
                "num_iid": num_iid,
                "fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"
            }
        )
        
        if result and "item_get_response" in result:
            product_data = result["item_get_response"]["item"]
            # 保存商品数据到数据库
            success = self.db.save_product(product_data)
            if success:
                print(f"商品 {num_iid} 同步成功")
                return True
        print(f"商品 {num_iid} 同步失败")
        return False
        
    def batch_sync_products(self, num_iids, interval=2):
        """批量同步商品数据"""
        success_count = 0
        for num_iid in num_iids:
            if self.sync_product(num_iid):
                success_count += 1
            # 控制API调用频率,避免触发限流
            time.sleep(interval)
        print(f"批量同步完成,成功 {success_count}/{len(num_iids)}")
        return success_count
        
    def search_and_sync(self, keyword, page=1, page_size=20):
        """搜索商品并同步"""
        print(f"搜索商品: {keyword}, 第 {page} 页")
        result = self.api.call(
            "taobao.items.search",
            {
                "q": keyword,
                "page_no": page,
                "page_size": page_size,
                "fields": "num_iid,title,nick,price,sale_count,stock,cid,created,modified"
            }
        )
        
        if result and "items_search_response" in result:
            items = result["items_search_response"]["items"]["item"]
            num_iids = [item["num_iid"] for item in items]
            return self.batch_sync_products(num_iids)
        return 0
        
    def close(self):
        """关闭资源"""
        self.db.close()

if __name__ == "__main__":
    sync_service = ProductSyncService()
    
    try:
        # 示例1:同步单个商品
        sync_service.sync_product(572060726845)
        
        # 示例2:批量同步商品
        # sync_service.batch_sync_products([572060726845, 614662268993, 634862718573])
        
        # 示例3:搜索并同步商品
        # sync_service.search_and_sync("手机", page=1, page_size=10)
    finally:
        sync_service.close()

四、数据中台集成建议

  1. 增量同步策略:通过维护商品最后更新时间,实现增量同步,减少 API 调用量

  2. 缓存机制:对频繁访问的商品数据进行缓存,提高查询性能

  3. 限流控制:严格遵守淘宝 API 的调用频率限制,避免账号被封

  4. 异常重试:实现失败重试机制,保证数据完整性

  5. 监控告警:建立 API 调用成功率、数据同步延迟等指标的监控告警

五、总结

通过淘宝平台 API 构建商品实时数据源,是电商数据中台建设的重要基础。本文提供的方案实现了从 API 调用、数据转换到存储的完整链路,可根据实际业务需求进行扩展。

在实际应用中,还需要考虑数据安全、API 权限管理、数据清洗等问题,以确保数据源的稳定、可靠和高质量,为后续的数据分析、业务决策提供有力支持。


少长咸集

群贤毕至

访客