大数据(9e)Flink侧输出流
admin
2024-03-05 12:42:33
0

文章目录

  • 概述
  • 环境
  • OutputTag介绍
    • 实现分流
    • 处理迟到数据
  • 处理关窗之后到达的数据

概述

窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据

环境

WIN10+IDEA+JDK1.8+FLINK1.14

881.14.62.12

org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}

OutputTag介绍

OutputTag是一种命名标记,用于标记算子中的侧输出

实现分流

ctx.output:向由OutputTag标识的侧输出发出记录

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义输出标签OutputTag o1 = new OutputTag("除以3余1") {};OutputTag o2 = new OutputTag("除以3余2") {};//创建流SingleOutputStreamOperator d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(Integer value, Context ctx, Collector out) {//分流if (value % 3 == 2) {ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else if (value % 3 == 1) {ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else {out.collect(value);}}});//输出s.print("被3整除");s.getSideOutput(o1).print(o1.getId());s.getSideOutput(o2).print(o2.getId());//环境执行env.execute();}
}
测试结果
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9

处理迟到数据

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {//发送水位线ctx.emitWatermark(new Watermark(1999L));//发送2条数据,其中1条迟到ctx.collectWithTimestamp("1998", 1998L);ctx.collectWithTimestamp("2000", 2000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(String value, Context ctx, Collector out) {//获取水位线long watermark = ctx.timerService().currentWatermark();//判断是否迟到if (ctx.timestamp() > watermark) {//冇迟到out.collect(value);} else {//迟到:向outputTag发送数据ctx.output(outputTag, value);}}});//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
发送1999水位线,然后发送两条数据,测试结果如下
侧输出> 1998
主流输出> 2000

处理关窗之后到达的数据

开窗后.sideOutputLateData(outputTag)

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {ctx.collectWithTimestamp("a", 4000L);ctx.collectWithTimestamp("b", 5000L);ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭ctx.collectWithTimestamp("c", 5000L);ctx.collectWithTimestamp("d", 5000L);ctx.collectWithTimestamp("e", 6000L);ctx.collectWithTimestamp("f", 7000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d//事件时间滚动窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))//侧输出.sideOutputLateData(outputTag)//拼接字符串.reduce((a, b) -> a + "," + b);//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
中途发送水位线,触发关窗,测试结果如下
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f

相关内容

热门资讯

手机系统安卓和ios系统下载地... 你有没有发现,现在手机的世界里,安卓和iOS两大系统就像是一对双胞胎,各有各的特色,让人爱不释手。今...
安卓系统最早开发公司,从安卓起... 你有没有想过,我们每天离不开的安卓系统,它究竟是由哪家公司最早开发的呢?没错,就是谷歌(Google...
安卓系统平板推荐学生用,学生适... 作为一名热爱学习的学生,你是不是也在寻找一款既实用又好用的平板电脑呢?平板电脑在学习和生活中可是个得...
安卓5.0系统多大容量,存储容... 你有没有想过,你的安卓手机里那个神秘的安卓5.0系统到底有多大容量呢?别急,今天就来给你揭秘这个谜团...
芒果嗨是安卓系统吗,揭秘这款热... 你有没有听说过“芒果嗨”这个名字?最近,这个名词在手机圈里可是火得一塌糊涂。不过,别急,今天咱们就来...
安卓系统锁屏如何破,破解攻略全... 你是不是也遇到了安卓手机锁屏的烦恼?每次解锁都要输入复杂的密码或者滑动图案,有时候真是急得团团转。别...
安卓系统app开机自启,深度解... 你有没有发现,每次手机开机,那些APP就像一群调皮的小精灵,迫不及待地跳出来和你打招呼?没错,说的就...
安卓系统拨号连接在哪,拨号连接... 你是不是也和我一样,有时候在使用安卓手机时,突然想连接一下网络,却发现不知道怎么操作?别急,今天就来...
安卓系统为什么会赢,技术革新与... 你有没有想过,为什么安卓系统在智能手机市场上如此火爆,几乎成了“手机必备”的存在呢?今天,就让我带你...
电脑可以做安卓系统么,电脑上运... 你有没有想过,电脑能不能装上安卓系统呢?这听起来是不是有点像科幻电影里的情节?别急,让我带你一探究竟...
国产安卓系统碎片化软件,软件生... 你有没有发现,现在手机上的安卓系统越来越丰富多样了?没错,这就是我们今天要聊的话题——国产安卓系统的...
安卓系统的蚂蚁花呗,蚂蚁花呗在... 你知道吗?在安卓系统的世界里,有一个超级方便的支付工具,那就是蚂蚁花呗。它就像你的贴心小助手,让你在...
安卓2系统彩蛋在哪找,揭秘隐藏... 你有没有发现,安卓2系统里竟然隐藏着一些神秘的彩蛋?没错,就是那些让你忍不住想要一探究竟的小惊喜。今...
全球最大的安卓系统,全球最大移... 你知道吗?在智能手机的世界里,有一个系统可是当之无愧的“王者”——那就是安卓系统!它就像一位全能的魔...
安卓系统就没有碎片了,迈向无缝... 你知道吗?最近在科技圈里,安卓系统可是掀起了一阵不小的波澜呢!有人说,安卓系统再也没有碎片化了?这可...
安卓系统平板电脑评测,安卓平板... 你有没有想过,在这个信息爆炸的时代,拥有一台性能卓越的安卓系统平板电脑,简直就是移动办公和娱乐的完美...
双系统安卓自动关机,双系统安卓... 你有没有遇到过这样的情况:手机里装了双系统安卓,一边是工作用的,一边是娱乐用的,结果有时候不小心,手...
圣地安列斯安卓9系统,圣地安列... 亲爱的读者,你是否也像我一样,对科技新动态充满好奇?今天,我要和你分享一个超级有趣的话题——圣地安列...
平果有安卓系统的吗,畅享智能生... 你有没有想过,手机的世界里,竟然还有这样一个有趣的现象?那就是——平果手机,竟然也有安卓系统!是不是...
vivoy27安卓系统下载,畅... 你有没有听说最近Vivo Y27这款手机的新鲜事儿?没错,就是它的安卓系统下载!今天,我就要给你来个...