发布者认证信息(营业执照和身份证)未完善,请登录后完善信息登录
 终于明白自研 Pulsar Starter:Winfun-Pulsar-Spring-Boot-Starter - 客集网
Hi,你好,欢迎来到客集网
  • 产品
  • 求购
  • 公司
  • 展会
  • 招商
  • 资讯
  • 解梦
当前位置: 首页 » 资讯 » 农业食品 找商家、找信息优选VIP,安全更可靠!
终于明白自研 Pulsar Starter:Winfun-Pulsar-Spring-Boot-Starter
发布日期:2023-01-15 08:45:20  浏览次数:11

 里程碑 版本 功能点 作者 完成 支持PulsarTemplate发送消息&支持自定义注解实例化Consumer监听消息 howinfun ✅ 支持动态开启/关闭Consumer消费线程池、支持自定义配置Consuemr消费线程池参数 howinfun ✅ 支持Spring容器停止时,释放Pulsar所有相关资源 howinfun TODO 支持多Pulsar数据源 howinfun TODO
  一、背景

Pulsar 作为新生代云原生消息队列,越来越受到开发者的热爱;而我们现在基本上的项目都是基于 SpringBoot 上开发的,但是我们可以发现,至今都没有比较大众和成熟的关于 Pulsar 的 Starter,所以我们需要自己整一个,从而避免常规使用 Pulsar API 时产生大量的重复代码。

二、设计思路

由于是第一版的设计,所以我们是从简单开始,不会一开始就设计得很复杂,尽量保留 Pulsar API 原生的功能。

、PulsarClient

我们都知道,不管是 Producer 还是 Consumer,都是由 PulsarClient 创建的。

当然了,PulsarClient 可以根据业务需要自定义很多参数,但是第一版的设计只会支持比较常用的参数。

我们这个组件支持下面功能点:

支持 PulsarClient 参数配置外部化,参数可配置在 中。 支持 提供配置提示信息。 读取外部配置文件,根据参数实例化 PulsarClient,并注入到 IOC 容器中。 、Producer

Producer是发送消息的组件。

这里我们提供一个模版类,可以根据需求创建对应的 Producer 实例。 支持将 TopicProducer 关系缓存起来,避免重复创建 Producer 实例。 支持同步/异步发送消息。 、Consumer

Consumer是消费消息的组件。

这里我们提供一个抽象类,开发者只需要集成此实现类并实现 doReceive 方法即可,即消费消息的逻辑方法。 接着还提供一个自定义注解,自定义注解支持自定义 Consmuer 配置,例如Topic、Tenant、Namespace等。 实现类加入上述自定义注解后,组件将会自动识别并且生成对应的 Consumer 实例。 支持同步/线程池异步消费。 三、使用例子 、引入依赖          winfun-pulsar-spring-boot-starter        、加入配置 -url=pulsar://.1:6650 =winfun =study -timeout=30 -threads=10 -threads=10  、发送消息  @RestController @RequestMapping("msg") public class MessageController {      @Autowired     private PulsarTemplate pulsarTemplate;     @Autowired     private PulsarProperties pulsarProperties;           @GetMapping("/{topic}/{msg}")     public String send(@PathVariable("topic") String topic,@PathVariable("msg") String msg) throws Exception {         ().persistent()                 .tenant(())                 .namespace(())                 .topic(topic)                 .send(msg);         return "success";     } }  、消费消息  @Slf4j @PulsarListener(topics = {"test-topic2"},                 threadPool = @ThreadPool(                                         coreThreads = 2,                                         maxCoreThreads = 3,                                          threadPoolName = "test-thread-pool")) public class ConsumerListener extends baseMessageListener {           @Override     protected void doReceived(Consumer consumer, Message msg) {         ("成功消费消息:{}",());         try {             (msg);         } catch (PulsarClientException e) {             ();         }     }           @Override     public Boolean enableAsync() {         return ;     } }  四、源码

源码就不放在这里分析了,大家可到Github上看看,如果有什么代码上面的建议或意见,欢迎大家提MR。

 

VIP企业最新发布
最新VIP企业
背景开启

客集网是一个开放的平台,信息全部为用户自行注册发布!并不代表本网赞同其观点或证实其内容的真实性,需用户自行承担信息的真实性,图片及其他资源的版权责任! 本站不承担此类作品侵权行为的直接责任及连带责任。

如若本网有任何内容侵犯您的权益,请联系 QQ: 1130861724

网站首页 | 信息删除 | 付款方式 | 关于我们 | 联系方式 | 使用协议 | 版权隐私 | 网站地图 (c)2014-2024 Rights Reserved 鄂公网安备42018502007153 SITEMAPS 联系我们 | 鄂ICP备14015623号-21

返回顶部