ClickHouse & Kafka

ClickHouse ClickHouse (CH) e' un Columnar Database, con interfaccia SQL, distribuito con prestazioni ottime sulle attivita' OLAP (On-Line Analytical Processing) ed in grado di effettuare caricamenti di grandi quantita' di dati.
ClickHouse e' un Open Source con licenza Apache 2.0 e tutte le funzionalita' sono disponibili senza limitazioni. CH e' un'economica ed efficiente alternativa per mantenere ed interrogare velocemente grandi quantita' di dati.

Kafka e' un sistema di messaggistica fault tolerant e scalabile in grado consegnare milioni di messaggi al secondo.
Kakfa e' diventato lo standard di fatto per lo scambio di informazioni nel mondo Big Data.

Sono molti gli ambiti in cui Kafka e ClickHouse possono operare insieme. In questa paginetta vediamo come.

Questo documento presenta gli aspetti dell'integrazione tra ClickHouse e Kafka: ClickHouse , Kafka, Primi passi, Utilizzo, Configurazione e tuning, ... utilizzando un approccio pratico con esempi. Questa pagina e' stata aggiornata alla versione 23.08 di ClickHouse perche' sono state introdotte significative nuove funzionalita'.

ClickHouse

ClickHouse utilizza un approccio differente rispetto alla rappresentazione ISAM e con indici B-Tree tipica dei DB relazionali memorizzando i dati per colonna.
Questo consente di utilizzare algoritmi differenti nell'accesso ai dati che possono essere eseguiti in parallelo. Innanzi si utilizzano tutte le CPU sul nodo ospite, ma e' possibile farlo anche in rete con piu' nodi. Le query di ClickHouse in cluster scalano in modo pressoche' lineare come prestazioni.

L'interfaccia l'SQL rende facilmente utilizzabile ClickHouse a chiunque conosca il linguaggio.

ClickHouse ha ottime prestazioni: e' migliaia di volte piu' veloce nelle query OLAP rispetto ad un DB relazionale tradizionale. ClickHouse non presenta la latenza tipica di alcuni DB NoSQL rispondendo in modo quasi real time alle richieste OLAP. Anche l'inserimento massivo di dati e' ottimizzato consentendo il caricamento di milioni di record al secondo.

ClickHouse, oltre che come database per query analitiche, grazie alla veloce ingestione di dati ed alla loro compressione, puo' essere utilizzato efficacemente per mantenere e consolidare grandi quantita' di dati.

Kafka

Kafka Cluster Kafka e' un sistema di messaggistica sviluppato inizialmente da LinkedIn e quindi mantenuto come progetto Open Source da Apache. Il codice e' principalmente scritto in Scala, un moderno linguaggio ad oggetti che viene compilato in bytecode Java.
Kafka e' distribuito, partizionato, fault-tolerant e puo' scalare per gestire milioni di messaggi al secondo.

Un Kafka cluster e' costituito da piu' Broker, riceve i messaggi dai Producer e li consegna ai Consumer.
I messaggi sono raggruppati in categorie chiamate Topic. Un Topic puo' essere costituito da una o piu' Partition che vengono elaborate in parallelo da broker diversi, eventualmente su server differenti. Le partizioni possono essere replicate per fornire l'HA. Per ogni partizioni replicata viene eletto un leader mentre gli altri broker sono follower. Per l'elezione del leader viene utilizzato Zookeeper [NdA dalla versione 3.3 di Kafka e' stato introdotto il protocollo KRaft in sostituzione di Zookeeper].
I consumer sono raggruppati in Consumer Group che possono avere due politiche di consegna:

Kafka garantisce che nessun messaggio venga perso e che i messaggi vengano consegnati nello stesso ordine con cui sono stati inseriti (per topic/partizione o commit log).

Grazie alla robustezza della soluzione ed alla sua scalabilita' Kafka viene utilizzato in molteplici contesti come Stream Processing, Metrics, Log Aggregation, ... sulle piattaforme di produzione di migliaia di aziende [NdE piu' dell' 80% delle aziende Fortune 100].

Primi passi

ClickHouse dispone di un Engine Kafka che integra l'accesso al sistema di messaggistica. Per utilizzare Kafka ClickHouse bastano... semplici comandi SQL!

Per creare utilizzare l'Engine Kafka la sintassi e':

CREATE TABLE table_name (
...
) Engine = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_schema = '',]
    ...

Tutto qui?
Ora basta leggere dalla tabella per consumare i messaggi provenienti da Kafka:

CREATE TABLE iot_queue ( time_ref DateTime, sensor String, metric String, measure Float64 ) ENGINE = Kafka('localhost:9092', 'topic1', 'group1') SETTINGS kafka_format = 'JSONEachRow', kafka_row_delimiter = '\n'; SET stream_like_engine_allow_direct_select=1; SELECT * FROM iot_queue;

Gia' fatto?!

Si!
La DDL crea una tabella con Engine Kafka accedendo ad un cluster locale (le porte di Kafka di default sono 9092 e 9093 per l'SSL). Con la select si consumano i dati inviati sul topic1. Importante e' la definizione del consumer group che consente di operare in parallelo o in HA. Ma questa era solo una prova...

Utilizzo

Funziona gia' tutto ma in realta' l'utilizzo di una SELECT per leggere i dati da Kafka, se non per effettuare verifiche puntuali sui dati, non e' la modalita' ottimale...

Una funzionalita' molto importante di ClickHouse, che puo' essere sfruttata in questo caso, e' quella delle Materialized View. Kafka with ClickHouse Materialized View L'idea di base e' molto semplice: la tabella creata con l'Engine Kafka raccoglie i dati che vengono automaticamente letti da una Materialized View che li appoggia su una normale tabella Merge Tree. Ecco i comandi per creare la MV agganciata alla tabelle Kafka creata in precedenza:

CREATE TABLE iot_storage (
    time_ref DateTime Codec(DoubleDelta, LZ4),
    sensor String,
    metric String,
    measure Float64 Codec(T64, LZ4)
) ENGINE = MergeTree
  PARTITION BY toYYYYMM(time_ref);

CREATE MATERIALIZED VIEW iot_mv TO iot_storage
    AS SELECT *
    FROM iot_queue;

La Storage Table fornisce mantiene in modo persistente i dati provenienti da Kafka, come tutte le tabelle CH verra' partizionata e conterra' dati fortemente compressi, potra' avere un TTL, ...

Sulla stessa tabella Kafka possono essere definite piu' materialized view, ciascuna con il livello di dettaglio desiderato, che vengono aggiornate automaticamente via via che i dati sono raccolti dalla tabella Consumer.

CREATE MATERIALIZED VIEW daily_stat
    AS SELECT toDate(toDateTime(timestamp)) AS day, sensor, metric,
              avgState(measure) as avg_measure, countState(*) as total
    FROM iot_queue GROUP BY day, sensor, metric;

SELECT sensor, sumMerge(total) FROM daily_stat GROUP BY sensor;

Dal punto di vista di Kafka il contenuto dei messaggi e' completamente libero. CH pero' deve raccogliere il messaggio in una struttura tabellare, quindi il formato dei messaggi ha molta importanza. Come formati CH accetta tutti i formati di INPUT accettati dalla INSERT. L'elenco ufficiale li riporta tutti... tra i principali: TabSeparated, TabSeparatedWithNames, CSV, CSVWithNames, JSONEachRow, Parquet, ... LineAsString.

L'Engine Kafka di CH e' molto veloce ed una tabella con un singolo Consumer e' adatta alla maggioranza dei carichi. Nella definizione della tabella kafka e' possibile utilizzare un numero superiore di Consumer, utilizzando il relativo parametro. E' pero' importante non superare il numero di partizioni definite sul Topic.
Se si utilizza un cluster e si vuole parallelizzare la raccolta dei messaggi da Kafka, e' sufficiente definire lo stesso Consumer Group sulle tabelle Kafka ed una tabella distribuita come base table; anche in questo caso non deve essere superato il numero di partizioni del Topic affinche' i Consumer siano tutti attivi.
La materialized view utilizzata per leggere i messaggi dalla tabella su Engine Kafka bufferizza i messaggi per rendere piu' efficiente l'inserimento dei dati. Questo genera un leggero ritardo ma e' inevitabile ed e' un caso analogo a quando vengono eseguite INSERT singole in ClickHouse.

Per l'Engine Kafka sono disponibili le seguenti colonne virtuali:

Ecco come sfruttare le colonne virtuali per controllare il lag dei caricamenti con Kafka:

CREATE MATERIALIZED VIEW kafka_mv TO kafka_store_tab
(
    `timestamp_k` DateTime, 
    `timestamp_ch` DateTime, 
    `timestamp` DateTime, 
    `Device_ID` String, 
...
) AS
SELECT 
    _timestamp AS timestamp_k, 
    now() AS timestamp_ch, 
    Timestamp,
    Device_ID, 
...
FROM kafka_engine_tab;

SELECT
    timestamp as t,
    avg(timestamp_ch-timestamp_k) KafkaLAG,
    avg(timestamp_ch-Collection_Date_Time) DataLAG
  FROM kafka_store_tab
 -- WHERE condition if needed
 GROUP BY t
 ORDER BY t;

Configurazione

L'Engine Kafka e' implementato utilizzando la libreria librdkafka che e' la libreria C/C++ Apache Kafka di riferimento.

Oltre ai parametri disponibili al momento della creazione delle tabelle e' possibile impostare una serie di impostazioni di configurazione di dettaglio. Come e' noto il file di configurazione di ClickHouse e' /etc/clickhouse-server/config.xml ed e' utilizzato anche per configurare l'Engine Kafka. Sono disponibili due tipologie di parametri con il prefisso: kafka, relativi alle impostazioni globali, e kafka_topic, relativi ai topic. L'elenco completo dei parametri e' pubblicato su documentazione della libreria librdkafka.

La configurazione si effettua nel file config.xml e generalmente le impostazioni di default sono gia' adeguate. La configurazione e' invece necessaria se viene utilizzato l'SSL o vanno impostati dimensioni differenti dei messaggi.
Per configurare il debug (che invia tutti i messaggi nel file stderr.log) la configurazione e' la seguente:

<yandex> ... <kafka> <debug>all</debug> </kafka> ... </yandex>

Questo documento non entra nei dettagli della configurazione dei broker Kafka... tuttavia qualche elemento puo' essere comunque utile!
Per consumare in parallelo dallo stesso topic e' necessario definire un numero sufficiente di partizioni: tipicamente il doppio del numero di consumer. In ogni caso il numero dei consumer in un consumer group non deve superare il numero di partizioni. Su ClickHouse e' opportuno mantenere il parametro kafka_num_consumers=1 e, se e' realmente necessario parallelizzare le operazioni, definire piu' tabelle con Engine Kafka sullo stesso consumer group (sullo stesso nodo o su nodi differenti in cluster con Engine Distribuited). Se il numero di tabelle con Engine=Kafka e' elevato e' possibile che il background_schedule_pool_size non sia adeguato [NdA il default e' 16], la verifica si effettua monitorando il valore della metrica BackgroundSchedulePoolTask.
Per una configurazione in HA di Kafka e' necessario disporre di almeno 3 broker con repl.factor=3 e min.insync.replicas=2. E' possibile scalare aumentando il numero di broker kafka.
Come e' noto una configurazione Kafka richiede ZooKeeper; per una configurazione in HA e' necessario un cluster (ensemble) di 3 nodi ZooKeeper. Con 3 nodi si sopporta la caduta di un nodo, con 5 nodi possono essere fuori linea fino a due nodi. Piu' nodi sono presenti nel cluster Zookeeper piu' rallenta. Zookeeper richiede poca latenza quindi generalmente si installa su server dedicati o che operano con servizi con poco carico.
Per una configurazione in replica o (XOR) in sharding di Clickhouse sono sufficienti due nodi. Clickhouse utilizza ZooKeeper solo per la replica; lo Zookeeper utilizzato da Kafka e da Clickhouse puo' essere lo stesso (tipicamente Zookeeper e' utilizzato da piu' servizi). Come base table per le materialized view su Kafka possono essere utilizzate normali tabelle MergeTree o anche tabelle Distribuited.

Quando sono presenti piu' ambienti puo' essere utile utilizzare le macro per rendere parametrica la definizione delle tabelle con Engine Kafka:

...
ENGINE = Kafka()
SETTINGS kafka_broker_list = '{kafka_cluster}',
         kafka_topic_list = '{env}.TOPIC_I',
         kafka_group_name = '{replica}.CONSUMER_G',
         kafka_format = 'CSV';

Lo skip dei messaggi con formato errato ha un comportamento un poco particolare ma puo' essere utilizzato il parametro kafka_handle_error_mode impostato a stream che consente un maggior controllo.
In alcuni casi puo' essere utile effettuare il tuning dei parametri background_pool_size e background_schedule_pool_size.

Varie ed eventuali

ClickHouse e' un database colonnare Open Source sviluppato da Yandex, con sorgenti su Github dal 2016 distribuito con una licenza molto libera: Apache License 2.0. Ovviamente anche Apache Kafka e' fornito con la licenza Apache.

L'engine Kafka su ClickHouse e' disponibile dal 2017-11: v. 1.1.54310 e viene continuamente aggiornato: multiple topics, custom separators, SETTINGS, background thread, librdkafka 1.1, virtual columns, INSERT support (producer), atomic insert, ... In alcune versioni di ClickHouse l'Engine Kafka presentava problemi ma e' sufficiente utilizzare una versione piu' recente di ClickHouse per evitare errori e disporre di maggiori funzionalita'.
Molteplici Kafka enhancements sono stati effettuati nella versione 20.4 (tra cui il fix di alcuni simpatici problemi come quello del DROP).
Nella 21.12 sono stati introdotti controlli per evitare letture dirette dall'Engine Kafka con i parametri stream_like_engine_allow_direct_select kafka_commit_on_select entrambe impostati a 0 per default. Ma ancora piu' utile e' la possibilita' di utilizzare le named_collection nella configurazione dei parametri di Kafka.
Nella release 22.7 e' stato modificato a 0 il valore di default di format_csv_allow_single_quotes.
Significativi sono stati gli aggiornamenti effettuati nella versione 23 di ClickHouse in cui e' stato effettuato un refactor di tutti gli streaming engines come Kafka e RabbitMQ e l'introduzione dell'utile ma travagliata vista system.kafka_consumers nella 23.8.
Nella versione 24 e' stato unificato il trattamento delle named collection XML ed SQL permettendo anche la creazione di nuove configurazioni XML senza richiedere riavvio.
Nel documento Your Server Stinks! sono mantenute apposite sezioni aggiornate con in principali rilasci di ClickHouse e Kafka.

La documentazione ufficiale ClickHouse sull'Engine Kafka si trova su questo link. Su questo argomento consiglio la lettura di questa pagina: ClickHouse Kafka FAQ.
Un'introduzione a Kafka si trova in questo documento, in realta' ci sono documenti migliori ma il link proposto e' in italiano e poi... l'ho scritto io! Altri documenti su ClickHouse sono Introduzione a ClickHouse, CH Materialized views, CH DBA scripts, Architettura ClickHouse e la versione precedente di questa pagina ClickHouse & Kafka (Legacy).


Titolo: ClickHouse & Kafka
Livello: Medio (2/5)
Data: 2 Agosto 2019
Versione: 1.0.4 - 15 Agosto 2023
Autore: mail [AT] meo.bogliolo.name