kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用於在Spring項目里快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了很多高級功能,下面我們就來一一探秘這些用法。

項目地址:github.com/spring-proje

簡單集成

引入依賴

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>

添加配置

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

測試發送和接收

/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {

private final Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}" , input);
}
}

啟動應用後,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制台看到有日誌輸出了:input value: "kl"。基礎的使用就這麼簡單。發送消息時注入一個KafkaTemplate,接收消息時添加一個@KafkaListener註解即可。

Spring-kafka-test嵌入式Kafka Server

不過上面的代碼能夠啟動成功,前提是你已經有了Kafka Server的服務環境,我們知道Kafka是由Scala + Zookeeper構建的,可以從官網下載部署包在本地部署。但是,我想告訴你,為了簡化開發環節驗證Kafka相關功能,Spring-Kafka-Test已經封裝了Kafka-test提供了註解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文後面的所有測試用例的Kafka都是使用這種嵌入式服務提供的。

引入依賴

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.6.RELEASE</version>
<scope>test</scope>
</dependency>

啟動服務

下面使用Junit測試用例,直接啟動一個Kafka Server服務,包含四個Broker節點。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads()throws IOException {
System.in.read();
}
}

如上:只需要一個註解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務,是不是很酷。默認只寫註解不加參數的情況下,是創建一個隨機埠的Broker,在啟動的日誌中會輸出具體的埠以及默認的一些配置項。不過這些我們在Kafka安裝包配置文件中的配置項,在註解參數中都可以配置,下面詳解下@EmbeddedKafka註解中的可設置參數 :

  • value:broker節點數量
  • count:同value作用一樣,也是配置的broker的節點數量
  • controlledShutdown:控制關閉開關,主要用來在Broker意外關閉時減少此Broker上Partition的不可用時間

Kafka是多Broker架構的高可用服務,一個Topic對應多個partition,一個Partition可以有多個副本Replication,這些Replication副本保存在多個Broker,用於高可用。但是,雖然存在多個分區副本集,當前工作副本集卻只有一個,默認就是首次分配的副本集【首選副本】為Leader,負責寫入和讀取數據。當我們升級Broker或者更新Broker配置時需要重啟服務,這個時候需要將partition轉移到可用的Broker。下面涉及到三種情況

  1. 直接關閉Broker:當Broker關閉時,Broker集群會重新進行選主操作,選出一個新的Broker來作為Partition Leader,選舉時此Broker上的Partition會短時不可用
  2. 開啟controlledShutdown:當Broker關閉時,Broker本身會先嘗試將Leader角色轉移到其他可用的Broker上
  3. 使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動觸發PartitionLeader角色轉移
  • ports:埠列表,是一個數組。對應了count參數,有幾個Broker,就要對應幾個埠號
  • brokerProperties:Broker參數設置,是一個數組結構,支持如下方式進行Broker參數設置:

@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})

  • okerPropertiesLocation:Broker參數文件設置

功能同上面的brokerProperties,只是Kafka Broker的可設置參數達182個之多,都像上面這樣配置肯定不是最優方案,所以提供了載入本地配置文件的功能,如:

@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

創建新的Topic

默認情況下,如果在使用KafkaTemplate發送消息時,Topic不存在,會創建一個新的Topic,默認的分區數和副本數為如下Broker參數來設定

num.partitions = 1 #默認Topic分區數
num.replica.fetchers = 1 #默認副本數

程序啟動時創建Topic

/**
* @author: kl @kailing.pub
* @date: 2019/5/31
*/
@Configuration
public class KafkaConfig {
@Bean
public KafkaAdmin admin(KafkaProperties properties){
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
admin.setFatalIfBrokerNotAvailable(true);
return admin;
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic-kl", 1, (short) 1);
}
}

如果Kafka Broker支持(1.0.0或更高版本),則如果發現現有Topic的Partition 數少於設置的Partition 數,則會新增新的Partition分區。關於KafkaAdmin有幾個常用的用法如下:

setFatalIfBrokerNotAvailable(true):默認這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業務需要顯示的將這個值設置為True

setAutoCreate(false) : 默認值為True,也就是Kafka實例化後會自動創建已經實例化的NewTopic對象

initialize():當setAutoCreate為false時,需要我們程序顯示的調用admin的initialize()方法來初始化NewTopic對象

代碼邏輯中創建

有時候我們在程序啟動時並不知道某個Topic需要多少Partition數合適,但是又不能一股腦的直接使用Broker的默認設置,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:

@Autowired
private KafkaProperties properties;
@Test
public void testCreateToipc(){
AdminClient client = AdminClient.create(properties.buildAdminProperties());
if(client !=null){
try {
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic("topic-kl",1,(short) 1));
client.createTopics(newTopics);
}catch (Throwable e){
e.printStackTrace();
}finally {
client.close();
}
}
}

ps:其他的方式創建Topic

上面的這些創建Topic方式前提是你的spring boot版本到2.x以上了,因為spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程序中通過Kafka_2.10創建Topic的方式

引入依賴

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>

api方式創建

@Test
public void testCreateTopic()throws Exception{
ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
String topicName = "topic-kl";
int partitions = 1;
int replication = 1;
AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
}

注意下ZkClient最後一個構造入參,是一個序列化反序列化的介面實現,博主測試如果不填的話,創建的Topic在ZK上的數據是有問題的,默認的Kafka實現也很簡單,就是做了字元串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經實現好的一個介面實例,是一個Scala的伴生對象,在Java中直接調用點MODULE$就可以得到一個實例

命令方式創建

@Test
public void testCreateTopic(){
String [] options= new String[]{
"--create",
"--zookeeper","127.0.0.1:2181",
"--replication-factor", "3",
"--partitions", "3",
"--topic", "topic-kl"
};
TopicCommand.main(options);
}

消息發送之KafkaTemplate探秘

獲取發送結果

非同步獲取

template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
......
}

@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
....
}
});

同步獲取

ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
try {
SendResult<Object,Object> result = future.get();
}catch (Throwable e){
e.printStackTrace();
}

kafka事務消息

默認情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務消息發送能力的。需要使用如下配置激活事務特性。事務激活後,所有的消息發送只能在發生事務的方法內執行了,不然就會拋一個沒有事務交易的異常

spring.kafka.producer.transaction-id-prefix=kafka_tx.

當發送消息有事務要求時,比如,當所有消息發送成功才算成功,如下面的例子:假設第一條消費發送後,在發第二條消息前出現了異常,那麼第一條已經發送的消息也會回滾。而且正常情況下,假設在消息一發送後休眠一段時間,在發送第二條消息,消費端也只有在事務方法執行完成後才會接收到消息

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.executeInTransaction(t ->{
t.send("topic_input","kl");
if("error".equals(input)){
throw new RuntimeException("failed");
}
t.send("topic_input","ckl");
return true;
});
}

當事務特性激活時,同樣,在方法上面加@Transactional註解也會生效

@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) {
template.send("topic_input", "kl");
if ("error".equals(input)) {
throw new RuntimeException("failed");
}
template.send("topic_input", "ckl");
}

Spring-Kafka的事務消息是基於Kafka提供的事務消息功能的。而Kafka Broker默認的配置針對的三個或以上Broker高可用服務而設置的。這邊在測試的時候為了簡單方便,使用了嵌入式服務新建了一個單Broker的Kafka服務,出現了一些問題:如

1、事務日誌副本集大於Broker數量,會拋如下異常:

Number of alive brokers 1 does not meet the required replication factor 3
for the transactions state topic (configured via transaction.state.log.replication.factor).
This error can be ignored if the cluster is starting up and not all brokers are up yet.

默認Broker的配置transaction.state.log.replication.factor=3,單節點只能調整為1

2、副本數小於副本同步隊列數目,會拋如下異常

Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

默認Broker的配置transaction.state.log.min.isr=2,單節點只能調整為1

ReplyingKafkaTemplate獲得消息回復

ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類的方法,新增了一個方法sendAndReceive,實現了消息發送回復語義

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

也就是我發送一條消息,能夠拿到消費者給我返回的結果。就像傳統的RPC交互那樣。當消息的發送者需要知道消息消費者的具體的消費情況,非常適合這個api。如,一條消息中發送一批數據,需要知道消費者成功處理了哪些數據。下面代碼演示了怎麼集成以及使用ReplyingKafkaTemplate

/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}

@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate(pf, repliesContainer);
}

@Bean
public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate(pf);
}

@Autowired
private ReplyingKafkaTemplate template;

@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
System.err.println("Return value: " + consumerRecord.value());
}

@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}
}

Spring-kafka消息消費用法探秘

@KafkaListener的使用

前面在簡單集成中已經演示過了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:

  • 顯示的指定消費哪些Topic和分區的消息,
  • 設置每個Topic以及分區初始化的偏移量,
  • 設置消費線程並發度
  • 設置消息異常處理器

@KafkaListener(id = "webGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},concurrency = "6",errorHandler = "myErrorHandler")
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}

其他的註解參數都很好理解,errorHandler需要說明下,設置這個參數需要實現一個介面KafkaListenerErrorHandler。而且註解里的配置,是你自定義實現實例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應該存在這樣一個實例:

/**
* @author: kl @kailing.pub
* @date: 2019/5/31
*/
@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
Logger logger =LoggerFactory.getLogger(getClass());
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
logger.info(message.getPayload().toString());
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
logger.info(message.getPayload().toString());
return null;
}
}

手動Ack模式

手動ACK模式,由業務邏輯控制提交偏移量。比如程序在消費時,有這種語義,特別異常情況下不確認ack,也就是不提交偏移量,那麼你只能使用手動Ack模式來做了。開啟手動首先需要關閉自動提交,然後設置下consumer的消費模式

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

上面的設置好後,在消費時,只需要在@KafkaListener監聽方法的入參加入Acknowledgment 即可,執行到ack.acknowledge()代表提交了偏移量

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input, Acknowledgment ack) {
logger.info("input value: {}", input);
if ("kl".equals(input)) {
ack.acknowledge();
}
return "successful";
}

@KafkaListener註解監聽器生命周期

@KafkaListener註解的監聽器的生命周期是可以控制的,默認情況下,@KafkaListener的參數autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來干預他的生命周期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續。如下代碼詳細演示了這種功能。

/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Autowired
private KafkaTemplate template;

@GetMapping("/send/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public void sendFoo(@PathVariable String input) throws Exception {
ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
template.send(record);
}

@Autowired
private KafkaListenerEndpointRegistry registry;

@GetMapping("/stop/{listenerID}")
public void stop(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).pause();
}
@GetMapping("/resume/{listenerID}")
public void resume(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).resume();
}
@GetMapping("/start/{listenerID}")
public void start(@PathVariable String listenerID){
registry.getListenerContainer(listenerID).start();
}
@KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}
}

在上面的代碼中,listenerID就是@KafkaListener中的id值「webGroup」。項目啟動好後,分別執行如下url,就可以看到效果了。

先發送一條消息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以並不會看到有消息進入監聽器。

接著啟動監聽器:http://localhost:8081/start/webGroup。可以看到有一條消息進來了。

暫停和繼續消費的效果使用類似方法就可以測試出來了。

SendTo消息轉發

前面的消息發送響應應用裡面已經見過@SendTo,其實除了做發送響應語義外,@SendTo註解還可以帶一個參數,指定轉發的Topic隊列。常見的場景如,一個消息需要做多重加工,不同的加工耗費的cup等資源不一致,那麼就可以通過跨不同Topic和部署在不同主機上的consumer來解決了。如:

@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo("topic-ckl")
public String listen(String input) {
logger.info("input value: {}", input);
return input + "hello!";
}

@KafkaListener(id = "webGroup1", topics = "topic-ckl")
public void listen2(String input) {
logger.info("input value: {}", input);
}

消息重試和死信隊列的應用

除了上面談到的通過手動Ack模式來控制消息偏移量外,其實Spring-kafka內部還封裝了可重試消費消息的語義,也就是可以設置為當消費數據出現異常時,重試這個消息。而且可以設置重試達到多少次後,讓消息進入預定好的Topic。也就是死信隊列里。下面代碼演示了這種效果:

@Autowired
private KafkaTemplate template;

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//最大重試三次
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
return factory;
}

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.send("topic-kl", input);
}

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input) {
logger.info("input value: {}", input);
throw new RuntimeException("dlt");
}

@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
public void dltListen(String input) {
logger.info("Received from DLT: " + input);
}

上面應用,在topic-kl監聽到消息會,會觸發運行時異常,然後監聽器會嘗試三次調用,當到達最大的重試次數後。消息就會被丟掉重試死信隊列裡面去。死信隊列的Topic的規則是,業務Topic名字+「.DLT」。如上面業務Topic的name為「topic-kl」,那麼對應的死信隊列的Topic就是「topic-kl.DLT」

文末結語

最近業務上使用了kafka用到了Spring-kafka,所以系統性的探索了下Spring-kafka的各種用法,發現了很多好玩很酷的特性,比如,一個註解開啟嵌入式的Kafka服務、像RPC調用一樣的發送響應語義調用、事務消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。

來源:開源中國

原文:my.oschina.net/keking/b

福利:

配套資料:

以及在線視頻教程:

SpringBoot框架從入門到實踐|Spring Boot視頻課程 - 蛙課視頻?

www.wkcto.com
圖標

推薦閱讀:
相关文章