Connect Apache Kafka with Greenplum DB

Yaniv Ben Hemo
4 min readJan 31, 2019

The Greenplum Stream Server (GPSS) is an ETL (extract, transform, load) tool. An instance of the GPSS server ingests streaming data from one or more clients, using Greenplum Database readable external tables to transform and insert the data into a target Greenplum table. The data source and the format of the data are specific to the client.

The Greenplum Stream Server includes the gpss command-line utility. When you run gpss, you start an instance of GPSS; this instance waits indefinitely for client data.

The Greenplum Stream Server also includes the gpsscli command-line utility, a client tool for submitting data load jobs to a GPSS instance and managing those jobs.

Limitations

The Greenplum Stream Server does not support loading data from multiple Kafka topics to the same Greenplum Database table. All jobs will hang if GPSS encounters this situation.

GPSS Architecture

Lets start working..

Some Preparations first -

GPMaster Side

  1. Prepare a database + table which will receive the kafka data

testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );

2. Registering the GPSS Extension

You must explicitly register the Greenplum Stream Server extension in each database in which you will use GPSS to write data to Greenplum tables.

$ ssh gpadmin@gpmaster

gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh

mdw$ psql -d testdb

testdb=# CREATE EXTENSION gpss;

Perform steps 3 and 4 for each database in which the Greenplum Stream Server will write client data.

3. Configuring the Greenplum Stream Server over our Master

Our stream server will combine within the same process both gpss listener + gpfdist

  • Create “gpss_config.json” file that responsible to configure our GPSS service

Default host address is — localhost

Example:

{

“ListenAddress”: {

“Host”: “”,

“Port”: 50007,

“SSL”: false

},

“Gpfdist”: {

“Host”: “”,

“Port”: 8319

}

}

gpmaster$ gpss gpss_config.json — log-dir . &

  • Now we have both ‘gpfdist’ listener to dispatch data to our segments nodes + ‘gpss’ listener to receive data from Kafka client node.

4. Create ‘jsonload_cfg.yaml’ file

In this file we configure our data source and where to put it, on which database and table.

https://gpdb.docs.pivotal.io/5110/greenplum-kafka/load-json-example.html

Example:

Client Side

  1. Install Kafka

In this tutorial we will use kafka as our message broker or ETL server

That receive topics or data and send them over to our gpss listener at the master node.

2. Make sure you have a routing between kafka server to gpmaster server on port 9092

3. Create a Kafka topic

Greenplum has a limitation of 1:1 topic to table. It cannot ingress several topics to 1 table.

https://gpdb.docs.pivotal.io/5110/greenplum-kafka/load-json-example.html

Example:

kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh — create \

— zookeeper localhost:2181 — replication-factor 1 — partitions 1 \

— topic topic_json_gpkafka

4. Insert data to the new kafka topic

kafkahost$ vi sample_data.json

Copy and paste:

{ “cust_id”: 1313131, “month”: 12, “expenses”: 1313.13 }

{ “cust_id”: 3535353, “month”: 11, “expenses”: 761.35 }

{ “cust_id”: 7979797, “month”: 10, “expenses”: 4489.00 }

{ “cust_id”: 7979797, “month”: 11, “expenses”: 18.72 }

{ “cust_id”: 3535353, “month”: 10, “expenses”: 6001.94 }

{ “cust_id”: 7979797, “month”: 12, “expenses”: 173.18 }

{ “cust_id”: 1313131, “month”: 10, “expenses”: 492.83 }

{ “cust_id”: 3535353, “month”: 12, “expenses”: 81.12 }

{ “cust_id”: 1313131, “month”: 11, “expenses”: 368.27 }

5. Stream the data into kafka topic we created

kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \

— broker-list localhost:9092 \

— topic topic_json_gpkafka < sample_data.json

6. Verify the data got inserted

kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \

— bootstrap-server localhost:9092 — topic topic_json_gpkafka \

— from-beginning

Submit a job -

  1. On the master side -

gpmaster$ gpsscli submit — name kafkajson2gp — gpss-port 50007 ./jsonload_cfg.yaml

List all the jobs

gpmaster$ gpsscli list — all — gpss-port 50007

Start the job

gpmaster$ gpsscli start kafkajson2gp — gpss-port 50007

To stop receiving and insert the new rows — stop the job

gpmaster$ gpsscli stop kafkajson2gp — gpss-port 50007

2. Examine the ‘gpss’ command output. It should look like that -

… -[INFO]:- … Inserted 9 rows

… -[INFO]:- … Rejected 0 rows

3. View the new content at the table we have created

gpmaster$ psql -d testdb

testdb=# SELECT * FROM json_from_kafka WHERE customer_id=’1313131'

ORDER BY amount_paid;

customer_id | month | amount_paid

— — — — — — -+ — — — -+ — — — — — — -

1313131 | 11 | 368.27

1313131 | 10 | 492.83

1313131 | 12 | 1313.13

--

--

Yaniv Ben Hemo

A developer, technologist, and entrepreneur. Co-Founder and CEO at Memphis.dev. Trying to make developers' lives a bit easier.