compose: update Clickhouse schema to match casing of proto (#133)

This commit resolves an issue with the mapping between the ClickHouse
schema and the flow.proto schema.

In 3326554, the casing of the proto fields was updated, but the
ClickHouse column names were not also updated, resulting in the
ClickHouse Kafka engine only being able to successfully deserialize
fields that without an underscore.

Also updates the provisioned dashboards. Versions of the tools are also updated.

Co-authored-by: lspgn <lspgn@users.noreply.github.com>
This commit is contained in:
Brooks Swinnerton
2023-03-06 23:39:33 -05:00
committed by GitHub
parent 176eb8772f
commit d53e5f9b5a
6 changed files with 105 additions and 181 deletions

View File

@@ -1,9 +1,13 @@
version: "3"
services:
goflow:
goflow2:
build:
context: ../../
dockerfile: Dockerfile
args:
VERSION: compose
LDFLAGS: -X main.version=compose
image: netsampler/goflow2
user: root # because docker-compose mount as root
ports:
- '8080:8080'
@@ -11,7 +15,7 @@ services:
- '2055:2055/udp'
command:
- -transport=file
- -transport.file=/var/log/goflow/goflow.log
- -transport.file=/var/log/goflow/goflow2.log
- -format=json
restart: always
logging:

View File

@@ -4,7 +4,7 @@ set -e
clickhouse client -n <<-EOSQL
CREATE DATABASE IF NOT EXISTS dictionaries;
CREATE DICTIONARY IF NOT EXISTS dictionaries.protocols (
proto UInt8,
name String,
@@ -17,27 +17,27 @@ clickhouse client -n <<-EOSQL
CREATE TABLE IF NOT EXISTS flows
(
TimeReceived UInt64,
TimeFlowStart UInt64,
time_received UInt64,
time_flow_start UInt64,
SequenceNum UInt32,
SamplingRate UInt64,
SamplerAddress FixedString(16),
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
SrcAddr FixedString(16),
DstAddr FixedString(16),
src_addr FixedString(16),
dst_addr FixedString(16),
SrcAS UInt32,
DstAS UInt32,
src_as UInt32,
dst_as UInt32,
EType UInt32,
Proto UInt32,
etype UInt32,
proto UInt32,
SrcPort UInt32,
DstPort UInt32,
src_port UInt32,
dst_port UInt32,
Bytes UInt64,
Packets UInt64
bytes UInt64,
packets UInt64
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
@@ -48,78 +48,78 @@ clickhouse client -n <<-EOSQL
CREATE TABLE IF NOT EXISTS flows_raw
(
Date Date,
TimeReceived DateTime,
TimeFlowStart DateTime,
date Date,
time_received DateTime,
time_flow_start DateTime,
SequenceNum UInt32,
SamplingRate UInt64,
SamplerAddress FixedString(16),
sequence_num UInt32,
sampling_rate UInt64,
sampler_address FixedString(16),
SrcAddr FixedString(16),
DstAddr FixedString(16),
src_addr FixedString(16),
dst_addr FixedString(16),
SrcAS UInt32,
DstAS UInt32,
src_as UInt32,
dst_as UInt32,
EType UInt32,
Proto UInt32,
etype UInt32,
proto UInt32,
SrcPort UInt32,
DstPort UInt32,
src_port UInt32,
dst_port UInt32,
Bytes UInt64,
Packets UInt64
bytes UInt64,
packets UInt64
) ENGINE = MergeTree()
PARTITION BY Date
ORDER BY TimeReceived;
PARTITION BY date
ORDER BY time_received;
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_raw_view TO flows_raw
AS SELECT
toDate(TimeReceived) AS Date,
toDate(time_received) AS date,
*
FROM flows;
CREATE TABLE IF NOT EXISTS flows_5m
(
Date Date,
Timeslot DateTime,
date Date,
timeslot DateTime,
SrcAS UInt32,
DstAS UInt32,
src_as UInt32,
dst_as UInt32,
ETypeMap Nested (
EType UInt32,
Bytes UInt64,
Packets UInt64,
Count UInt64
etypeMap Nested (
etype UInt32,
bytes UInt64,
packets UInt64,
count UInt64
),
Bytes UInt64,
Packets UInt64,
Count UInt64
bytes UInt64,
packets UInt64,
count UInt64
) ENGINE = SummingMergeTree()
PARTITION BY Date
ORDER BY (Date, Timeslot, SrcAS, DstAS, \`ETypeMap.EType\`);
PARTITION BY date
ORDER BY (date, timeslot, src_as, dst_as, \`etypeMap.etype\`);
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m
CREATE MATERIALIZED VIEW IF NOT EXISTS flows_5m_view TO flows_5m
AS
SELECT
Date,
toStartOfFiveMinute(TimeReceived) AS Timeslot,
SrcAS,
DstAS,
date,
toStartOfFiveMinute(time_received) AS timeslot,
src_as,
dst_as,
[EType] AS \`ETypeMap.EType\`,
[Bytes] AS \`ETypeMap.Bytes\`,
[Packets] AS \`ETypeMap.Packets\`,
[Count] AS \`ETypeMap.Count\`,
[etype] AS \`etypeMap.etype\`,
[bytes] AS \`etypeMap.bytes\`,
[packets] AS \`etypeMap.packets\`,
[count] AS \`etypeMap.count\`,
sum(Bytes) AS Bytes,
sum(Packets) AS Packets,
count() AS Count
sum(bytes) AS bytes,
sum(packets) AS packets,
count() AS count
FROM flows_raw
GROUP BY Date, Timeslot, SrcAS, DstAS, \`ETypeMap.EType\`;
GROUP BY date, timeslot, src_as, dst_as, \`etypeMap.etype\`;
EOSQL

View File

@@ -1,14 +1,14 @@
version: "3"
services:
zookeeper:
image: bitnami/zookeeper:3.6.3
image: bitnami/zookeeper:3.7.1
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
restart: always
kafka:
image: bitnami/kafka:2.8.0
image: bitnami/kafka:3.4.0
ports:
- 9092:9092
environment:
@@ -19,10 +19,11 @@ services:
depends_on:
- zookeeper
grafana:
image: grafana/grafana:9.1.7
image: grafana/grafana:9.4.3
environment:
- GF_INSTALL_PLUGINS=grafana-clickhouse-datasource
- GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=vertamedia-clickhouse-datasource
- GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource
# - GF_INSTALL_PLUGINS=grafana-clickhouse-datasource
# - GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS=vertamedia-clickhouse-datasource
ports:
- 3000:3000
restart: always
@@ -31,7 +32,7 @@ services:
- ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yml
- ./grafana/dashboards:/var/lib/grafana/dashboards
prometheus:
image: prom/prometheus:v2.27.0
image: prom/prometheus:v2.37.6
ports:
- 9090:9090
restart: always
@@ -41,6 +42,10 @@ services:
build:
context: ../../
dockerfile: Dockerfile
args:
VERSION: compose
LDFLAGS: -X main.version=compose
image: netsampler/goflow2
depends_on:
- kafka
ports:
@@ -55,7 +60,7 @@ services:
- -format=pb
- -format.protobuf.fixedlen=true
db:
image: clickhouse/clickhouse-server:22.6.9.11-alpine
image: clickhouse/clickhouse-server:22.8.14.53-alpine
ports:
- 8123:8123
volumes:

View File

@@ -52,6 +52,7 @@
"lines": true,
"linewidth": 1,
"links": [],
"maxDataPoints": 200,
"nullPointMode": "null",
"options": {
"dataLinks": []
@@ -67,20 +68,16 @@
"targets": [
{
"database": "default",
"dateColDataType": "Date",
"dateColDataType": "date",
"dateLoading": false,
"dateTimeColDataType": "TimeFlowStart",
"dateTimeColDataType": "time_flow_start",
"dateTimeType": "DATETIME",
"datetimeLoading": false,
"format": "time_series",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"group": [],
"intervalFactor": 1,
"metricColumn": "none",
"query": "SELECT\n toUInt64(toStartOfMinute($dateTimeCol))*1000 as t,\n sum(Bytes*SamplingRate) as sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY t\nORDER BY t",
"rawQuery": "SELECT toUInt64(toStartOfMinute(TimeFlowStart))*1000 as t, sum(Bytes*SamplingRate) as sumbytes FROM default.flows_raw WHERE Date >= toDate(1585445405) AND TimeFlowStart >= toDateTime(1585445405) GROUP BY t ORDER BY t",
"rawSql": "SELECT\n (cast(extract(epoch from time_flow) as integer)/30)*30 AS \"time\",\n sum(bytes*sampling_rate*8)/30\nFROM flows\nWHERE\n $__timeFilter(date_inserted)\nGROUP BY \"time\"\nORDER BY \"time\"",
"refId": "A",
"query": "SELECT\n t,\n sum(sumbytes) AS sumbytes\nFROM (\n SELECT\n $timeSeries AS t,\n sum(bytes*sampling_rate) as sumbytes\n FROM $table\n WHERE $timeFilter\n GROUP BY t\n\n UNION ALL\n\n SELECT\n intDiv($from+number*$interval, $interval)*$interval*1000 AS t,\n 0 AS sumbytes\n FROM numbers(intDiv($to-$from, $interval))\n)\nGROUP BY t\nORDER BY t", "refId": "A",
"round": "0s",
"select": [
[
@@ -105,46 +102,8 @@
]
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Instant traffic",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "bps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
"type": "timeseries"
},
{
"columns": [],
@@ -182,7 +141,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -199,7 +157,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -216,7 +173,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -232,19 +188,16 @@
"targets": [
{
"database": "default",
"dateColDataType": "",
"dateColDataType": "date",
"dateLoading": false,
"dateTimeColDataType": "TimeFlowStart",
"dateTimeColDataType": "time_flow_start",
"dateTimeType": "DATETIME",
"datetimeLoading": false,
"format": "table",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"group": [],
"intervalFactor": 1,
"metricColumn": "none",
"query": "SELECT\n if(EType = 0x800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13,4))), IPv6NumToString(SrcAddr)) as srcip,\n sum(Bytes*SamplingRate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY srcip\nORDER BY sumbytes DESC",
"rawQuery": "SELECT toUInt64(toStartOfMinute(TimeFlowStart))*1000 as t, sum(Bytes*SamplingRate) as sumbytes FROM default.flows_raw WHERE Date >= toDate(1593315015) AND TimeFlowStart >= toDateTime(1593315015) GROUP BY t ORDER BY t",
"rawSql": "SELECT src_ip, count(*), sum(bytes) AS sumBytes FROM flows GROUP BY src_ip",
"query": "SELECT\n if(etype = 0x800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(src_addr), 13,4))), IPv6NumToString(src_addr)) as srcip,\n sum(bytes*sampling_rate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY srcip\nORDER BY sumbytes DESC",
"refId": "A",
"round": "0s",
"select": [
@@ -271,7 +224,7 @@
],
"title": "Top source IPs",
"transform": "table",
"type": "table-old"
"type": "table"
},
{
"columns": [],
@@ -311,7 +264,6 @@
"$$hashKey": "object:1507",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -329,7 +281,6 @@
"$$hashKey": "object:1508",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -347,7 +298,6 @@
"$$hashKey": "object:1509",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -363,20 +313,17 @@
"targets": [
{
"database": "default",
"dateColDataType": "",
"dateColDataType": "date",
"dateLoading": false,
"dateTimeColDataType": "TimeFlowStart",
"dateTimeColDataType": "time_flow_start",
"dateTimeType": "DATETIME",
"datetimeLoading": false,
"extrapolate": true,
"format": "table",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"group": [],
"intervalFactor": 1,
"metricColumn": "none",
"query": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(Proto)) AS protoName\nSELECT\n if(protoName = '', toString(Proto), protoName) || '/' || toString(SrcPort) as port,\n sum(Bytes*SamplingRate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY port\nORDER BY sumbytes DESC",
"rawQuery": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(Proto)) AS protoName SELECT if(protoName = '', toString(Proto), protoName) || '/' || toString(SrcPort) as port, sum(Bytes*SamplingRate) AS sumbytes FROM default.flows_raw WHERE TimeFlowStart >= toDateTime(1593319741) GROUP BY port ORDER BY sumbytes DESC",
"rawSql": "SELECT src_ip, count(*), sum(bytes) AS sumBytes FROM flows GROUP BY src_ip",
"query": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(proto)) AS protoName\nSELECT\n if(protoName = '', toString(proto), protoName) || '/' || toString(src_port) as port,\n sum(bytes*sampling_rate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY port\nORDER BY sumbytes DESC",
"refId": "A",
"round": "0s",
"select": [
@@ -403,7 +350,7 @@
],
"title": "Top source ports",
"transform": "table",
"type": "table-old"
"type": "table"
},
{
"columns": [],
@@ -441,7 +388,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -458,7 +404,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -475,7 +420,6 @@
{
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -491,19 +435,16 @@
"targets": [
{
"database": "default",
"dateColDataType": "",
"dateColDataType": "date",
"dateLoading": false,
"dateTimeColDataType": "TimeFlowStart",
"dateTimeColDataType": "time_flow_start",
"dateTimeType": "DATETIME",
"datetimeLoading": false,
"format": "table",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"group": [],
"intervalFactor": 1,
"metricColumn": "none",
"query": "SELECT\n if(EType = 0x800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13,4))), IPv6NumToString(DstAddr)) as dstip,\n sum(Bytes*SamplingRate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY dstip\nORDER BY sumbytes DESC",
"rawQuery": "SELECT toUInt64(toStartOfMinute(TimeFlowStart))*1000 as t, sum(Bytes*SamplingRate) as sumbytes FROM default.flows_raw WHERE Date >= toDate(1593317660) AND TimeFlowStart >= toDateTime(1593317660) GROUP BY t ORDER BY t",
"rawSql": "SELECT src_ip, count(*), sum(bytes) AS sumBytes FROM flows GROUP BY src_ip",
"query": "SELECT\n if(etype = 0x800, IPv4NumToString(reinterpretAsUInt32(substring(reverse(dst_addr), 13,4))), IPv6NumToString(dst_addr)) as dstip,\n sum(bytes*sampling_rate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY dstip\nORDER BY sumbytes DESC",
"refId": "A",
"round": "0s",
"select": [
@@ -530,7 +471,7 @@
],
"title": "Top destination IPs",
"transform": "table",
"type": "table-old"
"type": "table"
},
{
"columns": [],
@@ -570,7 +511,6 @@
"$$hashKey": "object:1429",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -588,7 +528,6 @@
"$$hashKey": "object:1430",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -606,7 +545,6 @@
"$$hashKey": "object:1431",
"alias": "",
"align": "auto",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
"rgba(237, 129, 40, 0.89)",
@@ -622,20 +560,17 @@
"targets": [
{
"database": "default",
"dateColDataType": "",
"dateColDataType": "date",
"dateLoading": false,
"dateTimeColDataType": "TimeFlowStart",
"dateTimeColDataType": "time_flow_start",
"dateTimeType": "DATETIME",
"datetimeLoading": false,
"extrapolate": true,
"format": "table",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"group": [],
"intervalFactor": 1,
"metricColumn": "none",
"query": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(Proto)) AS protoName\nSELECT\n if(protoName = '', toString(Proto), protoName) || '/' || toString(DstPort) as port,\n sum(Bytes*SamplingRate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY port\nORDER BY sumbytes DESC",
"rawQuery": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(Proto)) AS protoName SELECT if(protoName = '', toString(Proto), protoName) || '/' || toString(DstPort) as port, sum(Bytes*SamplingRate) AS sumbytes FROM default.flows_raw WHERE TimeFlowStart >= toDateTime(1593319708) GROUP BY port ORDER BY sumbytes DESC",
"rawSql": "SELECT src_ip, count(*), sum(bytes) AS sumBytes FROM flows GROUP BY src_ip",
"query": "WITH dictGetString('dictionaries.protocols', 'name', toUInt64(proto)) AS protoName\nSELECT\n if(protoName = '', toString(proto), protoName) || '/' || toString(dst_port) as port,\n sum(bytes*sampling_rate) AS sumbytes\nFROM $table\nWHERE $timeFilter\nGROUP BY port\nORDER BY sumbytes DESC",
"refId": "A",
"round": "0s",
"select": [
@@ -662,10 +597,10 @@
],
"title": "Top destination ports",
"transform": "table",
"type": "table-old"
"type": "table"
}
],
"refresh": false,
"refresh": "",
"schemaVersion": 25,
"style": "dark",
"tags": [],

View File

@@ -24,23 +24,3 @@ datasources:
secureJsonFields: {}
version: 3
readOnly: false
- name: ClickHouse-new
type: grafana-clickhouse-datasource
typeLogoUrl: ''
access: proxy
url: http://db:8123
password: ''
user: ''
database: ''
basicAuth: false
basicAuthUser: ''
basicAuthPassword: ''
withCredentials: false
isDefault: true
jsonData:
port: 8123
protocol: http
server: db
secureJsonFields: {}
version: 3
readOnly: false

View File

@@ -11,4 +11,4 @@ rule_files:
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090', 'goflow:8080']
- targets: ['localhost:9090', 'goflow2:8080']