DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。
DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云
上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。
datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。
本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发
1. 功能介绍1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,
2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费
3.支持游标的上次记忆功能
2.快速开始 2.1 启动客户端配置阿里云DataHub的endpoint以及AK信息
启动SpringBoot,你会发现datahub客户端已经启动完毕
2.2 获取DataHub客户端 2.3 写数据以上示例代码表示往 projectName为my_test,tp官网下载 topicName为student, tp官方正版下载 shardId 为N的hub里写数据,并且返回插入成功的条数
2.4 读数据读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用
以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。
这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等
3. 核心代码首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。
写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用
读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。
read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:
DataHubHandler
DataHubListener
最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。
启动类:
属性配置类
最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,
大体逻辑就是这样了,你学会了吗? hhhhhhhhh~
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。