在电商运营和数据分析领域,实时获取商品价格与库存信息具有重要价值。本文将详细介绍如何开发一套基于淘宝 API 的实时价格与库存监控系统,并深入解读核心代码实现。
系统架构设计
实时价格与库存监控系统主要由以下几个核心模块组成:
API 接入模块:负责与淘宝接口进行通信
数据解析模块:处理 API 返回的 JSON 数据
存储模块:保存监控数据用于分析和历史对比
监控告警模块:当价格或库存变动达到阈值时触发通知
前端展示模块:可视化展示监控数据
淘宝 API 接入准备工作
在开始开发前,需要完成开发者认证,并获取 API 调用所需的 apikey 和 apisecret。主要涉及的 API 包括:
商品详情 API:获取商品基本信息、价格
库存查询 API:获取商品实时库存状态
商品上下架 API:监控商品状态变化
核心代码实现
1. API 接入认证模块
淘宝 API 采用 OAuth2.0 认证方式,以下是获取访问令牌的核心代码:
import requests import time import hashlib import json class TaobaoAuth: def __init__(self, app_key, app_secret, redirect_uri): self.app_key = app_key self.app_secret = app_secret self.redirect_uri = redirect_uri self.access_token = None self.refresh_token = None self.expires_at = 0 def get_auth_url(self): """生成授权URL""" return f"https://oauth.taobao.com/authorize?response_type=code&client_id={self.app_key}&redirect_uri={self.redirect_uri}&state=STATE" def get_access_token(self, code): """通过授权码获取访问令牌""" url = "https://oauth.taobao.com/token" params = { "grant_type": "authorization_code", "code": code, "client_id": self.app_key, "client_secret": self.app_secret, "redirect_uri": self.redirect_uri } response = requests.post(url, params=params) result = response.json() if "access_token" in result: self.access_token = result["access_token"] self.refresh_token = result["refresh_token"] self.expires_at = time.time() + result["expires_in"] return True return False def refresh_access_token(self): """刷新访问令牌""" if not self.refresh_token: return False url = "https://oauth.taobao.com/token" params = { "grant_type": "refresh_token", "refresh_token": self.refresh_token, "client_id": self.app_key, "client_secret": self.app_secret } response = requests.post(url, params=params) result = response.json() if "access_token" in result: self.access_token = result["access_token"] self.refresh_token = result["refresh_token"] self.expires_at = time.time() + result["expires_in"] return True return False def is_token_valid(self): """检查令牌是否有效""" return self.access_token and time.time() < self.expires_at - 60 # 提前60秒过期
2. 商品价格与库存获取模块
以下代码实现了通过淘宝 API 获取商品价格和库存信息的核心功能:
import requests import time import hashlib from taobao_auth import TaobaoAuth class TaobaoProductMonitor: def __init__(self, app_key, app_secret, redirect_uri): self.auth = TaobaoAuth(app_key, app_secret, redirect_uri) self.api_url = "https://eco.taobao.com/router/rest" def _generate_sign(self, params): """生成API签名""" sorted_params = sorted(params.items()) sign_str = self.auth.app_secret for key, value in sorted_params: sign_str += f"{key}{value}" sign_str += self.auth.app_secret return hashlib.md5(sign_str.encode()).hexdigest().upper() def _call_api(self, method, params): """调用淘宝API""" # 检查令牌有效性,无效则刷新 if not self.auth.is_token_valid(): if not self.auth.refresh_access_token(): raise Exception("无法获取有效的访问令牌,请重新授权") # 构建公共参数 common_params = { "app_key": self.auth.app_key, "format": "json", "method": method, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "v": "2.0", "sign_method": "md5", "session": self.auth.access_token } # 合并参数 all_params = {**common_params,** params} # 生成签名 all_params["sign"] = self._generate_sign(all_params) # 调用API response = requests.get(self.api_url, params=all_params) result = response.json() # 检查是否有错误 if "error_response" in result: error = result["error_response"] raise Exception(f"API调用错误: {error['msg']} (错误码: {error['code']})") return result def get_product_info(self, num_iid): """获取商品基本信息和价格""" try: result = self._call_api( "taobao.item.get", { "fields": "num_iid,title,price,stock,status", "num_iid": num_iid } ) return result["item_get_response"]["item"] except Exception as e: print(f"获取商品信息失败: {str(e)}") return None def get_product_stock(self, num_iid, sku_id=None): """获取商品库存信息,支持指定SKU""" params = {"num_iid": num_iid} if sku_id: params["sku_id"] = sku_id try: result = self._call_api( "taobao.item.quantity.get", params ) return result["item_quantity_get_response"]["item_quantity"] except Exception as e: print(f"获取商品库存失败: {str(e)}") return None def batch_get_products(self, num_iids): """批量获取商品信息""" if not num_iids or len(num_iids) > 40: raise ValueError("商品ID列表不能为空且数量不能超过40个") try: result = self._call_api( "taobao.items.get", { "fields": "num_iid,title,price,stock,status", "num_iids": ",".join(map(str, num_iids)) } ) return result["items_get_response"]["items"]["item"] except Exception as e: print(f"批量获取商品信息失败: {str(e)}") return None
3. 监控与存储模块
以下代码实现了定时监控商品价格和库存变化,并将数据存储到数据库的功能:
import time import sqlite3 from datetime import datetime from taobao_product_monitor import TaobaoProductMonitor import threading class PriceStockMonitor: def __init__(self, app_key, app_secret, redirect_uri, db_name="product_monitor.db"): self.monitor = TaobaoProductMonitor(app_key, app_secret, redirect_uri) self.db_name = db_name self.monitored_products = set() self.alert_thresholds = {} # 存储每个商品的告警阈值 self.init_database() def init_database(self): """初始化数据库""" conn = sqlite3.connect(self.db_name) cursor = conn.cursor() # 创建商品信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS products ( num_iid TEXT PRIMARY KEY, title TEXT, current_price REAL, current_stock INTEGER, status TEXT, last_updated TIMESTAMP ) ''') # 创建价格历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS price_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, num_iid TEXT, price REAL, record_time TIMESTAMP, FOREIGN KEY (num_iid) REFERENCES products(num_iid) ) ''') # 创建库存历史表 cursor.execute(''' CREATE TABLE IF NOT EXISTS stock_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, num_iid TEXT, stock INTEGER, record_time TIMESTAMP, FOREIGN KEY (num_iid) REFERENCES products(num_iid) ) ''') conn.commit() conn.close() def add_monitored_product(self, num_iid, price_threshold=None, stock_threshold=None): """添加需要监控的商品""" self.monitored_products.add(num_iid) self.alert_thresholds[num_iid] = { 'price': price_threshold, 'stock': stock_threshold } # 立即获取一次初始数据 self.fetch_and_save_product_data(num_iid) def remove_monitored_product(self, num_iid): """移除监控的商品""" if num_iid in self.monitored_products: self.monitored_products.remove(num_iid) del self.alert_thresholds[num_iid] return True return False def fetch_and_save_product_data(self, num_iid): """获取并保存商品数据""" # 获取商品信息 product = self.monitor.get_product_info(num_iid) if not product: return False # 获取库存信息 stock = self.monitor.get_product_stock(num_iid) if stock: product['stock'] = stock['quantity'] # 保存数据 conn = sqlite3.connect(self.db_name) cursor = conn.cursor() # 更新商品表 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') cursor.execute(''' INSERT OR REPLACE INTO products (num_iid, title, current_price, current_stock, status, last_updated) VALUES (?, ?, ?, ?, ?, ?) ''', ( product['num_iid'], product['title'], float(product['price']), int(product['stock']), product['status'], current_time )) # 记录价格历史 cursor.execute(''' INSERT INTO price_history (num_iid, price, record_time) VALUES (?, ?, ?) ''', (product['num_iid'], float(product['price']), current_time)) # 记录库存历史 cursor.execute(''' INSERT INTO stock_history (num_iid, stock, record_time) VALUES (?, ?, ?) ''', (product['num_iid'], int(product['stock']), current_time)) conn.commit() conn.close() # 检查是否需要告警 self.check_alert_conditions(product) return True def check_alert_conditions(self, product): """检查是否达到告警条件""" thresholds = self.alert_thresholds.get(product['num_iid'], {}) alerts = [] # 价格告警检查 if thresholds['price'] is not None: current_price = float(product['price']) if current_price <= thresholds['price']['min']: alerts.append(f"价格低于阈值: {current_price} <= {thresholds['price']['min']}") elif current_price >= thresholds['price']['max']: alerts.append(f"价格高于阈值: {current_price} >= {thresholds['price']['max']}") # 库存告警检查 if thresholds['stock'] is not None: current_stock = int(product['stock']) if current_stock <= thresholds['stock']: alerts.append(f"库存低于阈值: {current_stock} <= {thresholds['stock']}") # 发送告警 if alerts: self.send_alert(product['num_iid'], product['title'], alerts) def send_alert(self, num_iid, title, messages): """发送告警通知""" # 这里可以实现邮件、短信或其他方式的告警 print(f"\n【告警】商品 {title} ({num_iid})") for msg in messages: print(f"- {msg}") def start_monitoring(self, interval=300): """开始监控任务""" print(f"开始监控,刷新间隔: {interval}秒") print(f"监控商品数量: {len(self.monitored_products)}") def monitor_loop(): while True: current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"\n[{current_time}] 开始刷新商品数据...") for num_iid in list(self.monitored_products): print(f"处理商品 {num_iid}...") self.fetch_and_save_product_data(num_iid) print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 数据刷新完成,等待下一次刷新...") time.sleep(interval) # 启动监控线程 self.monitor_thread = threading.Thread(target=monitor_loop, daemon=True) self.monitor_thread.start() return self.monitor_thread
系统使用示例
以下是如何使用上述模块构建一个完整的监控系统的示例代码:
from monitor_service import PriceStockMonitor def main(): # 替换为你的淘宝开放平台应用信息 APP_KEY = "你的APP_KEY" APP_SECRET = "你的APP_SECRET" REDIRECT_URI = "你的回调地址" # 初始化监控器 monitor = PriceStockMonitor(APP_KEY, APP_SECRET, REDIRECT_URI) # 提示用户进行授权 print("请访问以下URL进行授权:") print(monitor.monitor.auth.get_auth_url()) auth_code = input("请输入授权后得到的code: ") # 获取访问令牌 if not monitor.monitor.auth.get_access_token(auth_code): print("授权失败,程序退出") return # 添加需要监控的商品 # 格式: (商品ID, 价格阈值{min, max}, 库存阈值) products_to_monitor = [ ("123456789", {"min": 99.0, "max": 199.0}, 10), # 示例商品1 ("987654321", {"min": 49.0, "max": 89.0}, 5) # 示例商品2 ] for num_iid, price_threshold, stock_threshold in products_to_monitor: monitor.add_monitored_product(num_iid, price_threshold, stock_threshold) # 开始监控,设置刷新间隔为300秒(5分钟) monitor.start_monitoring(interval=300) # 保持主线程运行 try: while True: time.sleep(1) except KeyboardInterrupt: print("用户中断,程序退出") if __name__ == "__main__": import time main()
代码关键技术点解读
API 签名机制:淘宝 API 采用 MD5 签名方式,通过将参数按字典序排序后拼接密钥进行加密,确保请求的安全性和完整性。
令牌管理:实现了自动刷新访问令牌的机制,避免因令牌过期导致监控中断。
数据存储设计:采用 SQLite 数据库存储商品信息、价格历史和库存历史,便于数据分析和趋势预测。
定时监控:使用线程实现定时任务,定期获取商品数据并检查是否触发告警条件。
模块化设计:将认证、API 调用、数据存储和监控告警功能拆分为独立模块,提高代码可维护性和可扩展性。
系统扩展建议
增加数据可视化功能,使用 Matplotlib 或 ECharts 绘制价格和库存变化趋势图。
实现更丰富的告警方式,如邮件、短信、企业微信 / 钉钉通知等。
增加 SKU 级别的监控,支持多规格商品的价格和库存跟踪。
加入数据分析功能,如价格波动分析、库存预警预测等。
开发 Web 管理界面,方便用户配置监控商品和查看监控数据。
通过以上实现,我们可以构建一个稳定、高效的淘宝商品实时价格与库存监控系统,为电商运营决策提供数据支持。