Skip to main content

Overview

Blocklight outputs findings in NDJSON (Newline Delimited JSON) format via the file alert channel, making it compatible with virtually any log aggregation and SIEM platform. This guide shows how to integrate Blocklight’s NDJSON output with popular data pipeline tools like Vector, Logstash, and Fluentd, which can then route findings to Elasticsearch, Splunk, cloud services, and other destinations.
Important: Blocklight does not have native exporters for Elasticsearch, Splunk, or cloud services. Instead, it outputs NDJSON to a file, which you then process with external tools (Vector, Logstash, Fluentd) to route to your desired destinations.

Output Formats

NDJSON (Newline Delimited JSON)

Optimized for streaming and log aggregation:
{"timestamp":"2024-01-15T10:30:00Z","severity":"CRITICAL","rule_name":"high_value_transfer","chain":"ethereum","block_number":18500000,"tx_hash":"0x...","contract_address":"0x...","tags":["defi","high-value"],"output":"High-value transfer detected: 1000 ETH"}
{"timestamp":"2024-01-15T10:30:15Z","severity":"WARNING","rule_name":"flash_loan_pattern","chain":"ethereum","block_number":18500001,"tx_hash":"0x...","tags":["defi","flash-loan"],"output":"Potential flash loan attack pattern"}
Key Fields:
  • timestamp: ISO 8601 timestamp
  • severity: CRITICAL, WARNING, NOTICE
  • rule_name: Detection rule identifier
  • chain: Blockchain name
  • block_number: Block number
  • tx_hash: Transaction hash
  • contract_address: Contract address (if applicable)
  • tags: Array of tags
  • references: Array of threat intelligence URLs
  • metadata: Additional context

SARIF (Static Analysis Results Interchange Format)

For CI/CD integration:
{
  "version": "2.1.0",
  "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
  "runs": [{
    "tool": {
      "driver": {
        "name": "Blocklight",
        "version": "1.0.0"
      }
    },
    "results": [...]
  }]
}

Vector Integration

Vector is a high-performance observability data pipeline.

Configuration

Create vector.toml:
# Source: Read from Blocklight stdout
[sources.blocklight]
type = "docker_logs"
include_containers = ["blocklight-core"]

# Transform: Parse JSON and extract fields
[transforms.parse]
type = "remap"
inputs = ["blocklight"]
source = '''
  . = parse_json!(.message)
  .severity_level = if .severity == "CRITICAL" { 1 } 
                    else if .severity == "WARNING" { 2 } 
                    else { 3 }
'''

# Transform: Enrich with metadata
[transforms.enrich]
type = "remap"
inputs = ["parse"]
source = '''
  .environment = "production"
  .source = "blocklight"
  .indexed = {
    "severity": .severity,
    "chain": .chain,
    "rule": .rule_name
  }
'''

# Sink: Send to Loki
[sinks.loki]
type = "loki"
inputs = ["enrich"]
endpoint = "http://loki:3100"
encoding.codec = "json"
labels.job = "blocklight"
labels.severity = "{{ severity }}"
labels.chain = "{{ chain }}"

# Sink: Send to Elasticsearch
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["enrich"]
endpoint = "http://elasticsearch:9200"
index = "blocklight-findings-%Y.%m.%d"

# Sink: Send to S3 for archival
[sinks.s3]
type = "aws_s3"
inputs = ["enrich"]
bucket = "blocklight-findings"
key_prefix = "year=%Y/month=%m/day=%d/"
encoding.codec = "json"
compression = "gzip"

Deploy with Docker Compose

services:
  vector:
    image: timberio/vector:latest-alpine
    volumes:
      - ./vector.toml:/etc/vector/vector.toml:ro
      - /var/run/docker.sock:/var/run/docker.sock:ro
    ports:
      - "8686:8686"
    networks:
      - blocklight-network

Logstash Integration

Logstash is part of the Elastic Stack.

Configuration

Create logstash.conf:
input {
  # Read from Blocklight file output
  file {
    path => "/app/output/findings-*.jsonl"
    start_position => "beginning"
    codec => "json"
    tags => ["blocklight"]
  }
  
  # Or read from TCP
  tcp {
    port => 5000
    codec => "json"
    tags => ["blocklight"]
  }
}

filter {
  # Parse timestamp
  date {
    match => [ "timestamp", "ISO8601" ]
    target => "@timestamp"
  }
  
  # Add severity level for sorting
  if [severity] == "CRITICAL" {
    mutate { add_field => { "severity_level" => 1 } }
  } else if [severity] == "WARNING" {
    mutate { add_field => { "severity_level" => 2 } }
  } else {
    mutate { add_field => { "severity_level" => 3 } }
  }
  
  # Extract contract address from metadata
  if [metadata][contract_address] {
    mutate {
      add_field => { "contract_address" => "%{[metadata][contract_address]}" }
    }
  }
  
  # Add geolocation for chain (optional)
  mutate {
    add_field => { "chain_type" => "evm" }
  }
}

output {
  # Send to Elasticsearch
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "blocklight-findings-%{+YYYY.MM.dd}"
    document_type => "_doc"
  }
  
  # Send to Kafka for downstream processing
  kafka {
    topic_id => "blocklight-findings"
    bootstrap_servers => "kafka:9092"
    codec => "json"
  }
  
  # Debug output
  stdout {
    codec => rubydebug
  }
}

Deploy with Docker Compose

services:
  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
      - blocklight-output:/app/output:ro
    ports:
      - "5000:5000"
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
    networks:
      - blocklight-network

Fluentd Integration

Fluentd is an open-source data collector.

Configuration

Create fluent.conf:
<source>
  @type tail
  path /app/output/findings-*.jsonl
  pos_file /var/log/fluentd/blocklight.pos
  tag blocklight.findings
  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S%z
  </parse>
</source>

# Filter: Add metadata
<filter blocklight.**>
  @type record_transformer
  <record>
    source blocklight
    environment production
    severity_level ${record["severity"] == "CRITICAL" ? 1 : (record["severity"] == "WARNING" ? 2 : 3)}
  </record>
</filter>

# Output: Send to multiple destinations
<match blocklight.**>
  @type copy
  
  # Send to Elasticsearch
  <store>
    @type elasticsearch
    host elasticsearch
    port 9200
    index_name blocklight-findings
    type_name _doc
    logstash_format true
    logstash_prefix blocklight
  </store>
  
  # Send to S3
  <store>
    @type s3
    aws_key_id YOUR_AWS_KEY
    aws_sec_key YOUR_AWS_SECRET
    s3_bucket blocklight-findings
    s3_region us-east-1
    path findings/%Y/%m/%d/
    <buffer>
      @type file
      path /var/log/fluentd/s3
      timekey 3600
      timekey_wait 10m
      chunk_limit_size 256m
    </buffer>
  </store>
  
  # Send to Splunk HEC
  <store>
    @type splunk_hec
    host splunk
    port 8088
    token YOUR_HEC_TOKEN
    index blocklight
    source blocklight-engine
    sourcetype _json
  </store>
</match>

Deploy with Docker Compose

services:
  fluentd:
    image: fluent/fluentd:v1.16-1
    volumes:
      - ./fluent.conf:/fluentd/etc/fluent.conf:ro
      - blocklight-output:/app/output:ro
    ports:
      - "24224:24224"
    networks:
      - blocklight-network

Splunk Integration

Splunk HTTP Event Collector (HEC)

Configure Blocklight to send directly to Splunk:
# config.yaml
exporters:
  splunk:
    enabled: true
    endpoint: https://splunk:8088/services/collector
    token: ${SPLUNK_HEC_TOKEN}
    index: blocklight
    source: blocklight-engine
    sourcetype: _json

Via Fluentd/Logstash

Use the configurations above with Splunk HEC output.

Splunk Search Queries

# All critical findings
index=blocklight severity=CRITICAL

# Findings by chain
index=blocklight | stats count by chain

# Top triggered rules
index=blocklight | top rule_name

# High-value transfers
index=blocklight rule_name="high_value_transfer" | table timestamp, tx_hash, metadata.value

# Flash loan attacks
index=blocklight tags="flash-loan" | timechart count by severity

Elasticsearch Integration

Blocklight outputs NDJSON to a file. Use Vector, Logstash, or Fluentd (configured above) to send findings to Elasticsearch.

Index Template

Create an index template for optimized storage:
{
  "index_patterns": ["blocklight-findings-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.lifecycle.name": "blocklight-policy"
    },
    "mappings": {
      "properties": {
        "timestamp": { "type": "date" },
        "severity": { "type": "keyword" },
        "severity_level": { "type": "integer" },
        "rule_name": { "type": "keyword" },
        "chain": { "type": "keyword" },
        "block_number": { "type": "long" },
        "tx_hash": { "type": "keyword" },
        "contract_address": { "type": "keyword" },
        "tags": { "type": "keyword" },
        "output": { "type": "text" },
        "metadata": { "type": "object", "enabled": false }
      }
    }
  }
}

Kibana Dashboards

Import pre-built visualizations:
  1. Finding Timeline: Time series of detections
  2. Severity Distribution: Pie chart by severity
  3. Top Rules: Bar chart of most triggered rules
  4. Chain Activity: Heatmap by chain and time
  5. Contract Analysis: Table of flagged contracts

Cloud Service Integration

Blocklight outputs NDJSON to a file. Use Vector, Logstash, or Fluentd to route findings to cloud services:
  • AWS: Use Vector’s aws_s3 or aws_cloudwatch_logs sinks
  • Google Cloud: Use Vector’s gcp_cloud_logging or gcp_pubsub sinks
  • Azure: Use Vector’s azure_monitor sink
  • Datadog: Use Vector’s datadog_logs sink
See the Vector/Logstash/Fluentd configurations above for examples. Configure these tools to read from Blocklight’s NDJSON output file and route to your cloud service.
Webhook Channel: For direct HTTP integration, use Blocklight’s webhook alert channel (configured in alerting.channels in config.yaml), not an exporter.

Performance Considerations

Configure your pipeline tool (Vector/Logstash/Fluentd) for optimal performance:
  • Batching: Configure batch sizes in your pipeline tool (e.g., Vector’s batch settings)
  • Buffering: Enable disk buffering in your pipeline tool for reliability
  • Compression: Enable compression in your pipeline tool’s sinks to reduce bandwidth

Monitoring Pipeline Health

Vector Metrics

# Throughput
rate(vector_component_sent_events_total[5m])

# Errors
rate(vector_component_errors_total[5m])

# Lag
vector_buffer_events

Logstash Metrics

curl http://localhost:9600/_node/stats/pipelines

Fluentd Metrics

<source>
  @type monitor_agent
  bind 0.0.0.0
  port 24220
</source>

Troubleshooting

Issue: Data not appearing in destination

Check:
  1. Blocklight is outputting data: docker logs blocklight-core
  2. Pipeline is reading data: Check pipeline logs
  3. Destination is reachable: Test connectivity
  4. Credentials are correct: Verify API keys/tokens

Issue: High latency

Solutions:
  • Increase batch sizes
  • Enable compression
  • Add more pipeline workers
  • Use local buffering

Issue: Data loss

Solutions:
  • Enable persistent queues
  • Increase buffer sizes
  • Add retry logic
  • Use at-least-once delivery

Best Practices

  1. Use structured logging: NDJSON for easy parsing
  2. Enable compression: Reduce bandwidth and storage costs
  3. Implement buffering: Prevent data loss during outages
  4. Monitor pipeline health: Track throughput and errors
  5. Test failover: Ensure redundancy works
  6. Secure credentials: Use secret management
  7. Rotate logs: Prevent disk space issues
  8. Index optimization: Use proper data types in Elasticsearch
  9. Partition data: By chain, severity, or time
  10. Archive old data: Move to cold storage after 90 days

Next Steps