Simple Knative Eventing in Kubernetes with a Java Listener and Sender

Previously we ran a service using Knative Server and kicked it off with a http request.
Here we will write our own Knative services that act on events a listener and a sender both in Java using Spring Boot.
Set up
Docker Desktop (macos) 2.3.03
K8s 1.16.5
Istio 1.6.4
Install Eventing
From https://knative.dev/development/install/any-kubernetes-cluster/#installing-the-eventing-component
Install the eventing components:
kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-crds.yaml
customresourcedefinition.apiextensions.k8s.io/apiserversources.sources.knative.dev created
customresourcedefinition.apiextensions.k8s.io/brokers.eventing.knative.dev created
customresourcedefinition.apiextensions.k8s.io/channels.messaging.knative.dev created
etc
kubectl apply --filename https://github.com/knative/eventing/releases/download/v0.16.0/eventing-core.yaml
namespace/knative-eventing created
serviceaccount/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-resolver created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-source-observer created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-sources-controller created
clusterrolebinding.rbac.authorization.k8s.io/eventing-controller-manipulator created
serviceaccount/pingsource-mt-adapter created
clusterrolebinding.rbac.authorization.k8s.io/knative-eventing-pingsource-mt-adapter created
serviceaccount/eventing-webhook created
etc
Istio Cluster Gateway
This is needed for correct service discovery in Knative Eventing service startup.
cat << EOF > ./istio-minimal-operator.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
values:
global:
proxy:
autoInject: disabled
useMCP: false
# The third-party-jwt is not enabled on all k8s.
# See: https://istio.io/docs/ops/best-practices/security/#configure-third-party-service-account-tokens
jwtPolicy: first-party-jwt
addonComponents:
pilot:
enabled: true
prometheus:
enabled: false
components:
ingressGateways:
- name: istio-ingressgateway
enabled: true
- name: cluster-local-gateway
enabled: true
label:
istio: cluster-local-gateway
app: cluster-local-gateway
k8s:
service:
type: ClusterIP
ports:
- port: 15020
name: status-port
- port: 80
name: http2
- port: 443
name: https
EOF
istioctl manifest apply -f istio-minimal-operator.yaml
Event Display and Ping Source
First we will use a pre-defined Event Display Service.
cat <<EOF | kubectl create -f -
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
EOF
kn service list
NAME URL LATEST AGE CONDITIONS READY REASON
event-display http://event-display.default.example.com 7s 0 OK / 3 Unknown RevisionMissing : Configuration "event-display" is waiting for a Revision to become ready.
Let's create a standard Ping Source
cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
name: test-ping-source
spec:
schedule: "*/2 * * * *"
jsonData: '{"message": "Hello world!"}'
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
EOF
This will send the message "Hello World" every two minutes.
kn source list
NAME TYPE RESOURCE SINK READY
test-ping-source PingSource pingsources.sources.knative.dev ksvc:event-display True
See if the event display service is receiving events.
kubectl logs -l serving.knative.dev/service=event-display -c user-container --since=10m
specversion: 1.0
type: dev.knative.sources.ping
source: /apis/v1/namespaces/default/pingsources/test-ping-source
id: c9270430-a649-4980-87a1-f9a7a985d2ac
time: 2020-08-05T00:32:00.003686838Z
datacontenttype: application/json
Data,
{
"message": "Hello world!"
}
You will see the pod being created and deleted which each ping (every 2 mins)
All good our Knative eventing services have been installed and are working.
Clean up
kubectl delete pingsources.sources.knative.dev test-ping-source
or
kn source ping delete test-ping-source
And also delete the event-display
kn service delete event-display
Java Based Event Listener
Let's write our own event listener in Java, in kn-listener sub project.
@Slf4j
@RestController
@SpringBootApplication
public class KnEventListener {
public static void main(String[] args) {
SpringApplication.run(KnEventListener.class, args);
}
@GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<String> status() {
return ResponseEntity.ok("{\"status\": \"UP\"}");
}
@PostMapping(value = "/", consumes = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<Void> event(@RequestHeader Map<String, Object> headers, @RequestBody String body) throws Exception {
log.info(LocalDateTime.now() + ", " + body);
return ResponseEntity.accepted().build();
}
}
build.gradle (parent)
plugins {
id 'org.springframework.boot' version '2.3.2.RELEASE'
id "io.freefair.lombok" version "5.1.1"
}
subprojects {
group 'uk.co.vadalg'
version '1.0-SNAPSHOT'
apply plugin: 'application'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'io.freefair.lombok'
repositories {
jcenter()
}
}
build.gradle (listener)
mainClassName = "uk.co.ac.vadalg.KnEventListener"
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
test {
useJUnitPlatform()
}
bootBuildImage {
imageName = 'localhost:5007/kn-listener'
}
Create docker image (using gradle)
gradle bootBuildImage
then push to local docker registry and create the service (see previous blog)
docker push localhost:5007/kn-listener
kn service create knlistener --image localhost:5007/kn-listener
If you have docker image connection issues you may need to check you have edited the configmap (see previous blog )
Test if the service is up:
curl -i -H "Host: knlistener.default.example.com" localhost
HTTP/1.1 200 OK
content-length: 16
content-type: application/json
date: Thu, 06 Aug 2020 22:27:39 GMT
x-envoy-upstream-service-time: 5
server: istio-envoy
{"status": "UP"}
Start the ping again but this time referencing knlistener
cat <<EOF | kubectl create -f -
apiVersion: sources.knative.dev/v1alpha2
kind: PingSource
metadata:
name: test-ping-source
spec:
schedule: "*/2 * * * *"
jsonData: '{"message": "Hi earth!"}'
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: knlistener
EOF
Check the logs
kubectl logs -l serving.knative.dev/service=knlistener -c user-container --since=10m
2020-08-06 22:32:03.778 INFO 1 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-08-06 22:32:03.778 INFO 1 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1073 ms
2020-08-06 22:32:04.052 INFO 1 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-06 22:32:04.271 INFO 1 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-08-06 22:32:04.329 INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-06 22:32:04.342 INFO 1 --- [ main] uk.co.ac.vadalg.KnEventListener : Started KnEventListener in 2.058 seconds (JVM running for 2.479)
2020-08-06 22:32:04.544 INFO 1 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-08-06 22:32:04.545 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-08-06 22:32:04.550 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2020-08-06 22:32:04.641 INFO 1 --- [nio-8080-exec-3] uk.co.ac.vadalg.KnEventListener : 2020-08-06T22:32:04.641, {"message":"Hi earth!"}
Boing, spring event listener. Nice.
Delete the test source
kn source ping delete test-ping-source
Can also test the kn service listener with a curl pod:
kubectl run curl \
--image=curlimages/curl --rm=true --restart=Never -ti -- \
-X POST -v \
-H "content-type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-source: http://curl-command" \
-H "ce-type: curl.demo" \
-H "ce-id: ekbetron" \
-d '{"name":"lightphos"}' \
http://knlistener.default.svc
Java Based Event Sender
How about writing our own Kn event source in Java?
We need three things a sender, sink binding and a cronjob. We will keep the existing knlistener we wrote above to receive the events.
Simple Java Sender
@Slf4j
@RestController
@SpringBootApplication
public class KnEventSender {
public static void main(String[] args) {
SpringApplication.run(KnEventSender.class, args);
}
@Value("${ksink}")
private String sink;
@PostConstruct
public void fire() {
try {
log.info("Fire {} ", send());
System.exit(0);
}
catch (Exception e ) {
log.error("sink {}", sink);
log.error("error ", e);
}
}
@GetMapping(value = "/", produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<String> send() {
log.info("Sink {}", sink);
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> request = new HttpEntity<>("{ 'test': 'from kn sender " + Math.random() + "'}", headers);
return restTemplate.postForEntity(sink, request, String.class);
}
}
application.yml (note the K_SINK environment variable)
spring.application.name: kneventsender
ksink: ${K_SINK}
build.gradle (kn-sender)
mainClassName = "uk.co.ac.vadalg.KnEventSender"
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.junit.jupiter:junit-jupiter:5.4.2'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
test {
useJUnitPlatform()
}
bootBuildImage {
imageName = 'localhost:5007/kn-sender'
}
Build and push to local docker registry
gradle bootBuildImage
docker push localhost:5007/kn-sender
No need to create a sender service as we will use a cronjob to do that and fire events periodically.
Sink Binding
This references the cronjob (knsender-cron) and binds the service created by it to our kn listener.
apiVersion: sources.knative.dev/v1alpha1
kind: SinkBinding
metadata:
name: knsender-heartbeat
spec:
subject:
apiVersion: batch/v1
kind: Job
selector:
matchLabels:
app: knsender-cron
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: knlistener
ceOverrides:
extensions:
sink: bound
kubectl apply -f <above>
CronJob Trigger
This will set up a cron job to run every minute or so and kick off our java sender in concert with the sink bind above.
apiVersion: batch/v1beta1
kind: CronJob
metadata:
name: knsender-cron
spec:
# Run every minute
schedule: "* * * * *"
jobTemplate:
metadata:
labels:
app: knsender-cron
spec:
template:
spec:
restartPolicy: Never
containers:
- name: knsender-heartbeat
image: localhost:5007/kn-sender
kubectl apply -f <above>
Check out Jobs
kubectl get cronjob
NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE
knsender-cron * * * * * False 0 28s 30m
kubectl get job
NAME COMPLETIONS DURATION AGE
knsender-cron-1597849380 1/1 4s 3m2s
knsender-cron-1597849440 1/1 4s 2m2s
knsender-cron-1597849500 1/1 9s 62s
knsender-cron-1597849560 0/1 2s 2s
Listener
The listener should be getting invoked and receiving events:
kubectl logs -l serving.knative.dev/service=knlistener -c user-container --since=10m
2020-08-19 15:00:07.106 INFO 1 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1064 ms
2020-08-19 15:00:07.432 INFO 1 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-08-19 15:00:07.632 INFO 1 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2020-08-19 15:00:07.700 INFO 1 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-08-19 15:00:07.733 INFO 1 --- [ main] uk.co.ac.vadalg.KnEventListener : Started KnEventListener in 2.14 seconds (JVM running for 2.508)
2020-08-19 15:00:07.753 INFO 1 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-08-19 15:00:07.753 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-08-19 15:00:07.758 INFO 1 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 5 ms
2020-08-19 15:00:07.834 INFO 1 --- [nio-8080-exec-3] uk.co.ac.vadalg.KnEventListener : 2020-08-19T15:00:07.834, { 'test': 'from kn sender 0.9763648332730366'}
2020-08-19 15:01:03.744 INFO 1 --- [nio-8080-exec-9] uk.co.ac.vadalg.KnEventListener : 2020-08-19T15:01:03.744, { 'test': 'from kn sender 0.8349076066212481'}
There you are, java sender and listener via knative.
Delete
kubectl delete cronjob knsender-cron
cronjob.batch "knsender-cron" deleted
kubectl get po
NAME READY STATUS RESTARTS AGE
knlistener-rzcpv-1-deployment-789df7845b-bgrmn 2/2 Terminating 0 4m3s
Conclusion
We installed our knative event dependencies, tested the installation using standard event resources then created our own Java listener and Java sender to receive and send json based events in a 'serverless' manner (the pods are removed after inactivity of 30 secs).
Event Sources
Knative comes built with core sources of events such as:
- ApiServerSource, fired when a k8s resource is created
- PingSource (cronjob)
- ContainerSource, container images as sources
- KafkaSource
See https://knative.dev/docs/eventing/sources/
Source Code
All source code can be found here: