ChunJun脏数据插件——源码分析
创始人
2024-05-29 10:50:11
0

ChunJun脏数据插件——源码分析

    • 插件配置的Java解析逻辑
    • 脏数据收集逻辑
    • 脏数据记录写入MySQL
    • errorLimit配置不生效
    • DirtyDataCollector的实例化、运行逻辑

  • 版本 ChunJun 1.12

插件配置的Java解析逻辑

  • 在chunjun任务提交-源码分析中,我已经知道了SYNC类型任务脚本(json)的解析方式,而脏数据插件的配置不在json脚本中,需要单独传参,样例如下
./bin/chunjun-standalone.sh -job my-examples/sync_mysql2mysql.json -confProp {"chunjun.dirty-data.output-type":"print", "chunjun.dirty-data.max-rows":"1000", "chunjun.dirty-data.max-collect-failed-rows":"100", "chunjun.dirty-data.jdbc.url":"jdbc:mysql://localhost:3306/tiezhu", "chunjun.dirty-data.jdbc.username":"root", "chunjun.dirty-data.jdbc.password":"abc123", "chunjun.dirty-data.jdbc.database":"tiezhu", "chunjun.dirty-data.jdbc.table":"chunjun_dirty_data","chunjun.dirty-data.jdbc.batch-size":"10", "chunjun.dirty-data.log.print-interval":"10"}
  • 脏数据插件的配置信息会在启动时由-confProp传入
  • 传入后,会在Main.java中由configStreamExecutionEnvironment方法处理
package com.dtstack.chunjun;// import ...public class Main {// code ...private static void configStreamExecutionEnvironment(StreamExecutionEnvironment env, Options options, SyncConf config) {if (config != null) {// code ...} else {// code ...DirtyConf dirtyConf = DirtyConfUtil.parse(options);// code ...}// code ...}// code ...}
  • 再看DirtyConfUtil.java
package com.dtstack.chunjun;// import ...public class DirtyConfUtil {private static final String DEFAULT_TYPE = "default";public static final String DIRTY_CONF_PREFIX = "chunjun.dirty-data.";public static final String TYPE_KEY = "chunjun.dirty-data.output-type";public static final String MAX_ROWS_KEY = "chunjun.dirty-data.max-rows";public static final String MAX_FAILED_ROWS_KEY = "chunjun.dirty-data.max-collect-failed-rows";public static final String PRINT_INTERVAL = "chunjun.dirty-data.log.print-interval";public static final String DIRTY_DIR = "chunjun.dirty-data.dir";public static final String DIRTY_DIR_SUFFIX = "dirty-data-collector";public static DirtyConf parseFromMap(Map confMap) {DirtyConf dirtyConf = new DirtyConf();Properties pluginProperties = new Properties();String type = String.valueOf(confMap.getOrDefault(TYPE_KEY, DEFAULT_TYPE));if (type.equals("jdbc")) {type = "mysql";}long maxConsumed = Long.parseLong(String.valueOf(confMap.getOrDefault(MAX_ROWS_KEY, "0")));long maxFailed =Long.parseLong(String.valueOf(confMap.getOrDefault(MAX_FAILED_ROWS_KEY, "0")));long printRate = Long.parseLong(String.valueOf(confMap.getOrDefault(PRINT_INTERVAL, "1")));String pluginDir = MapUtils.getString(confMap, DIRTY_DIR);confMap.entrySet().stream().filter(item ->item.getKey().toLowerCase(Locale.ROOT).startsWith(DIRTY_CONF_PREFIX)).forEach(item ->pluginProperties.put(item.getKey().toLowerCase(Locale.ROOT).replaceFirst(DIRTY_CONF_PREFIX, "").trim(),item.getValue()));dirtyConf.setType(type);dirtyConf.setMaxConsumed(maxConsumed < 0 ? Long.MAX_VALUE : maxConsumed);dirtyConf.setMaxFailedConsumed(maxFailed < 0 ? Long.MAX_VALUE : maxFailed);dirtyConf.setPrintRate(printRate <= 0 ? Long.MAX_VALUE : printRate);dirtyConf.setPluginProperties(pluginProperties);dirtyConf.setLocalPluginPath(pluginDir);return dirtyConf;}public static DirtyConf parse(Options options) {try {Properties properties = PropertiesUtil.parseConf(options.getConfProp());properties.put(DIRTY_DIR, options.getChunjunDistDir() + File.separator + DIRTY_DIR_SUFFIX);return parse(properties);} catch (Exception e) {throw new NoRestartException(String.format("Parse conf [%s] to DirtyConf failed.", options.getConfProp()),e);}}public static DirtyConf parse(Properties properties) {try {Map confMap = Maps.fromProperties(properties);return parseFromMap(confMap);} catch (Exception e) {throw new NoRestartException(String.format("Parse properties to dirtyConf failed. Properties: %s", properties),e);}}
}
  • 其中DirtyConfUtil.parse(Options options)options中获取了我们前面传入的confProp,然后依次调用了DirtyConfUtil.parse(Options options)DirtyConfUtil.parseFromMap(Map confMap),完成解析,最终生成了DirtyConf实体对象
package com.dtstack.chunjun.dirty;// import ...
public class DirtyConf implements Serializable {private static final long serialVersionUID = 1L;/*** This is the limit on the max consumed-data. The consumer would to be killed with throwing a* {@link NoRestartException} when the consumed-count exceed the limit.*/protected long maxConsumed;/** This is the limit on the max failed-consumed-data. Same as {@link #maxConsumed} */protected long maxFailedConsumed;/** The type of dirty-plugin. */private String type;/** Print dirty-data every ${printRate}. */private Long printRate = 1L;/** Custom parameters of different dirty-plugin. */private Properties pluginProperties = new Properties();/** ChunJun dirty-plugins local plugins path {@link Options#getFlinkLibDir()} */private String localPluginPath;// code ...
}

脏数据收集逻辑

  • 在ChunJun-JDBC轮询增量更新-源码分析中,我们已经知道对于每条数据的读取时的处理逻辑,其中关键的部分如下
package com.dtstack.chunjun.source.format;// import ...public abstract class BaseRichInputFormat extends RichInputFormat {// code ...@Overridepublic RowData nextRecord(RowData rowData) {if (byteRateLimiter != null) {byteRateLimiter.acquire();}RowData internalRow = null;try {internalRow = nextRecordInternal(rowData);} catch (ReadRecordException e) {dirtyManager.collect(e.getRowData(), e, null);}if (internalRow != null) {updateDuration();if (numReadCounter != null) {numReadCounter.add(1);}if (bytesReadCounter != null) {bytesReadCounter.add(rowSizeCalculator.getObjectSize(internalRow));}}return internalRow;}// code ...
}
  • 对于每条数据的处理,会调用nextRecordInternal(rowData),对于不同的数据源会使用不同的BaseRichInputFormat的子实现来处理
  • 每条数据的处理过程中,可能会出错,所以使用了try catch的方式来捕获,而此处catch中则是实现了对于脏数据的捕获收集
package com.dtstack.chunjun.dirty.manager;// import ...public class DirtyManager implements Serializable {// code ...public void collect(Object data, Throwable cause, String field) {if (executor == null) {execute();}DirtyDataEntry entity = new DirtyDataEntry();entity.setJobId(jobId);entity.setJobName(jobName);entity.setOperatorName(operationName);entity.setCreateTime(new Timestamp(System.currentTimeMillis()));entity.setDirtyContent(toString(data));entity.setFieldName(field);entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause));consumer.offer(entity);errorCounter.add(1L);}// code ...}
  • 此处的DirtyDataEntry是用于记录脏数据信息的实体对象,consumer.offer(entity)将脏数据对象提交到了队列,errorCounter.add(1L)记录了脏数据的条数
  • 再看consumer对应的class DirtyDataCollector
package com.dtstack.chunjun.dirty.consumer;// import ...public abstract class DirtyDataCollector implements Runnable, Serializable {protected final LongCounter failedConsumedCounter = new LongCounter(0L);protected final LongCounter consumedCounter = new LongCounter(0L);// code .../** The queue stored the data not yet consumed. */protected LinkedBlockingQueue consumeQueue = new LinkedBlockingQueue<>();/*** Offer data into the blocking-queue.** @param dirty dirty data.*/public synchronized void offer(DirtyDataEntry dirty) {consumeQueue.offer(dirty);addConsumed(1L);}@Overridepublic void run() {while (isRunning.get()) {try {DirtyDataEntry dirty = consumeQueue.take();consume(dirty);} catch (Exception e) {addFailedConsumed(e, 1L);}}}protected void addConsumed(long count) {consumedCounter.add(count);if (consumedCounter.getLocalValue() >= maxConsumed) {throw new NoRestartException(String.format("The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s]",maxConsumed));}}protected void addFailedConsumed(Throwable cause, long failedCount) {failedConsumedCounter.add(failedCount);warn(LOG,"dirty-plugins consume failed.",cause,printRate,failedConsumedCounter.getLocalValue());if (failedConsumedCounter.getLocalValue() >= maxFailedConsumed) {throw new NoRestartException(String.format("The dirty consumer shutdown, due to the failed-consumed count exceed the max-failed-consumed [%s]",maxFailedConsumed));}}// code ...
}
  • 当前面调用consumer.offer(entity)时,脏数据对象被提交进了此处的队列consumeQueue
  • DirtyDataCollector实现了Runnable接口,里面有个run方法,该方法内是个while循环,会不断的读取consumeQueue队列中的数据做处理
    • 关于DirtyDataCollector的启动逻辑,可以看后文
  • 前面调用offer时,同时还调用了addConsumed(1L),此时会将Flink的累加器consumedCounter加1,并判断是否大于了最大值maxConsumedmaxConsumed即是我们传入的配置chunjun.dirty-data.max-rows
  • 写入Sink端和读取Source端的脏数据处理逻辑一致,不再赘述

脏数据记录写入MySQL

  • 接着再看DirtyDataCollector中的run
    @Overridepublic void run() {while (isRunning.get()) {try {DirtyDataEntry dirty = consumeQueue.take();consume(dirty);} catch (Exception e) {addFailedConsumed(e, 1L);}}}
  • 先从队列中取出数据,再调用consume,我们来看其对应MySQL的实现
package com.dtstack.chunjun.dirty.mysql;// import ...public class MysqlDirtyDataCollector extends DirtyDataCollector {// code ...private void flush() {try {for (DirtyDataEntry item : entities) {final String[] dirtyArrays = item.toArray();for (int i = 0; i < TABLE_FIELDS.length; i++) {statement.setObject(i + 1, dirtyArrays[i]);}statement.addBatch();}statement.executeBatch();} catch (SQLException e) {singleFlush();} finally {entities.clear();}}@Overrideprotected void consume(DirtyDataEntry dirty) throws Exception {entities.add(dirty);if (consumedCounter.getLocalValue() % batchSize == 0) {flush();}}// code ...
}
  • 此处先将脏数据对象放入了entities列表中,然后看Flink累加器consumedCounter中记录的值是否是batchSize的整数倍(batchSize对应传入的配置chunjun.dirty-data.jdbc.batch-size),如果是,那么调用flush(),利用JDBC的批量写入机制,将数据入库到配置的MySQL中
    • 此处取余的逻辑比较奇怪(即consumedCounter.getLocalValue() % batchSize == 0),那么是不是只要consumedCounter的值不为batchSize的整数倍,就永远不会执行flush()逻辑呢?
    • 因为此处判断的逻辑和consumedCounter累加的逻辑不在同一线程,所以记录数只会在恰巧的情况下达到batchSize的整数倍且被处理(当脏数据较多时,entities中可能会堆积大量数据,而数据库里也一直看不到记录)
    • 建议:根据entities的大小和时间间隔来决定是否应该调用flush()
  • DirtyDataCollector中的run中还有一个try catch,当前面记录脏数据失败时,则会调用addFailedConsumed(e, 1L)
package com.dtstack.chunjun.dirty.consumer;// import ...public abstract class DirtyDataCollector implements Runnable, Serializable {protected final LongCounter failedConsumedCounter = new LongCounter(0L);// code ...protected void addFailedConsumed(Throwable cause, long failedCount) {failedConsumedCounter.add(failedCount);warn(LOG,"dirty-plugins consume failed.",cause,printRate,failedConsumedCounter.getLocalValue());if (failedConsumedCounter.getLocalValue() >= maxFailedConsumed) {throw new NoRestartException(String.format("The dirty consumer shutdown, due to the failed-consumed count exceed the max-failed-consumed [%s]",maxFailedConsumed));}}// code ...
}
  • 此时将另一个Flink累加器failedConsumedCounter加1,并判断是否超过了最大值maxFailedConsumed(maxFailedConsumed对应传入的配置chunjun.dirty-data.max-collect-failed-rows)。
    • 需要注意的是,每次记录脏数据到MySQL,是批量写入的,而每次写入异常时累加器只会加1,所以配置chunjun.dirty-data.max-collect-failed-rows对应的是脏数据写入次数,而不是条数

errorLimit配置不生效

  • 因为源码中没有解析该配置
  • setting配置样例
    "setting": {"speed": {"channel": 1,"bytes": 0},"errorLimit": {"record": 5,"percentage": 60.0}}
  • 源码中对应的Class SettingConf
package com.dtstack.chunjun.conf;// import ...public class SettingConf implements Serializable {private static final long serialVersionUID = 1L;/** 速率及通道配置 */private SpeedConf speed = new SpeedConf();/** 任务指标插件信息 */private MetricPluginConf metricPluginConf = new MetricPluginConf();/** 断点续传配置 */private RestoreConf restore = new RestoreConf();/** 失败重试配置 */private RestartConf restart = new RestartConf();/** ChunJun日志记录配置 */private LogConf log = new LogConf();// code ...
}

DirtyDataCollector的实例化、运行逻辑

  • BaseRichInputFormat中处理数据源的数据发送异常时,会调用dirtyManager.collect(e.getRowData(), e, null)
  • collect方法中最开始调用了execute()
package com.dtstack.chunjun.dirty.manager;// import...public class DirtyManager implements Serializable {private transient ThreadPoolExecutor executor;// code...public DirtyManager(DirtyConf dirtyConf, RuntimeContext runtimeContext) {this.consumer = DataSyncFactoryUtil.discoverDirty(dirtyConf);// code...}public void collect(Object data, Throwable cause, String field) {if (executor == null) {execute();}// code...}public void execute() {if (executor == null) {executor =new ThreadPoolExecutor(MAX_THREAD_POOL_SIZE,MAX_THREAD_POOL_SIZE,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new ChunJunThreadFactory("dirty-consumer",true,(t, e) -> {LOG.error(String.format("Thread [%s] consume failed.", t.getName()),e);}),new ThreadPoolExecutor.CallerRunsPolicy());}consumer.open();executor.execute(consumer);}// code...
}
  • 此处构建了一个线程池executor,并对consumer进行了执行(记得前面说这个consumer实现了Runnable接口吧)
  • consumer的实例化,则是在DirtyManager得构造器中,调用了DataSyncFactoryUtil.discoverDirty(dirtyConf),利用反射获得脏数据插件对应的Class,并实例化(和connector插件几乎一致)
package com.dtstack.chunjun.util;// import...public class DataSyncFactoryUtil {// code...public static DirtyDataCollector discoverDirty(DirtyConf conf) {try {String pluginName = conf.getType();String pluginClassName = PluginUtil.getPluginClassName(pluginName, OperatorType.dirty);if (pluginName.equals(DEFAULT_DIRTY_TYPE)) {pluginClassName = DEFAULT_DIRTY_CLASS;}ClassLoader classLoader = Thread.currentThread().getContextClassLoader();Class clazz = classLoader.loadClass(pluginClassName);Constructor constructor = clazz.getConstructor();final DirtyDataCollector consumer = (DirtyDataCollector) constructor.newInstance();consumer.initializeConsumer(conf);return consumer;} catch (Exception e) {throw new NoRestartException("Load dirty plugins failed!", e);}}// code...
}

相关内容

热门资讯

安卓系统的地图怎样下载,下载与... 你有没有发现,现在不管去哪里,手机地图都成了我们的好帮手?尤其是安卓系统的地图,功能强大,用起来超级...
安卓9.0系统挂机游戏,轻松享... 你有没有发现,自从安卓9.0系统更新后,手机里的游戏体验简直就像坐上了火箭!今天,就让我带你一起探索...
安卓系统怎么用迅雷下载,安卓系... 你有没有想过,在安卓系统上下载文件竟然也能这么简单?没错,今天就要来给你揭秘,如何用迅雷在安卓系统上...
安卓手机刷成学生系统,探索全新... 你有没有想过,你的安卓手机其实可以变身成一个充满学习氛围的学生系统呢?没错,就是那种看起来简洁、功能...
ios能迁移安卓系统吗,iOS... 你有没有想过,你的iPhone里的那些宝贝应用,能不能搬到安卓手机上继续使用呢?这可是不少手机用户的...
荣耀10安卓11系统,畅享极致... 你知道吗?最近手机界可是热闹非凡呢!荣耀10这款手机,自从升级到了安卓11系统,简直就像脱胎换骨了一...
安卓系统pc版电脑配置,打造流... 你有没有想过,安卓系统竟然也能在电脑上运行呢?没错,就是那个我们手机上常用的安卓系统,现在也能在PC...
tcllinux系统刷安卓系统... 你有没有想过,你的TCL Linux系统竟然也能升级成安卓系统呢?没错,就是那个我们日常使用的安卓系...
安卓13系统更新蓝牙,蓝牙功能... 你有没有发现,最近你的安卓手机好像变得不一样了?没错,就是那个神秘的安卓13系统更新,它悄悄地来到了...
安卓系统钉钉打开声音,安卓系统... 你有没有遇到过这种情况?手机里装了钉钉,可每次打开它,那声音就“嗖”地一下跳出来,吓你一跳。别急,今...
理想汽车操作系统安卓,基于安卓... 你有没有想过,一辆汽车,除了能带你去你想去的地方,还能像智能手机一样,给你带来智能化的体验呢?没错,...
安卓系统越狱还能升级吗,升级之... 你有没有想过,你的安卓手机越狱后,还能不能愉快地升级系统呢?这可是不少手机爱好者关心的大问题。今天,...
安卓系统蓝牙耳机拼多多,畅享无... 你有没有发现,最近蓝牙耳机在市场上可是火得一塌糊涂呢!尤其是安卓系统的用户,对于蓝牙耳机的要求那可是...
安卓变苹果系统桌面,桌面系统变... 你知道吗?最近有个大新闻在科技圈里炸开了锅,那就是安卓用户纷纷转向苹果系统桌面。这可不是闹着玩的,这...
鸿蒙系统怎么下安卓,鸿蒙系统下... 你有没有想过,你的手机里那个神秘的鸿蒙系统,竟然也能和安卓世界来一场亲密接触呢?没错,今天就要来揭秘...
手机安卓系统流程排行,便捷操作... 你有没有发现,现在手机的世界里,安卓系统就像是个大舞台,各种版本层出不穷,让人眼花缭乱。今天,就让我...
安卓系统左上角hd,左上角HD... 你有没有发现,每次打开安卓手机,左上角那个小小的HD标识总是默默地在那里,仿佛在诉说着什么?今天,就...
安卓系统软件文件,架构解析与功... 你有没有发现,手机里的安卓系统软件文件就像是一个神秘的宝库,里面藏着无数的宝藏?今天,就让我带你一起...
安卓系统输入法回车,探索安卓输... 你有没有发现,在使用安卓手机的时候,输入法回车键的奇妙之处?它就像是我们指尖的魔法师,轻轻一点,文字...
安卓修改系统时间的软件,轻松掌... 你有没有想过,有时候手机上的时间不对劲,是不是觉得生活节奏都被打乱了?别急,今天就来给你揭秘那些神奇...