Simple Knative Eventing in Kubernetes with a Java Listener and Sender

Simple Knative Eventing in Kubernetes with a Java Listener and Sender

Previously we ran a service using Knative Server and kicked it off with a http request.

Here we will write our own Knative services that act on events a listener and a sender both in Java using Spring Boot.

Set up

Docker Desktop (macos) 2.3.03
K8s 1.16.5
Istio 1.6.4

Install Eventing

From https://knative.dev/development/install/any-kubernetes-cluster/#installing-the-eventing-component

Install the eventing components:

kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-crds.yaml

customresourcedefinition.apiextensions.k8s.io/apiserversources.sources.knative.dev created
customresourcedefinition.apiextensions.k8s.io/brokers.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/channels.messaging.knative.dev created
etc
kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-core.yaml

namespace/knative-eventing created
serviceaccount/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-resolver created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-source-observer created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-sources-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-manipulator created
serviceaccount/pingsource-mt-adapter created
clusterrolebinding.rbac.authorization.k8s.io/knative-eventing-pingsource-mt-adapter created
serviceaccount/eventing-webhook created

etc

Istio Cluster Gateway

This is needed for correct service discovery in Knative Eventing service startup.

cat << EOF > ./istio-minimal-operator.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
  values:
    global:
      proxy:
        autoInject: disabled
      useMCP: false
      # The third-party-jwt is not enabled on all k8s.
      # See: https://istio.io/docs/ops/best-practices/security/#configure-third-party-service-account-tokens
      jwtPolicy: first-party-jwt

  addonComponents:
    pilot:
      enabled: true
    prometheus:
      enabled: false

  components:
    ingressGateways:
      - name: istio-ingressgateway
        enabled: true
      - name: cluster-local-gateway
        enabled: true
        label:
          istio: cluster-local-gateway
          app: cluster-local-gateway
        k8s:
          service:
            type: ClusterIP
            ports:
            - port: 15020
              name: status-port
            - port: 80
              name: http2
            - port: 443
              name: https
EOF

istioctl manifest apply -f istio-minimal-operator.yaml

Event Display and Ping Source

First we will use a pre-defined Event Display Service.

cat <<EOF | kubectl create -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: event-display
spec:
  template:
    spec:
      containers:
        - image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
EOF
kn service list
NAME            URL                                        LATEST   AGE   CONDITIONS   READY     REASON
event-display   http://event-display.default.example.com            7s    0 OK / 3     Unknown   RevisionMissing : Configuration "event-display" is waiting for a Revision to become ready.

Let's create a standard Ping Source

cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *"
  jsonData: '{"message": "Hello world!"}'
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display
EOF

This will send the message "Hello World" every two minutes.

kn source list
NAME               TYPE         RESOURCE                          SINK                 READY
test-ping-source   PingSource   pingsources.sources.knative.dev   ksvc:event-display   True

See if the event display service is receiving events.

kubectl logs -l serving.knative.dev/service=event-display -c user-container --since=10m

specversion: 1.0
type: dev.knative.sources.ping
source: /apis/v1/namespaces/default/pingsources/test-ping-source
id: c9270430-a649-4980-87a1-f9a7a985d2ac
time: 2020-08-05T00:32:00.003686838Z
datacontenttype: application/json
Data,
{
"message": "Hello world!"
}

You will see the pod being created and deleted which each ping (every 2 mins)

All good our Knative eventing services have been installed and are working.

Clean up

kubectl delete pingsources.sources.knative.dev test-ping-source

or

kn source ping delete test-ping-source

And also delete the event-display

kn service delete event-display

Java Based Event Listener

Let's write our own event listener in Java, in kn-listener sub project.

@Slf4j
@RestController
@SpringBootApplication
public class KnEventListener {

    public static void main(String[] args) {
        SpringApplication.run(KnEventListener.class, args);
    }

    @GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity<String> status() {
        return ResponseEntity.ok("{\"status\": \"UP\"}");
    }


    @PostMapping(value = "/", consumes = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity<Void> event(@RequestHeader Map<String, Object> headers, @RequestBody String body) throws Exception {
        log.info(LocalDateTime.now() + ", " + body);
        return ResponseEntity.accepted().build();
    }

}

build.gradle (parent)

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id "io.freefair.lombok" version "5.1.1"
}


subprojects {

    group 'uk.co.vadalg'
    version '1.0-SNAPSHOT'

    apply plugin: 'application'
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'io.freefair.lombok'

    repositories {
        jcenter()
    }

}

build.gradle (listener)


mainClassName  = "uk.co.ac.vadalg.KnEventListener"

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

test {
    useJUnitPlatform()
}

bootBuildImage {
    imageName = 'localhost:5007/kn-listener'
}

Create docker image (using gradle)

gradle bootBuildImage

then push to local docker  registry and create the service (see previous blog)

docker push localhost:5007/kn-listener
kn service create knlistener --image localhost:5007/kn-listener

If you have docker image connection issues you may need to check you have edited the configmap (see previous blog )

Test if the service is up:

curl -i -H "Host: knlistener.default.example.com" localhost
HTTP/1.1 200 OK
content-length: 16
content-type: application/json
date: Thu, 06 Aug 2020 22:27:39 GMT
x-envoy-upstream-service-time: 5
server: istio-envoy

{"status": "UP"}

Start the ping again but this time referencing knlistener

cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
  name: test-ping-source
spec:
  schedule: "*/2 * * * *"
  jsonData: '{"message": "Hi earth!"}'
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: knlistener
EOF

Check the logs

kubectl logs -l serving.knative.dev/service=knlistener -c user-container --since=10m

2020-08-06 22:32:03.778  INFO 1 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-08-06 22:32:03.778  INFO 1 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1073 ms
2020-08-06 22:32:04.052  INFO 1 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-06 22:32:04.271  INFO 1 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-08-06 22:32:04.329  INFO 1 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-06 22:32:04.342  INFO 1 --- [           main] uk.co.ac.vadalg.KnEventListener          : Started KnEventListener in 2.058 seconds (JVM running for 2.479)
2020-08-06 22:32:04.544  INFO 1 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-08-06 22:32:04.545  INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-08-06 22:32:04.550  INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet        : Completed initialization in 5 ms
2020-08-06 22:32:04.641  INFO 1 --- [nio-8080-exec-3] uk.co.ac.vadalg.KnEventListener          : 2020-08-06T22:32:04.641, {"message":"Hi earth!"}

Boing, spring event listener. Nice.

Delete the test source

kn source ping delete test-ping-source

Can also test the kn service listener with a curl pod:

kubectl run curl \
    --image=curlimages/curl --rm=true --restart=Never -ti -- \
    -X POST -v \
    -H "content-type: application/json"  \
    -H "ce-specversion: 1.0"  \
    -H "ce-source: http://curl-command"  \
    -H "ce-type: curl.demo"  \
    -H "ce-id: ekbetron"  \
    -d '{"name":"lightphos"}' \
    http://knlistener.default.svc

Java Based Event Sender

How about writing our own Kn event source in Java?

We need three things a sender, sink binding and a cronjob. We will keep the existing knlistener we wrote above to receive the events.

Simple Java Sender

@Slf4j
@RestController
@SpringBootApplication
public class KnEventSender {

    public static void main(String[] args) {
        SpringApplication.run(KnEventSender.class, args);
    }

    @Value("${ksink}")
    private String sink;

    @PostConstruct
    public void fire() {
        try {
            log.info("Fire {} ", send());
            System.exit(0);
        }
        catch (Exception e ) {
            log.error("sink {}", sink);
            log.error("error ", e);
        }
    }

    @GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
    public ResponseEntity<String> send() {
        log.info("Sink {}", sink);
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);

        HttpEntity<String> request = new HttpEntity<>("{ 'test': 'from kn sender " + Math.random() + "'}", headers);

        return restTemplate.postForEntity(sink, request, String.class);
    }

}

application.yml (note the K_SINK environment variable)

spring.application.name: kneventsender

ksink: ${K_SINK}

build.gradle (kn-sender)


mainClassName  = "uk.co.ac.vadalg.KnEventSender"

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

test {
    useJUnitPlatform()
}

bootBuildImage {
    imageName = 'localhost:5007/kn-sender'
}

Build and push to local docker registry

gradle bootBuildImage
docker push localhost:5007/kn-sender

No need to create a sender service as we will use a cronjob to do that and fire events periodically.

Sink Binding

This references the cronjob (knsender-cron) and binds the service created by it to our kn listener.

apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
  name: knsender-heartbeat
spec:
  subject:
    apiVersion: batch/v1
    kind: Job
    selector:
      matchLabels:
        app: knsender-cron

  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: knlistener
  ceOverrides:
    extensions:
      sink: bound
kubectl apply -f <above>

CronJob Trigger

This will set up a cron job to run every minute or so and kick off our java sender in concert with the sink bind above.

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: knsender-cron
spec:
  # Run every minute
  schedule: "* * * * *"
  jobTemplate:
    metadata:
      labels:
        app: knsender-cron
    spec:
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: knsender-heartbeat
              image: localhost:5007/kn-sender
kubectl apply -f <above>

Check out Jobs

kubectl  get cronjob
NAME            SCHEDULE    SUSPEND   ACTIVE   LAST SCHEDULE   AGE
knsender-cron   * * * * *   False     0        28s             30m
kubectl get job
NAME                       COMPLETIONS   DURATION   AGE
knsender-cron-1597849380   1/1           4s         3m2s
knsender-cron-1597849440   1/1           4s         2m2s
knsender-cron-1597849500   1/1           9s         62s
knsender-cron-1597849560   0/1           2s         2s

Listener

The listener should be getting invoked and receiving events:

kubectl logs -l serving.knative.dev/service=knlistener -c user-container --since=10m
2020-08-19 15:00:07.106  INFO 1 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1064 ms
2020-08-19 15:00:07.432  INFO 1 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-19 15:00:07.632  INFO 1 --- [           main] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-08-19 15:00:07.700  INFO 1 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-19 15:00:07.733  INFO 1 --- [           main] uk.co.ac.vadalg.KnEventListener          : Started KnEventListener in 2.14 seconds (JVM running for 2.508)
2020-08-19 15:00:07.753  INFO 1 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-08-19 15:00:07.753  INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2020-08-19 15:00:07.758  INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet        : Completed initialization in 5 ms
2020-08-19 15:00:07.834  INFO 1 --- [nio-8080-exec-3] uk.co.ac.vadalg.KnEventListener          : 2020-08-19T15:00:07.834, { 'test': 'from kn sender 0.9763648332730366'}
2020-08-19 15:01:03.744  INFO 1 --- [nio-8080-exec-9] uk.co.ac.vadalg.KnEventListener          : 2020-08-19T15:01:03.744, { 'test': 'from kn sender 0.8349076066212481'}

There you are, java sender and listener via knative.

Delete

kubectl delete cronjob knsender-cron
cronjob.batch "knsender-cron" deleted
kubectl get po
NAME                                             READY   STATUS        RESTARTS   AGE
knlistener-rzcpv-1-deployment-789df7845b-bgrmn   2/2     Terminating   0          4m3s

Conclusion

We installed our knative event dependencies, tested the installation using standard event resources then created our own Java listener and Java sender to receive and send json based events in a 'serverless' manner (the pods are removed after inactivity of 30 secs).

Event Sources

Knative comes built with core sources of events such as:

  • ApiServerSource, fired when a k8s resource is created
  • PingSource (cronjob)
  • ContainerSource, container images as sources
  • KafkaSource

See https://knative.dev/docs/eventing/sources/

Source Code

All source code can be found here:

https://gitlab.com/lightphos/spring/vadalg

Related Article