×

Event processing usually completes within a short time frame, such as a few minutes. This ensures that the HTTP connection remains open and the service does not scale down prematurely.

Maintaining long-running connections increases the risk of failure, potentially leading to processing restarts and repeated request retries.

You can use JobSink to support long-running asynchronous jobs and tasks using the full Kubernetes batch/v1 Job resource and features and Kubernetes job queuing systems such as Kueue.

Using JobSink

When an event is sent to a JobSink, Eventing creates a Job and mounts the received event as JSON file at /etc/jobsink-event/event.

Procedure
  1. Create a JobSink object definition as a YAML file:

    JobSink YAML
    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-logger
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "cat" ]
                  args:
                    - "/etc/jobsink-event/event"
  2. Apply the JobSink YAML file:

    $ oc apply -f <job-sink-file.yaml>
  3. Verify JobSink is ready:

    $ oc get jobsinks.sinks.knative.dev

    Example output:

    NAME              URL                                                                          AGE   READY   REASON
    job-sink-logger   http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger   5s    True
  4. Trigger a JobSink. JobSink can be triggered by any event source or trigger.

    $ oc run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
       -H "content-type: application/json"  \
       -H "ce-specversion: 1.0" \
       -H "ce-source: my/curl/command" \
       -H "ce-type: my.demo.event" \
       -H "ce-id: 123" \
       -d '{"details":"JobSinkDemo"}' \
       http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
  5. Verify a Job is created:

    $ oc logs job-sink-loggerszoi6-dqbtq

    Example output:

    {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}

JobSink creates a Job for each unique event it receives.

An event is uniquely identified by the combination of its source and id attributes.

If an event with the same attributes is received while a Job for that event already exists, another Job will not be created.

Reading the Job event file

Procedure
  • Read the event file and deserialize it by using any CloudEvents JSON deserializer. The following example demonstrates how to read and process an event using CloudEvents Go SDK:

    package mytask
    
    import (
        "encoding/json"
        "fmt"
        "os"
    
        cloudevents "github.com/cloudevents/sdk-go/v2"
    )
    
    func handleEvent() error {
        eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
        if err != nil {
            return err
        }
    
        event := &cloudevents.Event{}
        if err := json.Unmarshal(eventBytes, event); err != nil {
            return err
        }
    
        fmt.Println(event)
    
        return nil
    }

Setting custom event file mount path

You can set a custom event file mount path in your JobSink definition.

Procedure
  • Inside your container definition, include the volumeMounts configuration and set as required.

    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-custom-mount-path
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5
    
                  # The event will be available in a file at `/etc/custom-path/event`
                  volumeMounts:
                    - name: "jobsink-event"
                      mountPath: "/etc/custom-path"
                      readOnly: true

Cleaning up finished jobs

You can clean up finished jobs by setting a ttlSecondsAfterFinished value in your JobSink definition. For example, setting the value to 600 removes completed jobs 600 seconds (10 minutes) after they finish.

Procedure
  • In your definition, set the value of ttlSecondsAfterFinished to the required amount.

    Example of ttlSecondsAfterFinished set to 600
    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-example
    spec:
      job:
        spec:
          ttlSecondsAfterFinished: 600

Simulating FailJob action

Procedure
  • Trigger a FailJob action by including a bug simulating command in your JobSink definition.

    Example of JobSink failure
    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-failure
    spec:
      job:
        metadata:
          labels:
            my-label: my-value
        spec:
          completions: 12
          parallelism: 3
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
                  args:
                    - -c
                    - echo "Hello world!" && sleep 5 && exit 42
          backoffLimit: 6
          podFailurePolicy:
            rules:
              - action: FailJob
                onExitCodes:
                  containerName: main      # optional
                  operator: In             # one of: In, NotIn
                  values: [ 42 ]
              - action: Ignore             # one of: Ignore, FailJob, Count
                onPodConditions:
                  - type: DisruptionTarget   # indicates Pod disruption