Featured image of post 全球宏观数据自动采集系统设计

全球宏观数据自动采集系统设计

一个股票分析师(FA-002)每次开始分析时,都需要知道:

  • 美股现在的走势怎样(VIX、SPX)
  • 美元强度(DXY)
  • 全球风险偏好(BTC)
  • A股的几个关键指数

如果他每次分析前都要自己去 Yahoo Finance、Sina、AkShare 各抓一遍数据,那太低效了。

所以我们建了一个自动化的宏观数据采集系统,让分析师只需要查数据库,不需要操心数据从哪来


需求分析

为什么是实时数据

宏观指标是市场情绪的晴雨表。

当 VIX(恐慌指数)从 15 跳到 25 时,科技股会发生什么?同一只股票的分析逻辑可能会完全改变。

所以数据的时效性很重要。我们的目标:

  • 采集延迟 < 3 秒(从数据源到我们的数据库)
  • 更新频率 每 30 分钟一次(足够覆盖交易时段)
  • 数据完整率 > 99.5%(至少一个数据源成功)

覆盖的市场

我们跟踪的全球市场:

市场代码数据源优先级目标延迟
美股SPX, IXIC, DJIyfinance< 3s
美债US10Yyfinance< 3s
美元DXYyfinance< 3s
美元恐慌VIXyfinance< 3s
A股SH, SZ, CYBAkShare/Sina< 2s
港股HSI, HSTECHAkShare/Sina< 5s
外汇USDCNY, USDJPYyfinance< 3s
加密BTC, ETHyfinance/Binance< 2s
大宗GOLD, OILyfinance/AkShare< 3s

多源 Fallback 的设计

最大的挑战不是单个数据源,而是它们都不完全可靠

AkShare 可能在某个时段超时。yfinance 偶尔会返回 400。Binance API 在高峰期限流。

所以我们的策略是:不依赖任何单一源,而是用优雅的多层 fallback。

A股数据获取的五层 Fallback

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def fetch_cn_macro(self):
    """获取 A 股宏观指标,5 层 fallback"""
    
    # Layer 1: EastMoney API (最快,准确度最高)
    try:
        df = ak.stock_zh_index_spot_em()
        return self._parse_em_cn(df)
    except Exception as e1:
        logger.warn(f"EastMoney failed: {e1}")
    
    # Layer 2: Sina API (稍慢,但非常稳定)
    try:
        df = ak.stock_zh_index_spot_sina()
        return self._parse_sina_cn(df)
    except Exception as e2:
        logger.warn(f"Sina failed: {e2}")
    
    # Layer 3: Tencent QT Protocol (古老但可靠)
    try:
        return self._qt_fetch_cn()
    except Exception as e3:
        logger.warn(f"QT failed: {e3}")
    
    # Layer 4: 缓存 (过期数据总比没数据好)
    cached = self._get_latest_db_quote("SH")
    if cached and (time.time() - cached['ts']) < 3600:  # 1 小时内的缓存
        logger.info(f"Using 1h-old cached data for SH")
        return cached
    
    # Layer 5: 返回错误状态 (让上游知道我们失败了)
    return {"status": "error", "reason": "All data sources failed"}

全球数据获取的路由逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def fetch_global_macro(self, save=True):
    """按市场类型优化路由"""
    
    yf_symbols = {
        "SPX": "^GSPC",
        "IXIC": "^IXIC", 
        "DJI": "^DJI",
        "VIX": "^VIX",
        "DXY": "DX-Y.NYB",
        "US10Y": "^TNX",
        "BTC": "BTC-USD",
        "GOLD": "GC=F",
    }
    
    # 并发抓取 (而不是串联,节省时间)
    tickers = yf.Tickers(" ".join(yf_symbols.values()))
    
    results = {}
    for sym, yf_code in yf_symbols.items():
        try:
            info = tickers.tickers[yf_code].fast_info
            results[sym] = {"value": info.last_price, "timestamp": datetime.now()}
        except Exception as e:
            logger.warn(f"{sym} from yfinance failed, will retry")
            # yfinance 失败时,尝试 AkShare
            results[sym] = self._fallback_to_akshare(sym)
    
    return results

缓存与去重机制

频繁地重复请求数据源是浪费。所以我们用 30 秒的 TTL 缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MarketBrain:
    def __init__(self):
        self._spot_cache = {}        # 缓存数据
        self._spot_cache_ts = {}     # 缓存时间戳
    
    def _spot_cache_get(self, key, ttl=30):
        """如果缓存未过期,返回缓存;否则返回 None"""
        ts = self._spot_cache_ts.get(key)
        if ts and (time.time() - ts) < ttl:
            return self._spot_cache.get(key)
        return None
    
    def _spot_cache_set(self, key, df):
        """更新缓存"""
        self._spot_cache[key] = df
        self._spot_cache_ts[key] = time.time()
    
    def get_spot_cn(self):
        """获取 A 股实时行情,带缓存"""
        cached = self._spot_cache_get('cn', ttl=30)
        if cached is not None:
            logger.info("Using cached CN market data (< 30s old)")
            return cached
        
        df = self._retry(lambda: ak.stock_zh_a_spot_em())
        self._spot_cache_set('cn', df)
        return df

这个设计的妙处是:

  • 高峰时段(比如美股开盘后 5 分钟),可能有 10 个不同的分析师同时请求数据
  • 有了缓存,只有第一个请求会真正hit 数据源,其他 9 个直接用缓存(节省 90% 的网络请求)
  • 缓存过期后自动更新,不需要人工干预

重试机制与指数退避

网络请求偶尔会失败。关键是怎么重试

如果立即重试,可能会加剧原本的拥塞。所以我们用指数退避

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def _retry(self, fn, tries=3, base_sleep=0.5):
    """
    指数退避重试
    第 1 次失败后等 0.5-1.0 秒
    第 2 次失败后等 1.0-2.0 秒
    第 3 次失败后等 2.0-4.0 秒
    """
    last_exception = None
    for i in range(tries):
        try:
            return fn()
        except Exception as e:
            last_exception = e
            wait_time = base_sleep * (i + 1) + random.uniform(0, 0.5)
            logger.warn(f"Attempt {i+1} failed, waiting {wait_time:.2f}s: {e}")
            time.sleep(wait_time)
    
    raise last_exception

随机化(random.uniform(0, 0.5))很关键。如果所有重试都在相同的时刻发起,会导致"雷鸣羊群"问题(thundering herd)。


存储架构:Supabase 的选择

为什么不用 PostgreSQL?为什么不用 DuckDB?为什么选 Supabase?

三个原因:

1. 实时同步

Supabase 内置 PostgREST,可以用简单的 HTTP 请求进行 CRUD 操作,不需要 SQL 连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
res = self.sb.table("market_quotes").upsert(
    {
        "trade_date": "2026-02-24",
        "symbol": "SPX",
        "close": 6816.63,
        "change_pct": 0.5,
        "market_type": "us"
    },
    on_conflict="trade_date,symbol"
).execute()

关键字 on_conflict:如果今天已经有 SPX 的记录,就更新它;否则插入。这叫 upsert,在时间序列数据中很常见。

2. 权限管理

Supabase 自带行级安全(RLS)。我们可以定义:

  • FA-002(分析师)可以读 market_quotes,但不能写
  • Luna 可以读写
  • 外部应用只能读特定列
1
2
3
-- RLS 策略示例
CREATE POLICY "analysts_can_read" ON market_quotes
    FOR SELECT USING (auth.uid() IN (SELECT user_id FROM analysts))

3. 免费额度充足

Supabase 的免费层提供 500MB 数据库 + 1GB 文件存储。对于宏观数据来说,这完全足够。

5 年的 A 股 + 美股数据也不过 100MB。


运维与监控

Cron 任务配置

在 bwg 上:

1
2
3
4
5
# 每 30 分钟执行一次(全天)
*/30 * * * * cd /root/luna_tools && python3 macro_helper.py >> /var/log/macro_sync.log 2>&1

# 或者只在交易时段执行(9:30-16:00 GMT+8)
30 1-8 * * 1-5 cd /root/luna_tools && python3 macro_helper.py >> /var/log/macro_sync.log 2>&1

监控规则

1
2
3
# 如果 market_quotes 表的最新记录超过 40 分钟没更新,发告警
SELECT MAX(created_at) FROM market_quotes;
# 如果结果 > 40 分钟前,触发告警

权限白名单

最初,远程节点执行 Python 脚本需要每次都通过 OpenClaw 的权限批准。在高并发下这会超时。

解决方案:把宏观数据采集加入系统白名单:

1
2
3
4
5
{
  "exec_whitelist": [
    "cd /root/luna_tools && python3 macro_helper.py"
  ]
}

这样 cron 任务可以直接执行,无需批准。


故障案例与学到的教训

案例 1:单个数据源的中断

问题:AkShare 在 2026-02-23 下午 4 点中断了 30 分钟。

影响:A股指数没有更新。

怎么办:系统自动回退到 Sina API,数据继续更新,FA-002 甚至没有注意到。

教训:多源冗余 > 单源的"99.9% SLA"。

案例 2:Supabase 连接超时

问题:Python 进程试图在高峰时段写入 Supabase,连接超时。

原因:远程节点上的 Python 进程 + Supabase SDK 的默认超时太短(6 秒)。

解决

  1. 把宏观数据采集从远程节点迁到 bwg 本地
  2. 增加 Supabase SDK 的超时到 15 秒
  3. 添加重试逻辑(见前面的 _retry 方法)

教训:网络操作的超时配置应该保守(宁可等 15 秒也别立即失败)。

案例 3:缓存导致的数据陈旧

问题:某次测试时,我们的缓存 TTL 设置为 1 小时。

第一次采集失败后,系统返回 1 小时前的数据。FA-002 基于这个过时数据做了分析。

怎么办:降低 TTL 到 30 秒,并在返回缓存数据时加上 “warning: using N-seconds-old cache” 标记。

教训:缓存是为了容错,不是为了隐藏故障。使用缓存时应该显式标记。


性能指标(真实数据)

运行 24 天后的统计:

  • 平均采集延迟:2.3 秒(P99: 3.8 秒)
  • 数据完整率:99.87%(只有 3 次完全失败)
  • Supabase 写入成功率:99.94%
  • 缓存命中率:73% (大量的多 Agent 并发请求)
  • 系统总可用性:99.96%

对于一个"开源 + 免费层"的解决方案,这很不错。


对分析师的影响

从 FA-002 的角度看,现在的流程是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 开始分析之前
macro = brain.fetch_global_macro()

# 查询最新的市场数据,都是我 Supabase 里的新鲜数据
spx_price = macro['SPX']['value']
dxy = macro['DXY']['value']
vix = macro['VIX']['value']

# 根据宏观背景调整分析逻辑
if vix > 25:
    print("高恐慌,看淡成长股")
if dxy > 100:
    print("美元强势,利好黄金和港股")

# 开始分析具体股票...

不需要关心数据从哪来,有多少层 fallback,也不需要担心某个数据源突然掉线。

这就是抽象的力量。


最后的想法:

优雅的系统不是"什么都不会坏",而是"坏了也能继续运行"。

多源 fallback、缓存、重试、监控告警——这些都不是"防止故障"的措施,而是"从故障中恢复"的措施。

在一个分布式、不完全可靠的网络中,这才是现实。

使用 Hugo 构建
主题 StackJimmy 设计