Apache Kafka is a distributed event streaming platform used for high-performance data pipelines, streaming analytics, and data integration. Apache Kafka Connect is a tool to scalably and reliably stream data between Apache Kafka® and other data systems. Kafka Connect is an ecosystem of pre-written and maintained Kafka Producers (source connectors) and Kafka Consumers (sink connectors) for data products and platforms like databases and message brokers.
This guide explains how to set up Kafka and Kafka Connect to stream data from a Kafka topic into your Timescale Cloud service.
To follow the procedure on this page, you need to:
Create a target Timescale Cloud service
You need your connection details to follow the steps in this page. This procedure also works for self-hosted TimescaleDB.
- Java8 or higher to run Apache Kafka
To install and configure Apache Kafka:
Extract the Kafka binaries to a local folder
curl https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz | tar -xzf -cd kafka_2.13-3.9.0From now on, the folder where you extracted the Kafka binaries is called
.Configure and run Apache Kafka
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"./bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/kraft/reconfig-server.properties./bin/kafka-server-start.sh config/kraft/reconfig-server.propertiesUse the
flag to run this process in the background.Create Kafka topics
In another Terminal window, navigate to <KAFKA_HOME>, then call
and create the following topics:accounts
: publishes JSON messages that are consumed by the timescale-sink connector and inserted into your Timescale Cloud service.deadletter
: stores messages that cause errors and that Kafka Connect workers cannot process.
./bin/kafka-topics.sh \--create \--topic accounts \--bootstrap-server localhost:9092 \--partitions 10./bin/kafka-topics.sh \--create \--topic deadletter \--bootstrap-server localhost:9092 \--partitions 10Test that your topics are working correctly
- Run
to send messages to theaccounts
topic:bin/kafka-console-producer.sh --topic accounts --bootstrap-server localhost:9092 - Send some events. For example, type the following:>Timescale Cloud>How Cool
- In another Terminal window, navigate to <KAFKA_HOME>, then run
to consume the events you just sent:You seebin/kafka-console-consumer.sh --topic accounts --from-beginning --bootstrap-server localhost:9092Timescale CloudHow Cool
- Run
Keep these terminals open, you use them to test the integration later.
To set up Kafka Connect server, plugins, drivers, and connectors:
Install the PostgreSQL connector
In another Terminal window, navigate to <KAFKA_HOME>, then download and configure the PostgreSQL sink and driver.
mkdir -p "plugins/camel-postgresql-sink-kafka-connector"curl https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-postgresql-sink-kafka-connector/3.21.0/camel-postgresql-sink-kafka-connector-3.21.0-package.tar.gz \| tar -xzf - -C "plugins/camel-postgresql-sink-kafka-connector" --strip-components=1curl -H "Accept: application/zip" https://jdbc.postgresql.org/download/postgresql-42.7.5.jar -o "plugins/camel-postgresql-sink-kafka-connector/postgresql-42.7.5.jar"echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-distributed.properties"echo "plugin.path=`pwd`/plugins/camel-postgresql-sink-kafka-connector" >> "config/connect-standalone.properties"Start Kafka Connect
export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*./bin/connect-standalone.sh config/connect-standalone.propertiesUse the
flag to run this process in the background.Verify Kafka Connect is running
In yet another another Terminal window, run the following command:
curl http://localhost:8083You see something like:
To prepare your Timescale Cloud service for Kafka integration:
Connect to your Timescale Cloud service
Create a table to ingest Kafka events
CREATE TABLE accounts (created_at TIMESTAMPTZ DEFAULT NOW(),name TEXT,city TEXT);Turn the table into a hypertable
SELECT create_hypertable('accounts', 'created_at');
To create a Timescale Cloud sink in Apache Kafka:
Create the connection configuration
In the terminal running Kafka Connect, stop the process by pressing
.Write the following configuration to
, then update the<properties>
with your connection details.name=timescale-standalone-sinkconnector.class=org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnectorerrors.tolerance=allerrors.deadletterqueue.topic.name=deadlettertasks.max=10value.converter=org.apache.kafka.connect.storage.StringConverterkey.converter=org.apache.kafka.connect.storage.StringConvertertopics=accountscamel.kamelet.postgresql-sink.databaseName=<dbname>camel.kamelet.postgresql-sink.username=<user>camel.kamelet.postgresql-sink.password=<password>camel.kamelet.postgresql-sink.serverName=<host>camel.kamelet.postgresql-sink.serverPort=<port>camel.kamelet.postgresql-sink.query=INSERT INTO accounts (name,city) VALUES (:#name,:#city)Restart Kafka Connect with the new configuration:
export CLASSPATH=`pwd`/plugins/camel-postgresql-sink-kafka-connector/*./bin/connect-standalone.sh config/connect-standalone.properties config/timescale-standalone-sink.properties
Test the connection
To see your sink, query the
route in a GET request:curl -X GET http://localhost:8083/connectorsYou see:
To test this integration, send some messages onto the accounts
topic. You can do this using the kafkacat or kcat utility.
In the terminal running
enter the following json strings{"name":"Lola","city":"Copacabana"}{"name":"Holly","city":"Miami"}{"name":"Jolene","city":"Tennessee"}{"name":"Barbara Ann ","city":"California"}Look in your terminal running
to see the messages being processed.Query your Timescale Cloud service for all rows in the
tableSELECT * FROM accounts;You see something like:
created_at name city 2025-02-18 13:55:05.147261+00 Lola Copacabana 2025-02-18 13:55:05.216673+00 Holly Miami 2025-02-18 13:55:05.283549+00 Jolene Tennessee 2025-02-18 13:55:05.35226+00 Barbara Ann California
You have successfully integrated Apache Kafka with Timescale Cloud.
Found an issue on this page?Report an issue or Edit this page in GitHub.