跳转到内容

Vector 面试题

30 道题
分类
可观测性
子分类
logs
题目数
30 道
已阅读 0 / 30 题
1 Vector 的核心架构由哪些组件构成?

答案:

Vector 采用基于 DAG(有向无环图)的数据管道架构,核心概念包括 Source、Transform 和 Sink。

组件职责示例
Source数据输入file、kafka、syslog、docker、http、prometheus
Transform数据处理parse_json、filter、remap、aggregate、sample
Sink数据输出elasticsearch、loki、s3、clickhouse、datadog
Buffer缓冲机制内存缓冲、磁盘缓冲(性能/可靠性折中)

管道流程:

Source → Transform (可多个) → Buffer → Sink

例如:
File Source → Parse JSON → Filter (level=error) → Remap → Elasticsearch Sink

配置结构:

[sources.my_logs]
type = "file"
include = ["/var/log/app/*.log"]

[transforms.parse]
type = "remap"
inputs = ["my_logs"]
source = '''
. = parse_json!(.message)
'''

[sinks.es_output]
type = "elasticsearch"
inputs = ["parse"]
endpoint = "http://elasticsearch:9200"
2 Vector 与 Fluentd / Logstash 的核心区别是什么?

答案:

维度VectorFluentdLogstash
语言RustRuby + CJRuby
性能极高中等较低
内存占用极低(~10MB)中等(~100MB)高(~1GB+)
管道模型DAG(拓扑排序)线性线性
配置格式TOML / YAML / JSONRuby DSLYAML / JSON
扩展方式VRL(Vector Remap Language)Ruby PluginRuby Plugin
资源效率极高中-高
K8s 原生Helm Chart + DaemonSetHelm ChartHelm Chart
社区DatadogCNCFElastic
数据丢失风险支持磁盘缓冲 At-least-once内置 At-least-once需额外插件

选型建议:

  • 资源受限、需要高性能 → Vector
  • 已有 ELK 生态 → Logstash(兼容性最好)
  • 轻量级且插件丰富 → Fluentd
  • 新项目 / K8s 原生 → Vector(推荐)
3 VRL(Vector Remap Language)的核心功能和语法是什么?

答案:

VRL 是 Vector 内置的数据转换语言,用于对事件数据进行解析、过滤和转换。

核心语法:

# 基础赋值
.new_field = "hello"

# 字段重命名
del(.old_name)
.new_name = .old_name

# 类型转换
.parsed = parse_int!(.raw_count)

# JSON 解析
. = parse_json!(.message)

# 字符串拼接
.full_name = join([.first_name, .last_name], " ")

# 条件处理
if .status >= 500 {
  .severity = "error"
} else if .status >= 400 {
  .severity = "warning"
}

# 错误处理
.parsed = parse_json(.message) ?? null

内置函数:

# 字符串操作
contains(.message, "error")
starts_with(.host, "web")
split(.csv_line, ",")
truncate(.long_field, 100)

# 数值操作
to_int(.count)
to_float(.rate)
floor(.value)
ceil(.value)

# 时间操作
now()
format_timestamp(.timestamp, "%Y-%m-%d")
parse_timestamp(.time_str, "%d/%b/%Y:%H:%M:%S")

# IP 操作
parse_ip(.client_ip)
cidr_contains("10.0.0.0/8", .ip)

# 聚合
count(.group_id)
sum(.bytes)
avg(.latency)
4 Vector 如何配置数据缓冲(Buffer)策略?

答案:

Vector 支持多种缓冲策略,平衡数据安全性和性能。

# Sink 级别的缓冲配置
[sinks.my_sink]
type = "elasticsearch"
inputs = ["transformed_logs"]

# 缓冲配置
buffer.type = "memory"       # 内存缓冲(默认)
buffer.max_events = 500       # 最大事件数
buffer.when_full = "block"    # 满时行为(block / drop_newest)

# 磁盘缓冲(更可靠,防止数据丢失)
buffer.type = "disk"
buffer.max_size = 1073741824  # 1GB
buffer.when_full = "block"

缓冲策略对比:

策略类型优势劣势适用场景
内存缓冲memory性能高、延迟低崩溃丢失数据可容忍少量丢失
磁盘缓冲disk崩溃后恢复、at-least-once性能低于内存关键数据不丢失
自适应adaptive自动切换模式2.0+ 新特性推荐生产

Buffer 行为:

Sink 写入失败
  → 数据进入 Buffer
  → Buffer 满 → 根据 when_full 策略处理
  → block:阻塞 Source 读取(背压)
  → drop_newest:丢弃新事件
  → Sink 恢复 → Buffer 数据继续发送
5 Vector 如何通过 Kubernetes DaemonSet 采集日志?

答案:

Vector 官方提供 Helm Chart,以 DaemonSet 模式采集每个节点上的容器日志。

# Helm values.yaml
vector:
  role: Agent               # DaemonSet 模式
  image: timberio/vector:0.37.0

  # K8s 日志采集配置
  config:
    data_dir: /vector-data-dir

    # 采集节点容器日志(标准输出)
    sources:
      kubernetes_logs:
        type: kubernetes_logs
        max_read_bytes: 102400
        auto_partial_merge: true

      # 采集节点系统日志
      journald:
        type: journald
        current_boot_only: true

    # 数据处理
    transforms:
      parse_json:
        type: remap
        inputs: ["kubernetes_logs"]
        source: |
          if exists(.message) {
            parsed = parse_json(.message) ?? {}
            . = merge(., parsed)
          }
          # 删除不必要的元数据
          del(.source_type)
          del(.stream)          

      enrich:
        type: remap
        inputs: ["parse_json"]
        source: |
          # 添加集群标识
          .cluster = "prod-cluster-1"          

    # 输出到 Loki
    sinks:
      loki:
        type: loki
        inputs: ["enrich"]
        endpoint: "http://loki:3100"
        encoding:
          codec: json

      # 输出到 S3 归档
      s3_archive:
        type: aws_s3
        inputs: ["enrich"]
        region: us-east-1
        bucket: my-logs-archive
        key_prefix: "logs/XQOPEN%Y/%m/%d/XQCLOSE"
        compression: gzip
        buffer:
          type: disk
          max_size: 1073741824

DaemonSet 配置:

daemonSet:
  extraVolumes:
  - name: varlog
    hostPath:
      path: /var/log
  - name: varlibdocker
    hostPath:
      path: /var/lib/docker/containers
  - name: journal
    hostPath:
      path: /var/log/journal
  extraVolumeMounts:
  - name: varlog
    mountPath: /var/log
  - name: varlibdocker
    mountPath: /var/lib/docker/containers
    readOnly: true
  - name: journal
    mountPath: /var/log/journal
    readOnly: true
6 Vector 的自动重试和故障恢复机制是怎样的?

答案:

Vector 在内核层内置重试和背压机制,保障管道可靠性。

重试策略:

# 每个 Sink 自动启用退避重试
[sinks.es]
type = "elasticsearch"
inputs = ["logs"]

# Vector 默认使用指数退避重试(不可配置,硬编码)
# 初始间隔:0.5s
# 最大间隔:60s
# 最大重试次数:无限

背压机制:

Sink 写入失败 → Vector 自动重试
  → 如果持续失败 → Buffer 满
  → 背压传递到 Source → Source 停止读取
  → Sink 恢复 → Buffer 释放 → Source 继续读取

数据丢失防护:

机制说明
磁盘缓冲崩溃后恢复,防止内存数据丢失
At-least-once磁盘缓冲模式下保证至少一次送达
确认机制Sink 收到目标确认后才从 Buffer 移除
自动恢复Vector 崩溃重启后从磁盘 Buffer 继续处理

验证可靠性:

# 查看管道状态
vector top

# 查看事件数量
vector tap --component-id my_source

# 检查错误
grep -i error /var/log/vector/vector.log
7 Vector 的 Aggregator 模式和 Agent 模式有什么区别?

答案:

Vector 支持两种运行模式,Agent 部署在数据源端,Aggregator 作为中心处理节点。

维度AgentAggregator
部署方式DaemonSet / Sidecar / 单机Deployment / StatefulSet
角色数据采集和初处理数据聚合、二次处理、路由分发
数据链路数据源 → Agent → AggregatorAggregator → 多个 Sink
资源配置低(节点级)高(中心级)
配置复杂度简单(采集 + 转发)复杂(多路聚合、转换)

Agent + Aggregator 架构:

# Agent 配置(节点上)
[sources.kubernetes_logs]
type = "kubernetes_logs"

[transforms.add_cluster]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
.cluster = "prod"
.host = get_hostname!()
'''

[sinks.aggregator]
type = "vector"
inputs = ["add_cluster"]
address = "vector-aggregator:6000"
# 压缩传输
compression = "gzip"
buffer.type = "disk"
# Aggregator 配置(中心节点)
[sources.from_agents]
type = "vector"
address = "0.0.0.0:6000"

[transforms.aggregate]
type = "remap"
inputs = ["from_agents"]
source = '''
# 统一处理所有 Agent 发来的数据
if !exists(.timestamp) {
  .timestamp = now()
}
'''

[sinks.loki]
type = "loki"
inputs = ["aggregate"]
endpoint = "http://loki:3100"

[sinks.s3]
type = "aws_s3"
inputs = ["aggregate"]
bucket = "central-logs-archive"
8 Vector 如何从 Kafka Source 消费数据?

答案:

[sources.kafka_input]
type = "kafka"
inputs = []
bootstrap_servers = "kafka:9092"
group_id = "vector-consumer"
topics = ["app-logs", "audit-logs"]

# 自动偏移重置
auto_offset_reset = "earliest"

# 会话超时
session_timeout_ms = 10000

# 解码
decoding.codec = "json"

# 多主题
topics_pattern = "logs-.*"    # 也可以用正则匹配主题

Kafka Source 说明:

参数说明默认值
bootstrap_serversKafka Broker 列表必填
group_idConsumer Group ID必填
topics / topics_pattern消费的主题必填
auto_offset_reset偏移重置策略largest
session_timeout_ms会话超时10000ms
decoding.codec解码方式bytes

Kafka Sink 示例:

[sinks.kafka_output]
type = "kafka"
inputs = ["transformed_logs"]
bootstrap_servers = "kafka:9092"
topic = "processed-logs"
encoding.codec = "json"

# 压缩
compression = "gzip"

# 批处理
batch.max_bytes = 1048576
batch.timeout_secs = 10
9 Vector 的条件路由(Conditional Routing)如何配置?

答案:

Vector 通过 swimlanes 或多个 Sink 的不同 inputs 实现条件路由。

# 方式一:Transform 条件分支
[transforms.classify]
type = "remap"
inputs = ["logs"]
source = '''
if .level == "error" || .level == "critical" {
  .route = "error"
} else if .level == "warning" {
  .route = "warning"
} else {
  .route = "info"
}
'''

# 错误日志 → Elasticsearch + 告警
[sinks.es_error]
type = "elasticsearch"
inputs = ["classify"]
# 路由过滤
inputs = ["classify"]
# 只输出 route=error 的事件
# 通过条件过滤实现
[sinks.es_error]
type = "elasticsearch"
inputs = ["classify"]

# 方式二:Sink 级别的过滤
[sinks.es_error]
type = "elasticsearch"
inputs = ["classify"]
# 可以通过 VRL 在 Transform 层提前打标

# 方式三:多个 Sink
[sinks.es_all]
type = "elasticsearch"
inputs = ["classify"]

[sinks.s3_archive]
type = "aws_s3"
inputs = ["classify"]

说明:

  • Vector 没有内置条件路由语法,通过 Transform 设置路由标签
  • 多个 Sink 可以消费同一 Transform 输出
  • Sink 层根据标签写入不同目标
10 Vector 的健康检查和可观测性如何配置?

答案:

Vector 内置健康检查、指标暴露和内部遥测。

# API 和健康检查
[api]
enabled = true
address = "0.0.0.0:8686"
playground = true          # VRL Playground(开发用)

# Prometheus 指标
[sinks.prometheus]
type = "prometheus_exporter"
inputs = []                # 空 = 输出 Vector 内部指标
address = "0.0.0.0:9090"

# 内部日志
[log_schema]
host_key = "host"
timestamp_key = "timestamp"
message_key = "message"

K8s 探针配置:

livenessProbe:
  httpGet:
    path: /health
    port: 8686
readinessProbe:
  httpGet:
    path: /health
    port: 8686

核心指标:

# 事件处理速率
vector_events_received_total
vector_events_sent_total
vector_events_discarded_total

# 组件状态
vector_component_received_events_total
vector_component_sent_events_total
vector_component_errors_total

# 缓冲状态
vector_buffer_events_count
vector_buffer_byte_size

# 处理延迟
vector_processing_time_seconds

命令行工具:

# 实时监控
vector top

# 事件流查看
vector tap --component-id my_source

# 配置验证
vector validate /etc/vector/vector.toml

# 性能测试
vector test /etc/vector/tests/*.toml
11 Vector 如何通过 `remap` Transform 实现复杂的数据转换?

答案:

[transforms.complex_transform]
type = "remap"
inputs = ["raw_logs"]
source = '''
# 1. 解析 JSON
if is_string(.message) {
  parsed = parse_json(.message) ?? {}
  . = merge(., parsed)
}

# 2. 类型转换
.timestamp = parse_timestamp!(.timestamp_str, "%+")
.response_time = to_float!(.latency_ms) / 1000

# 3. 条件标记
.status_class = if .status >= 500 { "server_error" }
  else if .status >= 400 { "client_error" }
  else if .status >= 300 { "redirect" }
  else { "success" }

# 4. 聚合和计算
.bytes_mb = to_float!(.bytes) / 1048576.0

# 5. 数组操作
.tags = ["production", .region]
.tags = push(.tags, .environment)

# 6. 对象合并
.metadata = {
  "source": "vector",
  "version": "1.0",
  "processed_at": now()
}

# 7. 错误处理
if !exists(.user_agent) {
  log("Missing user_agent", level: "warn")
  .user_agent = "unknown"
}

# 8. 删除不需要的字段
del(.raw_message)
del(.internal_field)
'''

VRL 核心特性:

特性说明示例
类型安全期望类型与函数签名严格匹配parse_int!() 强制转换
错误处理! 后缀或 ?? 操作符parse_int(.val) ?? 0
作用域局部变量和事件字段区分parsed = ... vs .field = ...
不可变事件对象可变,局部变量不可变局部变量赋值后不可修改
12 Vector 如何实现数据采样(Sampling)?

答案:

Vector 提供 sample Transform,支持按比例或字段值进行数据采样。

[transforms.sample_2percent]
type = "sample"
inputs = ["all_logs"]

# 保留 2% 的事件
rate = 50

# 按字段哈希采样(保证同一 key 的事件统一保留或丢弃)
key_field = "request_id"

配置说明:

# 按比例采样
[transforms.sample_ratio]
type = "sample"
inputs = ["all_logs"]
rate = 10               # 保留 10%(1/10)

# 高级采样
[transforms.sample_by_key]
type = "sample"
inputs = ["all_logs"]
rate = 100               # 保留 1%(1/100)
key_field = "user_id"    # 同一 user_id 的事件统一处理

采样策略:

全局采样:所有事件按比例保留
key 采样:根据 key_field 哈希值决定保留与否(同一 key 一致)
适用场景:
  - 调试日志大量输出、仅需采样
  - 高流量环境减少存储和传输
  - 统计分析类日志(不影响趋势)
13 Vector 如何集成 OpenTelemetry?

答案:

Vector 支持接收和发送 OpenTelemetry 格式的数据。

# 站收 OTel 数据
[sources.otel_logs]
type = "opentelemetry"
address = "0.0.0.0:4318"
# 支持 HTTP/gRPC
# HTTP: http://0.0.0.0:4318/v1/logs
# gRPC: 0.0.0.0:4317

[transforms.otel_transform]
type = "remap"
inputs = ["otel_logs"]
source = '''
# 提取 OTel 资源属性
.resource = .resource.attributes
.service_name = get!(.resource, ["service.name"])
'''

OTel 格式兼容性:

# 输出到 OTel Collector
[sinks.otel]
type = "opentelemetry"
inputs = ["transformed_logs"]
endpoint = "http://otel-collector:4317"
# 自动将 Vector 事件转换为 OTel 格式

支持的 OTel 信号:

信号Vector SourceVector Sink
Logs
Metrics
Traces
14 Vector 的配置检查和单元测试如何做?

答案:

Vector 提供内置的配置验证和单元测试框架。

# 配置语法检查
vector validate /etc/vector/vector.toml

# 检查所有 .toml 文件
vector validate --config-dir /etc/vector/

# 检查 + 环境验证(网络、权限等)
vector validate --deny-warnings /etc/vector/vector.toml

单元测试配置:

# tests/vector_tests.toml
[[tests]]
name = "test_json_parsing"

  [tests.input]
    source = '''
    {"message": "{\"level\":\"error\",\"status\":500}"}
    '''
    # 输入数据(原始事件)

  [[tests.outputs]]
    extract = "parsed_logs"
  
    [[tests.outputs.conditions]]
      type = "vrl"
      source = '''
      assert_eq!(.level, "error")
      assert_eq!(.status, 500)
      '''

运行测试:

# 运行所有测试
vector test /etc/vector/tests/*.toml

# 输出示例
# test_json_parsing ... passed
# test_log_filter ... passed
15 Vector 如何处理多行日志(如 Java 异常栈)?

答案:

[sources.java_logs]
type = "file"
include = ["/var/log/app/*.log"]

# 多行合并配置
auto_partial_merge = true          # 自动合并 K8s 截断的行
partial_event_marker_field = "_partial"  # 部分事件标记

# 自定义多行合并(2.0+)
[transforms.merge_multiline]
type = "reduce"
inputs = ["java_logs"]
# 合并多个事件为一个事件
merge_strategies.message = "concat"
starts_when = '''
  match(string!(.message), r'^\d{4}-\d{2}-\d{2}')
'''

配置说明:

starts_when 匹配新事件的开头
  匹配到开始标志 → 开始新事件
  未匹配到 → 合并到上一个事件
  merge_strategies.message = "concat" → 多行拼接

常见多行模式:

# Java 异常栈
starts_when = '''match(string!(.message), r'^\d{4}-\d{2}-\d{2}')'''

# Go panic
starts_when = '''match(string!(.message), r'^panic:')'''

# Python 回溯
starts_when = '''match(string!(.message), r'^Traceback')'''

# SQL 语句
starts_when = '''match(string!(.message), r'^\s*(SELECT|INSERT|UPDATE|DELETE)')'''
16 Vector 如何通过 HTTP Source 接收外部数据?

答案:

[sources.http_input]
type = "http_server"
address = "0.0.0.0:8080"
# 支持的路径
path = "/hooks/logs"

# 解码
decoding.codec = "json"

# 认证
# 可选 Basic Auth 或 Token
framing = "bytes"

# 限制
max_payload_size_bytes = 10485760    # 10MB

发送数据:

# 推送 JSON 事件
curl -X POST http://vector:8080/hooks/logs \
  -H "Content-Type: application/json" \
  -d '{"level": "info", "message": "test log", "app": "myapp"}'

# 批量推送
curl -X POST http://vector:8080/hooks/logs \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @logs.json

应用场景:

  • Webhook 接收(CI/CD 通知、GitLab/GitHub Webhook)
  • 自定义应用直接推送日志
  • 外部系统集成(CloudWatch、Datadog Webhook)
17 Vector 如何配置压缩传输?

答案:

# Source 到 Aggregator 的压缩传输
[sinks.aggregator]
type = "vector"
inputs = ["transformed_logs"]
address = "vector-aggregator:6000"

# 启用 gzip 压缩
compression = "gzip"
# 压缩等级(1-9)
compression_level = 6

Sink 输出压缩:

# S3 输出压缩
[sinks.s3_archive]
type = "aws_s3"
inputs = ["logs"]
bucket = "my-logs"
compression = "gzip"       # gzip / none

# Kafka 输出压缩
[sinks.kafka]
type = "kafka"
inputs = ["logs"]
bootstrap_servers = "kafka:9092"
topic = "logs"
compression = "snappy"     # snappy / gzip / lz4 / zstd

压缩比参考:

编码压缩比(文本日志)CPU 消耗
无压缩1:1
gzip 等级 13:1-5:1
gzip 等级 65:1-8:1
gzip 等级 96:1-10:1
snappy2:1-3:1极低
zstd4:1-7:1
18 Vector 如何实现事件过滤和丢弃?

答案:

# 方式一:VRL 条件过滤
[transforms.filter_errors]
type = "remap"
inputs = ["all_logs"]
source = '''
# 只保留 error 级别
if .level != "error" {
  abort
}

# 丢弃 healthcheck 日志
if contains(.message, "healthcheck") {
  abort
}
'''

# 方式二:filter Transform
[transforms.filter_important]
type = "filter"
inputs = ["all_logs"]
condition.type = "vrl"
condition.source = '''
  .level == "error" || .level == "critical"
'''

# 方式三:VRL 条件 ab_ort
[transforms.discard_noisy]
type = "remap"
inputs = ["all_logs"]
source = '''
# 丢弃心跳日志
if .message == "heartbeat" {
  abort
}
# 丢弃 debug 级别
if .level == "debug" {
  abort
}
# 丢弃健康检查路径
if exists(.path) && match(.path, r'^/health') {
  abort
}
'''

# 方式四:基于采样的丢弃
[transforms.sample_info]
type = "sample"
inputs = ["all_logs"]
rate = 100           # 只保留 1% 的 info 日志
19 Vector 如何处理字段类型转换?

答案:

[transforms.type_conversion]
type = "remap"
inputs = ["raw_logs"]
source = '''
# 字符串转数值
.count = to_int!(.count_str)
.latency_ms = to_float!(.latency_str)
.ratio = to_float!(.ratio_str)

# 字符串转布尔
.is_active = to_bool!(.active_str) ?? false

# 时间戳转换
.timestamp = parse_timestamp!(.time_string, "%Y-%m-%dT%H:%M:%S%z")

# 数值转字符串
.host = to_string!(.host_id)

# JSON 编码/解码
.parsed = parse_json!(.raw_json)
.raw_json = encode_json(.metadata)

# IP 类型
.ip = parse_ip!(.client_ip)

# 集合操作
.tags = to_array!(.tag_string)
if is_array(.tags) {
  .tags = compact(.tags)    # 去除空值
}

# 进制转换
.hex_value = encode_base64(.raw_bytes)
.decoded = decode_base64(.encoded_string)
'''

类型转换函数:

函数作用错误处理
to_int()转整数解析失败返回错误
to_float()转浮点解析失败返回错误
to_bool()转布尔只认 true/false/1/0
to_string()转字符串不会失败
parse_json()JSON 解析可加 ?? 提供默认值
encode_json()对象转 JSON不会失败
20 Vector 的 Enrichment Tables(富化表)是什么?

答案:

Enrichment Tables 允许 Vector 在管道中查询外部数据源来富化事件。

# 定义富化表
[enrichment_tables.geoip]
type = "geoip"
path = "/etc/vector/GeoLite2-City.mmdb"

[enrichment_tables.app_metadata]
type = "csv"
path = "/etc/vector/app_metadata.csv"
columns = ["app_name", "team", "owner", "pagerduty"]

在 Transform 中使用富化:

[transforms.enrich]
type = "remap"
inputs = ["parsed_logs"]
source = '''
# IP 地理位置富化
ip = parse_ip!(.client_ip)
geo = get_enrichment_table_record!("geoip", {"ip": ip})
.geo.city = geo.city_name
.geo.country = geo.country_name
.geo.latitude = geo.latitude
.geo.longitude = geo.longitude

# 应用元数据富化
app_info = get_enrichment_table_record!("app_metadata", {"app_name": .app})
.team = app_info.team
.owner = app_info.owner
.oncall = app_info.pagerduty
'''

支持的富化表类型:

类型说明查询方式
geoipGeoIP 地理 IP 库IP 地址
csvCSV 文件列值匹配
fileJSON/YAML 文件键值查找
21 Vector 如何配置磁盘队列(Disk Queue)保证 At-Least-Once 语义?

答案:

# 全局配置
data_dir = "/vector-data-dir"

# Sink 级磁盘队列
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["transformed_logs"]

buffer:
  type = "disk"
  # Buffer 大小限制
  max_size = 1073741824   # 1GB

  # 满时行为
  when_full = "block"     # 背压 Source

  # 磁盘队列文件前缀
  # 存储在 data_dir/vector-buffer/

磁盘队列工作方式:

事件到达 Source
  → Transform 处理后发给 Sink
  → `buffer.type = "disk"`
    事件先写入磁盘文件(WAL)
  → 确 Sink 已发送并确认
  → 从磁盘队列中删除

Vector 崩溃 → 重启后读取磁盘队列 → 继续发送

内存 vs 磁盘队列比较:

内存队列:
  速度:快,ns 级
  可靠性:崩溃丢失数据

磁盘队列:
  速度:中,μs 级
  可靠性:崩溃恢复,至少一次送达
22 Vector 如何处理日志中的敏感信息脱敏?

答案:

[transforms.redact_secrets]
type = "remap"
inputs = ["all_logs"]
source = '''
# 信用卡号(16 位数字)
.message = replace(.message, r'\d{4}-\d{4}-\d{4}-\d{4}', "****-****-****-****")

# 邮箱
.message = replace(.message, r'\b[\w\.-]+@[\w\.-]+\.\w{2,}\b', "***@***")

# IP 地址
.message = replace(.message, r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', "***.***.***.***")

# Token 和密码
if exists(.password) {
  del(.password)
}
if exists(.token) {
  .token = "***REDACTED***"
}

# 从嵌套对象中删除敏感字段
.token = if exists(.token) { "***REDACTED***" }
'''

高级脱敏:

# 根据字段路径删除
[transforms.remove_sensitive]
type = "remap"
inputs = ["all_logs"]
source = '''
# 遍历并删除敏感字段
paths = ["password", "secret", "token", "credit_card", "ssn"]
for path in paths {
  if exists("." + path) {
    del("." + path)
  }
}
'''
23 Vector 如何实现日志路由到多个目的地?

答案:

# 一个 Source 多个 Sink(广播)
[sinks.es_output]
type = "elasticsearch"
inputs = ["transformed_logs"]     # 从 Transforms 接收

[sinks.s3_archive]
type = "aws_s3"
inputs = ["transformed_logs"]     # 同一输入

[sinks.loki_output]
type = "loki"
inputs = ["transformed_logs"]

[sinks.kafka_output]
type = "kafka"
inputs = ["transformed_logs"]

# 并行发送到不同系统的示例:
# → Elasticsearch(近实时查询)
# → S3(长期归档)
# → Loki(Grafana 可视化)
# → Kafka(下游数据管道)

路由拓扑:

             → Elasticsearch(查询)
Source → Transform 
             → S3(归档)
             → Loki(可视化)
             → Kafka(下游)

多 Sink 运维要点:

Sink用途可靠性要求
Elasticsearch实时查询高(disk buffer)
S3 Archive长期存储最高(disk buffer + 重试)
Loki快速分析中(memory buffer)
Kafka下游消费高(disk buffer)
24 Vector 的 `metric_to_log` Transform 是什么?

答案:

metric_to_log 将指标事件转换为日志事件,便于将指标输出到日志系统。

[transforms.metrics_to_logs]
type = "metric_to_log"
inputs = ["internal_metrics"]
# 所有指标转为日志行
# 每个指标一个事件,包含 name, namespace, tags, value 等字段

输出日志格式:

{
  "name": "vector_events_received_total",
  "namespace": "vector",
  "tags": {
    "component_id": "my_source",
    "component_kind": "source",
    "component_type": "file"
  },
  "kind": "incremental",
  "value": {
    "counter": {
      "value": 100
    }
  },
  "timestamp": "2024-01-01T00:00:00Z"
}

应用场景:

  • 将 Vector 内部指标输出到 ELK/Loki 等日志系统
  • 统一指标和日志的可视化后端
  • 基于日志系统做指标分析(如长周期趋势)
25 Vector 如何实现日志内容增强(添加 K8s 元数据)?

答案:

[sources.kubernetes_logs]
type = "kubernetes_logs"

[transforms.enrich_k8s]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
# Vector 自动添加的 K8s 元数据(在 kubernetes_logs Source 中)
# .kubernetes.pod_name
# .kubernetes.namespace
# .kubernetes.container_name
# .kubernetes.pod_labels
# .kubernetes.pod_annotations

# 添加业务友好的字段
.namespace = .kubernetes.namespace
.app = .kubernetes.pod_labels.app
.service = .kubernetes.pod_labels.service
.version = .kubernetes.pod_labels.version

# 添加集群标识(通过环境变量或配置注入)
.cluster = "prod-cluster-1"

# 添加环境标签
.environment = "production"

# 添加标准时间字段
.@timestamp = floor(timestamp())
'''

K8s Source 自动注入的元数据:

字段说明
.kubernetes.pod_namePod 名称
.kubernetes.pod_ipPod IP
.kubernetes.namespace命名空间
.kubernetes.container_name容器名称
.kubernetes.container_id容器 ID
.kubernetes.pod_labelsPod 标签(map)
.kubernetes.pod_annotationsPod 注解(map)
.kubernetes.node_name节点名称
26 Vector 的 Log 数据压缩和归档配置?

答案:

# 归档到 S3 的完整配置
[sinks.s3_archive]
type = "aws_s3"
inputs = ["all_logs"]

# S3 配置
region = "us-east-1"
bucket = "prod-logs-archive"

# 按时间组织目录
key_prefix = "logs/year=%Y/month=%m/day=%d/"

# 压缩
compression = "gzip"
compression_level = 6

# 批处理(减少 API 调用)
batch.max_bytes = 10485760      # 10MB
batch.timeout_secs = 300        # 5 分钟

# 编码
encoding.codec = "json"

# 文件命名
filename_time_format = "%H%M%S"

# 重试
request.retry_initial_backoff = 1
request.retry_max_duration_secs = 300

# Buffer(磁盘缓冲防止丢失)
buffer.type = "disk"
buffer.max_size = 10737418240   # 10GB

归档路径模板:

模板示例
/year=%Y/month=%m/day=%d//year=2024/month=01/day=15/
/cluster=%s/%Y-%m-%d//cluster=prod/2024-01-15/
/app=%s/%s//app=nginx/YYYY/MM/DD/
27 Vector 如何与 Elasticsearch 集成?

答案:

[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["transformed_logs"]

# ES 端点
endpoint = "http://elasticsearch:9200"
# 多端点
# endpoint = ["http://es1:9200", "http://es2:9200"]

# 索引管理
index = "logs-%Y.%m.%d"           # 按日期分索引
# 或固定索引
# index = "application-logs"

# 文档 ID(可选)
doc_type = "_doc"
# id = "XQOPENid_fieldXQCLOSE"

# 编码
encoding.codec = "json"

# 认证
auth.strategy = "basic"
auth.user = "elastic"
auth.password = "${ES_PASSWORD}"

# TLS
tls.ca_file = "/etc/vector/ca.crt"
tls.verify_certificate = true

# 批量写入
batch.max_events = 1000
batch.timeout_secs = 10

# 路由
query."pipeline" = "my-pipeline"   # Elasticsearch Ingest Pipeline

# Buffer
buffer.type = "disk"
buffer.max_size = 10737418240

Elasticsearch Sink 说明:

参数说明默认值
index目标索引名(支持日期模板)vector-%Y.%m.%d
batch.max_events批量写入事件数1000
batch.timeout_secs批量超时1
auth.strategy认证方式basic
query.*URL 参数pipeline
28 Vector 如何配置 Runtime 资源限制?

答案:

# Vector 全局配置

# 数据目录
data_dir = "/vector-data-dir"

# 每个 Source 并发读取
[sources.file_source]
type = "file"
include = ["/var/log/*.log"]
max_read_bytes = 204800        # 每次读取最大字节
oldest_first = false           # 是否优先读取老的

# 全局并发限制
# 每个 Sink 线程
# 自动管理线程池大小

# 内存限制
# Dedicated 模式:
# vector 进程默认无上限(由系统资源限制)

# 推荐 K8s 资源限制
# memory: 256Mi-1Gi
# cpu: 200m-1000m

K8s 资源限制:

# DaemonSet
resources:
  requests:
    cpu: 200m
    memory: 256Mi
  limits:
    cpu: 1000m
    memory: 512Mi

# Aggregator
resources:
  requests:
    cpu: 1000m
    memory: 1Gi
  limits:
    cpu: 2000m
    memory: 2Gi

性能基准:

  • 单节点 Vector Agent 可以处理 10-50 MB/s 的日志(取决于 Transform 复杂度)
  • Aggregator 可处理 100+ MB/s
  • Vector 内存通常稳定在 10-100MB(Agent)/ 100-500MB(Aggregator)
29 Vector 如何集成 Datadog?

答案:

[sinks.datadog_logs]
type = "datadog_logs"
inputs = ["transformed_logs"]

# Datadog 站点
# us5.datadoghq.com / datadoghq.eu / ...
endpoint = "https://http-intake.logs.datadoghq.com"

# API Key
api_key = "${DATADOG_API_KEY}"

# 压缩
compression = "gzip"

# 编码
encoding.codec = "json"

# 批处理
batch.max_events = 1000
batch.timeout_secs = 5

# 日志标签
# Vector 自动将事件字段作为日志标签
# 也可通过 hostname / service / ddsource 字段控制
hostname = .host

# Datadog 指标
[sinks.datadog_metrics]
type = "datadog_metrics"
inputs = ["internal_metrics"]
api_key = "${DATADOG_API_KEY}"

Datadog 日志字段映射:

[transforms.enrich_datadog]
type = "remap"
inputs = ["logs"]
source = '''
# 标准 Datadog 字段
.hostname = .host
.service = .app
.ddsource = "vector"

# 添加 Datadog 标签
ddtags = ["env:production", "team:platform"]
'''
30 Vector 的生产最佳实践有哪些?

答案:

配置规范:

# 1. Source 配置
# - 优先使用 kubernetes_logs Source(自动丰富元数据)
# - 缓冲区设置 disk 模式保障可靠性
# - 配置 max_read_bytes 控制内存占用

# 2. Transform 配置
# - Transform 尽量在 Agent 端完成,减少 Aggregator 压力
# - VRL 脚本避免过度复杂(影响性能)
# - 使用 filter 提前丢弃无用日志

# 3. Sink 配置
# - 关键链路使用 disk buffer
# - 配置 batch 参数平衡延迟和吞吐
# - 设置合理的重试和超时

# 4. 监控
# - 暴露 Prometheus 指标
# - 配置 vector top 监控管道状态
# - 设置关键组件的告警

# 5. 可靠性
# - Agent + Aggregator 双层架构
# - Aggregator 部署多副本 + LB
# - 网络故障时磁盘缓冲保数据

部署架构建议:

规模AgentAggregatorBuffer
小(< 10 节点)DaemonSet可省略memory
中(10-100 节点)DaemonSet2 副本disk
大(100+ 节点)DaemonSet3+ 副本 + AutoScalingdisk

告警规则:

- alert: VectorHighErrorRate
  expr: rate(vector_component_errors_total[5m]) > 0.01
  for: 5m

- alert: VectorBufferUsageHigh
  expr: vector_buffer_byte_size / vector_buffer_max_size > 0.8
  for: 5m

- alert: VectorHighProcessingLatency
  expr: vector_processing_time_seconds{quantile="0.99"} > 1
  for: 5m