×

api开发 电商平台 数据挖掘

Python 异步框架 (Async/Aiohttp) 调用淘宝 API:实现万级商品数据异步采集

admin admin 发表于2025-08-28 11:46:26 浏览24 评论0

抢沙发发表评论

在电商数据分析、竞品监控等业务场景中,需要高效采集大量商品数据。传统的同步采集方式受限于网络 IO 等待,采集效率极低,难以满足万级甚至十万级商品数据的采集需求。而 Python 的异步编程(基于 Asyncio)结合 Aiohttp 框架,能够通过并发处理网络请求大幅提升采集效率,完美解决大规模数据采集的性能瓶颈。本文将详细讲解如何使用 Async/Aiohttp 调用淘宝平台 API,实现万级商品数据的异步采集,并提供完整可运行的代码示例。

一、核心技术原理与优势

在开始实战前,我们需要先理解异步采集的核心优势,以及淘宝 API 调用的关键注意事项,为后续开发奠定基础。

1. 同步采集 vs 异步采集的效率差异

  • 同步采集:每次请求必须等待前一个请求完成(包括网络连接、数据返回的 IO 等待)才能发起下一个请求。假设单个请求平均耗时 1 秒,采集 10000 条数据需要约 2.7 小时(10000 秒)。

  • 异步采集:通过事件循环(Event Loop)管理请求,当一个请求处于 IO 等待时,立即切换到其他请求继续执行,无需等待。在合理的并发数下(如 100-200),采集 10000 条数据仅需 1-2 分钟,效率提升超 100 倍。

2. 淘宝 API 调用的关键前提

要调用淘宝 API,需先完成以下准备工作,否则会导致接口调用失败:

  1. 注册淘宝账号:访问注册开发者账号。

  2. 创建应用并获取密钥:获取Api Key和Api Secret(接口调用的身份凭证)。

  3. 申请 API 权限:本文使用 “taobao.item.search” 接口(商品搜索接口),需在应用中申请该接口的调用权限(个人开发者可申请 “测试权限”,支持少量数据采集;企业开发者可申请正式权限)。

  4. 获取 Access Token:部分接口需要 Access Token(用户授权凭证),若仅采集公开商品数据,可使用 “应用级 Access Token”(通过 App Key 和 App Secret 获取)。

二、环境搭建

1. 安装依赖库

本文需要用到以下库:

  • asyncio:Python 内置的异步编程框架,用于管理事件循环和协程。

  • aiohttp:异步 HTTP 客户端库,用于发起异步网络请求(替代同步的requests库)。

  • pycryptodome:用于淘宝 API 签名(淘宝 API 要求请求参数需进行 HMAC-SHA256 签名)。

  • pandas:用于数据处理和存储(将采集到的商品数据保存为 Excel 或 CSV 文件)。

通过 pip 安装依赖:

pip install aiohttp pycryptodome pandas

2. 淘宝 API 签名工具函数

淘宝 API 要求所有请求参数(包括公共参数和业务参数)需按 “参数名 ASCII 排序” 后,用 App Secret 进行 HMAC-SHA256 签名,生成sign参数(签名步骤参考平台文档)。

我们先实现签名工具函数,方便后续调用:

import hmac
import hashlib
import urllib.parse

def generate_taobao_sign(params: dict, app_secret: str) -> str:
    """
    生成淘宝API请求的签名(HMAC-SHA256)
    :param params: 请求参数(包含公共参数和业务参数)
    :param app_secret: 应用的App Secret
    :return: 签名字符串(小写)
    """
    # 1. 按参数名ASCII升序排序
    sorted_params = sorted(params.items(), key=lambda x: x[0])
    # 2. 拼接为“key=value”格式的字符串(无分隔符)
    canonical_str = ''.join([f"{k}{v}" for k, v in sorted_params])
    # 3. 在字符串前后添加App Secret
    sign_str = app_secret + canonical_str + app_secret
    # 4. HMAC-SHA256签名并转为小写
    sign = hmac.new(
        app_secret.encode('utf-8'),
        sign_str.encode('utf-8'),
        hashlib.sha256
    ).hexdigest().lower()
    return sign

三、异步采集核心代码实现

1. 核心思路

  1. 构造请求参数:包含淘宝 API 的公共参数(如app_key、method、timestamp等)和业务参数(如keyword、page_no、page_size等)。

  2. 异步请求协程:定义fetch_taobao_items协程函数,负责单页商品数据的异步请求、响应解析。

  3. 并发控制:使用asyncio.Semaphore控制并发数(避免并发过高导致 IP 被封禁或接口限流)。

  4. 数据收集与存储:通过列表收集所有页的商品数据,最终用pandas保存为 Excel 文件。

2. 完整代码实现

import asyncio
import aiohttp
import time
import random
from datetime import datetime
import pandas as pd
from typing import List, Dict

# -------------------------- 配置参数(需根据自身情况修改) --------------------------
APP_KEY = "你的App Key"          # 替换为你的App Key
APP_SECRET = "你的App Secret"    # 替换为你的App Secret
ACCESS_TOKEN = "你的Access Token"# 替换为你的Access Token(若接口需要)
KEYWORD = "手机"                 # 搜索关键词(可替换为其他商品类别,如“连衣裙”“笔记本电脑”)
PAGE_SIZE = 100                  # 每页商品数量(淘宝API最大支持100条/页)
TOTAL_PAGES = 100                # 总采集页数(100页 × 100条/页 = 10000条数据)
MAX_CONCURRENCY = 50             # 最大并发数(建议50-200,过高易被限流)
API_URL = "https://eco.taobao.com/router/rest"  # 淘宝API网关地址
# -----------------------------------------------------------------------------------

async def fetch_taobao_items(
    session: aiohttp.ClientSession,
    semaphore: asyncio.Semaphore,
    page_no: int
) -> List[Dict]:
    """
    异步采集单页商品数据
    :param session: aiohttp客户端会话(复用连接,提升效率)
    :param semaphore: 信号量(控制并发数)
    :param page_no: 当前页码
    :return: 单页商品数据列表(若失败返回空列表)
    """
    async with semaphore:  # 控制并发
        try:
            # 1. 构造公共参数(淘宝API必填)
            public_params = {
                "app_key": APP_KEY,
                "method": "taobao.item.search",  # 商品搜索接口
                "format": "json",               # 响应格式
                "v": "2.0",                     # API版本
                "sign_method": "hmac-sha256",   # 签名方式
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # 时间戳
                "access_token": ACCESS_TOKEN    # 若接口需要,否则可删除
            }

            # 2. 构造业务参数(商品搜索条件)
            business_params = {
                "q": KEYWORD,          # 搜索关键词
                "page_no": page_no,    # 当前页码
                "page_size": PAGE_SIZE # 每页数量
                # 可选参数:添加筛选条件(如价格区间、销量排序等)
                # "start_price": 1000,  # 最低价格
                # "end_price": 5000,    # 最高价格
                # "sort": "sales_desc"  # 按销量降序(默认按综合排序)
            }

            # 3. 合并参数并生成签名
            all_params = {**public_params, **business_params}
            all_params["sign"] = generate_taobao_sign(all_params, APP_SECRET)

            # 4. 发起异步GET请求(添加随机延迟,避免被限流)
            await asyncio.sleep(random.uniform(0.1, 0.5))  # 0.1-0.5秒随机延迟
            async with session.get(API_URL, params=all_params) as response:
                if response.status != 200:
                    print(f"页码{page_no}请求失败,HTTP状态码:{response.status}")
                    return []

                # 5. 解析响应数据
                result = await response.json()
                # 检查API返回是否成功(淘宝API错误码参考:https://open.taobao.com/doc.htm?docId=101370&docType=1)
                if "error_response" in result:
                    error_msg = result["error_response"]["msg"]
                    print(f"页码{page_no}API调用失败:{error_msg}")
                    return []

                # 6. 提取商品核心字段(根据需求调整字段)
                items = result["item_search_response"]["items"]["item"]
                item_list = []
                for item in items:
                    item_list.append({
                        "商品ID": item.get("num_iid", ""),
                        "商品标题": item.get("title", "").replace("<span class='H'>", "").replace("</span>", ""),  # 去除标题中的高亮标签
                        "售价": item.get("price", ""),
                        "销量": item.get("sales", ""),
                        "卖家昵称": item.get("nick", ""),
                        "店铺名称": item.get("shop_name", ""),
                        "商品链接": item.get("detail_url", ""),
                        "图片链接": item.get("pic_url", "")
                    })

                print(f"页码{page_no}采集成功,获取{len(item_list)}条商品数据")
                return item_list

        except Exception as e:
            print(f"页码{page_no}采集异常:{str(e)}")
            return []

async def main():
    """
    主函数:创建异步会话、调度协程、收集数据并存储
    """
    start_time = time.time()
    all_items = []  # 存储所有商品数据

    # 1. 创建aiohttp客户端会话(复用TCP连接,提升效率)
    timeout = aiohttp.ClientTimeout(total=30)  # 超时时间30秒
    async with aiohttp.ClientSession(timeout=timeout) as session:
        # 2. 创建信号量(控制并发数)
        semaphore = asyncio.Semaphore(MAX_CONCURRENCY)

        # 3. 生成所有页码的协程任务
        tasks = [
            fetch_taobao_items(session, semaphore, page_no)
            for page_no in range(1, TOTAL_PAGES + 1)
        ]

        # 4. 并发执行所有任务并收集结果
        results = await asyncio.gather(*tasks)

        # 5. 合并所有页的商品数据
        for page_items in results:
            if page_items:
                all_items.extend(page_items)

    # 6. 数据存储(保存为Excel文件)
    if all_items:
        df = pd.DataFrame(all_items)
        output_file = f"淘宝商品数据_{KEYWORD}_{datetime.now().strftime('%Y%m%d%H%M%S')}.xlsx"
        df.to_excel(output_file, index=False, engine="openpyxl")
        print(f"\n采集完成!共获取{len(all_items)}条商品数据,已保存至:{output_file}")
    else:
        print("\n采集失败,未获取到任何商品数据")

    # 7. 输出采集耗时
    end_time = time.time()
    print(f"总耗时:{round(end_time - start_time, 2)}秒")

if __name__ == "__main__":
    # 运行异步主函数(Python 3.7+支持asyncio.run())
    asyncio.run(main())

四、代码说明与注意事项

1. 关键参数配置

  • APP_KEY/APP_SECRET/ACCESS_TOKEN:必须替换为你在淘宝平台申请的真实凭证,否则接口调用会提示 “身份验证失败”。

  • KEYWORD:搜索关键词,可根据需求修改(如 “运动鞋”“家电” 等)。

  • PAGE_SIZE:淘宝 API 限制最大为 100 条 / 页,无需修改( smaller 页码会增加请求次数,降低效率)。

  • TOTAL_PAGES:总采集页数,100 页对应 10000 条数据(若需采集更多数据,需申请更大的接口调用配额)。

  • MAX_CONCURRENCY:并发数建议设置为 50-200(过低效率低,过高易触发淘宝 API 的限流机制,导致 IP 被临时封禁)。

2. 淘宝 API 限流与反爬注意事项

淘宝平台对 API 调用有严格的限流策略(如个人开发者测试权限可能限制 “100 次 / 小时”),若需大规模采集,需注意:

  1. 控制并发数:通过MAX_CONCURRENCY参数限制并发,避免短时间内发起大量请求。

  2. 添加随机延迟:代码中已通过random.uniform(0.1, 0.5)添加随机延迟,模拟人工操作,降低被限流概率。

  3. 使用代理 IP:若采集量极大(如超 10 万条),建议使用代理 IP 池(如aiohttp-socks库),避免单个 IP 被封禁。

  4. 查看接口配额:在淘宝平台 “应用管理” 中查看接口调用配额,避免超出配额导致调用失败。

3. 数据字段扩展

代码中仅提取了商品的核心字段(如商品 ID、标题、售价等),若需采集更多字段(如商品属性、评价数、库存等),可参考淘宝 “taobao.item.search” 接口文档,在item_list的字典中添加对应字段(如"评价数": item.get("comment_count", ""))。

五、效果验证与性能优化

1. 采集效果验证

运行代码后,控制台会输出每一页的采集状态(如 “页码 1 采集成功,获取 100 条商品数据”),采集完成后会生成 Excel 文件,打开文件可查看结构化的商品数据(包含商品 ID、标题、售价等字段)。

以 “手机” 为关键词,采集 100 页(10000 条数据)的耗时约为 30-60 秒(取决于网络速度和并发数),远快于同步采集的 2.7 小时。

2. 性能优化方向

  • 连接复用:代码中使用aiohttp.ClientSession复用 TCP 连接,避免每次请求都建立新连接,提升效率。

  • 批量存储:若采集数据量超 10 万条,建议使用数据库(如 MySQL、MongoDB)替代 Excel,通过批量插入(如pandas.to_sql)提升存储效率。

  • 任务拆分:若需采集多个关键词(如 “手机”“电脑”“服装”),可将不同关键词的采集任务拆分为独立协程,进一步提升并发效率。

  • 异常重试:在fetch_taobao_items函数中添加异常重试机制(如使用tenacity库),对临时失败的请求进行重试,提高采集成功率。

六、总结

本文通过 Asyncio+Aiohttp 实现了淘宝 API 的异步调用,成功解决了万级商品数据采集的效率问题。核心亮点包括:

  1. 异步并发:通过事件循环和信号量,实现高效的并发请求,大幅降低采集耗时。

  2. 签名合规:严格按照淘宝 API 要求实现 HMAC-SHA256 签名,确保接口调用合法。

  3. 可扩展性:代码结构清晰,支持字段扩展、代理 IP 集成、数据库存储等二次开发需求。

若你需要进一步扩展功能(如添加商品详情页采集、实时数据监控等),可基于本文代码进行迭代。同时,需注意遵守淘宝平台的使用规范,避免过度采集导致账号或 IP 被封禁。


少长咸集

群贤毕至

访客