×

Knative Kafka provides integration options for you to use supported versions of the Apache Kafka message streaming platform with OpenShift Serverless. Kafka provides options for event source, channel, broker, and event sink capabilities.

Knative Kafka functionality is available in an OpenShift Serverless installation if a cluster administrator has installed the KnativeKafka custom resource.

Knative Kafka provides additional options, such as:

  • Kafka source

  • Kafka channel

  • Kafka broker (Technology Preview)

  • Kafka sink (Technology Preview)

Kafka event delivery and retries

Using Kafka components in an event-driven architecture provides "at least once" event delivery. This means that operations are retried until a return code value is received. This makes applications more resilient to lost events; however, it might result in duplicate events being sent.

For the Kafka event source, there is a fixed number of retries for event delivery by default. For Kafka channels, retries are only performed if they are configured in the Kafka channel Delivery spec.

See the Event delivery documentation for more information about delivery guarantees.

Kafka source

You can create a Kafka source that reads events from an Apache Kafka cluster and passes these events to a sink. You can create a Kafka source by using the Red Hat OpenShift Service on AWS web console, the Knative (kn) CLI, or by creating a KafkaSource object directly as a YAML file and using the OpenShift (oc) CLI to apply it.

Creating a Kafka event source by using the web console

After Knative Kafka is installed on your cluster, you can create a Kafka source by using the web console. Using the Red Hat OpenShift Service on AWS web console provides a streamlined and intuitive user interface to create a Kafka source.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.

  • You have logged in to the web console.

  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

Procedure
  1. In the Developer perspective, navigate to the +Add page and select Event Source.

  2. In the Event Sources page, select Kafka Source in the Type section.

  3. Configure the Kafka Source settings:

    1. Add a comma-separated list of Bootstrap Servers.

    2. Add a comma-separated list of Topics.

    3. Add a Consumer Group.

    4. Select the Service Account Name for the service account that you created.

    5. Select the Sink for the event source. A Sink can be either a Resource, such as a channel, broker, or service, or a URI.

    6. Enter a Name for the Kafka event source.

  4. Click Create.

Verification

You can verify that the Kafka event source was created and is connected to the sink by viewing the Topology page.

  1. In the Developer perspective, navigate to Topology.

  2. View the Kafka event source and sink.

    View the Kafka source and service in the Topology view

Creating a Kafka event source by using the Knative CLI

You can use the kn source kafka create command to create a Kafka source by using the kn CLI. Using the kn CLI to create event sources provides a more streamlined and intuitive user interface than modifying YAML files directly.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, Knative Serving, and the KnativeKafka custom resource (CR) are installed on your cluster.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

  • You have installed the Knative (kn) CLI.

  • Optional: You have installed the OpenShift (oc) CLI if you want to use the verification steps in this procedure.

Procedure
  1. To verify that the Kafka event source is working, create a Knative service that dumps incoming events into the service logs:

    $ kn service create event-display \
        --image quay.io/openshift-knative/knative-eventing-sources-event-display
  2. Create a KafkaSource CR:

    $ kn source kafka create <kafka_source_name> \
        --servers <cluster_kafka_bootstrap>.kafka.svc:9092 \
        --topics <topic_name> --consumergroup my-consumer-group \
        --sink event-display

    Replace the placeholder values in this command with values for your source name, bootstrap servers, and topics.

    The --servers, --topics, and --consumergroup options specify the connection parameters to the Kafka cluster. The --consumergroup option is optional.

  3. Optional: View details about the KafkaSource CR you created:

    $ kn source kafka describe <kafka_source_name>
    Example output
    Name:              example-kafka-source
    Namespace:         kafka
    Age:               1h
    BootstrapServers:  example-cluster-kafka-bootstrap.kafka.svc:9092
    Topics:            example-topic
    ConsumerGroup:     example-consumer-group
    
    Sink:
      Name:       event-display
      Namespace:  default
      Resource:   Service (serving.knative.dev/v1)
    
    Conditions:
      OK TYPE            AGE REASON
      ++ Ready            1h
      ++ Deployed         1h
      ++ SinkProvided     1h
Verification steps
  1. Trigger the Kafka instance to send a message to the topic:

    $ oc -n kafka run kafka-producer \
        -ti --image=quay.io/strimzi/kafka:latest-kafka-2.7.0 --rm=true \
        --restart=Never -- bin/kafka-console-producer.sh \
        --broker-list <cluster_kafka_bootstrap>:9092 --topic my-topic

    Enter the message in the prompt. This command assumes that:

    • The Kafka cluster is installed in the kafka namespace.

    • The KafkaSource object has been configured to use the my-topic topic.

  2. Verify that the message arrived by viewing the logs:

    $ oc logs $(oc get pod -o name | grep event-display) -c user-container
    Example output
    ☁️  cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/example-kafka-source#example-topic
      subject: partition:46#0
      id: partition:46/offset:0
      time: 2021-03-10T11:21:49.4Z
    Extensions,
      traceparent: 00-161ff3815727d8755848ec01c866d1cd-7ff3916c44334678-00
    Data,
      Hello!

Knative CLI sink flag

When you create an event source by using the Knative (kn) CLI, you can specify a sink where events are sent to from that resource by using the --sink flag. The sink can be any addressable or callable resource that can receive incoming events from other resources.

The following example creates a sink binding that uses a service, http://event-display.svc.cluster.local, as the sink:

Example command using the sink flag
$ kn source binding create bind-heartbeat \
  --namespace sinkbinding-example \
  --subject "Job:batch/v1:app=heartbeat-cron" \
  --sink http://event-display.svc.cluster.local \ (1)
  --ce-override "sink=bound"
1 svc in http://event-display.svc.cluster.local determines that the sink is a Knative service. Other default sink prefixes include channel, and broker.

Creating a Kafka event source by using YAML

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. To create a Kafka source by using YAML, you must create a YAML file that defines a KafkaSource object, then apply it by using the oc apply command.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your cluster.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

  • Install the OpenShift CLI (oc).

Procedure
  1. Create a KafkaSource object as a YAML file:

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: <source_name>
    spec:
      consumerGroup: <group_name> (1)
      bootstrapServers:
      - <list_of_bootstrap_servers>
      topics:
      - <list_of_topics> (2)
      sink:
      - <list_of_sinks> (3)
    1 A consumer group is a group of consumers that use the same group ID, and consume data from a topic.
    2 A topic provides a destination for the storage of data. Each topic is split into one or more partitions.
    3 A sink specifies where events are sent to from a source.

    Only the v1beta1 version of the API for KafkaSource objects on OpenShift Serverless is supported. Do not use the v1alpha1 version of this API, as this version is now deprecated.

    Example KafkaSource object
    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
  2. Apply the KafkaSource YAML file:

    $ oc apply -f <filename>
Verification
  • Verify that the Kafka event source was created by entering the following command:

    $ oc get pods
    Example output
    NAME                                    READY     STATUS    RESTARTS   AGE
    kafkasource-kafka-source-5ca0248f-...   1/1       Running   0          13m

Kafka broker

If a cluster administrator has configured your OpenShift Serverless deployment to use Kafka as the default broker type, creating a broker by using the default settings creates a Kafka-based Broker object. If your OpenShift Serverless deployment is not configured to use Kafka broker as the default broker type, you can use one of the following procedures to create a Kafka-based broker.

Kafka broker is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see https://access.redhat.com/support/offerings/techpreview/.

The Kafka broker, which is currently in Technology Preview, is not supported on FIPS.

Creating a Kafka broker by using YAML

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. To create a Kafka broker by using YAML, you must create a YAML file that defines a Broker object, then apply it by using the oc apply command.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your Red Hat OpenShift Service on AWS cluster.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

  • You have installed the OpenShift (oc) CLI.

Procedure
  1. Create a Kafka-based broker as a YAML file:

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka (1)
      name: example-kafka-broker
    spec:
      config:
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config (2)
        namespace: knative-eventing
    1 The broker class. If not specified, brokers use the default class as configured by cluster administrators. To use the Kafka broker, this value must be Kafka.
    2 The default config map for Knative Kafka brokers. This config map is created when the Kafka broker functionality is enabled on the cluster by a cluster administrator.
  2. Apply the Kafka-based broker YAML file:

    $ oc apply -f <filename>

Creating a Kafka broker that uses an externally managed Kafka topic

If you want to use a Kafka broker without allowing it to create its own internal topic, you can use an externally managed Kafka topic instead. To do this, you must create a Kafka Broker object that uses the kafka.eventing.knative.dev/external.topic annotation.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your Red Hat OpenShift Service on AWS cluster.

  • You have access to a Kafka instance such as Red Hat AMQ Streams, and have created a Kafka topic.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

  • You have installed the OpenShift (oc) CLI.

Procedure
  1. Create a Kafka-based broker as a YAML file:

    apiVersion: eventing.knative.dev/v1
    kind: Broker
    metadata:
      annotations:
        eventing.knative.dev/broker.class: Kafka (1)
        kafka.eventing.knative.dev/external.topic: <topic_name> (2)
    ...
    1 The broker class. If not specified, brokers use the default class as configured by cluster administrators. To use the Kafka broker, this value must be Kafka.
    2 The name of the Kafka topic that you want to use.
  2. Apply the Kafka-based broker YAML file:

    $ oc apply -f <filename>

Creating a Kafka channel by using YAML

Creating Knative resources by using YAML files uses a declarative API, which enables you to describe channels declaratively and in a reproducible manner. You can create a Knative Eventing channel that is backed by Kafka topics by creating a Kafka channel. To create a Kafka channel by using YAML, you must create a YAML file that defines a KafkaChannel object, then apply it by using the oc apply command.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource are installed on your Red Hat OpenShift Service on AWS cluster.

  • Install the OpenShift (oc) CLI.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

Procedure
  1. Create a KafkaChannel object as a YAML file:

    apiVersion: messaging.knative.dev/v1beta1
    kind: KafkaChannel
    metadata:
      name: example-channel
      namespace: default
    spec:
      numPartitions: 3
      replicationFactor: 1

    Only the v1beta1 version of the API for KafkaChannel objects on OpenShift Serverless is supported. Do not use the v1alpha1 version of this API, as this version is now deprecated.

  2. Apply the KafkaChannel YAML file:

    $ oc apply -f <filename>

Kafka sink

Kafka sinks are a type of event sink that are available if a cluster administrator has enabled Kafka on your cluster. You can send events directly from an event source to a Kafka topic by using a Kafka sink.

Kafka sink is a Technology Preview feature only. Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend using them in production. These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.

For more information about the support scope of Red Hat Technology Preview features, see https://access.redhat.com/support/offerings/techpreview/.

Using a Kafka sink

You can create an event sink called a Kafka sink that sends events to a Kafka topic. Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. To create a Kafka sink by using YAML, you must create a YAML file that defines a KafkaSink object, then apply it by using the oc apply command.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) are installed on your cluster.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in Red Hat OpenShift Service on AWS.

  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

  • Install the OpenShift (oc) CLI.

Procedure
  1. Create a KafkaSink object definition as a YAML file:

    Kafka sink YAML
    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>
  2. To create the Kafka sink, apply the KafkaSink YAML file:

    $ oc apply -f <filename>
  3. Configure an event source so that the sink is specified in its spec:

    Example of a Kafka sink connected to an API server source
    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> (1)
      namespace: <namespace> (2)
    spec:
      serviceAccountName: <service-account-name> (3)
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> (4)
    1 The name of the event source.
    2 The namespace of the event source.
    3 The service account for the event source.
    4 The Kafka sink name.