在电商数据分析和应用开发中,从 API 获取原始数据并将其转化为结构化数据库是至关重要的一步。本文将详细介绍如何从淘宝 API 获取商品数据,进行数据清洗,并最终存储到数据库中,为后续的数据分析和应用开发奠定基础。
1. 淘宝 API 数据获取
首先,我们需要从淘宝API 获取商品数据。淘宝提供了丰富的 API 接口,如商品搜索、商品详情等,我们可以通过这些接口获取原始 JSON 格式的数据。
import requests
import json
import time
# 淘宝API配置
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
API_URL = "https://eco.taobao.com/router/rest"
def get_taobao_products(keyword, page=1, page_size=40):
"""
调用淘宝API获取商品数据
"""
params = {
"method": "taobao.tbk.item.get",
"app_key": APP_KEY,
"format": "json",
"v": "2.0",
"sign_method": "md5",
"q": keyword,
"page_no": page,
"page_size": page_size,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
# 这里需要根据淘宝API的签名规则生成签名
# 实际应用中需要实现签名算法
params["sign"] = generate_sign(params, APP_SECRET)
try:
response = requests.get(API_URL, params=params)
result = json.loads(response.text)
if "error_response" in result:
print(f"API调用错误: {result['error_response']['msg']}")
return None
return result["tbk_item_get_response"]["results"]["n_tbk_item"]
except Exception as e:
print(f"获取数据出错: {str(e)}")
return None
# 注意:实际使用时需要实现generate_sign函数
# 签名生成方法请参考淘宝开放平台文档2. 数据结构分析
淘宝 API 返回的 JSON 数据结构通常比较复杂,包含了大量的字段。我们需要先分析这些字段,确定哪些是我们需要的核心信息。
典型的商品数据结构可能包含以下关键信息:
商品 ID (item_id)
商品标题 (title)
商品价格 (price)
商品销量 (sales)
商品图片 (pict_url)
店铺信息 (shop_info)
类目信息 (category)
佣金信息 (commission)
3. 数据清洗与转换
原始 API 数据往往存在一些问题,如格式不一致、冗余信息、缺失值等,需要进行清洗和转换才能用于后续的存储和分析。
import re
import pandas as pd
from datetime import datetime
def clean_price(price_str):
"""清洗价格数据,转换为浮点数"""
try:
# 移除所有非数字和非小数点字符
cleaned = re.sub(r'[^\d.]', '', price_str)
return float(cleaned) if cleaned else None
except:
return None
def clean_sales(sales_str):
"""清洗销量数据,转换为整数"""
try:
if not sales_str:
return 0
# 处理带万的销量,如"1.2万"
if '万' in sales_str:
return int(float(sales_str.replace('万', '')) * 10000)
# 提取数字部分
cleaned = re.sub(r'\D', '', sales_str)
return int(cleaned) if cleaned else 0
except:
return 0
def extract_category(cat_str):
"""从分类字符串中提取主要分类"""
if not cat_str:
return ""
# 假设分类以"|"分隔,取最后一级分类
categories = cat_str.split('|')
return categories[-1] if categories else cat_str
def clean_product_data(raw_data):
"""清洗单个商品数据"""
if not raw_data:
return None
# 提取并清洗需要的字段
cleaned = {
"item_id": raw_data.get("item_id"),
"title": raw_data.get("title", "").strip(),
"price": clean_price(raw_data.get("zk_final_price", "")),
"original_price": clean_price(raw_data.get("reserve_price", "")),
"sales": clean_sales(raw_data.get("sell_num", "0")),
"pict_url": raw_data.get("pict_url", ""),
"provcity": raw_data.get("provcity", ""),
"category": extract_category(raw_data.get("category", "")),
"shop_title": raw_data.get("shop_title", "").strip(),
"user_type": raw_data.get("user_type", 0), # 0表示淘宝店,1表示天猫店
"commission_rate": clean_price(raw_data.get("commission_rate", "0")),
"created_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 数据入库时间
}
# 处理标题中的特殊字符
if cleaned["title"]:
cleaned["title"] = re.sub(r'[^\w\s,。,.-]', '', cleaned["title"])
return cleaned
def batch_clean_products(raw_products):
"""批量清洗商品数据"""
if not raw_products:
return []
cleaned_products = []
for product in raw_products:
cleaned = clean_product_data(product)
if cleaned:
cleaned_products.append(cleaned)
return cleaned_products
# 示例用法
if __name__ == "__main__":
# 假设这是从API获取的原始数据
sample_raw_data = [
{
"item_id": "123456",
"title": "【正品保障】新款夏季连衣裙 显瘦百搭",
"zk_final_price": "129.00",
"reserve_price": "199.00",
"sell_num": "1.2万",
"pict_url": "https://example.com/image1.jpg",
"provcity": "浙江杭州",
"category": "女装|连衣裙",
"shop_title": "时尚女装旗舰店",
"user_type": 1,
"commission_rate": "15.00"
},
# 更多商品数据...
]
# 清洗数据
cleaned_data = batch_clean_products(sample_raw_data)
# 转换为DataFrame便于查看和处理
df = pd.DataFrame(cleaned_data)
print("清洗后的数据:")
print(df.head())4. 数据库设计与连接
接下来,我们需要设计适合存储商品数据的数据库表结构,并建立与数据库的连接。这里以 MySQL 为例:
import mysql.connector
from mysql.connector import Error
import pandas as pd
def create_db_connection(host_name, user_name, user_password, db_name):
"""创建数据库连接"""
connection = None
try:
connection = mysql.connector.connect(
host=host_name,
user=user_name,
passwd=user_password,
database=db_name
)
print("MySQL数据库连接成功")
except Error as err:
print(f"错误: '{err}'")
return connection
def create_table(connection):
"""创建商品数据表"""
create_table_query = """
CREATE TABLE IF NOT EXISTS taobao_products (
id INT AUTO_INCREMENT PRIMARY KEY,
item_id BIGINT NOT NULL UNIQUE,
title VARCHAR(255) NOT NULL,
price DECIMAL(10, 2),
original_price DECIMAL(10, 2),
sales INT DEFAULT 0,
pict_url TEXT,
provcity VARCHAR(100),
category VARCHAR(100),
shop_title VARCHAR(255),
user_type TINYINT DEFAULT 0,
commission_rate DECIMAL(5, 2),
created_time DATETIME,
updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
"""
try:
cursor = connection.cursor()
cursor.execute(create_table_query)
connection.commit()
print("商品数据表创建成功")
except Error as err:
print(f"错误: '{err}'")
# 示例用法
if __name__ == "__main__":
# 数据库配置
db_config = {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "taobao_products_db"
}
# 创建数据库连接
connection = create_db_connection(
db_config["host"],
db_config["user"],
db_config["password"],
db_config["database"]
)
# 创建数据表
if connection:
create_table(connection)
connection.close()5. 数据入库操作
清洗好的数据需要存入数据库中,以便后续查询和分析。我们可以实现一个数据入库的函数,支持批量插入和更新操作。
import mysql.connector
from mysql.connector import Error
import pandas as pd
def insert_or_update_products(connection, products):
"""
批量插入或更新商品数据
如果商品已存在则更新,不存在则插入
"""
if not connection or not products:
return False
try:
cursor = connection.cursor()
# 准备插入或更新的SQL语句
query = """
INSERT INTO taobao_products (
item_id, title, price, original_price, sales,
pict_url, provcity, category, shop_title,
user_type, commission_rate, created_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
price = VALUES(price),
original_price = VALUES(original_price),
sales = VALUES(sales),
pict_url = VALUES(pict_url),
provcity = VALUES(provcity),
category = VALUES(category),
shop_title = VALUES(shop_title),
user_type = VALUES(user_type),
commission_rate = VALUES(commission_rate)
"""
# 准备数据
data = [
(
p["item_id"], p["title"], p["price"], p["original_price"], p["sales"],
p["pict_url"], p["provcity"], p["category"], p["shop_title"],
p["user_type"], p["commission_rate"], p["created_time"]
) for p in products
]
# 执行批量插入
cursor.executemany(query, data)
connection.commit()
print(f"成功插入/更新 {cursor.rowcount} 条商品数据")
return True
except Error as err:
print(f"插入数据错误: '{err}'")
connection.rollback()
return False
finally:
if cursor:
cursor.close()
def export_to_csv(products, filename="taobao_products.csv"):
"""将清洗后的数据导出为CSV文件作为备份"""
if not products:
print("没有数据可导出")
return
df = pd.DataFrame(products)
df.to_csv(filename, index=False, encoding="utf-8-sig")
print(f"数据已导出到 {filename}")
# 示例用法
if __name__ == "__main__":
from db_operations import create_db_connection
# 数据库配置
db_config = {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "taobao_products_db"
}
# 假设这是清洗后的商品数据
cleaned_products = [
{
"item_id": "123456",
"title": "正品保障 新款夏季连衣裙 显瘦百搭",
"price": 129.00,
"original_price": 199.00,
"sales": 12000,
"pict_url": "https://example.com/image1.jpg",
"provcity": "浙江杭州",
"category": "连衣裙",
"shop_title": "时尚女装旗舰店",
"user_type": 1,
"commission_rate": 15.00,
"created_time": "2023-07-15 10:30:00"
},
# 更多商品数据...
]
# 连接数据库并插入数据
connection = create_db_connection(
db_config["host"],
db_config["user"],
db_config["password"],
db_config["database"]
)
if connection:
insert_or_update_products(connection, cleaned_products)
connection.close()
# 导出CSV备份
export_to_csv(cleaned_products)6. 完整流程整合
最后,我们将上述各个步骤整合起来,形成一个完整的从 API 获取数据、清洗转换到入库的流程。
import time
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='taobao_data_pipeline.log'
)
def main():
"""淘宝商品数据处理主流程"""
from api_client import get_taobao_products
from data_cleaning import batch_clean_products
from db_operations import create_db_connection
from data_insertion import insert_or_update_products, export_to_csv
# 配置参数
keywords = ["夏季连衣裙", "男士T恤", "运动鞋"] # 要搜索的关键词
max_pages = 5 # 每个关键词获取的最大页数
db_config = {
"host": "localhost",
"user": "your_username",
"password": "your_password",
"database": "taobao_products_db"
}
# 连接数据库
connection = create_db_connection(
db_config["host"],
db_config["user"],
db_config["password"],
db_config["database"]
)
if not connection:
logging.error("无法连接到数据库,程序退出")
return
try:
total_processed = 0
# 遍历每个关键词
for keyword in keywords:
logging.info(f"开始处理关键词: {keyword}")
# 分页获取数据
for page in range(1, max_pages + 1):
logging.info(f"获取第 {page} 页数据,关键词: {keyword}")
# 调用API获取原始数据
raw_products = get_taobao_products(keyword, page=page)
if not raw_products:
logging.warning(f"未能获取第 {page} 页数据,关键词: {keyword}")
continue
# 清洗数据
cleaned_products = batch_clean_products(raw_products)
logging.info(f"清洗完成,共 {len(cleaned_products)} 条有效数据")
if not cleaned_products:
continue
# 数据入库
if insert_or_update_products(connection, cleaned_products):
total_processed += len(cleaned_products)
# 每获取一页数据后休眠一段时间,避免触发API频率限制
time.sleep(2)
# 每个关键词处理完毕后休眠一段时间
time.sleep(5)
logging.info(f"数据处理完成,共处理 {total_processed} 条商品数据")
# 可以在这里添加数据导出备份的逻辑
# export_to_csv(cleaned_products, f"backup_{datetime.now().strftime('%Y%m%d')}.csv")
except Exception as e:
logging.error(f"程序运行出错: {str(e)}", exc_info=True)
finally:
if connection:
connection.close()
logging.info("数据库连接已关闭")
if __name__ == "__main__":
main()7. 注意事项与优化建议
API 调用限制:淘宝 API 有调用频率限制,实际使用时需要合理设置请求间隔,避免触发限制。
错误处理:在实际应用中,应增强错误处理机制,特别是网络错误、API 返回错误等情况的处理。
数据增量更新:对于大规模数据,可以实现增量更新机制,只更新有变化的数据,提高效率。
数据去重:虽然我们在数据库中设置了 item_id 为唯一键,但在数据清洗阶段也可以先进行去重处理。
日志记录:完善的日志记录有助于排查问题和监控数据处理流程。
分布式处理:如果需要处理海量数据,可以考虑使用分布式爬虫和处理框架,如 Scrapy、Celery 等。
通过以上步骤,我们可以构建一个稳定、高效的淘宝商品数据处理 pipeline,从原始的 API JSON 数据到结构化的数据库存储,为后续的数据分析、商品推荐、市场趋势预测等应用提供坚实的数据基础。