Kafka
# Kafka
# 什么是Kafka
Kafka 是一个分布式流处理平台,它以高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。它能够支撑海量数据的数据传递,在离线和实时的消息处理业务系统中都有广泛的应用。
Kafka 的应用场景包括:
- 日志收集:可以用 Kafka 收集各种服务的 log,通过 Kafka 以统一接口服务的方式开放给各种 consumer,例如 Hadoop、Hbase、Solr 等。
- 消息系统:解耦生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 Kafka 的 topic 中,然后消费者通过订阅这些 topic 来做实时的监控分析,亦可保存到数据库。
- 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如 spark streaming 和 storm。
是的,您提到的这些都是 Kafka 的基础概念。下面是一个简单的 Java 代码示例,它演示了如何使用 Kafka 的 Producer 和 Consumer API 来发送和接收消息。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Collections;
import java.util.Properties;
public class KafkaExample {
public static void main(String[] args) {
// 设置 Kafka 服务器地址
String bootstrapServers = "localhost:9092";
// 设置 topic 名称
String topic = "my-topic";
// 创建 Producer 配置
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", bootstrapServers);
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 KafkaProducer 实例
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
producer.send(record);
// 关闭 KafkaProducer
producer.close();
// 创建 Consumer 配置
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("group.id", "my-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建 KafkaConsumer 实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
// 订阅 topic
consumer.subscribe(Collections.singletonList(topic));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ": " + record.value());
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
在这个示例中,我们首先创建了一个 KafkaProducer
实例,用来向指定的 topic 发送消息。然后,我们创建了一个 KafkaConsumer
实例,并订阅了指定的 topic。最后,我们使用 poll
方法来拉取并处理消息。
# Kafka的一些基础概念
- Broker:Kafka 集群中的一个服务器节点。
- Topic:消息的类别,生产者将消息发送到特定的 topic,消费者订阅特定的 topic 来接收消息。
- Partition:topic 的分区,每个 topic 可以分为多个 partition,每个 partition 可以在不同的 broker 上存储。
- Offset:消息在 partition 中的唯一标识,Kafka 通过它来保证消息在 partition 内的顺序性。
- Producer:生产者,负责向 Kafka 发送消息。
- Consumer:消费者,负责从 Kafka 接收消息并进行处理。
- Consumer Group:消费者组,由多个消费者组成。每个消费者可以消费不同 partition 的数据,实现负载均衡。
- Zookeeper:Kafka 使用 Zookeeper 来存储集群元数据和进行协调。
# 为什么offset可以保证在分区的顺序
Offset 是消息在 partition 中的唯一标识,它表示了消息在 partition 中的位置,相当于消息的位置下标。Kafka 保证在同一个 partition 中,消息的 offset 是单调递增的。也就是说,后面发送的消息的 offset 一定比前面发送的消息的 offset 大。
例如,假设我们向一个 partition 发送了三条消息,它们的 offset 分别为 0、1 和 2。消费者在消费这些消息时,会按照 offset 的顺序进行消费,也就是先消费 offset 为 0 的消息,然后消费 offset 为 1 的消息,最后消费 offset 为 2 的消息。
因此,通过维护消息的 offset,Kafka 能够保证在同一个 partition 中,消息是有序的。
# 生产者只设置Topic、key、value,消费者会出现顺序问题嘛
当您在生产消息时,只需要指定 topic、key 和 value 即可。Kafka 会根据您指定的 key 来决定将消息发送到哪个 partition。如果您没有指定 key,Kafka 会使用轮询算法来选择 partition。
在同一个 partition 中,Kafka 能够保证消息的顺序。也就是说,如果您的消费者只消费一个 partition 的数据,那么它接收到的消息顺序与生产者发送的消息顺序是一致的。
但是,如果您的 topic 有多个 partition,那么不同 partition 之间的消息顺序是无法保证的。也就是说,如果您的消费者消费多个 partition 的数据,那么它接收到的消息顺序可能与生产者发送的消息顺序不一致。
下面是一个简单的 Java 代码示例,它演示了如何使用 Kafka 的 Producer API 来发送消息:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 设置 Kafka 服务器地址
String bootstrapServers = "localhost:9092";
// 设置 topic 名称
String topic = "my-topic";
// 创建 Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建 KafkaProducer 实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key-" + i, "value-" + i);
producer.send(record);
}
// 关闭 KafkaProducer
producer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
在这个示例中,我们创建了一个 KafkaProducer
实例,并向指定的 topic 发送了 10 条消息。每条消息都有一个 key 和一个 value。
# Kafka根据key是怎么确定放在哪个分区的啊
当您在发送消息时指定了 key,Kafka 会使用默认的分区器(DefaultPartitioner
)来决定将消息发送到哪个 partition。默认分区器的工作原理是对 key 进行哈希运算,然后对 partition 的数量取模,得到最终的 partition 编号。
例如,假设您的 topic 有 3 个 partition,编号分别为 0、1 和 2。当您发送一条带有 key 的消息时,Kafka 会对 key 进行哈希运算,然后对 3 取模。如果结果为 0,那么消息将被发送到 partition 0;如果结果为 1,那么消息将被发送到 partition 1;如果结果为 2,那么消息将被发送到 partition 2。
这种方式能够保证具有相同 key 的消息总是被发送到同一个 partition。这在某些场景下非常有用,例如订单处理系统中,我们希望同一个订单的所有消息都能够被顺序处理。
需要注意的是,您也可以自定义分区器来决定消息应该发送到哪个 partition。只需要在创建 KafkaProducer
实例时指定 partitioner.class
配置项即可。
# 为什么会有消费者组啊
消费者组(Consumer Group)是 Kafka 中的一个概念,它表示一组消费者,这些消费者共同消费一个 topic 的数据。消费者组中的每个消费者都可以消费 topic 的不同 partition 的数据,实现负载均衡。
消费者组的作用主要有两个方面:
实现负载均衡:当一个 topic 有多个 partition 时,可以使用多个消费者来并行消费数据,提高处理速度。Kafka 会自动将每个 partition 分配给消费者组中的一个消费者,确保每个 partition 都能被消费。
实现容错:当消费者组中的一个消费者宕机时,Kafka 会自动将它负责的 partition 分配给其他消费者,确保数据能够继续被消费。
例如,假设您有一个 topic,它有 3 个 partition。您可以创建一个包含 3 个消费者的消费者组来并行消费这些数据。Kafka 会自动将每个 partition 分配给一个消费者,这样每个消费者都能够独立地消费一个 partition 的数据。
select id,name,age,city from table group by city order by age desc limit 1
# 为什么会有Zookeeper
Zookeeper 是一个分布式协调服务,它在 Kafka 中扮演着重要的角色。Kafka 使用 Zookeeper 来存储集群的元数据信息,并进行故障转移和领导者选举。
下面是一些 Zookeeper 在 Kafka 中的具体用途:
- 存储集群元数据:Kafka 使用 Zookeeper 来存储集群的元数据信息,例如 broker 列表、topic 和 partition 信息等。
- 领导者选举:当 Kafka 集群中的一个 broker 宕机时,Zookeeper 会负责协调其他 broker 进行领导者选举,以确保集群能够继续正常运行。
- 故障转移:当 Kafka 集群中的一个 broker 宕机时,Zookeeper 会通知其他 broker 进行故障转移,以确保数据不会丢失。
- 消费者组协调:Kafka 使用 Zookeeper 来协调消费者组中的消费者,确保每个 partition 都能被正确地分配给消费者。
总之,Zookeeper 在 Kafka 中扮演着重要的角色,它负责存储集群的元数据信息,并协调集群中各个组件的运行。
# Kafka高性能的原因
Kafka 能够提供高性能,主要是因为它采用了一些优化技术来提高吞吐量和降低延迟。下面是一些 Kafka 采用的优化技术:
- 零拷贝(Zero-Copy):Kafka 使用零拷贝技术来避免不必要的数据拷贝,从而提高数据传输的效率。
- 批量发送和压缩:Kafka 支持将多条消息打包成一个批次进行发送,并且可以对消息进行压缩,以减少网络传输的开销。
- 分区和多副本:Kafka 支持将 topic 分区到多个 broker 上,并且支持多副本,这样可以实现负载均衡和容错。
- 消费者组:Kafka 支持消费者组,允许多个消费者并行消费数据,提高处理速度。
例如,假设您有一个 Kafka 集群,它包含 3 个 broker。您可以创建一个包含 3 个 partition 的 topic,并将每个 partition 分配给一个 broker。这样,生产者可以并行地向 3 个 broker 发送数据,消费者也可以并行地从 3 个 broker 拉取数据,从而实现负载均衡。
此外,您还可以使用消费者组来进一步提高性能。例如,您可以创建一个包含 3 个消费者的消费者组来并行消费数据。Kafka 会自动将每个 partition 分配给一个消费者,这样每个消费者都能够独立地消费一个 partition 的数据。
# Kafka是拉还是推
Kafka 的消费者使用拉(pull)模式来获取数据。也就是说,消费者需要主动向 Kafka 发起请求来获取数据,而不是等待 Kafka 将数据推送给消费者。
消费者可以使用 poll
方法来拉取数据。这个方法会向 Kafka 发起请求,获取指定数量的消息。如果当前没有足够的消息可供消费,poll
方法会阻塞等待,直到有新的消息到达或超时。
下面是一个简单的 Java 代码示例,它演示了如何使用 Kafka 的 Consumer API 来拉取并处理消息:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 设置 Kafka 服务器地址
String bootstrapServers = "localhost:9092";
// 设置 topic 名称
String topic = "my-topic";
// 创建 Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建 KafkaConsumer 实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 topic
consumer.subscribe(Collections.singletonList(topic));
// 拉取并处理消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ": " + record.value());
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
在这个示例中,我们创建了一个 KafkaConsumer
实例,并订阅了指定的 topic。然后我们使用 poll
方法来拉取并处理消息。poll
方法会阻塞等待,直到有新的消息到达或超时。
Kafka 之所以使用拉模式来获取数据,是因为这种方式能够更好地支持消费者的多样性。不同的消费者可能有不同的消费速率和需求,使用拉模式能够让消费者根据自己的需求来获取数据,而不是被动地接收数据。
# 如何判断Broker存活
在Kafka中,可以使用AdminClient
类来检查Broker是否存活。AdminClient
类提供了一个名为describeCluster
的方法,该方法返回一个DescribeClusterResult
对象,其中包含有关集群的信息,包括活动节点列表。您可以检查此列表中是否包含您要检查的Broker。
Kafka集群中的每个Broker都会定期向ZooKeeper发送心跳以表明它们仍然存活。如果ZooKeeper在一段时间内未收到来自某个Broker的心跳,它会认为该Broker已经死亡,并将其从活动节点列表中删除。因此,AdminClient
类的describeCluster
方法返回的活动节点列表仅包含已向ZooKeeper发送心跳并被认为仍然存活的Broker。
下面是一个简单的Java示例,演示如何使用AdminClient
检查Broker是否存活:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.Node;
import java.util.Properties;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
public class KafkaBrokerCheck {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 配置 AdminClient
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(config);
// 获取集群信息
DescribeClusterResult clusterResult = admin.describeCluster();
Collection<Node> nodes = clusterResult.nodes().get();
// 检查 Broker 是否存活
int brokerId = 1; // 要检查的 Broker ID
boolean isBrokerAlive = nodes.stream().anyMatch(node -> node.id() == brokerId);
System.out.println("Is Broker " + brokerId + " alive: " + isBrokerAlive);
admin.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
上面的代码创建了一个AdminClient
实例,并使用它来获取集群信息。然后,它检查活动节点列表中是否包含要检查的Broker。如果列表中包含该Broker,则认为它是存活的。
# 为什么零拷贝可以提高传输效率
在Kafka中,使用零拷贝技术是为了提高数据传输的效率,减少不必要的数据拷贝和上下文切换。零拷贝技术主要包括两种:mmap
和sendfile
。
mmap
是一种内存映射文件的技术,它可以将磁盘文件映射到内存中,让应用程序可以直接操作内存而不需要通过系统调用。这样可以避免从内核缓冲区到用户缓冲区的数据拷贝,提高数据读取的速度。Kafka使用mmap
技术来实现消息的持久化,将消息写入到内存映射文件中,然后由操作系统负责将数据刷新到磁盘。
sendfile
是一种利用DMA(直接内存访问)技术来实现数据传输的技术,它可以将数据从一个文件描述符(如磁盘文件)直接发送到另一个文件描述符(如网络套接字),而不需要经过用户空间的缓冲区。这样可以避免从用户缓冲区到内核缓冲区的数据拷贝,提高数据发送的速度。Kafka使用sendfile
技术来实现消息的传输,将消息从磁盘文件直接发送到网络套接字,而不需要经过Broker进程的缓冲区。
# 为什么使用批量发送和压缩啊
在Kafka中,使用批量发送和压缩技术是为了提高数据传输的效率和降低网络带宽的使用。
批量发送是指将多条消息组合成一个批次,然后一次性发送到Broker。这样可以减少网络请求的次数,降低网络延迟,提高数据传输的效率。批量发送可以通过配置batch.size
和linger.ms
参数来实现。
压缩是指在发送消息之前,对消息进行压缩以减小消息的大小。这样可以减少网络带宽的使用,提高数据传输的速度。Kafka支持多种压缩算法,包括GZIP、Snappy和LZ4。压缩可以通过配置compression.type
参数来实现。