Flink 1.14.5 Kafka Connector 案例

Apache Kafka Connector

Flink 提供了一个 Apache Kafka 连接器,用于从 Kafka Topic 读取数据和向 Kafka Topic 写入数据,并保证恰好一次次语义。

Dependency

Apache Flink 附带了一个通用的 Kafka 连接器,它试图跟踪最新版本的 Kafka 客户端。它使用的客户端版本可能会在 Flink 版本之间发生变化。最近的 Kafka 客户端向后兼容 broker 版本 0.10.0 或更高版本。关于 Kafka 兼容性的详细信息,请参考 Kafka 官方 文档。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.0</version>
</dependency>

Flink 的流连接器目前不是二进制发行版的一部分。在此处 查看如何与它们链接以进行集群执行。

Kafka Source

❝本部分介绍基于新 数据源 API 的 Kafka Source。❞

Usage

Kafka Source 提供了一个 builder 类来构建 KafkaSource 的实例。下面的代码片段展示了如何构建一个 KafkaSource 来消费来自主题 “input-topic” 最早偏移量的消息,消费者组是“my-group”,并且仅将消息的值反序列化为字符串。

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

构建 KafkaSource 「需要」以下属性:

  • Bootstrap servers,通过 setBootstrapServers(String)来配置
  • Topics / partitions to subscribe,请参阅以下 主题-分区订阅 以了解更多详细信息。
  • Deserializer to parse Kafka messages,更多详细信息请参见以下 Deserializer。

    Topic-partition Subscription

Kafka 源码提供了 3 种 topic-partition 订阅方式:

主题列表,订阅主题列表中所有分区的消息。例如:

KafkaSource.builder().setTopics("topic-a", "topic-b")

主题模式,从名称与提供的正则表达式匹配的所有主题订阅消息。例如:

KafkaSource.builder().setTopicPattern("topic.*")

分区集,订阅提供的分区集中的分区。例如:

final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
        new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
        new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet)

Deserializer

解析 Kafka 消息需要一个反序列化器。Deserializer(反序列化模式)可以通过 配置setDeserializer(KafkaRecordDeserializationSchema),其中KafkaRecordDeserializationSchema定义了如何反序列化一个 Kafka ConsumerRecord

如果只需要 Kafka ConsumerRecord的值,可以使用 setValueOnlyDeserializer(DeserializationSchema)在 builder 中使用,其中DeserializationSchema定义了如何反序列化 Kafka 消息值的二进制文件。

你还可以使用 Kafka Deserializer 来反序列化 Kafka 消息值. 例如使用 StringDeserializer 将 Kafka 消息值反序列化为字符串:

import org.apache.kafka.common.serialization.StringDeserializer;

KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringSerializer.class));

Starting Offset

Kafka Source 能够通过指定 OffsetsInitializer来消费从不同偏移量开始的消息。内置的初始值设定项包括:

KafkaSource.builder()
    // Start from committed offset of the consuming group, without reset strategy
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // Start from the first record whose timestamp is greater than or equals a timestamp
    .setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
    // Start from earliest offset
    .setStartingOffsets(OffsetsInitializer.earliest())
    // Start from latest offset
    .setStartingOffsets(OffsetsInitializer.latest())

如果上面的内置初始值设定项无法满足你的要求,您还可以实现自定义偏移量初始值设定项。
如果未指定 offsets 初始值设定项,「则」默认使用 「OffsetsInitializer.earliest()」

Boundedness

Kafka Source 旨在支持流式和批量运行模式。默认情况下,KafkaSource 设置为以流式方式运行,因此永远不会停止,直到 Flink 作业失败或被取消。您可以使用setBounded(OffsetsInitializer)指定停止偏移量并设置以批处理模式运行的源。当所有分区都达到它们的停止偏移量时,Source 将退出。

您还可以将 KafkaSource 设置为在流模式下运行,但仍然使用setUnbounded(OffsetsInitializer). 当所有分区达到其指定的停止偏移量时,Source 将退出。

Additional Properties

除了上述属性外,您还可以使用setProperties(Properties)和为 KafkaSource 和 KafkaConsumer 设置任意属性setProperty(String, String)。KafkaSource 有以下配置选项:

  • client.id.prefix 定义用于 Kafka 消费者的客户端 ID 的前缀
  • partition.discovery.interval.ms定义 Kafka 源发现新分区的时间间隔 im 毫秒。有关更多详细信息,请参阅下面的 动态分区发现。
  • register.consumer.metrics 指定是否在 Flink 指标组中注册 KafkaConsumer 的指标
  • commit.offsets.on.checkpoint 指定是否在检查点向 Kafka broker 提交消费偏移量
    KafkaConsumer 的配置可以参考 Apache Kafka文档 了解更多。

请注意,即使配置了以下键,构建器也会覆盖它:

  • key.deserializer 始终设置为 ByteArrayDeserializer
  • value.deserializer 始终设置为 ByteArrayDeserializer
  • auto.offset.reset.strategy被起始偏移量覆盖OffsetsInitializer#getAutoOffsetResetStrategy()
  • partition.discovery.interval.ms被调用时被覆盖为 -1setBounded(OffsetsInitializer)
    下面的代码片段显示了配置 KafkaConsumer 以使用“PLAIN”作为 SASL 机制并提供 JAAS 配置:
KafkaSource.builder()
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" passw

Dynamic Partition Discovery

为了在不重启 Flink 作业的情况下处理主题扩展或主题创建等场景,可以将 Kafka 源配置为在提供的主题-分区订阅模式下定期发现新分区。要启用分区发现,请为 property 设置一个非负值partition.discovery.interval.ms

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000") // discover new partitions per 10 seconds

❝默认情况下「禁用」分区发现。您需要明确设置分区发现间隔才能启用此功能。❞

Event Time and Watermarks

默认情况下,记录将使用嵌入在 Kafka 中的时间戳ConsumerRecord作为事件时间。您可以定义自己WatermarkStrategy的从记录本身提取事件时间,并在下游发出水印:

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

本文档 描述了有关如何定义WatermarkStrategy.

Consumer Offset Committing

Kafka source 在 checkpoint 「完成」时提交当前消费的 offset ,以保证 Flink 的 checkpoint 状态和 Kafka brokers 上的 commit offset 的一致性。

如果未启用检查点,则 Kafka 源依赖于 Kafka 消费者内部的自动定期偏移提交逻辑,由Kafka 消费者的属性配置enable.auto.commit并在其属性中配置auto.commit.interval.ms

需要注意的是 Kafka source 不依赖提交偏移量来实现容错。提交偏移量只是为了暴露消费者和消费组的进度以供监控。

Monitoring

Kafka Source 在各自的 范围内 公开以下指标。

Scope of Metric#

ScopeMetricsUser VariablesDescriptionType
OperatorcurrentEmitEventTimeLagn/a从记录事件时间戳到源连接器发出记录的时间跨度¹: currentEmitEventTimeLag = EmitTime - EventTime.Gauge
OperatorwatermarkLagn/a水印滞后于墙时钟时间的时间跨度: watermarkLag = CurrentTime - WatermarkGauge
OperatorsourceIdleTimen/a源没有处理任何记录的时间跨度: sourceIdleTime = CurrentTime - LastRecordProcessTimeGauge
OperatorpendingRecordsn/a源尚未提取的记录数。例如 Kafka 分区中消费者偏移后的可用记录。Gauge
OperatorKafkaSourceReader.commitsSucceededn/a如果偏移提交被打开并且检查点被开启,那么成功的偏移提交到 Kafka 的总数。Counter
OperatorKafkaSourceReader.commitsFailedn/a如果打开偏移提交并启用检查点,则向 Kafka 提交偏移提交失败的总数。请注意,将偏移量提交回 Kafka 只是暴露消费者进度的一种方式,因此提交失败不会影响 Flink 的检查点分区偏移量的完整性。Counter
OperatorKafkaSourceReader.committedOffsetstopic, partition对于每个分区,最后一次成功提交到 Kafka 的偏移量。可以通过主题名称和分区 id 指定特定分区的指标。Gauge
OperatorKafkaSourceReader.currentOffsetstopic, partition每个分区的消费者当前读取偏移量。可以通过主题名称和分区 id 指定特定分区的指标。Gauge

¹ 该指标是为最后处理的记录记录的瞬时值。提供此指标是因为延迟直方图可能很昂贵。瞬时延迟值通常足以很好地指示延迟。

Kafka Consumer Metrics#

Kafka 消费者的所有指标也都注册在 group 下KafkaSourceReader.KafkaConsumer。例如,Kafka 消费者指标“records-consumed-total”将在指标中报告:<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total

您可以通过配置 option 来配置是否注册 Kafka 消费者的指标register.consumer.metrics。默认情况下,此选项将设置为 true。

对于 Kafka 消费者的指标,您可以参考 Apache Kafka 文档 了解更多详细信息。

Behind the Scene

❝如果您对 Kafka Source 在新数据源 API 的设计下如何工作感兴趣,您可能需要阅读此部分作为参考。有关新数据源 API 的详细信息,数据源 文档 和FLIP-27 提供了更多描述性讨论。❞
在新数据源 API 的抽象下,Kafka Source 由以下组件组成:

Source Split#

Kafka Source 中的一个源拆分代表 Kafka 主题的一个分区。Kafka Source 拆分包括:

  • TopicPartition 分裂代表
  • 分区的起始偏移量
  • 停止分区的偏移量,仅在源以有界模式运行时可用
    Kafka source split 的状态也存储了partition 的当前消费 offset,当 Kafka source reader 为 snapshot 时,状态会转换为 immutable split,将当前 offset 赋值给immutable split 的起始偏移量。

您可以查看类 KafkaPartitionSplitKafkaPartitionSplitState`了解更多详情。

Split Enumerator#

Kafka 的拆分枚举器负责在提供的主题分区订阅模式下发现新的拆分(分区),并将拆分分配给读者,以循环方式均匀分布在子任务中。请注意,Kafka Source 的拆分枚举器会急切地将拆分推送到源阅读器,因此它不需要处理来自源阅读器的拆分请求。

Source Reader#

Kafka source 的 source reader 扩展了提供的SourceReaderBase,并使用单线程多路复用线程模型,该模型读取多个分配的拆分(分区),一个 KafkaConsumer 由一个 驱动SplitReader。消息在从 Kafka 中获取后立即反序列化SplitReader。拆分的状态或消息消费的当前进度由 更新KafkaRecordEmitter,它还负责在记录向下游发出时分配事件时间。

Kafka SourceFunction

FlinkKafkaConsumer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSource。❞
对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Sink

KafkaSink 允许将记录流写入一个或多个 Kafka 主题。

Usage

Kafka sink 提供了一个 builder 类来构造一个 KafkaSink 的实例。下面的代码片段显示了如何将字符串记录写入 Kafka 主题,并保证至少一次交付。

DataStream<String> stream = ...
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build()
        )
        .build();
        
stream.sinkTo(sink);

构建 KafkaSink 「需要」以下属性:

  • Bootstrap servers, setBootstrapServers(String)
  • Record serializer, setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果您配置交货保证 DeliveryGuarantee.EXACTLY_ONCE 你也必须设置 setTransactionalIdPrefix(String)`

    Serializer

你始终需要提供一个KafkaRecordSerializationSchema以将传入元素从数据流转换为 Kafka 生产者记录。Flink 提供了一个模式构建器来提供一些常见的构建块,即键/值序列化、主题选择、分区。您也可以自行实现接口以施加更多控制。

KafkaRecordSerializationSchema.builder()
    .setTopicSelector((element) -> {<your-topic-selection-logic>})
    .setValueSerializationSchema(new SimpleStringSchema())
    .setKeySerializationSchema(new SimpleStringSchema())
    .setPartitioner(new FlinkFixedPartitioner())
    .build();

「需要」始终设定的值序列化方法和一个主题(选择方法)。此外,还可以通过使用setKafkaKeySerializer(Serializer)或来使用 Kafka 序列化器代替 Flink 序列化器setKafkaValueSerializer(Serializer)

Fault Tolerance

总的来说,KafkaSink支持三种不同的DeliveryGuarantees。ForDeliveryGuarantee.AT_LEAST_ONCEDeliveryGuarantee.EXACTLY_ONCEFlink 的检查点必须启用。默认情况下KafkaSink使用DeliveryGuarantee.NONE. 您可以在下面找到对不同保证的解释。

  • DeliveryGuarantee.NONE 不提供任何保证:如果 Kafka broker 出现问题,消息可能会丢失,如果 Flink 故障,消息可能会重复。
  • DeliveryGuarantee.AT_LEAST_ONCE:接收器将等待 Kafka 缓冲区中所有未完成的记录在检查点上由 Kafka 生产者确认。如果 Kafka 代理出现任何问题,则不会丢失任何消息,但是当 Flink 重新启动时,消息可能会重复,因为 Flink 会重新处理旧的输入记录。
  • DeliveryGuarantee.EXACTLY_ONCE:在这种模式下,KafkaSink 将写入 Kafka 事务中的所有消息,这些消息将在检查点上提交给 Kafka。因此,如果消费者只读取提交的数据(参见 Kafka 消费者配置隔离级别),则在 Flink 重启的情况下不会看到重复数据。但是,这会有效地延迟记录可见性,直到写入检查点,因此相应地调整检查点持续时间。请确保在同一 Kafka 集群上运行的应用程序中使用唯一的 transactionalIdPrefix,这样多个正在运行的作业不会干扰它们的事务!此外,强烈建议调整 Kafka 事务超时(请参阅 Kafka 生产者 transaction.timeout.ms)» 最大检查点持续时间 + 最大重启持续时间或当 Kafka 未提交的事务到期时可能会发生数据丢失。

    Monitoring

Kafka sink 在各自的 scope 中 公开以下指标。

ScopeMetricsUser VariablesDescriptionType
OperatorcurrentSendTimen/a发送最后一条记录所花费的时间。这个度量是为最后处理的记录记录的瞬时值。Gauge

Kafka Producer

FlinkKafkaProducer已弃用,将随 Flink 1.15 一起删除,请使用 KafkaSink。❞
对于较旧的参考,您可以查看 Flink 1.13文档。

Kafka Connector Metrics

Flink 的 Kafka 连接器通过 Flink 的指标系统提供了一些指标来分析连接器的行为。生产者和消费者通过 Flink 的所有支持版本的指标系统导出 Kafka 的内部指标。Kafka 文档在其文档中列出了所有导出的指标。

也可以register.consumer.metrics通过本节概述的 KafkaSource 配置或在使用 KafkaSink 时禁用 Kafka 指标的转发,您可以通过生产者属性将配置设置register.producer.metrics为 false。

Enabling Kerberos Authentication

Flink 通过 Kafka 连接器提供一流的支持,以对为 Kerberos 配置的 Kafka 安装进行身份验证。只需配置 Flinkflink-conf.yaml即可为 Kafka 启用 Kerberos 身份验证,如下所示:

  1. 通过设置以下内容来配置 Kerberos 凭据 -
    • security.kerberos.login.use-ticket-cache:默认情况下,这是trueFlink 将尝试在由kinit. 请注意,在 YARN 上部署的 Flink 作业中使用 Kafka 连接器时,使用票证缓存的 Kerberos 授权将不起作用。
    • security.kerberos.login.keytabsecurity.kerberos.login.principal:要改用 Kerberos 密钥表,请为这两个属性设置值。
  2. 附加KafkaClientsecurity.kerberos.login.contexts:这告诉 Flink 将配置的 Kerberos 凭据提供给 Kafka 登录上下文以用于 Kafka 身份验证。
    启用基于 Kerberos 的 Flink 安全性后,您可以使用 Flink Kafka Consumer 或 Producer 向 Kafka 进行身份验证,只需在传递给内部 Kafka 客户端的提供的属性配置中包含以下两个设置:
  • 设置security.protocolSASL_PLAINTEXT(默认NONE):用于与 Kafka 代理通信的协议。使用独立的 Flink 部署时,也可以使用SASL_SSL; 请在此处查看如何为 SSL 配置 Kafka 客户端。
  • 设置sasl.kerberos.service.namekafka(默认kafka):此值应与用于 Kafka 代理配置的值相匹配sasl.kerberos.service.name。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
    有关 Kerberos 安全性的 Flink 配置的更多信息,请参阅此处。您还可以在此处找到有关 Flink 如何在内部设置基于 Kerberos 的安全性的更多详细信息。

Upgrading to the Latest Connector Version

升级作业和 Flink 版本指南中概述了通用升级步骤。对于 Kafka,您还需要执行以下步骤:

  • 请勿同时升级 Flink 和 Kafka Connector 版本。
  • 确保您为您的消费者配置了一个group.id
  • 在消费者上设置setCommitOffsetsOnCheckpoints(true),以便将读取偏移量提交给 Kafka。在停止并获取保存点之前执行此操作很重要。您可能必须在旧的连接器版本上执行停止/重新启动循环才能启用此设置。
  • 在消费者上设置setStartFromGroupOffsets(true),以便我们从 Kafka 获得读取偏移量。这只有在 Flink 状态下没有读取偏移时才会生效,这也是下一步非常重要的原因。
  • 更改源/接收器的分配uid。这确保新的源/接收器不会从旧的源/接收器操作符读取状态。
  • 开始新作业,--allow-non-restored-state因为我们在保存点中仍然拥有先前连接器版本的状态。

    Troubleshooting

❝如果您在使用 Flink 时遇到 Kafka 问题,请记住,Flink 只包装了KafkaConsumer或KafkaProducer,您的问题可能与 Flink 无关,有时可以通过升级 Kafka brokers、重新配置 Kafka brokers 或重新配置KafkaConsumerKafkaProducerin Flink 来解决。下面列出了一些常见问题的示例。❞

Data loss

根据您的 Kafka 配置,即使在 Kafka 确认写入之后,您仍然可能会遇到数据丢失的情况。特别要记住 Kafka 配置中的以下属性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*
    上述选项的默认值很容易导致数据丢失。更多解释请参考 Kafka 文档。

UnknownTopicOrPartitionException

此错误的一个可能原因是正在进行新的领导者选举时,例如在重新启动 Kafka Broker 之后或期间。这是一个可重试的异常,因此 Flink 作业应该能够重新启动并恢复正常运行。它也可以通过更改retries生产者设置中的属性来规避。然而,这可能会导致消息重新排序,反过来,如果不需要,可以通过设置为 1 来规避max.in.flight.requests.per.connection

ProducerFencedException

此异常的原因很可能是代理端的事务超时。随着KAFKA-6119的实施,(producerId, epoch)将在事务超时后被隔离,并且其所有挂起的事务都被中止(每个transactional.id都映射到一个单独的事务producerId;这在下面的博客文章中有更详细的描述)。

发表回复