怎么在SpringBoot中使用KafkaAdminClient集群管理工具

怎么在Spring Boot中使用KafkaAdminClient集群管理工具?相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

創新互聯是少有的網站設計制作、成都網站設計、營銷型企業網站、小程序開發、手機APP,開發、制作、設計、賣友情鏈接、推廣優化一站式服務網絡公司,自2013年起,堅持透明化,價格低,無套路經營理念。讓網頁驚喜每一位訪客多年來深受用戶好評

原理介紹

在Kafka官網中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

  • 創建Topic:createTopics(Collection<NewTopic> newTopics)

  • 刪除Topic:deleteTopics(Collection<String> topics)

  • 羅列所有Topic:listTopics()

  • 查詢Topic:describeTopics(Collection<String> topicNames)

  • 查詢集群信息:describeCluster()

  • 查詢ACL信息:describeAcls(AclBindingFilter filter)

  • 創建ACL信息:createAcls(Collection<AclBinding> acls)

  • 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)

  • 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)

  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)

  • 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)

  • 查詢節點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)

  • 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)

  • 增加分區:createPartitions(Map<String, NewPartitions> newPartitions)

其內部原理是使用Kafka自定義的一套二進制協議來實現,詳細可以參見Kafka協議。主要實現步驟:

客戶端根據方法的調用創建相應的協議請求,比如創建Topic的createTopics方法,其內部就是發送CreateTopicRequest請求。
客戶端發送請求至Kafka Broker。

Kafka Broker處理相應的請求并回執,比如與CreateTopicRequest對應的是CreateTopicResponse。
客戶端接收相應的回執并進行解析處理。

和協議有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。

代碼如下

@Component
public class KafkaConfig{

   // 配置Kafka
  public Properties getProps(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
/*    props.put("retries", 2); // 重試次數
    props.put("batch.size", 16384); // 批量發送大小
    props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置
    props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送*/
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

}
@RestController
public class KafkaTopicManager {

  @Autowired
  private KafkaConfig kafkaConfig;

  @GetMapping("createTopic")
  public void createTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    NewTopic newTopic = new NewTopic("test1",4, (short) 1);
    Collection<NewTopic> newTopicList = new ArrayList<>();
    newTopicList.add(newTopic);
    adminClient.createTopics(newTopicList);

    adminClient.close();
  }
  @GetMapping("deleteTopic")
  public void deleteTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    adminClient.deleteTopics(Arrays.asList("test1"));
    adminClient.close();
  }
  @GetMapping("listAllTopic")
  public void listAllTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    ListTopicsResult result = adminClient.listTopics();
    KafkaFuture<Set<String>> names = result.names();
    try {
      names.get().forEach((k)->{
        System.out.println(k);
      });
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    adminClient.close();
  }
  @GetMapping("getTopic")
  public void getTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

    Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

    if(values.isEmpty()){
      System.out.println("找不到描述信息");
    }else{
      for (KafkaFuture<TopicDescription> value : values) {
        System.out.println(value);
      }
    }
    adminClient.close();
  }
}

看完上述內容,你們掌握怎么在Spring Boot中使用KafkaAdminClient集群管理工具的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注創新互聯行業資訊頻道,感謝各位的閱讀!

文章名稱:怎么在SpringBoot中使用KafkaAdminClient集群管理工具
文章源于:http://m.kartarina.com/article8/pihsop.html

成都網站建設公司_創新互聯,為您提供網站營銷動態網站虛擬主機響應式網站外貿建站定制網站

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

成都app開發公司
主站蜘蛛池模板: 无码精品国产VA在线观看| 无码日本电影一区二区网站| 无码激情做a爰片毛片AV片| 亚洲av无码天堂一区二区三区 | 久久亚洲中文字幕无码| 亚洲最大天堂无码精品区| av无码东京热亚洲男人的天堂 | 亚洲国产综合无码一区二区二三区| 久久亚洲中文字幕无码| 亚洲ⅴ国产v天堂a无码二区| 亚洲av无码一区二区三区四区| 亚洲精品无码久久久久久| 最新无码A∨在线观看| 无码射肉在线播放视频| 亚洲AV无码欧洲AV无码网站| 无码人妻精品丰满熟妇区| 亚洲av永久无码精品古装片| 无码毛片一区二区三区中文字幕 | 久久久久久国产精品免费无码| 精品国产一区二区三区无码| 狠狠躁天天躁无码中文字幕图| 无码中文字幕乱在线观看| 亚洲精品人成无码中文毛片 | 亚洲动漫精品无码av天堂| 无码av无码天堂资源网| 精品国产一区二区三区无码| 亚洲精品无码午夜福利中文字幕| 亚洲精品中文字幕无码AV| 久久精品成人无码观看56| 亚洲精品无码AV中文字幕电影网站| 亚洲av无码成人黄网站在线观看 | 国产成人无码AV片在线观看| 亚洲AV无码乱码在线观看富二代 | 中文字幕无码一区二区免费| 下载天堂国产AV成人无码精品网站| 亚洲国产精品无码久久SM| 亚洲色偷拍另类无码专区| 十八禁视频在线观看免费无码无遮挡骂过 | 亚洲一级特黄大片无码毛片| 伊人久久一区二区三区无码| 无码国产精品一区二区免费虚拟VR|