Overview
Blocklight outputs findings in NDJSON (Newline Delimited JSON) format via the file alert channel, making it compatible with virtually any log aggregation and SIEM platform. This guide shows how to integrate Blocklight’s NDJSON output with popular data pipeline tools like Vector, Logstash, and Fluentd, which can then route findings to Elasticsearch, Splunk, cloud services, and other destinations.
Important: Blocklight does not have native exporters for Elasticsearch, Splunk, or cloud services. Instead, it outputs NDJSON to a file, which you then process with external tools (Vector, Logstash, Fluentd) to route to your desired destinations.
NDJSON (Newline Delimited JSON)
Optimized for streaming and log aggregation:
{"timestamp":"2024-01-15T10:30:00Z","severity":"CRITICAL","rule_name":"high_value_transfer","chain":"ethereum","block_number":18500000,"tx_hash":"0x...","contract_address":"0x...","tags":["defi","high-value"],"output":"High-value transfer detected: 1000 ETH"}
{"timestamp":"2024-01-15T10:30:15Z","severity":"WARNING","rule_name":"flash_loan_pattern","chain":"ethereum","block_number":18500001,"tx_hash":"0x...","tags":["defi","flash-loan"],"output":"Potential flash loan attack pattern"}
Key Fields:
timestamp: ISO 8601 timestamp
severity: CRITICAL, WARNING, NOTICE
rule_name: Detection rule identifier
chain: Blockchain name
block_number: Block number
tx_hash: Transaction hash
contract_address: Contract address (if applicable)
tags: Array of tags
references: Array of threat intelligence URLs
metadata: Additional context
For CI/CD integration:
{
"version": "2.1.0",
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"runs": [{
"tool": {
"driver": {
"name": "Blocklight",
"version": "1.0.0"
}
},
"results": [...]
}]
}
Vector Integration
Vector is a high-performance observability data pipeline.
Configuration
Create vector.toml:
# Source: Read from Blocklight stdout
[sources.blocklight]
type = "docker_logs"
include_containers = ["blocklight-core"]
# Transform: Parse JSON and extract fields
[transforms.parse]
type = "remap"
inputs = ["blocklight"]
source = '''
. = parse_json!(.message)
.severity_level = if .severity == "CRITICAL" { 1 }
else if .severity == "WARNING" { 2 }
else { 3 }
'''
# Transform: Enrich with metadata
[transforms.enrich]
type = "remap"
inputs = ["parse"]
source = '''
.environment = "production"
.source = "blocklight"
.indexed = {
"severity": .severity,
"chain": .chain,
"rule": .rule_name
}
'''
# Sink: Send to Loki
[sinks.loki]
type = "loki"
inputs = ["enrich"]
endpoint = "http://loki:3100"
encoding.codec = "json"
labels.job = "blocklight"
labels.severity = "{{ severity }}"
labels.chain = "{{ chain }}"
# Sink: Send to Elasticsearch
[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["enrich"]
endpoint = "http://elasticsearch:9200"
index = "blocklight-findings-%Y.%m.%d"
# Sink: Send to S3 for archival
[sinks.s3]
type = "aws_s3"
inputs = ["enrich"]
bucket = "blocklight-findings"
key_prefix = "year=%Y/month=%m/day=%d/"
encoding.codec = "json"
compression = "gzip"
Deploy with Docker Compose
services:
vector:
image: timberio/vector:latest-alpine
volumes:
- ./vector.toml:/etc/vector/vector.toml:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
ports:
- "8686:8686"
networks:
- blocklight-network
Logstash Integration
Logstash is part of the Elastic Stack.
Configuration
Create logstash.conf:
input {
# Read from Blocklight file output
file {
path => "/app/output/findings-*.jsonl"
start_position => "beginning"
codec => "json"
tags => ["blocklight"]
}
# Or read from TCP
tcp {
port => 5000
codec => "json"
tags => ["blocklight"]
}
}
filter {
# Parse timestamp
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
# Add severity level for sorting
if [severity] == "CRITICAL" {
mutate { add_field => { "severity_level" => 1 } }
} else if [severity] == "WARNING" {
mutate { add_field => { "severity_level" => 2 } }
} else {
mutate { add_field => { "severity_level" => 3 } }
}
# Extract contract address from metadata
if [metadata][contract_address] {
mutate {
add_field => { "contract_address" => "%{[metadata][contract_address]}" }
}
}
# Add geolocation for chain (optional)
mutate {
add_field => { "chain_type" => "evm" }
}
}
output {
# Send to Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "blocklight-findings-%{+YYYY.MM.dd}"
document_type => "_doc"
}
# Send to Kafka for downstream processing
kafka {
topic_id => "blocklight-findings"
bootstrap_servers => "kafka:9092"
codec => "json"
}
# Debug output
stdout {
codec => rubydebug
}
}
Deploy with Docker Compose
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- blocklight-output:/app/output:ro
ports:
- "5000:5000"
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
networks:
- blocklight-network
Fluentd Integration
Fluentd is an open-source data collector.
Configuration
Create fluent.conf:
<source>
@type tail
path /app/output/findings-*.jsonl
pos_file /var/log/fluentd/blocklight.pos
tag blocklight.findings
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S%z
</parse>
</source>
# Filter: Add metadata
<filter blocklight.**>
@type record_transformer
<record>
source blocklight
environment production
severity_level ${record["severity"] == "CRITICAL" ? 1 : (record["severity"] == "WARNING" ? 2 : 3)}
</record>
</filter>
# Output: Send to multiple destinations
<match blocklight.**>
@type copy
# Send to Elasticsearch
<store>
@type elasticsearch
host elasticsearch
port 9200
index_name blocklight-findings
type_name _doc
logstash_format true
logstash_prefix blocklight
</store>
# Send to S3
<store>
@type s3
aws_key_id YOUR_AWS_KEY
aws_sec_key YOUR_AWS_SECRET
s3_bucket blocklight-findings
s3_region us-east-1
path findings/%Y/%m/%d/
<buffer>
@type file
path /var/log/fluentd/s3
timekey 3600
timekey_wait 10m
chunk_limit_size 256m
</buffer>
</store>
# Send to Splunk HEC
<store>
@type splunk_hec
host splunk
port 8088
token YOUR_HEC_TOKEN
index blocklight
source blocklight-engine
sourcetype _json
</store>
</match>
Deploy with Docker Compose
services:
fluentd:
image: fluent/fluentd:v1.16-1
volumes:
- ./fluent.conf:/fluentd/etc/fluent.conf:ro
- blocklight-output:/app/output:ro
ports:
- "24224:24224"
networks:
- blocklight-network
Splunk Integration
Splunk HTTP Event Collector (HEC)
Configure Blocklight to send directly to Splunk:
# config.yaml
exporters:
splunk:
enabled: true
endpoint: https://splunk:8088/services/collector
token: ${SPLUNK_HEC_TOKEN}
index: blocklight
source: blocklight-engine
sourcetype: _json
Via Fluentd/Logstash
Use the configurations above with Splunk HEC output.
Splunk Search Queries
# All critical findings
index=blocklight severity=CRITICAL
# Findings by chain
index=blocklight | stats count by chain
# Top triggered rules
index=blocklight | top rule_name
# High-value transfers
index=blocklight rule_name="high_value_transfer" | table timestamp, tx_hash, metadata.value
# Flash loan attacks
index=blocklight tags="flash-loan" | timechart count by severity
Elasticsearch Integration
Blocklight outputs NDJSON to a file. Use Vector, Logstash, or Fluentd (configured above) to send findings to Elasticsearch.
Index Template
Create an index template for optimized storage:
{
"index_patterns": ["blocklight-findings-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.lifecycle.name": "blocklight-policy"
},
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"severity": { "type": "keyword" },
"severity_level": { "type": "integer" },
"rule_name": { "type": "keyword" },
"chain": { "type": "keyword" },
"block_number": { "type": "long" },
"tx_hash": { "type": "keyword" },
"contract_address": { "type": "keyword" },
"tags": { "type": "keyword" },
"output": { "type": "text" },
"metadata": { "type": "object", "enabled": false }
}
}
}
}
Kibana Dashboards
Import pre-built visualizations:
- Finding Timeline: Time series of detections
- Severity Distribution: Pie chart by severity
- Top Rules: Bar chart of most triggered rules
- Chain Activity: Heatmap by chain and time
- Contract Analysis: Table of flagged contracts
Cloud Service Integration
Blocklight outputs NDJSON to a file. Use Vector, Logstash, or Fluentd to route findings to cloud services:
- AWS: Use Vector’s
aws_s3 or aws_cloudwatch_logs sinks
- Google Cloud: Use Vector’s
gcp_cloud_logging or gcp_pubsub sinks
- Azure: Use Vector’s
azure_monitor sink
- Datadog: Use Vector’s
datadog_logs sink
See the Vector/Logstash/Fluentd configurations above for examples. Configure these tools to read from Blocklight’s NDJSON output file and route to your cloud service.
Webhook Channel: For direct HTTP integration, use Blocklight’s webhook alert channel (configured in alerting.channels in config.yaml), not an exporter.
Configure your pipeline tool (Vector/Logstash/Fluentd) for optimal performance:
- Batching: Configure batch sizes in your pipeline tool (e.g., Vector’s
batch settings)
- Buffering: Enable disk buffering in your pipeline tool for reliability
- Compression: Enable compression in your pipeline tool’s sinks to reduce bandwidth
Monitoring Pipeline Health
Vector Metrics
# Throughput
rate(vector_component_sent_events_total[5m])
# Errors
rate(vector_component_errors_total[5m])
# Lag
vector_buffer_events
Logstash Metrics
curl http://localhost:9600/_node/stats/pipelines
Fluentd Metrics
<source>
@type monitor_agent
bind 0.0.0.0
port 24220
</source>
Troubleshooting
Issue: Data not appearing in destination
Check:
- Blocklight is outputting data:
docker logs blocklight-core
- Pipeline is reading data: Check pipeline logs
- Destination is reachable: Test connectivity
- Credentials are correct: Verify API keys/tokens
Issue: High latency
Solutions:
- Increase batch sizes
- Enable compression
- Add more pipeline workers
- Use local buffering
Issue: Data loss
Solutions:
- Enable persistent queues
- Increase buffer sizes
- Add retry logic
- Use at-least-once delivery
Best Practices
- Use structured logging: NDJSON for easy parsing
- Enable compression: Reduce bandwidth and storage costs
- Implement buffering: Prevent data loss during outages
- Monitor pipeline health: Track throughput and errors
- Test failover: Ensure redundancy works
- Secure credentials: Use secret management
- Rotate logs: Prevent disk space issues
- Index optimization: Use proper data types in Elasticsearch
- Partition data: By chain, severity, or time
- Archive old data: Move to cold storage after 90 days
Next Steps