×

api开发 电商平台 数据挖掘

实时价格与库存监控系统开发:淘宝 API 接口接入核心代码解读

admin admin 发表于2025-08-31 15:01:47 浏览24 评论0

抢沙发发表评论

在电商运营和数据分析领域,实时获取商品价格与库存信息具有重要价值。本文将详细介绍如何开发一套基于淘宝 API 的实时价格与库存监控系统,并深入解读核心代码实现。

系统架构设计

实时价格与库存监控系统主要由以下几个核心模块组成:


  1. API 接入模块:负责与淘宝接口进行通信

  2. 数据解析模块:处理 API 返回的 JSON 数据

  3. 存储模块:保存监控数据用于分析和历史对比

  4. 监控告警模块:当价格或库存变动达到阈值时触发通知

  5. 前端展示模块:可视化展示监控数据

淘宝 API 接入准备工作

在开始开发前,需要完成开发者认证,并获取 API 调用所需的 apikey 和 apisecret。主要涉及的 API 包括:


  • 商品详情 API:获取商品基本信息、价格

  • 库存查询 API:获取商品实时库存状态

  • 商品上下架 API:监控商品状态变化

核心代码实现

1. API 接入认证模块

淘宝 API 采用 OAuth2.0 认证方式,以下是获取访问令牌的核心代码:

import requests
import time
import hashlib
import json

class TaobaoAuth:
    def __init__(self, app_key, app_secret, redirect_uri):
        self.app_key = app_key
        self.app_secret = app_secret
        self.redirect_uri = redirect_uri
        self.access_token = None
        self.refresh_token = None
        self.expires_at = 0
    
    def get_auth_url(self):
        """生成授权URL"""
        return f"https://oauth.taobao.com/authorize?response_type=code&client_id={self.app_key}&redirect_uri={self.redirect_uri}&state=STATE"
    
    def get_access_token(self, code):
        """通过授权码获取访问令牌"""
        url = "https://oauth.taobao.com/token"
        params = {
            "grant_type": "authorization_code",
            "code": code,
            "client_id": self.app_key,
            "client_secret": self.app_secret,
            "redirect_uri": self.redirect_uri
        }
        
        response = requests.post(url, params=params)
        result = response.json()
        
        if "access_token" in result:
            self.access_token = result["access_token"]
            self.refresh_token = result["refresh_token"]
            self.expires_at = time.time() + result["expires_in"]
            return True
        return False
    
    def refresh_access_token(self):
        """刷新访问令牌"""
        if not self.refresh_token:
            return False
            
        url = "https://oauth.taobao.com/token"
        params = {
            "grant_type": "refresh_token",
            "refresh_token": self.refresh_token,
            "client_id": self.app_key,
            "client_secret": self.app_secret
        }
        
        response = requests.post(url, params=params)
        result = response.json()
        
        if "access_token" in result:
            self.access_token = result["access_token"]
            self.refresh_token = result["refresh_token"]
            self.expires_at = time.time() + result["expires_in"]
            return True
        return False
    
    def is_token_valid(self):
        """检查令牌是否有效"""
        return self.access_token and time.time() < self.expires_at - 60  # 提前60秒过期

2. 商品价格与库存获取模块

以下代码实现了通过淘宝 API 获取商品价格和库存信息的核心功能:

import requests
import time
import hashlib
from taobao_auth import TaobaoAuth

class TaobaoProductMonitor:
    def __init__(self, app_key, app_secret, redirect_uri):
        self.auth = TaobaoAuth(app_key, app_secret, redirect_uri)
        self.api_url = "https://eco.taobao.com/router/rest"
    
    def _generate_sign(self, params):
        """生成API签名"""
        sorted_params = sorted(params.items())
        sign_str = self.auth.app_secret
        for key, value in sorted_params:
            sign_str += f"{key}{value}"
        sign_str += self.auth.app_secret
        return hashlib.md5(sign_str.encode()).hexdigest().upper()
    
    def _call_api(self, method, params):
        """调用淘宝API"""
        # 检查令牌有效性,无效则刷新
        if not self.auth.is_token_valid():
            if not self.auth.refresh_access_token():
                raise Exception("无法获取有效的访问令牌,请重新授权")
        
        # 构建公共参数
        common_params = {
            "app_key": self.auth.app_key,
            "format": "json",
            "method": method,
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "v": "2.0",
            "sign_method": "md5",
            "session": self.auth.access_token
        }
        
        # 合并参数
        all_params = {**common_params,** params}
        # 生成签名
        all_params["sign"] = self._generate_sign(all_params)
        
        # 调用API
        response = requests.get(self.api_url, params=all_params)
        result = response.json()
        
        # 检查是否有错误
        if "error_response" in result:
            error = result["error_response"]
            raise Exception(f"API调用错误: {error['msg']} (错误码: {error['code']})")
        
        return result
    
    def get_product_info(self, num_iid):
        """获取商品基本信息和价格"""
        try:
            result = self._call_api(
                "taobao.item.get",
                {
                    "fields": "num_iid,title,price,stock,status",
                    "num_iid": num_iid
                }
            )
            return result["item_get_response"]["item"]
        except Exception as e:
            print(f"获取商品信息失败: {str(e)}")
            return None
    
    def get_product_stock(self, num_iid, sku_id=None):
        """获取商品库存信息,支持指定SKU"""
        params = {"num_iid": num_iid}
        if sku_id:
            params["sku_id"] = sku_id
            
        try:
            result = self._call_api(
                "taobao.item.quantity.get",
                params
            )
            return result["item_quantity_get_response"]["item_quantity"]
        except Exception as e:
            print(f"获取商品库存失败: {str(e)}")
            return None
    
    def batch_get_products(self, num_iids):
        """批量获取商品信息"""
        if not num_iids or len(num_iids) > 40:
            raise ValueError("商品ID列表不能为空且数量不能超过40个")
            
        try:
            result = self._call_api(
                "taobao.items.get",
                {
                    "fields": "num_iid,title,price,stock,status",
                    "num_iids": ",".join(map(str, num_iids))
                }
            )
            return result["items_get_response"]["items"]["item"]
        except Exception as e:
            print(f"批量获取商品信息失败: {str(e)}")
            return None

3. 监控与存储模块

以下代码实现了定时监控商品价格和库存变化,并将数据存储到数据库的功能:

import time
import sqlite3
from datetime import datetime
from taobao_product_monitor import TaobaoProductMonitor
import threading

class PriceStockMonitor:
    def __init__(self, app_key, app_secret, redirect_uri, db_name="product_monitor.db"):
        self.monitor = TaobaoProductMonitor(app_key, app_secret, redirect_uri)
        self.db_name = db_name
        self.monitored_products = set()
        self.alert_thresholds = {}  # 存储每个商品的告警阈值
        self.init_database()
        
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()
        
        # 创建商品信息表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            num_iid TEXT PRIMARY KEY,
            title TEXT,
            current_price REAL,
            current_stock INTEGER,
            status TEXT,
            last_updated TIMESTAMP
        )
        ''')
        
        # 创建价格历史表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS price_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            num_iid TEXT,
            price REAL,
            record_time TIMESTAMP,
            FOREIGN KEY (num_iid) REFERENCES products(num_iid)
        )
        ''')
        
        # 创建库存历史表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS stock_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            num_iid TEXT,
            stock INTEGER,
            record_time TIMESTAMP,
            FOREIGN KEY (num_iid) REFERENCES products(num_iid)
        )
        ''')
        
        conn.commit()
        conn.close()
    
    def add_monitored_product(self, num_iid, price_threshold=None, stock_threshold=None):
        """添加需要监控的商品"""
        self.monitored_products.add(num_iid)
        self.alert_thresholds[num_iid] = {
            'price': price_threshold,
            'stock': stock_threshold
        }
        # 立即获取一次初始数据
        self.fetch_and_save_product_data(num_iid)
    
    def remove_monitored_product(self, num_iid):
        """移除监控的商品"""
        if num_iid in self.monitored_products:
            self.monitored_products.remove(num_iid)
            del self.alert_thresholds[num_iid]
            return True
        return False
    
    def fetch_and_save_product_data(self, num_iid):
        """获取并保存商品数据"""
        # 获取商品信息
        product = self.monitor.get_product_info(num_iid)
        if not product:
            return False
            
        # 获取库存信息
        stock = self.monitor.get_product_stock(num_iid)
        if stock:
            product['stock'] = stock['quantity']
        
        # 保存数据
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()
        
        # 更新商品表
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        cursor.execute('''
        INSERT OR REPLACE INTO products 
        (num_iid, title, current_price, current_stock, status, last_updated)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            product['num_iid'],
            product['title'],
            float(product['price']),
            int(product['stock']),
            product['status'],
            current_time
        ))
        
        # 记录价格历史
        cursor.execute('''
        INSERT INTO price_history (num_iid, price, record_time)
        VALUES (?, ?, ?)
        ''', (product['num_iid'], float(product['price']), current_time))
        
        # 记录库存历史
        cursor.execute('''
        INSERT INTO stock_history (num_iid, stock, record_time)
        VALUES (?, ?, ?)
        ''', (product['num_iid'], int(product['stock']), current_time))
        
        conn.commit()
        conn.close()
        
        # 检查是否需要告警
        self.check_alert_conditions(product)
        
        return True
    
    def check_alert_conditions(self, product):
        """检查是否达到告警条件"""
        thresholds = self.alert_thresholds.get(product['num_iid'], {})
        alerts = []
        
        # 价格告警检查
        if thresholds['price'] is not None:
            current_price = float(product['price'])
            if current_price <= thresholds['price']['min']:
                alerts.append(f"价格低于阈值: {current_price} <= {thresholds['price']['min']}")
            elif current_price >= thresholds['price']['max']:
                alerts.append(f"价格高于阈值: {current_price} >= {thresholds['price']['max']}")
        
        # 库存告警检查
        if thresholds['stock'] is not None:
            current_stock = int(product['stock'])
            if current_stock <= thresholds['stock']:
                alerts.append(f"库存低于阈值: {current_stock} <= {thresholds['stock']}")
        
        # 发送告警
        if alerts:
            self.send_alert(product['num_iid'], product['title'], alerts)
    
    def send_alert(self, num_iid, title, messages):
        """发送告警通知"""
        # 这里可以实现邮件、短信或其他方式的告警
        print(f"\n【告警】商品 {title} ({num_iid})")
        for msg in messages:
            print(f"- {msg}")
    
    def start_monitoring(self, interval=300):
        """开始监控任务"""
        print(f"开始监控,刷新间隔: {interval}秒")
        print(f"监控商品数量: {len(self.monitored_products)}")
        
        def monitor_loop():
            while True:
                current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                print(f"\n[{current_time}] 开始刷新商品数据...")
                
                for num_iid in list(self.monitored_products):
                    print(f"处理商品 {num_iid}...")
                    self.fetch_and_save_product_data(num_iid)
                
                print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 数据刷新完成,等待下一次刷新...")
                time.sleep(interval)
        
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
        self.monitor_thread.start()
        return self.monitor_thread

系统使用示例

以下是如何使用上述模块构建一个完整的监控系统的示例代码:

from monitor_service import PriceStockMonitor

def main():
    # 替换为你的淘宝开放平台应用信息
    APP_KEY = "你的APP_KEY"
    APP_SECRET = "你的APP_SECRET"
    REDIRECT_URI = "你的回调地址"
    
    # 初始化监控器
    monitor = PriceStockMonitor(APP_KEY, APP_SECRET, REDIRECT_URI)
    
    # 提示用户进行授权
    print("请访问以下URL进行授权:")
    print(monitor.monitor.auth.get_auth_url())
    auth_code = input("请输入授权后得到的code: ")
    
    # 获取访问令牌
    if not monitor.monitor.auth.get_access_token(auth_code):
        print("授权失败,程序退出")
        return
    
    # 添加需要监控的商品
    # 格式: (商品ID, 价格阈值{min, max}, 库存阈值)
    products_to_monitor = [
        ("123456789", {"min": 99.0, "max": 199.0}, 10),  # 示例商品1
        ("987654321", {"min": 49.0, "max": 89.0}, 5)     # 示例商品2
    ]
    
    for num_iid, price_threshold, stock_threshold in products_to_monitor:
        monitor.add_monitored_product(num_iid, price_threshold, stock_threshold)
    
    # 开始监控,设置刷新间隔为300秒(5分钟)
    monitor.start_monitoring(interval=300)
    
    # 保持主线程运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("用户中断,程序退出")

if __name__ == "__main__":
    import time
    main()

代码关键技术点解读

  1. API 签名机制:淘宝 API 采用 MD5 签名方式,通过将参数按字典序排序后拼接密钥进行加密,确保请求的安全性和完整性。

  2. 令牌管理:实现了自动刷新访问令牌的机制,避免因令牌过期导致监控中断。

  3. 数据存储设计:采用 SQLite 数据库存储商品信息、价格历史和库存历史,便于数据分析和趋势预测。

  4. 定时监控:使用线程实现定时任务,定期获取商品数据并检查是否触发告警条件。

  5. 模块化设计:将认证、API 调用、数据存储和监控告警功能拆分为独立模块,提高代码可维护性和可扩展性。

系统扩展建议

  1. 增加数据可视化功能,使用 Matplotlib 或 ECharts 绘制价格和库存变化趋势图。

  2. 实现更丰富的告警方式,如邮件、短信、企业微信 / 钉钉通知等。

  3. 增加 SKU 级别的监控,支持多规格商品的价格和库存跟踪。

  4. 加入数据分析功能,如价格波动分析、库存预警预测等。

  5. 开发 Web 管理界面,方便用户配置监控商品和查看监控数据。


通过以上实现,我们可以构建一个稳定、高效的淘宝商品实时价格与库存监控系统,为电商运营决策提供数据支持。


少长咸集

群贤毕至

访客