Confluent Kafka — Study Notes for CCDAK
Here are my study notes which I prepared and studied to clear my CCDAK certification. These may appear haphazard, but they carry the meaningful knowledge. Though the certification is Confluent Certification, but it is for “Apache Kafka”, hence reading Apache Kafka documentation would be ultimate source of truth, not just Confluent’s :-)
1. PRODUCER CONFIGURATIONS
- acks ( how many ack before message is produced. acks=-1 means ALL)
- increasing batching with linger.ms (instead of producer sending as soon as data available, it lingers on this much more to batch the size up )
- optimizing batch.size
- max.inflight.requests.per.connection (>1 can affect ordering unless enable.idempotence is true)
- enable.idempotence (Ensuring exactly once )
- buffer.memory (default 32M)
Retriable & Non-Retriable Errors
- Not_Leader_For_Partition (RETRIABLE, Producer automatically recovers)
- Message_Too_Large (Fix config or message size first!)
- Topic_Authorization_Failed (Fix authorization or ACLs first!)
- Not_Enough_Replicas (RETRIABLE)
- Invalid_Required_Acks (Fix configuration first!)
- Leader_Not_Available (RETRIABLE, Producer automatically recovers)
Full List -> https://kafka.apache.org/protocol#protocol_error_codes
2. CONSUMER CONFIGURATIONS
Consumer tend to be more complicated than Producer coz
- It has to take care of OFFSET
- There is concept of Consumer Group
GroupCoordinator is a service running on server and is responsible for management of consumer & offset. When a broker starts it will start a GroupCoordinator Service. Broker where partition leader is located , is the GroupCoordinator responsible to the group. When creating a KafkaConsumer, ConsumerCoordinator is created first which is responsible for communicating with Group Coordinator
__consumer_offsets is the topic used internally by Kafka to store group consumption. By default there are 50 partitions and each partition defaults to 3 copies
Subscription Types
- Default, AUTO_TOPICS
- Pattern Mode, AUTO_PATTERN (subscription by regex pattern of topic names)
- Assign Mode, USER_ASSIGNED (partitions assigned in further detail to a consumer)
What happens inside consumer poll(…)
- Check if heartbeat thread is running normally
- Subscribe to topic. If ConsumerCoordinator is unknown, send FindCoordinator request to establish connection
- Determine if you need to re-join a group. If subscription’s (or assigned) partition changes, you need to rejoin, send joinGroup request followed by syncGroup request, and finally ensureActiveGroup(..)
Auto-Commit: If auto-commit is set to TRUE and sending time limit is reached, auto-commit offset is automatically set
What can trigger Consumer Rebalancing
- New Consumer joins Consumer Group
- Consumer goes offline ( heartbeat not sent for long)
- Consumer actively withdraws from Consumer Group via unsubscribe(…)
- GroupCoordinator node corresponding to Consumer Group has changed
- Number of partitions of ANY topic, or topic subscribed to Consumer Group, has changed ( partitions can only be increased via kafka topics CLI not decreased)
Note: subscribe() & assign() both cannot be called by same consumer and will lead to IllegalStateException
How to make a consumer read message from specific partition of given topic? ( A: assign() syntax with partition number ,or partition list, parameter)
To delay rebalance use group.initial.rebalance.delay.ms configuration
Detecting Consumer Failures
- max.poll.interval.ms → By increasing you give more time to consumer to handle batch of records returned from the poll. But increasing may delay consumer group rebalance since that happens inside call to poll()
- max.poll.records → To limit total number of records returned from single call to poll()
Multi-Threading : Kafka Producer is thread-safe, but Consumer is NOT thread-safe
- Ensuring multi-thread access is responsibility of user
- wakeUp() can be safely called from an external thread to interrupt active operation
One Consumer Per Thread Approach:
- Pros: Easy to implement, Fastest and no inter-thread coordination, and In-order processing on per-partition basis is easy
- Cons: More consumers mean more TCP connections to cluster, More requests being sent to server and slightly less batching of data, and Number of threads will still be limited by the number of partitions
Decoupled Consumption & Processing:
Pros: Independent scaling number of consumers and processors
Cons:
- Guaranteeing order across processes require care. For processing which requires no ordering, it is not a problem
- Manual commit of position becomes harder as it requires that all threads coordinate to ensure processing complete for that partition
What is the default consumer group id assignment in Kafka Consumer CLI? (A: It creates off a random consumer group id, unless specified by the property — consumer-property group.id=your_group )
3. PARTITIONS & ASSIGNMENTS
Partitions are made of segments (.log file), and each segment comes with two indices (.index as position index, and .timeindex to help find message by timestamp )
If producer writes at 1 GB/sec and consumer consumes at 250MB/sec then requires 4 partition. Partition assigners can be following types:
- Range
- RoundRobin
- Sticky
- Message Key to Partition Assignment, how it works and can be controlled ? — Custom Partitioner
It is not possible to delete a partition of topic once created
Kafka provides a CLI kafka-reassign-partitions.sh which can take two input types viz. “Topics-to-Move JSON” (specifies the topics that you want to reassign) or “Reassignment Configuration JSON” (Purposes: 1. Reassign partitions between brokers 2. Reassign partitions to another log directory on the same broker 3. Reassign partitions between log directories across multiple brokers 4. Change partition assignment order i.e. for electing a new leader )
To find partitions without leader: $bin/kafka-topics.sh --zookeeper localhost:2181 --describe --unavailable-partitions
4. BROKER CONFIGURATIONS
- default.replication.factor : applies to auto-created topics. Should be set at least 2 or 3(default 1 )
- min.insync.replicas : Matters only when acks=ALL. This is minimum number of replicas in ISR needed to commit a produce request. Set 1 for single node cluster, else producer will fail (Invalid_Required_Acks). Also qualified as Topic setting
- unclean.leader.election.enable : Whether to enable replicas not in ISR set to be elected as leader as the last resort, even though might result in data-loss
- num.replication.fetchers: defines number of threads that will be replicating data from leader to the follower (default 1 )
- replica.fetch.max.bytes: how much data we want to fetch from any partition in each fetch request. Higer value helps to create replica fast in the followers
- replica.socket.receive.buffer.bytes: If replication thread is slow compared to incoming message rate, it will help to hold more data
- num.partitions : Level of parallelism and writing data in parallel that will increase the throughput. However if system does not have sufficient threads or disks, then does not make sense to create more partitions
- num.io.threads : How much disk we have in our cluster, that decides setting value for I/O threads
- advertised.listeners: Critical setting to ensure cluster availability in private network or whole internet. If clients are on private network, set either internal private IP address, or internal private DNS hostname. If on public network, then external IP address, or external public DNS hostname pointing to public IP
- logs.dirs : comma separated values of logs dir of difrferent machines
- delete.topic.enable: Enable topic deletion, default false
- log.retention.hours: Minimum age of log file before deleting, 168 default
- log.segment.bytes : Size of log file, default 1G
- log.retention.check.interval.ms: Interval at which log segments are checked to see if they can be deleted
- message.max.bytes: 1000012 (1M by default)
- num.io.threads : default 8
- num.recovery.threads.per.data.dir=1 (set to number of disks )
- offset.retention.minutes: default 1440, i.e. after 24 hours deleted
- zookeeper.connect: Comma separated ZK cluster notes ending with “/kafka”
- zookeeper.connection.timeout.ms : default 6000. i.e. 6 seconds
- quota.producer.default : default 10485760 (i.e. 10 MB/s)
- quota.consumer.default: default 10485760 (i.e. 10 MB/s)
- broker.rack = null (set as per availability zones / location of broker)
Kafka Disk Configurations: Modern Linux distributions come with default of only 1024 file descriptors allowed per process. This is too low for Kafka, ideally at least 100.000
Kafka Memory Usage Configurations: Minimum RAM 8G for Broker, and 4G for Kafka Java process heap, rest all goes for OS Page Cache! (KAFKA_HEAP_OPTS=”Xmx4g”). Prefer not to set min heap size
Disable swapping for Kafka entirely i.e. vm.swappiness=0 (Default is 60)
What are dynamic broker configurations and where they are stored? (Answer: in Zookeeper)
What are total number of brokers that can go down while acks=all is setup? (Answer: Replication Factor minus min.insync.replicas value )
5. ZOOKEEPER
- Distributed Config Management by odd number of servers in quorum
- Self-election & Consensus Building
- Coordination & Locks
- Key Value Stores
- Zookeeper cluster needs to have an odd number of servers, and must maintain a majority of servers up to be able to vote. Therefore, a 2N+1 zookeeper cluster can survive to N zookeeper being down
Role in Kafka:
- Broker Registration with heartbeat mechanism to keep list current
- Maintaining a list of topics along a. Configuration of Topics, b. List of ISRs
- Performing Leader Election, in case brokers go down
- Storing Kafka Cluster ID (generated random at first start of cluster)
- Storing ACLs towards Topics, Consumer Groups, and Users
- Quota Configurations, but Consumer offsets no longer in ZK
- ClientPort: 2181, Peer Post: 2888, LeaderPort: 3888
ZK Properties:
data.dir : Location of data directory
maxClientCnxns: Zero means unlimited
tickTime : e.g. 2000, Heartbeat interval
initLimit : e.g. 10. How many times of tickTime it can take to sync, else fail
syncLimit: How many syncs can be passed between request & ack ( e.g. if set to 5, if 5 x tickTime is passed, it will report fail )
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
(Purge all but 3 snapshots, every 24 hours)
Four Letter Words:
For these 4lw commands to work, this property in zoo.cfg must be set
4lw.commands.whitelist=*
ruok ( R you ok ? ), conf (print serving configuration) , cons (print list of connections ), crst (print connection/session statistics), envi (print details of environment variables ), srst (Reset server statistics), mntr (list of variables that can be used in monitoring), dump (only works on leader Node )
e.g. echo “ruok” | nc localhost 2181; echo
Persistent znodes
Persistent znodes are permanent and have to be deleted explicitly by the client. They stay even after the session that created the znode is terminated.
Ephemeral znodes
Ephemeral znodes are temporary. These znodes are deleted automatically when the client session creating them ends. Ephemeral znodes are used to detect the termination of a client.
6. TOPIC CONFIGURATIONS
cleanup.policy : “compact” for enabling log compaction
min.cleanable.dirty.ratio : (default 0.5) Low value to ensure log compactions
segment.ms : Every milliseconds Kafka should roll out new segment
message.timestamp.type : either CreateTime (message creation time by producer) , or LogAppendTime ( when received at broker)
7. KAFKA STREAMS
Streams DSL & Processor API are the options, while first is recommended, and second requiring more manual work
Stream configuration is most commonly presented in Java builder pattern object called Topology, which includes series of operations, finally a build method, start, and adding of shutdown hook
Stateless Operations: branch, filter, inverseFilter, flatMap, flatMapValues, foreach, groupByKey, groupBy, map, mapValues
Stateful Operations: join, aggregate, count, reduce, windowing
TimeStampExtractor Contract
TimeStamp Extractors:
- FailOnInvalidTimeStamp
- LogAndSkipOnInvalidTimeStamp
- UsePreviousTimeOnInvalidTimeStamp
- Event Time: use CustomTimeStampExtractor to read time from message value
- Ingestion Time: Configure topic for message.timestamp.type=LogAppendTime, and use above extractors
- Processing Time: use WallclockTimestampExtractor
Windowing Types:
Tumbling window
- Time based, Fixed Size, Non overlapping, Gap less windows
- E.g. if window-size=5min and advance-interval =5min then it looks like [0–5min] [5min-10min] [10min-15min]
Hopping Window
- Time based, Fixed Size, Overlapping
- E.g if window-size=5min and advance-interval=3min then it looks like [0–5min][3–8min][6–11min]
Sliding Window
- Fixed Size, Overlapping, works on differences between timestamps
- Used for “join” operations alone
Session Window
- Session based, dynamically sized, Non-overlapping
- Used only for “aggregate” key based events into a session
Running Kafka Streams may create internal topics(prefixed by application.id)
a. Repartitioning Topics: in case you start transforming key of your stream, a re-partitioning will happen at some processor
b. ChangeLog Topics: In case you perform aggregation, Kafka streams will store compacted data in these topics
KStreams support all inserts, as in a log aka data streams everthing appended
KTable support all upserts, on non-null values & deletes on null values like any table or log-compacted topic (in a way insert, update, delete all possible)
Which one to use?
- Reading from a topic that is not compacted, use KTable when log compacted (aggregations)
- Use Kstream if every new data is partial or transactional information, however KTable when every update is self-sufficient
Stateless versus Stateful Operations of KStreams/KTables
Stateless: means that result of transformation only depend on the data-point you process ( no other prior info required, e.g “multiply by 5" is stateless )
Stateful: means that result of the transformation also depend on external information i.e. “the state” (e.g. “word count” operation requires app to know what all words occured since start )
MapValues
only affecting values, does not change key, does not trigger re-partitioning, both for KStreams & KTables (Stateless)
e.g. upperCased = stream.mapValues(value -> value.toUppercase());
Map
affects key and value both, triggers re-partitioning, for KStreams only
Filter / FilterNot
takes one record & produces zero or one record, does not change key/value, hence does not trigger repartitioning, both for KStreams & KTables
e.g. KStream<String, Long> onlyPos =stream.filter((key,value)-> value>0);
FlatMap / FlatMapValues
FlatMapValues does not change keys (hence no re-partitioning) ,for Kstreams. Flattens/splits the values into multiple records with same key
FlatMap changes both key & value (re-partitioning) , for KStreams
words=sentences.flatMapValues(value->Arrays.asList(value.split(“\\s+”)));
Branch
Split a KStream based on one or more predicates. If no predicates match, record is dropped. You get multiple KStreams as result
e.g. KStream<String, Long> [] branches = streams.branch(
(key, value) -> value > 100,
(key,value) -> value > 10,
(key,value) -> value > 0 );
SelectKey
Assigns new key to the record in KStream
e.g. rekeyed = streams.selectKey((key,value)->key.substring(0,1));
Peek
Allows you to apply side-effect operations to KStream and get the sam KStream as result e.g. printing stream to console, or statistics collection
stream.to (…) is used to write to Kafka topic while KStream operations
stream.through(…) to write to a topic & get a stream/table from the topic
Log Compaction , naturally only applies on KStreams
a. Any consumer reading from head of log will still see ALL messages
b. Ordering of messages is kept, LC deletes messages, but doesn’t reorder
c. Offset of a message is immutable, offsets skipped if no message available
d. Deleted records can still be seen (via delete.retention.ms , default 24hrs)
KStreams & KTable Duality
Streams as a Table: A stream can be considered as changelog of a table, where each data record in stream represent captures state change of a table
Table as a Stream: A table can be considered as a snapshot, of the latest value of each key in stream
KTable to KStream:
KStream<> stream = ktable.toStream()
KStream to KTable:
KTable<> ktable = stream.toTable(Materialized.as(“stream-conv-to-table”))
or, write to kafka intermediate topic, and read back as KTable
Exactly-Once
StreamsConfig.PROCESSING_GUARANTEE_CONFIG = StreamsConfig.EXACTLY_ONCE
With v0.11, Producers are idempotent i.e. if ack is not received in time and message is resent, if it is same message, Kafka will make sure to keep only one copy of it. Plus now you can write multiple messages to different topics as one transaction (either all are written, or none)
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG=true (Producer Properties)
State Store
Stateful operations alone will require some intermediate state store, while options are :
- Fault-tolerant in-memory SS , or 2. Recoverable persistent SS
while Stores can be each of type
- KeyValueStore, 2. SessionStore, 3. WindowStore, or 4. ReadOnlyStore
While using Stores, it is extremely important how Keys of messages are defined, otherwise inconsistent local processings & local storage ( by different tasks over different partitions where key sent that message ) might happen. If key of the original messages is already fixed, then a new key assignment (triggering re-partitioning based on new key) could be a solution via intermediate processing in topology ( i.e. through() processor )
Behind the scene even local state stores are backed up in Kafka topic referred as ‘State Store Change Log’
Aggregation
- count() — simply count, nothing else
- reduce() : No need to initialize values & state store as all types are known
- aggregate() : Explicit initializations needed for values & state store
8. AVRO
SerDe Data-types
byte[]
ByteBuffer
Double
Integer
Long
String
Avro Schemas
Primitive Types: null, boolean, int (32 bit), long (64 bit), float (32 bit), double (64 bit), bytes (8 -bits), string (unicode character sequence)
Complex Types: Enums, Arrays, Maps, Unions, other schemas as Types
Enums: are the fields whose values can be enumerated (e.g. customer type)
Arrays: List of undefined size of same schema (e.g. cusomer emails)
Maps: Key Values (e.g. secret questions)
Unions: Can allow a field to take different types. For example, if a field that could be either an int or null, then the union is represented as [“int”, “null”]. If defaults are defined, they must correspond to the first type in the union
Logical Types: To give more meaning to primitive types, e.g. decimal (bytes), date (int, number of days since epoch), time-millis (long)
Avro Record Schemas Fields
- Name: name of your schema
- Namespace: name space of your schema, equivalent to package in Java
- Doc: documentation to explain schema
- Aliases: OPTIONAL, optional other names of schema
- Fields: Array of fields, each would contain:
- Name: name of field
- Doc: documentation of the field
- Type: Primitive Type of Field
- Default: Default value of that field
Schema Evolution
Avro enables us to evolve our schema over time e.g. today might capture some information about customer, tomorrow might be more information items
Backward Schema Evolution: New schema can read old data (Adding new field in new schema with default value helps to accomplish that)
Forward Schema Evolution: Old schema can read new data (Deleting fields without default is NOT forward compatible change )
Full Schema Evolution: Backward & Forward both (Only add fields with defaults, only remove fields that have defaults )
Breaking Schema Evolution: None of two (e.g. adding/removing elements from enum, renaming a field that has no default, changing data type of a field).
Note: If you do not provide a default value for a field, you cannot delete that field from your schema without breaking
Confluent Schema Registry
- Store/retrieve schemas for producer/consumers
- Enforce backward/forward/full schema compatibility on topics
- Decrease size of payload sent to Kafka
- Schema does not live in Kafka but in CSR, only Schema ID lives in Kafka
To write an Avro producer, you need to pull in Confluent Avro serializer dependencies and specify serializer being KafkaAvroSerializer
If you are reading an Avro topic and want to deserialise it into a SpecificRecord, set specific.avro.reader=true
Schema is sent to CSR, Content without Schema but reference to Schema ID is sent to Kafka
Client protocols used by Schema Registry: Http & Https
Confluent REST Proxy
Content-Type: application/vnd.kafka.avro.v2+json
Accept: application/vnd.kafka.v2+json
JSON, Binary, Or Protobuf : What data formats are not natively available with the Confluent REST Proxy ? (A: protobuf )
How to handle: org.apache.kafka.connect.errors.DataException: JsonDeserializer with schemas.enable requires “schema” and “payload” fields and may not contain additional fields ?key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
9. KSQL
auto.offset.reset property, by default is “latest” i.e. stream upon creation reads messages afterwards when they arrive. Can be changed to “earliest”
ksql.streams.state.dir property helps to define rocksDB storage directory for stateful operations
Different ways to create stream in KSQL
- From existing Topic (Formats: AVRO, DELIMITED, JSON, JSON_SR, PROTOBUF )
- Via Count / join
- Persistent Streaming Query
Additional Fields in Every KSQL Stream
ROWTIME — Timestamp of Message
ROWKEY — Key of the Topic
Re-Keying a Stream & Materialising it to support Querying :
CREATE STREAM TEST_TOPIC_REKEY AS SELECT * FROM TEST_TOPIC PARTITION BY field_name ;
CREATE TABLE MATERIALIZED_TOPIC WITH (KAFKA_TOPIC=’TEST_TOPIC_REKEY’, VALUE_FORMAT =’DELIMITED’, KEY=’field_name’);
CREATE STREAM MY_STREAM_REKEYED WITH (partitions=1) AS SELECT * FROM MY_STREAM ( helpful when joins require co-partitions )
Pull & Push Queries
Pull queries are typically ANSI SQL complaint, can be only against materialized tables (not just streams), and WHERE clause supports only key column constraints
Push queries are like subscription to a query result in future, ends with EMIT CHANGES, supports all SQL filtering, does not by default persist result in topic, for persistence use “CREATE [STREAM | TABLE ] AS SELECT … EMIT CHANGES” in addition
Merging Two Streams
CREATE STREAM WORLD_STREAM AS SELECT “Europe” AS DATA_SRC , * FROM EUROPE_STREAM
INSERT INTO WORLD_STREAM SELECT “USA” AS DATA_SRC , * FROM USA_STREAM
Windowing in KSQL
TOPK function allows you to select the top K values for a given key for a given window. For example, to compute the the 5 highest value orders per zip code per hour:
SELECT orderzip_code, TOPK(order_total, 5) FROM orders WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode;
TOPKDISTINCT function is similar to the TOPK function, except that it will output the topK distinct values for a given key for a given window. For example, to print the 5 latest page views for each page:
SELECT pageid, TOPKDISTINCT(viewtime, 5) FROM pageviews_users GROUP BY pageid;
GEO_DISTANCE is unique filtering construct for Geospatial data synthesis provided in KSQL. For example:
SELECT user, GEO_DISTANCE(from_lattitutde, from_longitude, to_lattitude, to_longitude, ‘km’) as dist ;
10. KAFKA CONNECT
is available by REST APIs for the purpose of
- Get Worker information
- List Connectors available on a Worker
- Ask about Active Connectors
- Get Information about Connector Task & Config
- Get Connector Status
- Pause/Resume a Connector
- Delete a Connector
- Create a Connector
- Update Connector Configuration
- Get Connector Configuration
Tasks are essentially consumer threads, and if 10 partitions & tasks.max = 5, then each task to receive 2 partitions
Internal TOPICS of Kafka Connect:
- Connector Config
- Task Config
- Offset
- Status
Source Partition & Source Offsets allow Kafka connect to know which source you have been reading & to track until when you have been reading. They are different than partition & offsets for Kafka, but for Kafka Connect Source
Common Worker Configurations
bootstrap.servers , key.converter , value.converter , internal.key.converter , internal.value.converter , offset.flush.interval.ms , offset.flush.timeout.ms , plugin.path , rest.advertised.host.name , rest.advertised.listener, response.http.headers.config , task.shutdown.graceful.timeout.ms , key.converter.schema.registry.url , value.converter.schema.registry.url
Distributed Worker Configurations
group.id , config.storage.topic , config.storage.replication.factor , offset.storage.topic , offset.storage.replication.factor , offset.storage.partitions , status.storage.topic , status.storage. replication.factor , status.storage.partitions , connections.max.idle.ms (default 540000) , receive.buffer.bytes (default 32768 ) , request.timeout.ms (default 40000) , send.buffer.bytes (default 131072) , worker.sync.timeout.ms (default 3000 ) , worker.unsync.backoff.ms (default 300000 ), metadata.max.age.ms (default 300000), reconnect.backoff.ms (Wait before trying to reconnect host, default 50 ) , retry.backoff.ms (Wait before attempting to retry failed fetch from topic partition, default 100 )
JDBC Sink Configuration Options
connection.url , connection.user, connection.password , insert.mode (valid values insert | upsert ), batch.size , table.name.format (default ${topic} ) , pk.mode ( valid values none | kafka | record_key primitive/struct | record_value struct ), fields.whitelist , auto.create (of destination table), auto.evolve ( upon schema change, trigger ALTER SQL automatically) , max.retries , retry.backoff.ms
11. KAFKA SECURITY
Authentication can take a few forms
- SSL Authentication: Client authenticate to Kafka using SSL certificates
- SASL Authentication: PLAIN (client authenticate using username/password — weak & easy to setup ), KERBEROS (such as GSSAPI Microsoft AD groups — strong & hard to setup), and SCRAM (username & password — strong & medium to setup)
- Authentication in future: OAUTHBEARER for OAuth2 support
SASL Kafka Server Configurations
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093, SASL_SSL://0.0.0.0:9094
advertised.listeners=PLAINTEXT://<public_dns>:9092,SSL://<public_dns>:9093, SASL_SSL://<public_dns>:9094
sasl.enabled.mechanism:GSSAPI
sasl.kerberos.service.name=kafka
ssl.keystore.location=
ssl.keystore.password=
ssl.key.password=
ssl.trustscore.location=<server_truststore.jks>
zookeeper.set.acl=true #Does not affect topics already created
SASL Kafka Consumer Configurations
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=<client_truststore.jks>
ssl.truststore.password=
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;
Kafka Client JAAS Config
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=”/tmp/reader.user.keytab”
principal=”reader@KAFKA.SECURE”;
};
Access Control Lists (ACLs)
(Support restricting access to cluster, topics separately. Stored in ZooKeeper)
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:kafka
allow.everyone.if.no.acl.found=false
security.inter.broker.protocol=SASL_SSL
Listing all ACLs
$ kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper:2181
Adding an ACL
$ kafka-acls.sh --add --allow-principal User:Bob --allow-principal User:Alice --allow-hosts Host1,Host2 --operations Read,Write --topic Test-topic
Zookeeper Configurations for ACLs
authProvider=org.apace.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
kerberos.remoeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true
For older created topics zookeeper-security-migration can be used to transform non-secured topic znodes to secure ones
— — —
That’s all :)