×

Kafka sinks are a type of event sink that are available if a cluster administrator has enabled Kafka on your cluster. You can send events directly from an event source to a Kafka topic by using a Kafka sink.

Using a Kafka sink

You can create an event sink called a Kafka sink that sends events to a Kafka topic. Creating Knative resources by using YAML files uses a declarative API, which enables you to describe applications declaratively and in a reproducible manner. By default, a Kafka sink uses the binary content mode, which is more efficient than the structured mode. To create a Kafka sink by using YAML, you must create a YAML file that defines a KafkaSink object, then apply it by using the oc apply command.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resource (CR) are installed on your cluster.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

  • You have access to a Red Hat AMQ Streams (Kafka) cluster that produces the Kafka messages you want to import.

  • Install the OpenShift CLI (oc).

Procedure
  1. Create a KafkaSink object definition as a YAML file:

    Kafka sink YAML
    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
      name: <sink-name>
      namespace: <namespace>
    spec:
      topic: <topic-name>
      bootstrapServers:
       - <bootstrap-server>
  2. To create the Kafka sink, apply the KafkaSink YAML file:

    $ oc apply -f <filename>
  3. Configure an event source so that the sink is specified in its spec:

    Example of a Kafka sink connected to an API server source
    apiVersion: sources.knative.dev/v1alpha2
    kind: ApiServerSource
    metadata:
      name: <source-name> (1)
      namespace: <namespace> (2)
    spec:
      serviceAccountName: <service-account-name> (3)
      mode: Resource
      resources:
      - apiVersion: v1
        kind: Event
      sink:
        ref:
          apiVersion: eventing.knative.dev/v1alpha1
          kind: KafkaSink
          name: <sink-name> (4)
    1 The name of the event source.
    2 The namespace of the event source.
    3 The service account for the event source.
    4 The Kafka sink name.

Configuring security for Kafka sinks

Transport Layer Security (TLS) is used by Apache Kafka clients and servers to encrypt traffic between Knative and Kafka, as well as for authentication. TLS is the only supported method of traffic encryption for Knative Kafka.

Simple Authentication and Security Layer (SASL) is used by Apache Kafka for authentication. If you use SASL authentication on your cluster, users must provide credentials to Knative for communicating with the Kafka cluster; otherwise events cannot be produced or consumed.

Prerequisites
  • The OpenShift Serverless Operator, Knative Eventing, and the KnativeKafka custom resources (CRs) are installed on your OpenShift Container Platform cluster.

  • Kafka sink is enabled in the KnativeKafka CR.

  • You have created a project or have access to a project with the appropriate roles and permissions to create applications and other workloads in OpenShift Container Platform.

  • You have a Kafka cluster CA certificate stored as a .pem file.

  • You have a Kafka cluster client certificate and a key stored as .pem files.

  • You have installed the OpenShift (oc) CLI.

  • You have chosen the SASL mechanism to use, for example, PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512.

Procedure
  1. Create the certificate files as a secret in the same namespace as your KafkaSink object:

    Certificates and keys must be in PEM format.

    • For authentication using SASL without encryption:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_PLAINTEXT \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-literal=user=<username> \
        --from-literal=password=<password>
    • For authentication using SASL and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SASL_SSL \
        --from-literal=sasl.mechanism=<sasl_mechanism> \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
        --from-literal=user=<username> \
        --from-literal=password=<password>
      1 The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service, such as Red Hat OpenShift Streams for Apache Kafka.
    • For authentication and encryption using TLS:

      $ oc create secret -n <namespace> generic <secret_name> \
        --from-literal=protocol=SSL \
        --from-file=ca.crt=<my_caroot.pem_file_path> \ (1)
        --from-file=user.crt=<my_cert.pem_file_path> \
        --from-file=user.key=<my_key.pem_file_path>
      1 The ca.crt can be omitted to use the system’s root CA set if you are using a public cloud managed Kafka service, such as Red Hat OpenShift Streams for Apache Kafka.
  2. Create or modify a KafkaSink object and add a reference to your secret in the auth spec:

    apiVersion: eventing.knative.dev/v1alpha1
    kind: KafkaSink
    metadata:
       name: <sink_name>
       namespace: <namespace>
    spec:
    ...
       auth:
         secret:
           ref:
             name: <secret_name>
    ...
  3. Apply the KafkaSink object:

    $ oc apply -f <filename>