springboot-集成flink最佳实践和打包部署
admin
2024-02-20 23:21:38
0

引入flink依赖

//stream api和table api
org.apache.flinkflink-table-api-java-bridge_2.111.14.2provided

org.apache.flinkflink-clients_2.111.14.2provided

编写入口

目录结构

  • com.example.demo
    • auto
      • ChildApplication
    • task
      • Task
      • AbstractTask
      • TaskManager
    • time
      • TimeSource
      • TimeTask
    • Demo2Application

子容器初始化类

@EnableAutoConfiguration
public class ChildApplication {
}

任务接口

public interface Task {void run(String... args) throws Exception;
}

抽象任务类

@Slf4j
public abstract class AbstractTask implements Task {@Overridepublic void run(String... args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameterTool = ParameterTool.fromArgs(args);configTask(env, parameterTool);JobClient jobClient = env.executeAsync(getClass().getSimpleName());if (jobClient instanceof WebSubmissionJobClient) {return;}jobClient.getJobExecutionResult().whenComplete(new BiConsumer() {@Overridepublic void accept(JobExecutionResult jobExecutionResult, Throwable throwable) {log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS));}});}public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool);}

任务管理器

@Slf4j
@Service
public class TaskManager implements CommandLineRunner {@ResourceList taskList;@Overridepublic void run(String... args) throws Exception {ParameterTool parameterTool = ParameterTool.fromArgs(args);log.info("程序参数 {}", parameterTool);String runTaskName = parameterTool.get("task");if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) {return;}for (Task task : taskList) {if (Objects.equals(runTaskName, task.getClass().getName())) {task.run(args);}}}}

一个计时任务数据源

@Slf4j
@Service
public class TimeSource extends RichSourceFunction {volatile boolean running = true;private JdbcTemplate jdbcTemplate;@Overridepublic void open(Configuration parameters) throws Exception {//创建一个容器,并拿到需要的beanString[] args = new String[]{String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", getClass().getName()),String.format("--spring.jmx.default-domain=%s", getClass().getName())};ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args);jdbcTemplate = applicationContext.getBean(JdbcTemplate.class);}@Overridepublic void run(SourceContext ctx) throws Exception {while (running) {Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class));ctx.collect(date);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {running = false;}}

写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:

  • xxx is not serializable:flink的算子可能会在不同的机器上运行,所以类信息会序列化之后传输。所以算子不能有任何不能序列化的字段(字段为null除外)
  • 有些需要的字段没有实现Serializable,但是又确实要用到,比如JdbcTemplate,如果是mybatis的话,就是各种mapper;像这些字段,只能在open方法里面初始化。有两种方法做这个初始化:一是,通过全局参数把一些连接信息传到算子,然后在open方法中初始化JdbcTemplate;二是,在open方法中重新创建一个容器,然后从容器中拿到JdbcTemplate。第一种方法,比较容易实现,但是要手动装配JdbcTemplate;第二种方法,要重新创建一个容器,装配的任务全都交给容器;想法是很nice,但在一个容器中创建另一个容器,比想象中的要复杂一些。
  • 在一个容器中初始化另一个容器:
    • 需要一个容器初始化类:因为毕竟不需要注入所有对象,所以不能用主程序启动类Demo2Application;但是又要autoconfigure里面的很多对象,所以考虑加@EnableAutoConfiguration注解,同时放入单独的auto包,避免扫到不需要的bean定义;如果需要mybatis的mapper,考虑加@MapperScan注解
    • 定义好容器初始化类之后,启动报错:Error creating bean with name ‘springApplicationAdminRegistrar’ defined in class path resource [org/springframework/boot/autoconfigure/admin/SpringApplicationAdminJmxAutoConfiguration.class]: Invocation of init method failed; nested exception is javax.management.InstanceAlreadyExistsException: org.springframework.boot:type=Admin,name=SpringApplication。看错误信息是实例重复了,这个有两种解决办法:
      • 容器初始化类直接排除掉SpringApplicationAdminJmxAutoConfiguration.class:@EnableAutoConfiguration(exclude = {SpringApplicationAdminJmxAutoConfiguration.class})
      • 子容器启动时修改spring.application.admin.jmx-name:–spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s
    • 再启动,还是报错:Unable to register MBean [HikariDataSource (HikariPool-2)] with key ‘dataSource’; nested exception is javax.management.InstanceAlreadyExistsException: com.zaxxer.hikari:name=dataSource,type=HikariDataSource。又是个实例重复的问题,这个问题百度了下,需要给spring.jmx.default-domain配置个新的值:–spring.jmx.default-domain=%s
    • 再启动,子容器正常创建,程序运行发现ok
    • 打包上传flink web,提交运行,正常!

一个计时任务

@Slf4j
@Service
public class TimeTask extends AbstractTask {@Resourceprivate TimeSource timeSource;@Overridepublic void configTask(StreamExecutionEnvironment env, ParameterTool tool) {env.getConfig().setAutoWatermarkInterval(0);env.addSource(timeSource).setParallelism(1).print().setParallelism(1);}}

主程序启动类

@SpringBootApplication
public class Demo2Application {public static void main(String[] args) {SpringApplication.run(Demo2Application.class, args);}}

打包程序

设置parent

org.springframework.bootspring-boot-starter-parent2.7.5 

直接使用spring-boot-maven-plugin?

org.springframework.bootspring-boot-maven-plugin

因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。

考虑使用maven-shade-plugin

参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:

org.apache.maven.pluginsmaven-shade-plugin3.3.0packageshadefalsecom.google.code.findbugs:jsr305org.slf4j:*log4j:**:*module-info.classMETA-INF/*.SFMETA-INF/*.DSAMETA-INF/*.RSAMETA-INF/spring.handlersreference.confMETA-INF/spring.factoriesMETA-INF/spring.schemas${start-class}

结果报错:

Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer

纠结了半天,也没找到原因

再试试maven-assembly-plugin

  org.apache.maven.pluginsmaven-assembly-plugin3.3.0${start-class}jar-with-dependenciesmake-assemblypackagesingle

可以正常打包,本地也能运行,但是上传到flink web报错

LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)

很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。

重回maven-shade-plugin

找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。

那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer的问题呢?

恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parentpluginManagement,发现里面定义很多插件,其中就包括maven-shade-plugin

按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:

org.apache.maven.pluginsmaven-shade-plugin

mvn clean package

打包成功了!

仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。

后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:

org.apache.maven.pluginsmaven-shade-plugincom.google.code.findbugs:jsr305ch.qos.logback:*

相关内容

热门资讯

安卓7.0系统速度咋样,速度与... 你有没有发现,自从手机更新到安卓7.0系统后,感觉整个手机都焕然一新了呢?今天,就让我来给你详细聊聊...
安卓文件系统隔离,Androi... 你知道吗?在安卓的世界里,有一个神奇的小秘密,那就是安卓文件系统隔离。听起来是不是有点高大上?别急,...
电脑板安卓系统下载,轻松实现多... 你有没有想过,你的电脑板突然间变得如此强大,竟然能运行安卓系统?没错,这就是科技的魅力!今天,就让我...
安卓系统双开app排行,热门双... 安卓系统双开App排行大揭秘在数字化时代,手机已经成为我们生活中不可或缺的一部分。而安卓系统,作为全...
安卓原生系统谁在开发,谷歌主导... 你有没有想过,那个陪伴你每天刷抖音、玩游戏、办公的安卓系统,究竟是谁在背后默默开发呢?今天,就让我带...
vive属于安卓系统吗,揭秘V... 你有没有想过,那个让人眼前一亮的VR设备Vive,它到底是不是安卓系统的呢?这个问题,估计不少对VR...
ios系统和安卓系统传递文件,... 你有没有想过,当你把一张照片或者一个文件从你的手机传给朋友时,背后其实是一个复杂的系统在默默工作呢?...
安卓刷鸿蒙系统工具,一招解锁全... 你有没有想过,让你的安卓手机也来一场华丽的变身?没错,就是那种从安卓系统切换到鸿蒙系统的神奇之旅。今...
安卓系统看不到系统文件,为何无... 你是不是也遇到了这样的烦恼?手机里明明有好多文件,可就是找不到它们的小身影,安卓系统竟然看不到系统文...
安卓系统升级模式,迭代创新与用... 你知道吗?安卓系统升级模式,这可是个让人又爱又恨的话题。每次手机收到升级通知,心里既期待又紧张,就像...
安装安卓虚拟系统教程,安卓虚拟... 你有没有想过在电脑上也能体验安卓系统的乐趣呢?没错,就是那种随时随地都能玩手机游戏、刷抖音的感觉,现...
叶茂然安卓系统下载,畅享智能生... 你有没有听说最近叶茂然安卓系统下载成了热门话题?没错,就是那个叶茂然安卓系统!今天,我就要带你全方位...
安卓子系统下载地址,深度解析下... 你有没有想过,你的安卓手机里那些神秘的子系统其实也可以下载下来,自己动手安装呢?没错,今天就要来给你...
安卓手机系统铃声替换,唤醒你的... 你有没有发现,每次手机响个不停,是不是总想给它换个铃声,让它听起来更个性、更有范儿?没错,安卓手机系...
登陆系统和安卓系统区别,差异解... 你有没有想过,为什么你的手机里装了那么多应用,却还是觉得登陆系统有点儿麻烦呢?今天,就让我带你一起探...
安卓系统手表壁纸华为,科技美学... 你有没有发现,最近安卓系统手表的江湖风头正劲呢?尤其是华为家的那些宝贝,简直让人爱不释手。今天,就让...
安卓系统闹钟组件设置,轻松打造... 你有没有发现,每天早上闹钟响起的时候,是不是总感觉时间过得飞快,一转眼又是一个新的一天呢?今天,就让...
安卓系统的几大组件,组件架构与... 你有没有发现,你的安卓手机里藏着许多神奇的“小精灵”呢?它们默默无闻地工作,让你的手机变得如此强大和...
安卓系统关闭app流量,轻松关... 手机里的APP们是不是有时候让你觉得流量消耗得有点儿太快了呢?别急,今天就来教你几招,让你的安卓手机...
安卓系统无尽之海,安卓系统中的... 安卓系统,无尽之海中的航行者想象你正站在一望无际的海洋边,海风轻拂,波光粼粼。这片海洋,深邃而神秘,...