×

api开发 电商平台 数据挖掘

使用 Go 语言高性能并发采集 1688 平台商品实时数据

admin admin 发表于2025-09-18 17:32:10 浏览7 评论0

抢沙发发表评论

在电商数据分析领域,高效采集商品实时数据是进行市场分析、价格监控和竞品研究的基础。1688 作为国内领先的 B2B 电商平台,其商品数据具有极高的商业价值。本文将介绍如何利用 Go 语言的并发特性,构建一个高性能的 1688 商品数据采集系统,实现高效、稳定的实时数据获取。

Go 语言在数据采集中的优势

Go 语言(Golang)由 Google 开发,专为并发编程设计,非常适合构建高性能的数据采集系统:


  • 原生并发支持:通过 goroutine 和 channel 实现轻量级并发,资源消耗低,可同时发起大量请求

  • 优秀的性能:编译型语言,执行效率接近 C/C++

  • 丰富的标准库:内置强大的网络库、JSON 处理和并发控制工具

  • 简洁的语法:易于编写和维护,降低开发复杂度

  • 内存管理:自动垃圾回收,减少内存泄漏风险


这些特性使 Go 成为构建高性能网络爬虫和数据采集系统的理想选择。

系统设计架构

我们的 1688 商品数据采集系统将采用以下架构:


  1. 任务调度层:负责管理采集任务队列,分配任务到工作协程

  2. 并发请求层:使用 goroutine 池并发发起 API 请求

  3. 数据解析层:处理 API 返回结果,提取关键信息

  4. 数据存储层:将清洗后的数据存储到目标介质

  5. 监控与控制层:监控系统运行状态,实现流量控制和错误重试


![系统架构示意图]

实现方案

1. 核心配置与常量定义

首先定义系统的核心配置,包括 API 密钥、请求频率限制、并发数控制等:

package main

import (
	"time"
)

// API配置
type APIConfig struct {
	AppKey    string        // 1688开放平台AppKey
	AppSecret string        // 1688开放平台AppSecret
	BaseURL   string        // API基础地址
	Timeout   time.Duration // 请求超时时间
}

// 爬虫配置
type CrawlerConfig struct {
	MaxConcurrency     int           // 最大并发数
	RequestInterval    time.Duration // 请求间隔
	MaxRetries         int           // 最大重试次数
	RetryDelay         time.Duration // 重试延迟
	MaxPageSize        int           // 每页最大数量
	EnableProxy        bool          // 是否启用代理
	ProxyURL           string        // 代理地址
	LogLevel           string        // 日志级别
	OutputDir          string        // 输出目录
	EnableRateLimiting bool          // 是否启用速率限制
}

// 全局配置实例
var (
	APIConf = APIConfig{
		BaseURL: "https://api.1688.com/router/json",
		Timeout: 10 * time.Second,
	}

	CrawlerConf = CrawlerConfig{
		MaxConcurrency:     50,
		RequestInterval:    200 * time.Millisecond,
		MaxRetries:         3,
		RetryDelay:         1 * time.Second,
		MaxPageSize:        100,
		EnableProxy:        false,
		LogLevel:           "info",
		OutputDir:          "./output",
		EnableRateLimiting: true,
	}
)

2. 签名与 API 请求工具

1688 API 需要特殊的签名机制,我们实现一个工具类来处理签名和 API 请求:

package main

import (
	"crypto/md5"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"net/url"
	"sort"
	"strings"
	"time"
)

// 1688 API客户端
type APIClient struct {
	config    APIConfig
	client    *http.Client
	rateLimiter chan struct{}
}

// 创建新的API客户端
func NewAPIClient(conf APIConfig) *APIClient {
	client := &http.Client{
		Timeout: conf.Timeout,
	}
	
	// 如果启用速率限制,创建一个信号量通道
	var rateLimiter chan struct{}
	if CrawlerConf.EnableRateLimiting {
		rateLimiter = make(chan struct{}, CrawlerConf.MaxConcurrency)
	}
	
	return &APIClient{
		config:    conf,
		client:    client,
		rateLimiter: rateLimiter,
	}
}

// 生成1688 API签名
func (c *APIClient) generateSign(params map[string]string) string {
	// 1. 按参数名ASCII排序
	keys := make([]string, 0, len(params))
	for k := range params {
		keys = append(keys, k)
	}
	sort.Strings(keys)
	
	// 2. 拼接参数
	var signStr strings.Builder
	signStr.WriteString(c.config.AppSecret)
	for _, k := range keys {
		signStr.WriteString(k)
		signStr.WriteString(params[k])
	}
	signStr.WriteString(c.config.AppSecret)
	
	// 3. 计算MD5
	h := md5.New()
	h.Write([]byte(signStr.String()))
	return strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
}

// 执行API请求
func (c *APIClient) DoRequest(method string, params map[string]string) (map[string]interface{}, error) {
	// 如果启用了速率限制,获取信号量
	if c.rateLimiter != nil {
		c.rateLimiter <- struct{}{}
		defer func() { <-c.rateLimiter }()
	}
	
	// 基础参数
	reqParams := map[string]string{
		"app_key":  c.config.AppKey,
		"format":   "json",
		"v":        "2.0",
		"method":   method,
		"timestamp": time.Now().Format("2006-01-02 15:04:05"),
	}
	
	// 合并自定义参数
	for k, v := range params {
		reqParams[k] = v
	}
	
	// 生成签名
	reqParams["sign"] = c.generateSign(reqParams)
	
	// 构建请求URL
	urlValues := url.Values{}
	for k, v := range reqParams {
		urlValues.Set(k, v)
	}
	
	fullURL := fmt.Sprintf("%s?%s", c.config.BaseURL, urlValues.Encode())
	
	// 发起请求
	resp, err := c.client.Get(fullURL)
	if err != nil {
		return nil, fmt.Errorf("请求失败: %v", err)
	}
	defer resp.Body.Close()
	
	// 读取响应
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return nil, fmt.Errorf("读取响应失败: %v", err)
	}
	
	// 解析JSON
	var result map[string]interface{}
	if err := json.Unmarshal(body, &result); err != nil {
		return nil, fmt.Errorf("解析JSON失败: %v, 响应内容: %s", err, string(body))
	}
	
	// 检查错误响应
	if errorResp, ok := result["error_response"].(map[string]interface{}); ok {
		return nil, fmt.Errorf("API错误: %s (错误码: %v)", 
			errorResp["msg"], errorResp["code"])
	}
	
	return result, nil
}

// 带重试的API请求
func (c *APIClient) DoRequestWithRetry(method string, params map[string]string) (map[string]interface{}, error) {
	var lastErr error
	
	for i := 0; i < CrawlerConf.MaxRetries; i++ {
		resp, err := c.DoRequest(method, params)
		if err == nil {
			return resp, nil
		}
		
		lastErr = err
		// 如果不是最后一次重试,等待后再试
		if i < CrawlerConf.MaxRetries-1 {
			time.Sleep(CrawlerConf.RetryDelay * time.Duration(i+1)) // 指数退避
		}
	}
	
	return nil, lastErr
}

3. 并发任务调度器

实现一个高效的任务调度器,管理并发的采集任务:

package main

import (
	"sync"
	"log"
	"time"
)

// 商品采集任务
type ProductTask struct {
	Keyword    string // 搜索关键词
	Page       int    // 页码
	PageSize   int    // 每页数量
	CategoryID string // 分类ID
	PriceMin   float64 // 最低价格
	PriceMax   float64 // 最高价格
	ResultChan chan<- []Product // 结果通道
}

// 商品信息结构
type Product struct {
	ID          string  `json:"id"`
	Title       string  `json:"title"`
	Price       float64 `json:"price"`
	SaleCount   int     `json:"sale_count"`
	ImageURL    string  `json:"image_url"`
	SellerID    string  `json:"seller_id"`
	SellerName  string  `json:"seller_name"`
	Province    string  `json:"province"`
	CreateTime  string  `json:"create_time"`
}

// 任务调度器
type Scheduler struct {
	client      *APIClient
	taskQueue   chan ProductTask
	wg          sync.WaitGroup
	workerCount int
}

// 创建新的调度器
func NewScheduler(client *APIClient, workerCount int) *Scheduler {
	if workerCount <= 0 {
		workerCount = CrawlerConf.MaxConcurrency
	}
	
	return &Scheduler{
		client:      client,
		taskQueue:   make(chan ProductTask, workerCount*2), // 缓冲队列
		workerCount: workerCount,
	}
}

// 启动工作协程
func (s *Scheduler) Start() {
	for i := 0; i < s.workerCount; i++ {
		s.wg.Add(1)
		go s.worker(i)
	}
	log.Printf("调度器启动,工作协程数量: %d", s.workerCount)
}

// 工作协程,处理任务
func (s *Scheduler) worker(id int) {
	defer s.wg.Done()
	log.Printf("工作协程 #%d 启动", id)
	
	for task := range s.taskQueue {
		log.Printf("工作协程 #%d 处理任务: 关键词=%s, 页码=%d", id, task.Keyword, task.Page)
		
		// 执行采集任务
		products, err := s.fetchProducts(task)
		if err != nil {
			log.Printf("工作协程 #%d 任务失败: %v", id, err)
			continue
		}
		
		// 发送结果
		if len(products) > 0 && task.ResultChan != nil {
			task.ResultChan <- products
		}
		
		// 控制请求频率
		time.Sleep(CrawlerConf.RequestInterval)
	}
	
	log.Printf("工作协程 #%d 退出", id)
}

// 提交任务到队列
func (s *Scheduler) Submit(task ProductTask) {
	s.taskQueue <- task
}

// 停止调度器
func (s *Scheduler) Stop() {
	close(s.taskQueue)
	s.wg.Wait()
	log.Println("调度器已停止")
}

// 执行商品采集
func (s *Scheduler) fetchProducts(task ProductTask) ([]Product, error) {
	// 构建请求参数
	params := map[string]string{
		"q":        task.Keyword,
		"page":     fmt.Sprintf("%d", task.Page),
		"pageSize": fmt.Sprintf("%d", task.PageSize),
	}
	
	// 添加可选参数
	if task.CategoryID != "" {
		params["categoryId"] = task.CategoryID
	}
	if task.PriceMin > 0 {
		params["priceStart"] = fmt.Sprintf("%.2f", task.PriceMin)
	}
	if task.PriceMax > 0 {
		params["priceEnd"] = fmt.Sprintf("%.2f", task.PriceMax)
	}
	
	// 调用1688商品搜索API
	resp, err := s.client.DoRequestWithRetry("alibaba.icbu.product.search", params)
	if err != nil {
		return nil, err
	}
	
	// 解析响应数据
	return s.parseProducts(resp)
}

// 解析商品数据
func (s *Scheduler) parseProducts(resp map[string]interface{}) ([]Product, error) {
	var products []Product
	
	// 提取商品列表
	respData, ok := resp["result"].(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("无法解析响应结果")
	}
	
	productList, ok := respData["products"].([]interface{})
	if !ok {
		return nil, fmt.Errorf("无法提取商品列表")
	}
	
	// 解析每个商品
	for _, item := range productList {
		productItem, ok := item.(map[string]interface{})
		if !ok {
			continue
		}
		
		// 解析价格
		price, _ := productItem["price"].(float64)
		
		// 解析销量
		saleCount := 0
		if sc, ok := productItem["saleCount"].(float64); ok {
			saleCount = int(sc)
		}
		
		// 解析卖家信息
		sellerID := ""
		sellerName := ""
		province := ""
		
		if sellerInfo, ok := productItem["sellerInfo"].(map[string]interface{}); ok {
			if id, ok := sellerInfo["userId"].(string); ok {
				sellerID = id
			}
			if name, ok := sellerInfo["companyName"].(string); ok {
				sellerName = name
			}
			if p, ok := sellerInfo["province"].(string); ok {
				province = p
			}
		}
		
		// 构建商品对象
		product := Product{
			ID:         getStringField(productItem, "id"),
			Title:      getStringField(productItem, "title"),
			Price:      price,
			SaleCount:  saleCount,
			ImageURL:   getStringField(productItem, "imageUrl"),
			SellerID:   sellerID,
			SellerName: sellerName,
			Province:   province,
			CreateTime: time.Now().Format("2006-01-02 15:04:05"),
		}
		
		products = append(products, product)
	}
	
	return products, nil
}

// 辅助函数:安全获取字符串字段
func getStringField(data map[string]interface{}, key string) string {
	if val, ok := data[key].(string); ok {
		return val
	}
	return ""
}

4. 主程序与数据处理

实现主程序,协调各组件工作,并处理采集到的数据:

package main

import (
	"encoding/json"
	"flag"
	"fmt"
	"log"
	"os"
	"path/filepath"
	"sync"
	"time"
)

func main() {
	// 解析命令行参数
	appKey := flag.String("appkey", "", "1688开放平台AppKey")
	appSecret := flag.String("secret", "", "1688开放平台AppSecret")
	keyword := flag.String("keyword", "女装", "搜索关键词")
	pages := flag.Int("pages", 5, "采集页数")
	concurrency := flag.Int("concurrency", 10, "并发数")
	output := flag.String("output", "", "输出文件路径")
	flag.Parse()
	
	// 验证必要参数
	if *appKey == "" || *appSecret == "" {
		log.Fatal("必须提供AppKey和AppSecret")
	}
	
	// 初始化配置
	APIConf.AppKey = *appKey
	APIConf.AppSecret = *appSecret
	if *concurrency > 0 {
		CrawlerConf.MaxConcurrency = *concurrency
	}
	
	// 创建输出目录
	if err := os.MkdirAll(CrawlerConf.OutputDir, 0755); err != nil {
		log.Fatalf("创建输出目录失败: %v", err)
	}
	
	// 创建API客户端
	client := NewAPIClient(APIConf)
	
	// 创建结果通道
	resultChan := make(chan []Product, *pages)
	
	// 创建并启动调度器
	scheduler := NewScheduler(client, CrawlerConf.MaxConcurrency)
	scheduler.Start()
	
	// 提交采集任务
	log.Printf("开始采集关键词: %s, 总页数: %d", *keyword, *pages)
	for page := 1; page <= *pages; page++ {
		task := ProductTask{
			Keyword:  *keyword,
			Page:     page,
			PageSize: CrawlerConf.MaxPageSize,
			ResultChan: resultChan,
		}
		scheduler.Submit(task)
	}
	
	// 关闭调度器(等待所有任务完成)
	go func() {
		// 等待一段时间确保所有任务都已提交
		time.Sleep(2 * time.Second)
		scheduler.Stop()
		close(resultChan)
	}()
	
	// 收集结果
	var allProducts []Product
	var wg sync.WaitGroup
	
	// 启动结果处理协程
	wg.Add(1)
	go func() {
		defer wg.Done()
		for products := range resultChan {
			log.Printf("收到 %d 条商品数据", len(products))
			allProducts = append(allProducts, products...)
		}
	}()
	
	// 等待结果处理完成
	wg.Wait()
	
	// 输出结果
	log.Printf("采集完成,共获取 %d 条商品数据", len(allProducts))
	
	// 保存到文件
	outputPath := *output
	if outputPath == "" {
		timestamp := time.Now().Format("20060102150405")
		outputPath = filepath.Join(CrawlerConf.OutputDir, 
			fmt.Sprintf("products_%s_%s.json", *keyword, timestamp))
	}
	
	if err := saveProductsToFile(allProducts, outputPath); err != nil {
		log.Printf("保存文件失败: %v", err)
	} else {
		log.Printf("数据已保存到: %s", outputPath)
	}
}

// 保存商品数据到文件
func saveProductsToFile(products []Product, filename string) error {
	file, err := os.Create(filename)
	if err != nil {
		return err
	}
	defer file.Close()
	
	encoder := json.NewEncoder(file)
	encoder.SetIndent("", "  ") // 格式化输出
	return encoder.Encode(products)
}

性能优化策略

为了进一步提升系统性能,我们可以采取以下优化策略:


  1. 连接池管理:复用 HTTP 连接,减少 TCP 握手开销

  2. 智能限流:根据 API 响应时间动态调整请求频率

  3. 分布式部署:在大规模采集时,可将任务分配到多个节点

  4. 数据缓存:对热门关键词和分类的结果进行缓存

  5. 增量采集:只采集新增或更新的商品数据

  6. 优先级队列:为重要任务设置更高优先级

异常处理与容错机制

一个健壮的采集系统必须具备完善的异常处理能力:


  1. 网络异常处理:实现自动重试和指数退避策略

  2. API 错误码处理:针对不同错误码(如限流、权限不足)采取特定措施

  3. 数据校验:对采集到的数据进行验证,过滤异常值

  4. 监控告警:实时监控系统状态,异常时及时告警

  5. 优雅退出:收到终止信号时,完成当前任务再退出,避免数据丢失

合规性与反爬策略

在采集 1688 平台数据时,必须遵守平台规定和相关法律法规:


  1. 遵守 API 协议:严格按照 1688 平台的规定使用 API

  2. 控制请求频率:不发送超出限制的请求,避免给服务器造成负担

  3. 尊重 robots 协议:对于网页采集,遵守网站的 robots.txt 规则

  4. 数据使用合规:采集的数据不得用于非法用途

  5. 用户代理标识:明确标识自己的爬虫身份和联系方式

总结

本文介绍了如何使用 Go 语言构建一个高性能的 1688 商品数据采集系统。通过利用 Go 的并发特性,我们实现了可以同时处理大量 API 请求的采集系统,大幅提高了数据获取效率。


系统的核心优势在于:


  • 基于 goroutine 的轻量级并发,资源利用率高

  • 完善的任务调度和负载均衡

  • 健壮的错误处理和重试机制

  • 灵活的配置和扩展能力


在实际应用中,可以根据具体需求进一步扩展系统功能,如增加数据清洗、分析和可视化模块,使其成为一个完整的电商数据分析平台。


少长咸集

群贤毕至

访客