×

api开发 电商平台 数据挖掘

从原始 JSON 到结构化商品数据库:淘宝 API 数据清洗与入库完全指南

admin admin 发表于2025-10-16 17:10:05 浏览33 评论0

抢沙发发表评论

在电商数据分析和应用开发中,从 API 获取原始数据并将其转化为结构化数据库是至关重要的一步。本文将详细介绍如何从淘宝 API 获取商品数据,进行数据清洗,并最终存储到数据库中,为后续的数据分析和应用开发奠定基础。

1. 淘宝 API 数据获取

首先,我们需要从淘宝API 获取商品数据。淘宝提供了丰富的 API 接口,如商品搜索、商品详情等,我们可以通过这些接口获取原始 JSON 格式的数据。

import requests
import json
import time

# 淘宝API配置
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
API_URL = "https://eco.taobao.com/router/rest"

def get_taobao_products(keyword, page=1, page_size=40):
    """
    调用淘宝API获取商品数据
    """
    params = {
        "method": "taobao.tbk.item.get",
        "app_key": APP_KEY,
        "format": "json",
        "v": "2.0",
        "sign_method": "md5",
        "q": keyword,
        "page_no": page,
        "page_size": page_size,
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
    }
    
    # 这里需要根据淘宝API的签名规则生成签名
    # 实际应用中需要实现签名算法
    params["sign"] = generate_sign(params, APP_SECRET)
    
    try:
        response = requests.get(API_URL, params=params)
        result = json.loads(response.text)
        
        if "error_response" in result:
            print(f"API调用错误: {result['error_response']['msg']}")
            return None
            
        return result["tbk_item_get_response"]["results"]["n_tbk_item"]
        
    except Exception as e:
        print(f"获取数据出错: {str(e)}")
        return None

# 注意:实际使用时需要实现generate_sign函数
# 签名生成方法请参考淘宝开放平台文档

2. 数据结构分析

淘宝 API 返回的 JSON 数据结构通常比较复杂,包含了大量的字段。我们需要先分析这些字段,确定哪些是我们需要的核心信息。

典型的商品数据结构可能包含以下关键信息:

  • 商品 ID (item_id)

  • 商品标题 (title)

  • 商品价格 (price)

  • 商品销量 (sales)

  • 商品图片 (pict_url)

  • 店铺信息 (shop_info)

  • 类目信息 (category)

  • 佣金信息 (commission)

3. 数据清洗与转换

原始 API 数据往往存在一些问题,如格式不一致、冗余信息、缺失值等,需要进行清洗和转换才能用于后续的存储和分析。

import re
import pandas as pd
from datetime import datetime

def clean_price(price_str):
    """清洗价格数据,转换为浮点数"""
    try:
        # 移除所有非数字和非小数点字符
        cleaned = re.sub(r'[^\d.]', '', price_str)
        return float(cleaned) if cleaned else None
    except:
        return None

def clean_sales(sales_str):
    """清洗销量数据,转换为整数"""
    try:
        if not sales_str:
            return 0
            
        # 处理带万的销量,如"1.2万"
        if '万' in sales_str:
            return int(float(sales_str.replace('万', '')) * 10000)
            
        # 提取数字部分
        cleaned = re.sub(r'\D', '', sales_str)
        return int(cleaned) if cleaned else 0
    except:
        return 0

def extract_category(cat_str):
    """从分类字符串中提取主要分类"""
    if not cat_str:
        return ""
        
    # 假设分类以"|"分隔,取最后一级分类
    categories = cat_str.split('|')
    return categories[-1] if categories else cat_str

def clean_product_data(raw_data):
    """清洗单个商品数据"""
    if not raw_data:
        return None
        
    # 提取并清洗需要的字段
    cleaned = {
        "item_id": raw_data.get("item_id"),
        "title": raw_data.get("title", "").strip(),
        "price": clean_price(raw_data.get("zk_final_price", "")),
        "original_price": clean_price(raw_data.get("reserve_price", "")),
        "sales": clean_sales(raw_data.get("sell_num", "0")),
        "pict_url": raw_data.get("pict_url", ""),
        "provcity": raw_data.get("provcity", ""),
        "category": extract_category(raw_data.get("category", "")),
        "shop_title": raw_data.get("shop_title", "").strip(),
        "user_type": raw_data.get("user_type", 0),  # 0表示淘宝店,1表示天猫店
        "commission_rate": clean_price(raw_data.get("commission_rate", "0")),
        "created_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # 数据入库时间
    }
    
    # 处理标题中的特殊字符
    if cleaned["title"]:
        cleaned["title"] = re.sub(r'[^\w\s,。,.-]', '', cleaned["title"])
        
    return cleaned

def batch_clean_products(raw_products):
    """批量清洗商品数据"""
    if not raw_products:
        return []
        
    cleaned_products = []
    for product in raw_products:
        cleaned = clean_product_data(product)
        if cleaned:
            cleaned_products.append(cleaned)
            
    return cleaned_products

# 示例用法
if __name__ == "__main__":
    # 假设这是从API获取的原始数据
    sample_raw_data = [
        {
            "item_id": "123456",
            "title": "【正品保障】新款夏季连衣裙 显瘦百搭",
            "zk_final_price": "129.00",
            "reserve_price": "199.00",
            "sell_num": "1.2万",
            "pict_url": "https://example.com/image1.jpg",
            "provcity": "浙江杭州",
            "category": "女装|连衣裙",
            "shop_title": "时尚女装旗舰店",
            "user_type": 1,
            "commission_rate": "15.00"
        },
        # 更多商品数据...
    ]
    
    # 清洗数据
    cleaned_data = batch_clean_products(sample_raw_data)
    
    # 转换为DataFrame便于查看和处理
    df = pd.DataFrame(cleaned_data)
    print("清洗后的数据:")
    print(df.head())

4. 数据库设计与连接

接下来,我们需要设计适合存储商品数据的数据库表结构,并建立与数据库的连接。这里以 MySQL 为例:

import mysql.connector
from mysql.connector import Error
import pandas as pd

def create_db_connection(host_name, user_name, user_password, db_name):
    """创建数据库连接"""
    connection = None
    try:
        connection = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password,
            database=db_name
        )
        print("MySQL数据库连接成功")
    except Error as err:
        print(f"错误: '{err}'")
    
    return connection

def create_table(connection):
    """创建商品数据表"""
    create_table_query = """
    CREATE TABLE IF NOT EXISTS taobao_products (
        id INT AUTO_INCREMENT PRIMARY KEY,
        item_id BIGINT NOT NULL UNIQUE,
        title VARCHAR(255) NOT NULL,
        price DECIMAL(10, 2),
        original_price DECIMAL(10, 2),
        sales INT DEFAULT 0,
        pict_url TEXT,
        provcity VARCHAR(100),
        category VARCHAR(100),
        shop_title VARCHAR(255),
        user_type TINYINT DEFAULT 0,
        commission_rate DECIMAL(5, 2),
        created_time DATETIME,
        updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
    );
    """
    
    try:
        cursor = connection.cursor()
        cursor.execute(create_table_query)
        connection.commit()
        print("商品数据表创建成功")
    except Error as err:
        print(f"错误: '{err}'")

# 示例用法
if __name__ == "__main__":
    # 数据库配置
    db_config = {
        "host": "localhost",
        "user": "your_username",
        "password": "your_password",
        "database": "taobao_products_db"
    }
    
    # 创建数据库连接
    connection = create_db_connection(
        db_config["host"],
        db_config["user"],
        db_config["password"],
        db_config["database"]
    )
    
    # 创建数据表
    if connection:
        create_table(connection)
        connection.close()

5. 数据入库操作

清洗好的数据需要存入数据库中,以便后续查询和分析。我们可以实现一个数据入库的函数,支持批量插入和更新操作。

import mysql.connector
from mysql.connector import Error
import pandas as pd

def insert_or_update_products(connection, products):
    """
    批量插入或更新商品数据
    如果商品已存在则更新,不存在则插入
    """
    if not connection or not products:
        return False
        
    try:
        cursor = connection.cursor()
        
        # 准备插入或更新的SQL语句
        query = """
        INSERT INTO taobao_products (
            item_id, title, price, original_price, sales, 
            pict_url, provcity, category, shop_title, 
            user_type, commission_rate, created_time
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            title = VALUES(title),
            price = VALUES(price),
            original_price = VALUES(original_price),
            sales = VALUES(sales),
            pict_url = VALUES(pict_url),
            provcity = VALUES(provcity),
            category = VALUES(category),
            shop_title = VALUES(shop_title),
            user_type = VALUES(user_type),
            commission_rate = VALUES(commission_rate)
        """
        
        # 准备数据
        data = [
            (
                p["item_id"], p["title"], p["price"], p["original_price"], p["sales"],
                p["pict_url"], p["provcity"], p["category"], p["shop_title"],
                p["user_type"], p["commission_rate"], p["created_time"]
            ) for p in products
        ]
        
        # 执行批量插入
        cursor.executemany(query, data)
        connection.commit()
        
        print(f"成功插入/更新 {cursor.rowcount} 条商品数据")
        return True
        
    except Error as err:
        print(f"插入数据错误: '{err}'")
        connection.rollback()
        return False
    finally:
        if cursor:
            cursor.close()

def export_to_csv(products, filename="taobao_products.csv"):
    """将清洗后的数据导出为CSV文件作为备份"""
    if not products:
        print("没有数据可导出")
        return
        
    df = pd.DataFrame(products)
    df.to_csv(filename, index=False, encoding="utf-8-sig")
    print(f"数据已导出到 {filename}")

# 示例用法
if __name__ == "__main__":
    from db_operations import create_db_connection
    
    # 数据库配置
    db_config = {
        "host": "localhost",
        "user": "your_username",
        "password": "your_password",
        "database": "taobao_products_db"
    }
    
    # 假设这是清洗后的商品数据
    cleaned_products = [
        {
            "item_id": "123456",
            "title": "正品保障 新款夏季连衣裙 显瘦百搭",
            "price": 129.00,
            "original_price": 199.00,
            "sales": 12000,
            "pict_url": "https://example.com/image1.jpg",
            "provcity": "浙江杭州",
            "category": "连衣裙",
            "shop_title": "时尚女装旗舰店",
            "user_type": 1,
            "commission_rate": 15.00,
            "created_time": "2023-07-15 10:30:00"
        },
        # 更多商品数据...
    ]
    
    # 连接数据库并插入数据
    connection = create_db_connection(
        db_config["host"],
        db_config["user"],
        db_config["password"],
        db_config["database"]
    )
    
    if connection:
        insert_or_update_products(connection, cleaned_products)
        connection.close()
    
    # 导出CSV备份
    export_to_csv(cleaned_products)

6. 完整流程整合

最后,我们将上述各个步骤整合起来,形成一个完整的从 API 获取数据、清洗转换到入库的流程。

import time
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='taobao_data_pipeline.log'
)

def main():
    """淘宝商品数据处理主流程"""
    from api_client import get_taobao_products
    from data_cleaning import batch_clean_products
    from db_operations import create_db_connection
    from data_insertion import insert_or_update_products, export_to_csv
    
    # 配置参数
    keywords = ["夏季连衣裙", "男士T恤", "运动鞋"]  # 要搜索的关键词
    max_pages = 5  # 每个关键词获取的最大页数
    db_config = {
        "host": "localhost",
        "user": "your_username",
        "password": "your_password",
        "database": "taobao_products_db"
    }
    
    # 连接数据库
    connection = create_db_connection(
        db_config["host"],
        db_config["user"],
        db_config["password"],
        db_config["database"]
    )
    
    if not connection:
        logging.error("无法连接到数据库,程序退出")
        return
    
    try:
        total_processed = 0
        
        # 遍历每个关键词
        for keyword in keywords:
            logging.info(f"开始处理关键词: {keyword}")
            
            # 分页获取数据
            for page in range(1, max_pages + 1):
                logging.info(f"获取第 {page} 页数据,关键词: {keyword}")
                
                # 调用API获取原始数据
                raw_products = get_taobao_products(keyword, page=page)
                
                if not raw_products:
                    logging.warning(f"未能获取第 {page} 页数据,关键词: {keyword}")
                    continue
                
                # 清洗数据
                cleaned_products = batch_clean_products(raw_products)
                logging.info(f"清洗完成,共 {len(cleaned_products)} 条有效数据")
                
                if not cleaned_products:
                    continue
                
                # 数据入库
                if insert_or_update_products(connection, cleaned_products):
                    total_processed += len(cleaned_products)
                
                # 每获取一页数据后休眠一段时间,避免触发API频率限制
                time.sleep(2)
            
            # 每个关键词处理完毕后休眠一段时间
            time.sleep(5)
        
        logging.info(f"数据处理完成,共处理 {total_processed} 条商品数据")
        
        # 可以在这里添加数据导出备份的逻辑
        # export_to_csv(cleaned_products, f"backup_{datetime.now().strftime('%Y%m%d')}.csv")
        
    except Exception as e:
        logging.error(f"程序运行出错: {str(e)}", exc_info=True)
    finally:
        if connection:
            connection.close()
            logging.info("数据库连接已关闭")

if __name__ == "__main__":
    main()

7. 注意事项与优化建议

  1. API 调用限制:淘宝 API 有调用频率限制,实际使用时需要合理设置请求间隔,避免触发限制。

  2. 错误处理:在实际应用中,应增强错误处理机制,特别是网络错误、API 返回错误等情况的处理。

  3. 数据增量更新:对于大规模数据,可以实现增量更新机制,只更新有变化的数据,提高效率。

  4. 数据去重:虽然我们在数据库中设置了 item_id 为唯一键,但在数据清洗阶段也可以先进行去重处理。

  5. 日志记录:完善的日志记录有助于排查问题和监控数据处理流程。

  6. 分布式处理:如果需要处理海量数据,可以考虑使用分布式爬虫和处理框架,如 Scrapy、Celery 等。

通过以上步骤,我们可以构建一个稳定、高效的淘宝商品数据处理 pipeline,从原始的 API JSON 数据到结构化的数据库存储,为后续的数据分析、商品推荐、市场趋势预测等应用提供坚实的数据基础。

少长咸集

群贤毕至

访客