Appearance
汇总
- 创建Topic:
adminClient.createTopics()
- 获取Topic:
adminClient.listTopics()
- 删除Topic:
adminClient.deleteTopics()
- 查询详情:
adminClient.describeTopics()
创建Topic
java
@Slf4j
@SpringBootTest
public class KafkaTest {
/** 设置Admin客户端 */
public static AdminClient initAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");
return AdminClient.create(properties);
}
@Test
public void createTopic() {
AdminClient adminClient = initAdminClient();
// 2个分区,1个副本
NewTopic newTopic = new NewTopic(TOPIC_NAME, 2 , (short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
//future等待创建,成功不会有任何报错,如果创建失败和超时会报错。
try {
createTopicsResult.all().get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("创建新的topic");
}
}
创建多个Topic
场景说明:项目中使用枚举统一管理Topic,在项目创建的时候需要检查Topic是否创建,如果没有则创建。
java
@ToString
@AllArgsConstructor
public enum TopicEnums {
GE_TUI_MESSAGE("getui_msg", "用于直接向个推平台推送消息"),
GE_TUI_PUSH_DELAY("ge_tui_push_delay", "定时消息,用于向个推平台推送消息");
/** MQ的Topic(主题)值 */
@Getter
private final String value;
/** 简单的业务用处说明 */
private final String desc;
}
实现思路:遍历枚举类,按个尝试创建,如果已经创建会抛出异常,异常不做处理;
java
@Test
public void createTopic() {
AdminClient adminClient = initAdminClient();
// 1.获取枚举类中所有Topic名称
List<String> topicStrList = Arrays.stream(TopicEnums.values()).map(TopicEnums::getValue).collect(Collectors.toList());
// 2.创建Topic
for (String topicStr : topicStrList) {
NewTopic newTopic = new NewTopic(topicStr, 2, (short) 1);
try {
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
createTopicsResult.all().get();
log.warn( "Kafka Topic 创建成功,Topic Name:{}", topicStr);
}catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof TopicExistsException) {
log.info("Kafka Topic 已存在,Topic Name:{}", topicStr);
} else {
e.printStackTrace();
}
}
}
log.info("Kafka检查创建Topic任务完成");
}
操作和查看Topic
获取所有Topic
java
@Test
public void listTopic() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
ListTopicsResult listTopics = adminClient.listTopics(new ListTopicsOptions());
Set<String> topics = listTopics.names().get();
topics.forEach(topic -> log.warn("Kafka topic:{}", topic));
}
删除Topic
java
@Test
public void delTopicTest() {
AdminClient adminClient = initAdminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList("test"));
try {
deleteTopicsResult.all().get();
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
log.warn("Kafka Topic 不存在,可能已被删除");
}
throw new RuntimeException(e);
}
}
查看topic详情
java
@Test
public void getTopicInfo() throws Exception {
// 初始化AdminClient
AdminClient adminClient = initAdminClient();
// 描述Topic
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList("getui_msg"));
// 获取Topic描述信息
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
// 遍历Topic描述信息
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.forEach((entry) -> log.warn("name:{} , desc: {}", entry.getKey(), entry.getValue()));
}
增加分区数量
如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性。
java
@Test
public void incrPartitionsTest() throws Exception {
Map<String, NewPartitions> infoMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(5);
AdminClient adminClient = initAdminClient();
infoMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
createPartitionsResult.all().get();
}
DANGER
注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理