剖析 OTel Collector Delta To Cumulative Processor

這篇筆記主要記錄我在研究 OpenTelemetry Collector Contrib 中 deltatocumulative Processor 的心得。除了基本的配置,我們直接從 Source Code 層級來看看它是怎麼運作的,特別是它在狀態管理上的設計,以及我們在生產環境踩過的那些「坑」。
1. 為什麼需要這個組件?
簡單來說,deltatocumulativeprocessor 的工作就是把 Delta (增量) 指標轉成 Cumulative (累積) 指標。
聽起來很簡單?但這是一個 Stateful (有狀態) 的操作。這意味著 Processor 必須在記憶體裡「記住」所有 Time Series 當前的數值。一旦流量大起來,這裡就是記憶體洩漏或是數據遺失的高風險區。
2. 核心架構:它是如何撐住高併發的?
為了不讓這個 Processor 成為效能瓶頸,此 Processor 的設計核心在於「高效的狀態管理」與「嚴格的時間序驗證」。為了在高併發下維持準確性,它採用了細粒度的鎖定策略與強型別的狀態存儲。
2.1 關鍵資料結構 (Key Data Structures)
核心邏輯位於 processor/deltatocumulativeprocessor,主要由以下結構支撐:
A. 唯一識別 identity.Stream
Processor 怎麼知道哪些數據屬於同一個 Time Series?它依賴 identity.Stream。這不光是看 Metric Name,它會把 Name、Unit、Type 甚至所有的 Label (Attribute Hash) 組合起來當作唯一的 Key。所以,只要 Label 變了,對它來說就是一個全新的 Stream。
Metric Signature: Name, Unit, Type, Monotonicity, Temporality.
Attributes Hash: 所有 DataPoint 屬性 (Labels) 的雜湊值。 這確保了即使是同一個 Metric Name,不同的 Label 組合也會被視為獨立的 Stream。
B. 狀態儲存 state
在儲存方面,它用了 maps.Parallel (底層是 xsync.MapOf)。為了避開 Golang interface 轉換的開銷並確保型別安全,它很「搞剛」地把 Number (Sum/Gauge)、Histogram 和 ExponentialHistogram 拆成三個獨立的 Map 來存。:
nums: 存儲NumberDataPoint(Sum/Gauge)hist: 存儲HistogramDataPointexpo: 存儲ExponentialHistogramDataPoint
C. 鎖的策略、併發控制 mutex[T]
這是效能的關鍵點。Processor 沒有使用全域鎖 (Global Lock)。 如果每次處理數據都要鎖住整個 Map,那吞吐量肯定上不去。它為每一個獨立的 Stream 分配了一個專屬的 mutex。這意味著,除非多個請求同時更新「同一個 Metric 的同一個 Label 組合」,否則大家的更新操作是完全平行、互不卡頓的。
2.2 數據流經的旅程 (ConsumeMetrics)
當一筆 Metrics 進入 Processor 時,數據流經以下嚴格步驟:
過濾 (Filter):
- 檢查
AggregationTemporality。只有 Delta 類型的指標會被處理;Cumulative 指標直接透傳 (Pass-through)。
- 檢查
識別與查找 (Identify & Lookup):
計算 DataPoint 的
identity.Stream。嘗試從
stateMap 中撈取現有累積值。注意:如果是新 Stream 且總數超過
max_streams,它會直接標記error="limit"然後丟棄。這在除錯時很容易被忽略。
聚合運算 (
delta.Aggregate):在 Stream 級別的鎖保護下執行。
邏輯:
New_Cumulative = Old_Cumulative + New_Delta。
時間序驗證 (Validation): 在聚合前,必須通過兩項關鍵檢查 (位於
internal/delta/delta.go):亂序檢測 (
ErrOutOfOrder):條件:
New.Time <= Stored.Time結果: 丟棄數據。這通常發生在發送端重試或網路亂序時。
重啟檢測 (
ErrOlderStart):條件:
New.Start < Stored.Start結果: 丟棄數據。這代表來源進程可能已重啟,發送了屬於「上一代」的數據,或是時間戳生成有誤。
寫回與標記:
將計算出的 Cumulative 值寫回原始 DataPoint。
將 Temporality 修改為
Cumulative。更新
stalemap 中的最後活躍時間 (Last Seen)。
2.3 垃圾回收機制 (GC)
為了避免記憶體爆炸(例如 Pod 重啟頻繁導致 Stream 無限增長),這裡有一個背景 Goroutine,每分鐘會巡一次。只要發現某個 Stream 超過 max_stale 沒更新,就會把它從記憶體中清掉。
- 邏輯: 遍歷
staleMap。如果now - last_seen > max_stale,則從記憶體中刪除該 Stream 的所有狀態。
3. 設定檔該怎麼調? (Configuration)
只有兩個參數,但都很致命:
processors:
deltatocumulative:
# Stream 多久沒動靜就清掉?預設 5m。
# 坑點:設太短會導致狀態頻繁重置(數據斷層);設太長記憶體會爆
max_stale: 5m
# 允許追蹤的最大 Stream 數量 (預設為 Max Int)
# 影響: 這是保護 Collector 不被 High Cardinality 數據撐爆的最後防線。
# 這是防線。一旦爆了,超出的數據會被「無情丟棄」。
max_streams: 9223372036854775807
4. 監控指標詳解 (Observability)
Processor 透過 internal/telemetry 暴露了自我監控指標 (Self-monitoring Metrics),這是排查數據丟失問題的首要依據。當你懷疑數據掉了,請先看這些自我監控指標 (internal/telemetry):
4.1 核心指標列表
deltatocumulative_datapoints:這是最重要的 Counter。- 看
error="limit":是不是max_streams設太小了? - 看
error="delta.ErrOutOfOrder":發送端是不是時間戳亂跳?
- 看
deltatocumulative_streams_tracked:目前記憶體裡到底存了多少 Stream。
| 指標名稱 (Metric Name) | 類型 | 說明 | 關鍵標籤 (Labels) |
deltatocumulative_datapoints | Counter | 處理的數據點總數。請密切關注 error 標籤。 | error: |
- (missing): 處理成功 | |||
- limit: 觸發 max_streams 上限而丟棄 | |||
- delta.ErrOutOfOrder: 因時間戳亂序而丟棄 | |||
- delta.ErrOlderStart: 因起始時間異常而丟棄 | |||
deltatocumulative_streams_tracked | Gauge | 當前記憶體中活躍追蹤的 Stream 總數。 | 無 |
deltatocumulative_streams_limit | Gauge | 配置的 max_streams 值。 | 無 |
deltatocumulative_streams_max_stale | Gauge | 配置的 max_stale 值 (秒)。 | 無 |
線上鬼故事:常見問題剖析
案例一:狀態丟失與 streams_tracked 的鋸齒狀波動
現象: 監控圖表顯示 streams_tracked 呈現週期性的鋸齒狀下跌,或者劇烈震盪。同時下游看到的數值可能突然歸零或重置。
原因: 這通常與 GC 機制 (stale check) 有關。
間歇性流量: 如果某個指標每 6 分鐘才送一次,而
max_stale設為 5 分鐘。Processor 會在第 5 分鐘刪除狀態。第 6 分鐘數據進來時,被視為全新的 Stream,累積值從 0 (或當前 Delta) 開始計算,導致狀態丟失。GC 運作: 背景 Goroutine 每分鐘一次的清理動作,會導致
streams_tracked出現階梯式下降。
解法:
- 確保
max_stale顯著大於 metrics 的 scrape interval 或 push interval (建議至少 2-3 倍)。
案例二:消失的數據與 Pipeline 順序之謎
現象:Batch Processor 報告發送了 2.6k 點,但 exporter 報告只發送了 2.0k 點。中間的 0.6k 憑空消失,且沒有任何 Error Log。
源碼級原因: 這是 deltatocumulative 的 max_streams 限制與 Pipeline 順序共同作用的結果。
// processor.go 片段
if maps.Exceeded(last, loaded) {
attrs.Set(telemetry.Error("limit"))
return drop // 靜默丟棄,只會標記 error label
}
如果 Pipeline 配置為 [batch, deltatocumulative]:
Batch: 收到數據,計數器
batch_send_size+2.6k。DeltaToCumulative: 發現 Stream 總數超標,靜默丟棄 0.6k 數據點 (僅增加
deltatocumulative_datapoints{error="limit"})。Exporter: 收到剩餘的 2.0k,計數器
sent_metric_points+2.0k。
解法:
調整順序: 改為
[deltatocumulative, batch]。讓過濾發生在打包之前。監控 Drop: 設置告警監控
sum(rate(otelcol_deltatocumulative_datapoints{error="limit"}[2m])) > 0。
案例三:亂序數據 (Out of Order)
現象: 數據偶爾丟失,deltatocumulative_datapoints 出現 error="out_of_order"。
原因 (internal/delta/delta.go):
case dp.Timestamp() <= state.Timestamp():
return ErrOutOfOrder{...}
Processor 為了保證累積值的單調遞增,對時間戳要求非常嚴格 (New.Time > Stored.Time)。如果發送端 (如 Prometheus Remote Write 或某些 SDK) 因重試邏輯發送了重複或舊的時間戳,Processor 會為了保護累積值的單調性而拒絕該數據。
解法: 檢查發送端的 Retry 策略或時鐘同步狀態。
6. 本地重現 (PoC)
為了驗證那個「Pipeline 順序導致數據消失」的鬼故事,我用 Docker Compose 搞了個實驗。 (詳細配置略,核心概念是用 60 個 telemetrygen 實例去衝撞 max_streams: 50 的限制)
6.1 環境架構
┌─────────────────┐ ┌─────────────────────────────────────┐ ┌────────────┐
│ telemetrygen │────▶│ OTel Collector │────▶│ Prometheus │
│ (60 instances) │ │ ┌─────────────────────────────┐ │ │ :9090 │
│ app-01 ~ 60 │ │ │ Pipeline: │ │ └────────────┘
└─────────────────┘ │ │ receiver → cumulativetodelta│ │
│ │ → batch │ │
│ │ → deltatocumulative│ │
│ │ → exporter │ │
│ │ │ │
│ │ max_streams: 50 (< 60) │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
設計理念:
60 個 telemetrygen 實例,每個發送不同的
service.name(app-01 ~ app-60)max_streams設為 50,刻意製造 Stream 超限Pipeline 順序為
[cumulativetodelta, batch, deltatocumulative],重現「先 batch 後過濾」的問題
6.2 檔案結構
poc-lab/
├── docker-compose.yaml # Docker Compose 配置
├── otel-config.yaml # OTel Collector 配置
└── prometheus.yaml # Prometheus scrape 配置
6.3 配置檔案
otel-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
# 將 telemetrygen 產生的 Cumulative 轉成 Delta
cumulativetodelta:
batch:
send_batch_size: 100
timeout: 1s
deltatocumulative:
max_stale: 1m
# 【關鍵設定】設定極低的上限,強迫發生 Drop
max_streams: 50
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: "poc_app"
debug:
verbosity: normal
service:
telemetry:
metrics:
readers:
- pull:
exporter:
prometheus:
host: "0.0.0.0"
port: 8888
pipelines:
metrics:
receivers: [otlp]
# 【關鍵錯誤順序】重現問題
processors: [cumulativetodelta, batch, deltatocumulative]
exporters: [prometheus, debug]
prometheus.yaml
global:
scrape_interval: 5s
evaluation_interval: 5s
scrape_configs:
# Job 1: 監控 Collector 本身
- job_name: 'otel-collector-internal'
static_configs:
- targets: ['otel-collector:8888']
# Job 2: 監控實際輸出的數據
- job_name: 'app-metrics'
static_configs:
- targets: ['otel-collector:8889']
docker-compose.yaml (精簡版)
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.142.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-config.yaml:/etc/otel-collector-config.yaml:ro
ports:
- "4317:4317"
- "8888:8888" # Collector Internal Metrics
- "8889:8889" # Prometheus Exporter
networks:
- otel-network
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro
ports:
- "9090:9090"
depends_on:
- otel-collector
networks:
- otel-network
# 使用 YAML anchor 定義 60 個 telemetrygen 實例
telemetrygen-01: &telemetrygen-base
image: ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen:latest
command: ["metrics", "--otlp-insecure", "--otlp-endpoint=otel-collector:4317",
"--rate=5", "--duration=1000h", "--metric-type=Sum", "--service=app-01"]
depends_on: [otel-collector]
networks: [otel-network]
telemetrygen-02: { <<: *telemetrygen-base, command: [..., "--service=app-02"] }
# ... (app-03 ~ app-60)
networks:
otel-network:
driver: bridge
6.4 運行與驗證
步驟 1: 啟動環境
cd poc-lab
docker compose up -d
步驟 2: 等待數據累積 (約 30-60 秒)
docker compose ps # 確認所有容器運行中
步驟 3: 驗證查詢
打開 Prometheus UI (http://localhost:9090) 或使用 curl:
查詢 1: Streams 追蹤數量 (應該達到上限 50)
otelcol_deltatocumulative_streams_tracked
查詢 2: 數據點處理結果 (按 error 分類)
otelcol_deltatocumulative_datapoints_total
查詢 3: 比較 Batch vs Exporter
# Batch 處理量
otelcol_processor_batch_batch_send_size_sum
# Exporter 發送量
otelcol_exporter_sent_metric_points_total
6.5 驗證結果 - 問題重現 (錯誤順序)
使用錯誤的 Pipeline 順序 [cumulativetodelta, batch, deltatocumulative] 運行後:
| 指標 | 數值 | 說明 |
streams_tracked | 50 | 達到 max_streams 上限 |
streams_limit | 50 | 配置值 |
receiver_accepted | 13,000 | Receiver 接收的總數據點 |
batch_send_size_sum | 13,000 | Batch 處理的數據點 |
exporter_sent | 11,000 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
error="none" | 16,000 | 成功處理 |
error="limit" | 3,000 | 因達到 Stream 上限而丟棄 |
關鍵發現:
發送端: 60 個 telemetrygen 實例
限制: max_streams = 50
被完全丟棄的實例: 10 個 (60 - 50)
Batch (13,000) ≠ Exporter (11,000):差異 2,000 個數據點「憑空消失」
丟棄僅標記
error="limit",無 Error Log,難以察覺
6.6 修正驗證 - 正確順序
修改 otel-config.yaml 中的 processors 順序:
# 修正前 (問題配置)
processors: [cumulativetodelta, batch, deltatocumulative]
# 修正後 (正確配置)
processors: [cumulativetodelta, deltatocumulative, batch]
修正後驗證結果:
| 指標 | 數值 | 說明 |
streams_tracked | 50 | 仍達到 max_streams 上限 |
receiver_accepted | 17,500 | Receiver 接收的總數據點 |
batch_send_size_sum | 14,950 | Batch 處理的數據點 |
exporter_sent | 14,950 | Exporter 實際發送的數據點 |
數據點處理詳情:
| error 標籤 | 數值 | 說明 |
error="none" | 14,950 | 成功處理 |
error="limit" | 2,490 | 因達到 Stream 上限而丟棄 |
6.7 修正前後對比
| 指標 | 修正前 | 修正後 | 結論 |
| Batch 處理量 | 13,000 | 14,950 | - |
| Exporter 發送量 | 11,000 | 14,950 | - |
| Batch = Exporter? | 否 (差 2,000) | 是 (完全一致) | 問題解決 |
| error="limit" | 3,000 | 2,490 | 仍有丟棄 (預期行為) |
實驗結論:
修正前: Batch (13,000) ≠ Exporter (11,000) ← 數據「消失」,難以排查
修正後: Batch (14,950) = Exporter (14,950) ← 指標一致,問題可追溯
Pipeline 順序修正有效 - Batch 和 Exporter 的數值現在完全一致
丟棄仍然發生 (error="limit"),但這是預期行為,因為來源數 (60) > max_streams (50)
監控指標正確反映實際狀態 - 運維人員可直接從
error="limit"指標看到丟棄量
6.8 清理環境
docker compose down
7. 番外篇:為什麼 PHP 開發者最需要它?
7.1 為什麼是 PHP? (The "Share-Nothing" Architecture)
你可能會問,為什麼我們需要这么麻煩的轉 Delta?
對於 Java、Go 這種 Long-running process 來說,維護一個全域計數器(Cumulative)是輕而易舉的事。它們在記憶體中有一個全域的計數器,可以一直累加數值:
00:01: 累計 10 次
00:02: 累計 15 次 (累加了 5 次)
00:03: 累計 20 次 (又累加了 5 次)
這就是 Cumulative (累積) 模式,也是 Prometheus 等後端系統最喜歡的格式。
但是,PHP 通常運行在 PHP-FPM 或 CGI 模式下。其生命週期是「一個請求一個進程」 (Per-request process):
- 收到請求 -> 啟動 (或重用) PHP worker。
- 執行腳本 -> 處理指標 (Metrics)。
- 請求結束 -> 記憶體釋放/重置。
PHP (在 FPM 模式下) 是 Share-Nothing 架構。一個請求進來,Process 啟動,處理完,記憶體釋放。 PHP 進程沒辦法簡單地告訴下一個進程:「嘿,我剛剛處理了 1 個,現在總數是 100 喔。」(除非你用 Redis or SharedMemory等外部儲存)。
因此,PHP 最自然的做法是只回報「這一次請求發生了什麼」:
- 請求 A: 我處理了 1 個 DB 查詢 (Delta) -> 結束
請求 B: 我處理了 1 個 DB 查詢 (Delta) -> 結束
所以 PHP 最自然的行為是:「這次請求我處理了 1 個 DB Query (Delta)」,這就是
Delta (增量)模式。 至於加總的工作?就交給 OTel Collector 的 deltatocumulative processor 來扛吧。這就是它存在的最大意義。
7.2 Delta to Cumulative Processor 的角色
當您的後端資料庫(如 Prometheus)只接受 Cumulative 資料,但您的應用程式(如 PHP 或 Serverless Functions)只能提供 Delta 資料時,就會發生格式不相容。
這時候就需要 deltatocumulative processor 擔任「狀態管理者」 (Stateful Intermediary) 的角色:
- 接收 (Receive): 它接收來自 PHP 的無數個小 Delta (例如:+1, +1, +1)。
- 記憶 (Remember): 它在 Collector 的記憶體中維護一個對應的 Stream,並幫忙做加法運算 (State Management)。
- 轉換 (Convert): 它算出累積值 (例如:目前總共是 3),並將其轉換為 Cumulative 格式。
- 輸出 (Export): 發送給 Prometheus。
雖然 Serverless (如 AWS Lambda) 或 CLI 工具也有類似需求,但 PHP的廣泛使用以及其標準的運行模式,使其成為這個 Processor 最常見的使用案例。
8. 結論與最佳實踐
Pipeline 順序至關重要:請務必把 deltatocumulative 放在 batch 之前。
監控不能少:一定要針對 error="limit" 和 error="out_of_order" 設告警。
容量規劃: 根據預期的 Stream 數量合理設置
max_streamsStale 設定:
max_stale應大於最大的 push/scrape interval (建議 2-3 倍)



