Kafka in Kubernetes with Spring

Kafka in Kubernetes

Kafka in Kubernetes with Spring

We install Kafka in Kubernetes with Istio Service Mesh and a Spring boot application to send and receive messages with Kafka.

What does Kafka give you?
Speed. High Volume, across commodity hardware. Guaranteed ordering. Message Replay. Horizontal (Distributed) Scaling. Replication for resilience.

Where to use it?
Event driven applications. Event sourcing with event logs. Metrics. Real time stream processing. Used in areas where RabbitMQ and AMQP are unable to handle the volume and speed needed.

Our Setup
Docker desktop in macos, with it's in built k8s. Set the resources to at least 6G, and cores to 2+.

Installing Kafka on K8s


This is based on https://strimzi.io/quickstarts/

If you have followed my previous posts with Istio the following namespace should already have been created.

kubectl create namespace vadal

and this namespace has already been assigned to Istio for sidecar injection.

kubectl label namespace vadal istio-injection=enabled

Kafka Setup

Add required CRDs

kubectl apply -f 'https://strimzi.io/install/latest?namespace=vadal' -n vadal

Create the persistent kafka cluster

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n vadal

Wait for it.

kubectl wait kafka/my-cluster --for=condition=Ready --timeout=300s -n vadal

Check the pods

kc get pods -n vadal | grep cluster
my-cluster-entity-operator-6b7d7657dd-2pv6s   4/4     Running   1          76s
my-cluster-kafka-0                            3/3     Running   0          99s
my-cluster-zookeeper-0                        2/2     Running   0          2m10s
strimzi-cluster-operator-6c9d899778-ql9nn     2/2     Running   0          4m49s

Check Connectivity

Send

kubectl -n vadal run kafka-producer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic

If you don't see a command prompt, try pressing enter.

hi there
>

Receive

kubectl -n vadal run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
{my-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

hi there

View in Kiali


This will only work if you have Istio installed with all the goodies. Also you will need to have followed the example using a namespace labelled with istio injection (see above).

istioctl dashboard kiali

A lot going on here as you can see. There are the bootstrap, broker and zookeeper pods at play.

Spring Kafka

Application

@SpringBootApplication
@RestController
@Slf4j
public class VadalApplication {

    public static void main(String[] args) {
        SpringApplication.run(VadalApplication.class, args);
    }

    private static final String TOPIC = "my-topic";

    private final List<String> CONSUMED_MESSAGES = new ArrayList<>();

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = TOPIC, groupId = "group_id")
    public void consume(String m) {
        log.info("Message consumed: {}", m);
        CONSUMED_MESSAGES.add(m);
    }

    @GetMapping("/pub/{m}")
    public void produce(@PathVariable String m) {
        log.info("Message produced: {}", m);
        kafkaTemplate.send(TOPIC, m);
    }

    @GetMapping("/get")
    public List<String> get() {
        log.info("Get consumed messages");
        List<String> l = new ArrayList<>(CONSUMED_MESSAGES);
        CONSUMED_MESSAGES.clear();
        return l;
    }


}

The Spring Kafka wiring will create the topic if it does not exist (no need for the NewTopic method although you can add it).

The /pub endpoint publishes the message string, the KafkaListener receives the messages and stores them in a list. The /get endpoint retrieves from this list. This is obviously a contrived example to demonstrate Kafka interaction with Java Spring.

If you have multiple instances, then to save the message, you would need a common store such as a Db or a DataGrid.

POM

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-rest</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

I've left in spring-boot-starter-data-rest from previous applications, it can be replaced by spring-boot-starter-web if you wish. The parent pom has the main spring, lombok etc dependencies (see the full code here).

application.yml

server.port: 7777

spring:
  kafka:
    consumer:
      bootstrap-servers: my-cluster-kafka-bootstrap:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: my-cluster-kafka-bootstrap:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


management:
  endpoints:
    web:
      exposure:
        include: "*"

Note the bootstrap-servers are those we created earlier with strimzi CRDs.

Deploy and Serve

kubectl create deployment vkafka --image vadal-kafka-message:0.0.1-SNAPSHOT -n vadal
kubectl expose deployment vkafka --type NodePort --port 8888 --target-port 7777 -n vadal
kc get svc -n vadal
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
my-cluster-kafka-bootstrap    ClusterIP   10.107.228.196   <none>        9091/TCP,9092/TCP,9093/TCP   38h
my-cluster-kafka-brokers      ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   38h
my-cluster-zookeeper-client   ClusterIP   10.111.83.76     <none>        2181/TCP                     38h
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   38h
vkafka                        NodePort    10.102.114.125   <none>        8888:32758/TCP               37h

Send a message through our application

curl -i localhost:32758/pub/test-message
HTTP/1.1 200 OK
content-length: 0
date: Mon, 20 Jul 2020 13:32:56 GMT
x-envoy-upstream-service-time: 122
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*

Check the logs

kc get po -n vadal
NAME                                          READY   STATUS    RESTARTS   AGE
vkafka-546f454db9-kpkv9                       2/2     Running   1          36h
kc logs vkafka-546f454db9-kpkv9  -n vadal vadal-kafka-message
2020-07-20 13:32:57.153  INFO 1 --- [nio-7777-exec-2] u.c.a.vadalkafka.VadalApplication        : Message produced: test-message
2020-07-20 13:32:57.162  INFO 1 --- [nio-7777-exec-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2020-07-20 13:32:57.186  INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.0
2020-07-20 13:32:57.186  INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2020-07-20 13:32:57.186  INFO 1 --- [nio-7777-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1595251977185
2020-07-20 13:32:57.201  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: tao-huzJTTGmMCqwhtQF1Q
2020-07-20 13:32:57.280  INFO 1 --- [ntainer#0-0-C-1] u.c.a.vadalkafka.VadalApplication        : Message consumed: test-message

We can see the listener consuming the message.

Lets grab it via our get endpoint

curl -i localhost:32758/get
HTTP/1.1 200 OK
content-type: application/json
date: Mon, 20 Jul 2020 13:38:37 GMT
x-envoy-upstream-service-time: 12
server: istio-envoy
x-envoy-decorator-operation: vkafka.vadal.svc.cluster.local:8888/*
transfer-encoding: chunked
["test-message"]

Kafka GUI

There is a Kakfa GUI which can be deployed to K8s.

See:
https://github.com/spring-projects/spring-kafka/tree/master/samples

helm repo add akhq https://akhq.io/
helm install --name akhq akhq/akhq --namespace vadal

Edit the secret to change the bootstrap service from kafka:9092 to my-cluster-kafka-bootstrap:9092 (you can do it by hand or use Lens https://k8slens.dev/)

Edit service to provide a nodeport say 30011, delete the pod and let it restart, then you can navigate to it:

http://localhost:30011/my-cluster-plain-text/topic

Conclusion

There we go, Kakfa running in K8s, a spring boot application which reads and writes to it and a GUI to boot.

Further Reading

https://strimzi.io/docs/operators/latest/overview.html#configuration-points-connect_str

Related Article