//stream api和table api
org.apache.flink flink-table-api-java-bridge_2.11 1.14.2 provided
org.apache.flink flink-clients_2.11 1.14.2 provided
@EnableAutoConfiguration
public class ChildApplication {
}
public interface Task {void run(String... args) throws Exception;
}
@Slf4j
public abstract class AbstractTask implements Task {@Overridepublic void run(String... args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ParameterTool parameterTool = ParameterTool.fromArgs(args);configTask(env, parameterTool);JobClient jobClient = env.executeAsync(getClass().getSimpleName());if (jobClient instanceof WebSubmissionJobClient) {return;}jobClient.getJobExecutionResult().whenComplete(new BiConsumer() {@Overridepublic void accept(JobExecutionResult jobExecutionResult, Throwable throwable) {log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS));}});}public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool);}
@Slf4j
@Service
public class TaskManager implements CommandLineRunner {@ResourceList taskList;@Overridepublic void run(String... args) throws Exception {ParameterTool parameterTool = ParameterTool.fromArgs(args);log.info("程序参数 {}", parameterTool);String runTaskName = parameterTool.get("task");if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) {return;}for (Task task : taskList) {if (Objects.equals(runTaskName, task.getClass().getName())) {task.run(args);}}}}
@Slf4j
@Service
public class TimeSource extends RichSourceFunction {volatile boolean running = true;private JdbcTemplate jdbcTemplate;@Overridepublic void open(Configuration parameters) throws Exception {//创建一个容器,并拿到需要的beanString[] args = new String[]{String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", getClass().getName()),String.format("--spring.jmx.default-domain=%s", getClass().getName())};ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args);jdbcTemplate = applicationContext.getBean(JdbcTemplate.class);}@Overridepublic void run(SourceContext ctx) throws Exception {while (running) {Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class));ctx.collect(date);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {running = false;}}
写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:
@Slf4j
@Service
public class TimeTask extends AbstractTask {@Resourceprivate TimeSource timeSource;@Overridepublic void configTask(StreamExecutionEnvironment env, ParameterTool tool) {env.getConfig().setAutoWatermarkInterval(0);env.addSource(timeSource).setParallelism(1).print().setParallelism(1);}}
@SpringBootApplication
public class Demo2Application {public static void main(String[] args) {SpringApplication.run(Demo2Application.class, args);}}
org.springframework.boot spring-boot-starter-parent 2.7.5
org.springframework.boot spring-boot-maven-plugin
因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。
参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:
org.apache.maven.plugins maven-shade-plugin 3.3.0 package shade false com.google.code.findbugs:jsr305 org.slf4j:* log4j:* *:* module-info.class META-INF/*.SF META-INF/*.DSA META-INF/*.RSA META-INF/spring.handlers reference.conf META-INF/spring.factories META-INF/spring.schemas ${start-class}
结果报错:
Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer
纠结了半天,也没找到原因
org.apache.maven.plugins maven-assembly-plugin 3.3.0 ${start-class} jar-with-dependencies make-assembly package single
可以正常打包,本地也能运行,但是上传到flink web报错
LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)
很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。
找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。
那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer
的问题呢?
恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parent
的pluginManagement
,发现里面定义很多插件,其中就包括maven-shade-plugin
。
按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:
org.apache.maven.plugins maven-shade-plugin
mvn clean package
打包成功了!
仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。
后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:
org.apache.maven.plugins maven-shade-plugin com.google.code.findbugs:jsr305 ch.qos.logback:*
上一篇:最低通行费