Version: 6.4.1

Kafka

Kafka is an events streaming platform built on a commit log that can be subscribed to and publish data. It is highly scalable and fault tolerant. In addition it provides the ability to manipulate data as it arrives. Kafka works with Kubernetes orchestration on any cloud provider platform.

Kafka Use Cases in Fabric

In Fabric Kafka:

  • Adds buffering for Agent invokes to handle peak throughput.
  • Assures that Agent invokes are delivered at least once, even following Gateway restarts.
  • Allows for autoscaling

Currently Kafka is not used to:

  • Manage Skill to Skill message throughput within an Agent
  • Manage data streaming to Profiles

How Kafka works

Fabric's Processor Gateway now supports "worker" threads for Agent execution. By default 4 workers are deployed and each worker handles 4 parallel threads. This separates request handling from Agent execution.

Increased CPU usage which makes autoscaling more consistent. On the other hand, resource utilization is increased.

Fabric users will not interact with agents differently when Kafka is enabled, but several difference occur under the covers.

  • Invokes are written to inTopic

    • The topic format is similar to Fabric's REST agent invoke.

    • Headers are supported for authN and propagation to Skills

      EXAMPLE

      {
      "agentName": "cortex/hello.agent",
      "projectId": "myProject",
      "serviceName": "inputService",
      "correlationId": "ckyoj...",
      "properties": {"..."},
      "sessionId": "",
      "payload": {"numbers":[2, 14, 23]}
      }

      NOTE: agentName, projectId, and serviceName are required.

  • Responses are received from outTopic. The Gateway always sends a response.

    EXAMPLE

    {
    "activationId": "8e88d261-xxx",
    "correlationId": "cxyvxxxx",
    "response":{"sum": 39},
    "status": "COMPLETE"
    }
  • (OPTIONAL) The errorTopic provides insight into errors and exceptions.

Image alt tag value

Kafka Setup

Kafka setup must be performed by a System Administrator or DevOps Engineer.

Two requirements must be met to setup Kafka in Kubernetes:

  • Workers must be enabled by setting FEATURE_AGENT-WORKERS=true.

    NOTE: This feature may be enabled without using Kafka, but it is required to use Kafka.

  • A connector config map, gateway-connector-configs, must be provided using podspec.

    EXAMPLE

    {
    "name": "kafkaDefault",
    "type": "kafka",
    "kafka": {
    "config": {
    "clientId": "gateway",
    "brokers": ["192.168.138:30942"]
    }
    }
    }

Strimzi

Strimzi provides a way to run an Apache Kafka cluster on Kubernetes in various deployment configurations.

The Strimzi operator can be deployed as an extra Helm chart installation.

Add the Strimzi Helm Chart Repository:

helm repo add strimzi https://strimzi.io/charts/

Install the Strimzi Operator into a kafka namespace:

helm upgrade --install strimzi strimzi/strimzi-kafka-operator -n kafka --version 0.26.1

Create a Kafka instance and topics for Fabric via Strimzi's Custom Resource Definitions (the following example contains a Kafka instance named fabric-cluster and topics named fabric-in and fabric-out):

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: fabric-cluster
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: fabric-in
labels:
strimzi.io/cluster: "fabric-cluster"
spec:
config:
retention.ms: 3600000
partitions: 3
replicas: 1
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: fabric-out
labels:
strimzi.io/cluster: "fabric-cluster"
spec:
config:
retention.ms: 3600000
partitions: 3
replicas: 1

Apply the above yaml file with kubectl to have the operator create the Kafka instance/topics:

kubectl apply -f kafka.yaml -n kafka