×

api开发 电商平台 数据挖掘

ERP/CRM 系统的 API 接入实践:集成 1688 实时商品数据

admin admin 发表于2025-09-26 16:19:40 浏览52 评论0

抢沙发发表评论

在当今数字化商业环境中,企业资源计划 (ERP) 和客户关系管理 (CRM) 系统已成为企业运营的核心。将这些系统与 1688 等 B2B 电商平台实时对接,能够极大提升供应链效率和客户响应速度。本文将详细介绍如何实现 ERP/CRM 系统与 1688 平台的 API 对接,实时获取和同步商品数据。

1688 平台简介

1688 平台提供了丰富的 API 接口,允许第三方系统获取商品信息、订单数据、物流信息等关键业务数据。要实现对接,首先需要完成以下准备工作:

  1. 在 1688 注册账号

  2. 获取 Api Key 和 Api Secret

  3. 完成 OAuth 授权流程,获取访问令牌 (Access Token)

  4. 了解 1688 API 的调用规范和限制

系统对接架构设计

ERP/CRM 与 1688 平台的对接架构通常包含以下几个核心组件:

  • 认证授权模块:处理 API 访问的身份验证

  • API 调用模块:负责与 1688 平台的 HTTP 通信

  • 数据转换模块:将 1688 数据格式转换为内部系统格式

  • 数据存储模块:缓存或持久化存储获取的数据

  • 定时任务模块:定期同步数据以保持时效性

代码实现:1688 商品数据接入

下面我们将通过具体代码实现 1688 商品数据的接入功能。我们将使用 Python 语言,采用 requests 库进行 HTTP 通信,使用 pandas 处理数据,使用 schedule 库实现定时任务。

import requests
import time
import hashlib
import json
import pandas as pd
from datetime import datetime
import schedule

class Ali1688APIClient:
    def __init__(self, app_key, app_secret, access_token=None):
        """
        初始化1688 API客户端
        :param app_key: 应用Key
        :param app_secret: 应用密钥
        :param access_token: 访问令牌
        """
        self.app_key = app_key
        self.app_secret = app_secret
        self.access_token = access_token
        self.base_url = "https://gw.open.1688.com/openapi/param2/1/"
        
    def set_access_token(self, access_token):
        """设置访问令牌"""
        self.access_token = access_token
        
    def generate_signature(self, params):
        """生成签名"""
        # 按参数名排序
        sorted_params = sorted(params.items(), key=lambda x: x[0])
        # 拼接参数
        sign_str = self.app_secret
        for k, v in sorted_params:
            sign_str += f"{k}{v}"
        sign_str += self.app_secret
        # 计算MD5
        return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
        
    def call_api(self, api_name, api_version, params=None):
        """
        调用1688 API
        :param api_name: API名称
        :param api_version: API版本
        :param params: API参数
        :return: API返回结果
        """
        if not self.access_token:
            raise Exception("未设置access_token,请先获取并设置")
            
        # 基础参数
        base_params = {
            "app_key": self.app_key,
            "access_token": self.access_token,
            "method": f"{api_name}",
            "v": api_version,
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "format": "json",
            "sign_method": "md5"
        }
        
        # 合并参数
        all_params = {**base_params,**(params or {})}
        
        # 生成签名
        all_params["sign"] = self.generate_signature(all_params)
        
        # 发送请求
        url = f"{self.base_url}{api_name.replace('.', '/')}/{api_version}"
        try:
            response = requests.get(url, params=all_params, timeout=30)
            response.raise_for_status()
            result = json.loads(response.text)
            
            # 检查是否有错误
            if "error_response" in result:
                error = result["error_response"]
                raise Exception(f"API调用错误: {error.get('msg', '未知错误')} (code: {error.get('code')})")
                
            return result
        except Exception as e:
            print(f"API调用失败: {str(e)}")
            return None
    
    def get_products_by_category(self, category_id, page_no=1, page_size=40):
        """
        根据类目获取商品列表
        :param category_id: 类目ID
        :param page_no: 页码
        :param page_size: 每页条数
        :return: 商品列表
        """
        params = {
            "categoryId": category_id,
            "pageNo": page_no,
            "pageSize": page_size
        }
        
        result = self.call_api(
            "alibaba.icbu.category.product.search",
            "1.0",
            params
        )
        
        if result and "result" in result:
            return result["result"]
        return None
    
    def get_product_details(self, product_id):
        """
        获取商品详情
        :param product_id: 商品ID
        :return: 商品详情
        """
        params = {
            "productId": product_id,
            "fields": "product_id,subject,price,rprice,min_amount,detail_url,pic_url,attributes"
        }
        
        result = self.call_api(
            "alibaba.icbu.product.get",
            "1.0",
            params
        )
        
        if result and "product" in result:
            return result["product"]
        return None

class ERPIntegrator:
    def __init__(self, api_client):
        """
        初始化ERP集成器
        :param api_client: 1688 API客户端实例
        """
        self.api_client = api_client
        self.product_cache = {}
        
    def transform_product_data(self, raw_product):
        """
        转换商品数据格式为ERP系统所需格式
        :param raw_product: 原始商品数据
        :return: 转换后的商品数据
        """
        # 提取并转换关键信息
        transformed = {
            "external_id": raw_product.get("product_id"),
            "name": raw_product.get("subject", ""),
            "price": raw_product.get("price", {}).get("value", 0),
            "original_price": raw_product.get("rprice", {}).get("value", 0),
            "min_order_quantity": raw_product.get("min_amount", 1),
            "url": raw_product.get("detail_url", ""),
            "image_url": raw_product.get("pic_url", ""),
            "attributes": self._parse_attributes(raw_product.get("attributes", {})),
            "sync_time": datetime.now().isoformat()
        }
        
        return transformed
    
    def _parse_attributes(self, attributes):
        """解析商品属性"""
        attr_list = attributes.get("attribute", [])
        if not isinstance(attr_list, list):
            attr_list = [attr_list]
            
        parsed = {}
        for attr in attr_list:
            name = attr.get("name", "")
            value = attr.get("value", "")
            if name:
                parsed[name] = value
                
        return parsed
    
    def sync_category_products(self, category_id, max_pages=5):
        """
        同步指定类目的商品到ERP系统
        :param category_id: 类目ID
        :param max_pages: 最大同步页数
        :return: 同步结果
        """
        all_products = []
        
        for page in range(1, max_pages + 1):
            print(f"同步类目 {category_id} 第 {page} 页商品...")
            
            # 获取商品列表
            products_result = self.api_client.get_products_by_category(
                category_id, 
                page_no=page
            )
            
            if not products_result:
                print("获取商品列表失败")
                break
                
            product_ids = products_result.get("productIds", [])
            if not product_ids:
                print("没有更多商品")
                break
                
            # 获取每个商品的详情
            for product_id in product_ids:
                product_detail = self.api_client.get_product_details(product_id)
                if product_detail:
                    # 转换格式
                    transformed = self.transform_product_data(product_detail)
                    all_products.append(transformed)
                    
                    # 缓存商品数据
                    self.product_cache[product_id] = transformed
                    
                    # 避免请求过于频繁
                    time.sleep(1)
            
            # 检查是否还有更多页
            total_count = products_result.get("totalCount", 0)
            if page * 40 >= total_count:
                break
                
        print(f"同步完成,共获取 {len(all_products)} 个商品")
        
        # 可以在这里添加将数据写入ERP系统的逻辑
        self.save_to_erp(all_products)
        
        return all_products
    
    def save_to_erp(self, products):
        """
        将商品数据保存到ERP系统
        这里只是示例,实际应根据ERP系统的API或数据库接口进行实现
        """
        if not products:
            return
            
        # 转换为DataFrame便于处理
        df = pd.DataFrame(products)
        
        # 示例:保存到CSV
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"erp_products_{timestamp}.csv"
        df.to_csv(filename, index=False, encoding="utf-8-sig")
        print(f"商品数据已保存到 {filename}")
        
        # 实际应用中,这里应该是调用ERP系统的API或写入数据库
        # 例如:
        # erp_api = ERPAPI(...)
        # erp_api.batch_create_products(products)

# 使用示例
if __name__ == "__main__":
    # 配置信息(实际应用中应使用环境变量或配置文件)
    APP_KEY = "your_app_key"
    APP_SECRET = "your_app_secret"
    ACCESS_TOKEN = "your_access_token"
    
    # 初始化API客户端
    api_client = Ali1688APIClient(APP_KEY, APP_SECRET, ACCESS_TOKEN)
    
    # 初始化ERP集成器
    erp_integrator = ERPIntegrator(api_client)
    
    # 同步指定类目商品(例如:女装类目)
    CATEGORY_ID = 1036610  # 示例类目ID,实际需替换
    
    # 立即同步一次
    erp_integrator.sync_category_products(CATEGORY_ID)
    
    # 设置定时任务,每6小时同步一次
    def scheduled_sync():
        print(f"\n定时同步任务开始: {datetime.now()}")
        erp_integrator.sync_category_products(CATEGORY_ID)
        print(f"定时同步任务结束: {datetime.now()}")
    
    # 每6小时执行一次同步
    schedule.every(6).hours.do(scheduled_sync)
    
    print("开始运行定时同步任务...")
    print("按Ctrl+C退出")
    
    try:
        while True:
            schedule.run_pending()
            time.sleep(60)  # 每分钟检查一次任务
    except KeyboardInterrupt:
        print("程序已退出")

代码解析

上述代码实现了一个完整的 1688 商品数据接入方案,主要包含以下几个部分:

  1. API 客户端封装Ali1688APIClient类封装了与 1688 开放平台的交互,包括签名生成、API 调用等核心功能。

  2. 数据转换ERPIntegrator类负责将 1688 返回的原始数据转换为 ERP/CRM 系统所需的格式,处理数据映射和清洗。

  3. 同步逻辑:实现了按类目批量获取商品数据的功能,并支持分页加载。

  4. 定时任务:使用 schedule 库实现了定时同步功能,确保数据的时效性。

  5. 数据持久化:示例中将数据保存为 CSV 文件,实际应用中可替换为直接写入 ERP/CRM 系统的数据库或调用其 API。

集成注意事项

  1. API 调用限制:1688 平台对 API 调用频率有一定限制,实际应用中需合理设置请求间隔,避免触发限流机制。

  2. 错误处理:应完善异常处理机制,对网络错误、API 返回错误等情况进行妥善处理,并实现重试机制。

  3. 数据安全:Access Token 等敏感信息不应硬编码在代码中,应使用环境变量或加密配置文件存储。

  4. 增量同步:对于大规模数据同步,应实现增量同步机制,只获取变更的数据,提高效率。

  5. 日志记录:完善的日志系统有助于问题排查和系统监控。

扩展与优化

  1. 可以实现商品价格变动监控,当 1688 商品价格发生变化时,及时更新 ERP/CRM 系统。

  2. 增加库存数据同步,实时获取 1688 供应商的库存状态。

  3. 实现订单数据对接,将 ERP/CRM 系统的订单自动同步到 1688 平台。

  4. 引入消息队列,处理高并发的数据同步需求,提高系统稳定性。

通过以上实现,企业可以将 1688 平台的商品数据实时集成到自身的 ERP/CRM 系统中,实现供应链数据的自动化流转,提高运营效率,降低人工操作成本,为业务决策提供数据支持。


少长咸集

群贤毕至

访客