Kafka Architect
-
Publish-Subscribe Model:
- Kafka follows a publish-subscribe model
- Producers send messages to topics.
- Consumers subscribe to topics to receive messages.
-
Topics and Partitions:
- Topics categorize messages into logical streams.
- Each topic can have multiple partitions for scalability.
- Partitions enable parallel processing and data distribution.
-
Brokers:
- Kafka cluster consists of multiple brokers.
- Each broker stores data and serves client requests.
- Brokers collaborate for data replication and failover.
-
Leader and Replicas:
- Each partition has a leader broker and replica brokers.
- Leader handles read and write operations for a partition.
- Replicas ensure data redundancy and fault tolerance.
-
Producers:
- Producers send messages to Kafka topics.
- Producers can specify which partition to send to.
- Messages are persisted and replicated by Kafka.
-
Consumers:
- Consumers read messages from topics.
- Consumer groups allow parallel consumption.
- Each message is consumed by only one consumer in a group.
-
Consumer Offsets:
- Kafka tracks consumer progress using offsets.
- Offsets indicate the position of a consumer in a partition.
- Consumers manage their own offsets to control what they read.
-
Retention and Compaction:
- Kafka retains messages for a configurable time.
- Messages can be compacted to retain only the latest version.
-
ZooKeeper (deprecated in recent versions):
- Used for metadata management and coordination.
- Maintains cluster information, leader election, and more.
- Kafka is transitioning away from ZooKeeper in newer versions.
-
Kafka Connect:
- Framework for integrating Kafka with external systems.
- Handles data import/export between Kafka and databases, storage, etc.
-
Kafka Streams:
- Library for building real-time stream processing applications.
- Enables transformation and analysis of data within Kafka.
-
Exactly-Once Semantics:
- Kafka supports "exactly-once" message processing.
- Ensures that messages are neither lost nor duplicated during processing.
-
Horizontal Scalability:
- Kafka can scale horizontally by adding more brokers.
- Supports high-throughput, fault-tolerant data pipelines.
Producer Acks
-
Producer in Kafka provide Acks mechanism, used for guarantee data from producer to broker
- acks=0: Similar to "fire-and-forget," sending a message without waiting for a response. This can lead to situations where messages are lost.
- acks=1: Default setting. This time is more certain. The producer waits until it receives a response from the replication leader. However, it doesn't completely prevent message loss. The replication leader writes the message successfully and reports back to the producer, but the broker might encounter issues with disk and data recovery might not be possible.
- acks=all: This time it's very certain, ensuring no message loss. The producer will receive a response only when all replication leaders and in-sync replicas (ISRs) successfully write the data.
-
Example setting:
Properties properties = new Properties();
properties.put("acks","all");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Delivery Guarantees
- At Most Once: Messages might be missed if a consumer crashes before processing after saving its position.
- At Least Once: Messages could be processed multiple times if a consumer crashes after processing but before saving its position.
- Exactly Once: Kafka achieves this by using transactional producers and storing consumer positions with messages. Failures ensure consistency, and isolation levels control message visibility.
Idempotent Producers
- Idempotent Producers config
// create Producer properties
Properties properties = new Properties();
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
Data Retention Policy
- How long (default: 1 week)
- Set globally or per topic
Ordering
Kafka does not guarantee ordering of messages between partitions. It does provide ordering within a partition. Therefore, Kafka can maintain message ordering for a consumer if it is subscribed to only a single partition.
If message ordering is required in your use case, the messages should be ordered using a key to be grouped by during processing.
Use Case:
- Metrics − Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
- Log Aggregation Solution − Kafka can be used across an organization to collect logs from multiple services and make them available in a standard format to multiple consumers.
- Stream Processing − Popular frameworks such as Storm and Spark Streaming read data from a topic, processes it, and write processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.
Example Code
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("acks","all");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
References:
-
Documents:
- Slide13 - Kafka.pdf1
Welcome to here!
Here we can learn from each other how to use SiYuan, give feedback and suggestions, and build SiYuan together.
Signup About