Confluent Kafka — Study Notes for CCDAK

Amandeep Midha
16 min readSep 28, 2020

--

Here are my study notes which I prepared and studied to clear my CCDAK certification. These may appear haphazard, but they carry the meaningful knowledge. Though the certification is Confluent Certification, but it is for “Apache Kafka”, hence reading Apache Kafka documentation would be ultimate source of truth, not just Confluent’s :-)

1. PRODUCER CONFIGURATIONS

  1. acks ( how many ack before message is produced. acks=-1 means ALL)
  2. increasing batching with linger.ms (instead of producer sending as soon as data available, it lingers on this much more to batch the size up )
  3. optimizing batch.size
  4. max.inflight.requests.per.connection (>1 can affect ordering unless enable.idempotence is true)
  5. enable.idempotence (Ensuring exactly once )
  6. buffer.memory (default 32M)

Retriable & Non-Retriable Errors

  1. Not_Leader_For_Partition (RETRIABLE, Producer automatically recovers)
  2. Message_Too_Large (Fix config or message size first!)
  3. Topic_Authorization_Failed (Fix authorization or ACLs first!)
  4. Not_Enough_Replicas (RETRIABLE)
  5. Invalid_Required_Acks (Fix configuration first!)
  6. Leader_Not_Available (RETRIABLE, Producer automatically recovers)

Full List -> https://kafka.apache.org/protocol#protocol_error_codes

2. CONSUMER CONFIGURATIONS

Consumer tend to be more complicated than Producer coz

  1. It has to take care of OFFSET
  2. There is concept of Consumer Group

GroupCoordinator is a service running on server and is responsible for management of consumer & offset. When a broker starts it will start a GroupCoordinator Service. Broker where partition leader is located , is the GroupCoordinator responsible to the group. When creating a KafkaConsumer, ConsumerCoordinator is created first which is responsible for communicating with Group Coordinator

__consumer_offsets is the topic used internally by Kafka to store group consumption. By default there are 50 partitions and each partition defaults to 3 copies

Subscription Types

  1. Default, AUTO_TOPICS
  2. Pattern Mode, AUTO_PATTERN (subscription by regex pattern of topic names)
  3. Assign Mode, USER_ASSIGNED (partitions assigned in further detail to a consumer)

What happens inside consumer poll(…)

  1. Check if heartbeat thread is running normally
  2. Subscribe to topic. If ConsumerCoordinator is unknown, send FindCoordinator request to establish connection
  3. Determine if you need to re-join a group. If subscription’s (or assigned) partition changes, you need to rejoin, send joinGroup request followed by syncGroup request, and finally ensureActiveGroup(..)

Auto-Commit: If auto-commit is set to TRUE and sending time limit is reached, auto-commit offset is automatically set

What can trigger Consumer Rebalancing

  1. New Consumer joins Consumer Group
  2. Consumer goes offline ( heartbeat not sent for long)
  3. Consumer actively withdraws from Consumer Group via unsubscribe(…)
  4. GroupCoordinator node corresponding to Consumer Group has changed
  5. Number of partitions of ANY topic, or topic subscribed to Consumer Group, has changed ( partitions can only be increased via kafka topics CLI not decreased)

Note: subscribe() & assign() both cannot be called by same consumer and will lead to IllegalStateException

How to make a consumer read message from specific partition of given topic? ( A: assign() syntax with partition number ,or partition list, parameter)

To delay rebalance use group.initial.rebalance.delay.ms configuration

Detecting Consumer Failures

  1. max.poll.interval.ms → By increasing you give more time to consumer to handle batch of records returned from the poll. But increasing may delay consumer group rebalance since that happens inside call to poll()
  2. max.poll.records → To limit total number of records returned from single call to poll()

Multi-Threading : Kafka Producer is thread-safe, but Consumer is NOT thread-safe

  1. Ensuring multi-thread access is responsibility of user
  2. wakeUp() can be safely called from an external thread to interrupt active operation

One Consumer Per Thread Approach:

  1. Pros: Easy to implement, Fastest and no inter-thread coordination, and In-order processing on per-partition basis is easy
  2. Cons: More consumers mean more TCP connections to cluster, More requests being sent to server and slightly less batching of data, and Number of threads will still be limited by the number of partitions

Decoupled Consumption & Processing:

Pros: Independent scaling number of consumers and processors

Cons:

  1. Guaranteeing order across processes require care. For processing which requires no ordering, it is not a problem
  2. Manual commit of position becomes harder as it requires that all threads coordinate to ensure processing complete for that partition

What is the default consumer group id assignment in Kafka Consumer CLI? (A: It creates off a random consumer group id, unless specified by the property — consumer-property group.id=your_group )

3. PARTITIONS & ASSIGNMENTS

Partitions are made of segments (.log file), and each segment comes with two indices (.index as position index, and .timeindex to help find message by timestamp )

If producer writes at 1 GB/sec and consumer consumes at 250MB/sec then requires 4 partition. Partition assigners can be following types:

  1. Range
  2. RoundRobin
  3. Sticky
  4. Message Key to Partition Assignment, how it works and can be controlled ? — Custom Partitioner

It is not possible to delete a partition of topic once created

Kafka provides a CLI kafka-reassign-partitions.sh which can take two input types viz. “Topics-to-Move JSON” (specifies the topics that you want to reassign) or “Reassignment Configuration JSON” (Purposes: 1. Reassign partitions between brokers 2. Reassign partitions to another log directory on the same broker 3. Reassign partitions between log directories across multiple brokers 4. Change partition assignment order i.e. for electing a new leader )

To find partitions without leader: $bin/kafka-topics.sh --zookeeper localhost:2181 --describe --unavailable-partitions

4. BROKER CONFIGURATIONS

  1. default.replication.factor : applies to auto-created topics. Should be set at least 2 or 3(default 1 )
  2. min.insync.replicas : Matters only when acks=ALL. This is minimum number of replicas in ISR needed to commit a produce request. Set 1 for single node cluster, else producer will fail (Invalid_Required_Acks). Also qualified as Topic setting
  3. unclean.leader.election.enable : Whether to enable replicas not in ISR set to be elected as leader as the last resort, even though might result in data-loss
  4. num.replication.fetchers: defines number of threads that will be replicating data from leader to the follower (default 1 )
  5. replica.fetch.max.bytes: how much data we want to fetch from any partition in each fetch request. Higer value helps to create replica fast in the followers
  6. replica.socket.receive.buffer.bytes: If replication thread is slow compared to incoming message rate, it will help to hold more data
  7. num.partitions : Level of parallelism and writing data in parallel that will increase the throughput. However if system does not have sufficient threads or disks, then does not make sense to create more partitions
  8. num.io.threads : How much disk we have in our cluster, that decides setting value for I/O threads
  9. advertised.listeners: Critical setting to ensure cluster availability in private network or whole internet. If clients are on private network, set either internal private IP address, or internal private DNS hostname. If on public network, then external IP address, or external public DNS hostname pointing to public IP
  10. logs.dirs : comma separated values of logs dir of difrferent machines
  11. delete.topic.enable: Enable topic deletion, default false
  12. log.retention.hours: Minimum age of log file before deleting, 168 default
  13. log.segment.bytes : Size of log file, default 1G
  14. log.retention.check.interval.ms: Interval at which log segments are checked to see if they can be deleted
  15. message.max.bytes: 1000012 (1M by default)
  16. num.io.threads : default 8
  17. num.recovery.threads.per.data.dir=1 (set to number of disks )
  18. offset.retention.minutes: default 1440, i.e. after 24 hours deleted
  19. zookeeper.connect: Comma separated ZK cluster notes ending with “/kafka”
  20. zookeeper.connection.timeout.ms : default 6000. i.e. 6 seconds
  21. quota.producer.default : default 10485760 (i.e. 10 MB/s)
  22. quota.consumer.default: default 10485760 (i.e. 10 MB/s)
  23. broker.rack = null (set as per availability zones / location of broker)

Kafka Disk Configurations: Modern Linux distributions come with default of only 1024 file descriptors allowed per process. This is too low for Kafka, ideally at least 100.000

Kafka Memory Usage Configurations: Minimum RAM 8G for Broker, and 4G for Kafka Java process heap, rest all goes for OS Page Cache! (KAFKA_HEAP_OPTS=”Xmx4g”). Prefer not to set min heap size

Disable swapping for Kafka entirely i.e. vm.swappiness=0 (Default is 60)

What are dynamic broker configurations and where they are stored? (Answer: in Zookeeper)

What are total number of brokers that can go down while acks=all is setup? (Answer: Replication Factor minus min.insync.replicas value )

5. ZOOKEEPER

  • Distributed Config Management by odd number of servers in quorum
  • Self-election & Consensus Building
  • Coordination & Locks
  • Key Value Stores
  • Zookeeper cluster needs to have an odd number of servers, and must maintain a majority of servers up to be able to vote. Therefore, a 2N+1 zookeeper cluster can survive to N zookeeper being down

Role in Kafka:

  1. Broker Registration with heartbeat mechanism to keep list current
  2. Maintaining a list of topics along a. Configuration of Topics, b. List of ISRs
  3. Performing Leader Election, in case brokers go down
  4. Storing Kafka Cluster ID (generated random at first start of cluster)
  5. Storing ACLs towards Topics, Consumer Groups, and Users
  6. Quota Configurations, but Consumer offsets no longer in ZK
  7. ClientPort: 2181, Peer Post: 2888, LeaderPort: 3888

ZK Properties:

data.dir : Location of data directory

maxClientCnxns: Zero means unlimited

tickTime : e.g. 2000, Heartbeat interval

initLimit : e.g. 10. How many times of tickTime it can take to sync, else fail

syncLimit: How many syncs can be passed between request & ack ( e.g. if set to 5, if 5 x tickTime is passed, it will report fail )

autopurge.snapRetainCount=3

autopurge.purgeInterval=24

(Purge all but 3 snapshots, every 24 hours)

Four Letter Words:

For these 4lw commands to work, this property in zoo.cfg must be set

4lw.commands.whitelist=*

ruok ( R you ok ? ), conf (print serving configuration) , cons (print list of connections ), crst (print connection/session statistics), envi (print details of environment variables ), srst (Reset server statistics), mntr (list of variables that can be used in monitoring), dump (only works on leader Node )

e.g. echo “ruok” | nc localhost 2181; echo

Persistent znodes

Persistent znodes are permanent and have to be deleted explicitly by the client. They stay even after the session that created the znode is terminated.

Ephemeral znodes

Ephemeral znodes are temporary. These znodes are deleted automatically when the client session creating them ends. Ephemeral znodes are used to detect the termination of a client.

6. TOPIC CONFIGURATIONS

cleanup.policy : “compact” for enabling log compaction

min.cleanable.dirty.ratio : (default 0.5) Low value to ensure log compactions

segment.ms : Every milliseconds Kafka should roll out new segment

message.timestamp.type : either CreateTime (message creation time by producer) , or LogAppendTime ( when received at broker)

7. KAFKA STREAMS

Streams DSL & Processor API are the options, while first is recommended, and second requiring more manual work

Stream configuration is most commonly presented in Java builder pattern object called Topology, which includes series of operations, finally a build method, start, and adding of shutdown hook

Stateless Operations: branch, filter, inverseFilter, flatMap, flatMapValues, foreach, groupByKey, groupBy, map, mapValues

Stateful Operations: join, aggregate, count, reduce, windowing

TimeStampExtractor Contract

TimeStamp Extractors:

  1. FailOnInvalidTimeStamp
  2. LogAndSkipOnInvalidTimeStamp
  3. UsePreviousTimeOnInvalidTimeStamp
  • Event Time: use CustomTimeStampExtractor to read time from message value
  • Ingestion Time: Configure topic for message.timestamp.type=LogAppendTime, and use above extractors
  • Processing Time: use WallclockTimestampExtractor

Windowing Types:

Tumbling window

  • Time based, Fixed Size, Non overlapping, Gap less windows
  • E.g. if window-size=5min and advance-interval =5min then it looks like [0–5min] [5min-10min] [10min-15min]

Hopping Window

  • Time based, Fixed Size, Overlapping
  • E.g if window-size=5min and advance-interval=3min then it looks like [0–5min][3–8min][6–11min]

Sliding Window

  • Fixed Size, Overlapping, works on differences between timestamps
  • Used for “join” operations alone

Session Window

  • Session based, dynamically sized, Non-overlapping
  • Used only for “aggregate” key based events into a session

Running Kafka Streams may create internal topics(prefixed by application.id)

a. Repartitioning Topics: in case you start transforming key of your stream, a re-partitioning will happen at some processor

b. ChangeLog Topics: In case you perform aggregation, Kafka streams will store compacted data in these topics

KStreams support all inserts, as in a log aka data streams everthing appended

KTable support all upserts, on non-null values & deletes on null values like any table or log-compacted topic (in a way insert, update, delete all possible)

Which one to use?

  • Reading from a topic that is not compacted, use KTable when log compacted (aggregations)
  • Use Kstream if every new data is partial or transactional information, however KTable when every update is self-sufficient

Stateless versus Stateful Operations of KStreams/KTables

Stateless: means that result of transformation only depend on the data-point you process ( no other prior info required, e.g “multiply by 5" is stateless )

Stateful: means that result of the transformation also depend on external information i.e. “the state” (e.g. “word count” operation requires app to know what all words occured since start )

MapValues

only affecting values, does not change key, does not trigger re-partitioning, both for KStreams & KTables (Stateless)

e.g. upperCased = stream.mapValues(value -> value.toUppercase());

Map

affects key and value both, triggers re-partitioning, for KStreams only

Filter / FilterNot

takes one record & produces zero or one record, does not change key/value, hence does not trigger repartitioning, both for KStreams & KTables

e.g. KStream<String, Long> onlyPos =stream.filter((key,value)-> value>0);

FlatMap / FlatMapValues

FlatMapValues does not change keys (hence no re-partitioning) ,for Kstreams. Flattens/splits the values into multiple records with same key

FlatMap changes both key & value (re-partitioning) , for KStreams

words=sentences.flatMapValues(value->Arrays.asList(value.split(“\\s+”)));

Branch

Split a KStream based on one or more predicates. If no predicates match, record is dropped. You get multiple KStreams as result

e.g. KStream<String, Long> [] branches = streams.branch(

(key, value) -> value > 100,

(key,value) -> value > 10,

(key,value) -> value > 0 );

SelectKey

Assigns new key to the record in KStream

e.g. rekeyed = streams.selectKey((key,value)->key.substring(0,1));

Peek

Allows you to apply side-effect operations to KStream and get the sam KStream as result e.g. printing stream to console, or statistics collection

stream.to (…) is used to write to Kafka topic while KStream operations

stream.through(…) to write to a topic & get a stream/table from the topic

Log Compaction , naturally only applies on KStreams

a. Any consumer reading from head of log will still see ALL messages

b. Ordering of messages is kept, LC deletes messages, but doesn’t reorder

c. Offset of a message is immutable, offsets skipped if no message available

d. Deleted records can still be seen (via delete.retention.ms , default 24hrs)

KStreams & KTable Duality

Streams as a Table: A stream can be considered as changelog of a table, where each data record in stream represent captures state change of a table

Table as a Stream: A table can be considered as a snapshot, of the latest value of each key in stream

KTable to KStream:

KStream<> stream = ktable.toStream()

KStream to KTable:

KTable<> ktable = stream.toTable(Materialized.as(“stream-conv-to-table”))

or, write to kafka intermediate topic, and read back as KTable

Exactly-Once

StreamsConfig.PROCESSING_GUARANTEE_CONFIG = StreamsConfig.EXACTLY_ONCE

With v0.11, Producers are idempotent i.e. if ack is not received in time and message is resent, if it is same message, Kafka will make sure to keep only one copy of it. Plus now you can write multiple messages to different topics as one transaction (either all are written, or none)

ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG=true (Producer Properties)

State Store

Stateful operations alone will require some intermediate state store, while options are :

  1. Fault-tolerant in-memory SS , or 2. Recoverable persistent SS

while Stores can be each of type

  1. KeyValueStore, 2. SessionStore, 3. WindowStore, or 4. ReadOnlyStore

While using Stores, it is extremely important how Keys of messages are defined, otherwise inconsistent local processings & local storage ( by different tasks over different partitions where key sent that message ) might happen. If key of the original messages is already fixed, then a new key assignment (triggering re-partitioning based on new key) could be a solution via intermediate processing in topology ( i.e. through() processor )

Behind the scene even local state stores are backed up in Kafka topic referred as ‘State Store Change Log’

Aggregation

  • count() — simply count, nothing else
  • reduce() : No need to initialize values & state store as all types are known
  • aggregate() : Explicit initializations needed for values & state store

8. AVRO

SerDe Data-types

byte[]
ByteBuffer
Double
Integer
Long
String

Avro Schemas

Primitive Types: null, boolean, int (32 bit), long (64 bit), float (32 bit), double (64 bit), bytes (8 -bits), string (unicode character sequence)

Complex Types: Enums, Arrays, Maps, Unions, other schemas as Types

Enums: are the fields whose values can be enumerated (e.g. customer type)

Arrays: List of undefined size of same schema (e.g. cusomer emails)

Maps: Key Values (e.g. secret questions)

Unions: Can allow a field to take different types. For example, if a field that could be either an int or null, then the union is represented as [“int”, “null”]. If defaults are defined, they must correspond to the first type in the union

Logical Types: To give more meaning to primitive types, e.g. decimal (bytes), date (int, number of days since epoch), time-millis (long)

Avro Record Schemas Fields

  1. Name: name of your schema
  2. Namespace: name space of your schema, equivalent to package in Java
  3. Doc: documentation to explain schema
  4. Aliases: OPTIONAL, optional other names of schema
  5. Fields: Array of fields, each would contain:
  • Name: name of field
  • Doc: documentation of the field
  • Type: Primitive Type of Field
  • Default: Default value of that field

Schema Evolution

Avro enables us to evolve our schema over time e.g. today might capture some information about customer, tomorrow might be more information items

Backward Schema Evolution: New schema can read old data (Adding new field in new schema with default value helps to accomplish that)

Forward Schema Evolution: Old schema can read new data (Deleting fields without default is NOT forward compatible change )

Full Schema Evolution: Backward & Forward both (Only add fields with defaults, only remove fields that have defaults )

Breaking Schema Evolution: None of two (e.g. adding/removing elements from enum, renaming a field that has no default, changing data type of a field).

Note: If you do not provide a default value for a field, you cannot delete that field from your schema without breaking

Confluent Schema Registry

  • Store/retrieve schemas for producer/consumers
  • Enforce backward/forward/full schema compatibility on topics
  • Decrease size of payload sent to Kafka
  • Schema does not live in Kafka but in CSR, only Schema ID lives in Kafka

To write an Avro producer, you need to pull in Confluent Avro serializer dependencies and specify serializer being KafkaAvroSerializer

If you are reading an Avro topic and want to deserialise it into a SpecificRecord, set specific.avro.reader=true

Schema is sent to CSR, Content without Schema but reference to Schema ID is sent to Kafka

Client protocols used by Schema Registry: Http & Https

Confluent REST Proxy

Content-Type: application/vnd.kafka.avro.v2+json

Accept: application/vnd.kafka.v2+json

JSON, Binary, Or Protobuf : What data formats are not natively available with the Confluent REST Proxy ? (A: protobuf )

How to handle: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields ?key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=
false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=
false

9. KSQL

auto.offset.reset property, by default is “latest” i.e. stream upon creation reads messages afterwards when they arrive. Can be changed to “earliest”

ksql.streams.state.dir property helps to define rocksDB storage directory for stateful operations

Different ways to create stream in KSQL

  • From existing Topic (Formats: AVRO, DELIMITED, JSON, JSON_SR, PROTOBUF )
  • Via Count / join
  • Persistent Streaming Query

Additional Fields in Every KSQL Stream

ROWTIME — Timestamp of Message

ROWKEY — Key of the Topic

Re-Keying a Stream & Materialising it to support Querying :

CREATE STREAM TEST_TOPIC_REKEY AS SELECT * FROM TEST_TOPIC PARTITION BY field_name ;

CREATE TABLE MATERIALIZED_TOPIC WITH (KAFKA_TOPIC=’TEST_TOPIC_REKEY’, VALUE_FORMAT =’DELIMITED’, KEY=’field_name’);

CREATE STREAM MY_STREAM_REKEYED WITH (partitions=1) AS SELECT * FROM MY_STREAM ( helpful when joins require co-partitions )

Pull & Push Queries

Pull queries are typically ANSI SQL complaint, can be only against materialized tables (not just streams), and WHERE clause supports only key column constraints

Push queries are like subscription to a query result in future, ends with EMIT CHANGES, supports all SQL filtering, does not by default persist result in topic, for persistence use “CREATE [STREAM | TABLE ] AS SELECT … EMIT CHANGES” in addition

Merging Two Streams

CREATE STREAM WORLD_STREAM AS SELECT “Europe” AS DATA_SRC , * FROM EUROPE_STREAM

INSERT INTO WORLD_STREAM SELECT “USA” AS DATA_SRC , * FROM USA_STREAM

Windowing in KSQL

TOPK function allows you to select the top K values for a given key for a given window. For example, to compute the the 5 highest value orders per zip code per hour:

SELECT orderzip_code, TOPK(order_total, 5) FROM orders WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode;

TOPKDISTINCT function is similar to the TOPK function, except that it will output the topK distinct values for a given key for a given window. For example, to print the 5 latest page views for each page:

SELECT pageid, TOPKDISTINCT(viewtime, 5) FROM pageviews_users GROUP BY pageid;

GEO_DISTANCE is unique filtering construct for Geospatial data synthesis provided in KSQL. For example:

SELECT user, GEO_DISTANCE(from_lattitutde, from_longitude, to_lattitude, to_longitude, ‘km’) as dist ;

10. KAFKA CONNECT

is available by REST APIs for the purpose of

  1. Get Worker information
  2. List Connectors available on a Worker
  3. Ask about Active Connectors
  4. Get Information about Connector Task & Config
  5. Get Connector Status
  6. Pause/Resume a Connector
  7. Delete a Connector
  8. Create a Connector
  9. Update Connector Configuration
  10. Get Connector Configuration

Tasks are essentially consumer threads, and if 10 partitions & tasks.max = 5, then each task to receive 2 partitions

Internal TOPICS of Kafka Connect:

  1. Connector Config
  2. Task Config
  3. Offset
  4. Status

Source Partition & Source Offsets allow Kafka connect to know which source you have been reading & to track until when you have been reading. They are different than partition & offsets for Kafka, but for Kafka Connect Source

Common Worker Configurations

bootstrap.servers , key.converter , value.converter , internal.key.converter , internal.value.converter , offset.flush.interval.ms , offset.flush.timeout.ms , plugin.path , rest.advertised.host.name , rest.advertised.listener, response.http.headers.config , task.shutdown.graceful.timeout.ms , key.converter.schema.registry.url , value.converter.schema.registry.url

Distributed Worker Configurations

group.id , config.storage.topic , config.storage.replication.factor , offset.storage.topic , offset.storage.replication.factor , offset.storage.partitions , status.storage.topic , status.storage. replication.factor , status.storage.partitions , connections.max.idle.ms (default 540000) , receive.buffer.bytes (default 32768 ) , request.timeout.ms (default 40000) , send.buffer.bytes (default 131072) , worker.sync.timeout.ms (default 3000 ) , worker.unsync.backoff.ms (default 300000 ), metadata.max.age.ms (default 300000), reconnect.backoff.ms (Wait before trying to reconnect host, default 50 ) , retry.backoff.ms (Wait before attempting to retry failed fetch from topic partition, default 100 )

JDBC Sink Configuration Options

connection.url , connection.user, connection.password , insert.mode (valid values insert | upsert ), batch.size , table.name.format (default ${topic} ) , pk.mode ( valid values none | kafka | record_key primitive/struct | record_value struct ), fields.whitelist , auto.create (of destination table), auto.evolve ( upon schema change, trigger ALTER SQL automatically) , max.retries , retry.backoff.ms

11. KAFKA SECURITY

Authentication can take a few forms

  1. SSL Authentication: Client authenticate to Kafka using SSL certificates
  2. SASL Authentication: PLAIN (client authenticate using username/password — weak & easy to setup ), KERBEROS (such as GSSAPI Microsoft AD groups — strong & hard to setup), and SCRAM (username & password — strong & medium to setup)
  3. Authentication in future: OAUTHBEARER for OAuth2 support

SASL Kafka Server Configurations

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093, SASL_SSL://0.0.0.0:9094

advertised.listeners=PLAINTEXT://<public_dns>:9092,SSL://<public_dns>:9093, SASL_SSL://<public_dns>:9094

sasl.enabled.mechanism:GSSAPI

sasl.kerberos.service.name=kafka

ssl.keystore.location=

ssl.keystore.password=

ssl.key.password=

ssl.trustscore.location=<server_truststore.jks>

zookeeper.set.acl=true #Does not affect topics already created

SASL Kafka Consumer Configurations

security.protocol=SASL_SSL

sasl.kerberos.service.name=kafka

ssl.truststore.location=<client_truststore.jks>

ssl.truststore.password=

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;

Kafka Client JAAS Config

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/tmp/reader.user.keytab”
principal=”reader@KAFKA.SECURE”;
};

Access Control Lists (ACLs)

(Support restricting access to cluster, topics separately. Stored in ZooKeeper)

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:admin;User:kafka

allow.everyone.if.no.acl.found=false

security.inter.broker.protocol=SASL_SSL

Listing all ACLs

$ kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper:2181

Adding an ACL

$ kafka-acls.sh --add --allow-principal User:Bob --allow-principal User:Alice --allow-hosts Host1,Host2 --operations Read,Write --topic Test-topic

Zookeeper Configurations for ACLs

authProvider=org.apace.zookeeper.server.auth.SASLAuthenticationProvider

jaasLoginRenew=3600000

kerberos.remoeHostFromPrincipal=true

kerberos.removeRealmFromPrincipal=true

For older created topics zookeeper-security-migration can be used to transform non-secured topic znodes to secure ones

— — —

That’s all :)

--

--

Responses (11)