Stream SSE messages from Kafka
Stream SSE messages from Kafka
Get started with Zilla by deploying our Docker Compose stack. Before proceeding, you should have Docker Compose installed.
Running this Zilla sample will fanout Hello, World
from a Kafka topic acting as a SSE server
Setup SSE Kafka Proxy
Create these files, zilla.yaml
, docker-compose.yaml
and index.html
, in the same directory.
zilla.yaml
name: SSE-example
bindings:
# Proxy service entrypoint
north_tcp_server:
type: tcp
kind: server
options:
host: 0.0.0.0
port: 7114
exit: north_http_server
north_http_server:
type: http
kind: server
options:
access-control:
policy: cross-origin
routes:
- when:
- headers:
:scheme: http
:authority: localhost:7114
:path: /events
exit: north_sse_server
- when:
- headers:
:scheme: http
:authority: localhost:7114
exit: east_http_filesystem_mapping
# UI html file server
east_http_filesystem_mapping:
type: http-filesystem
kind: proxy
routes:
- when:
- path: /{path}
with:
path: ${params.path}
exit: east_filesystem_server
east_filesystem_server:
type: filesystem
kind: server
options:
location: /var/www/
# SSE Server With an exit to Kafka
north_sse_server:
type: sse
kind: server
exit: north_sse_kafka_mapping
north_sse_kafka_mapping:
type: sse-kafka
kind: proxy
routes:
- when:
- path: /events
with:
topic: events
exit: north_kafka_cache_client
# Kafka sync layer
north_kafka_cache_client:
type: kafka
kind: cache_client
exit: south_kafka_cache_server
south_kafka_cache_server:
type: kafka
kind: cache_server
options:
bootstrap:
- events
exit: south_kafka_client
# Connect to Kafka
south_kafka_client:
type: kafka
kind: client
options:
servers:
- ${{env.KAFKA_BOOTSTRAP_SERVER}}
exit: south_kafka_tcp_client
south_kafka_tcp_client:
type: tcp
kind: client
telemetry:
exporters:
stdout_logs_exporter:
type: stdout
docker-compose.yaml
version: '3'
services:
zilla:
image: ghcr.io/aklivity/zilla:latest
pull_policy: always
depends_on:
- kafka
ports:
- 7114:7114
environment:
KAFKA_BOOTSTRAP_SERVER: "kafka:29092"
volumes:
- ./zilla.yaml:/etc/zilla/zilla.yaml
- ./index.html:/var/www/index.html
command: start -v -e
kafka:
image: bitnami/kafka:3.5
hostname: kafka
ports:
- 9092:9092
- 29092:9092
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_BROKER_ID: "1"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@127.0.0.1:9093"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_LOG_DIRS: "/tmp/logs"
KAFKA_CFG_PROCESS_ROLES: "broker,controller"
KAFKA_CFG_LISTENERS: "CLIENT://:9092,INTERNAL://:29092,CONTROLLER://:9093"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_CFG_ADVERTISED_LISTENERS: "CLIENT://localhost:9092,INTERNAL://kafka:29092"
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-init:
image: bitnami/kafka:3.5
command:
- /bin/sh
- -c
- |
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 --create --if-not-exists --topic events
depends_on:
- kafka
init: true
kafka_messenger:
image: confluentinc/cp-kafkacat:7.1.9
command:
- /bin/sh
- -c
- |
while true; do echo "{ \"data\": \"Hello, world $(date)\" }" | kafkacat -P -b kafka:29092 -t events -k 1; echo "message sent, waiting 5 sec"; sleep 5; done
depends_on:
- kafka
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<title>Welcome to Zilla!</title>
<style type="text/css">
.row {
overflow: hidden;
padding: 10px;
width: 300px;
}
.col {
float: left;
width: 50%;
}
</style>
</head>
<body>
<div class="col">
<h3>Event Source</h3>
<div class="row">
<label for="location">Location</label>
<input id="location" value="http://localhost:7114/events" style="width: 200px" />
</div>
<div class="row">
<button onclick="javascript:attachEventSource()">Go</button>
</div>
<h3>Messages</h3>
<div id="messages"></div>
</div>
<script>
async function attachEventSource() {
const location = document.getElementById("location");
const messages = document.getElementById("messages");
const printOutput = (text) => {
var line = document.createElement("p");
line.appendChild(document.createTextNode(text ?? ""));
messages.insertBefore(line, messages.firstChild);
}
const es = new EventSource(`${location.value}`);
es.onmessage = ({ data }) => {
printOutput(data)
};
es.onopen = ({ type }) => {
printOutput(type)
};
es.onerror = printOutput;
}
</script>
</body>
</html>
Run Zilla and Kafka
docker-compose up --detach
- Open the browser
Navigate to the browser http://localhost:7114/index.html.
- Click
Go
With the location input set to http://localhost:7114/events you can click the Go
button to connect to the SSE server. Messages will stream in as long as you have the messenger
service running in docker.The stream of messages will render on the page.
...
message: Hello, world Wed May 10 14:25:45 UTC 2023
message: Hello, world Wed May 10 14:25:40 UTC 2023
open:
- Remove the running containers
docker-compose down
See more of what Zilla can do
Go deeper into this concept with the sse.kafka.fanout example.
Going Deeper
Try out the other SSE examples: