大数据(9e)Flink侧输出流
admin
2024-03-05 12:42:33
0

文章目录

  • 概述
  • 环境
  • OutputTag介绍
    • 实现分流
    • 处理迟到数据
  • 处理关窗之后到达的数据

概述

窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据

环境

WIN10+IDEA+JDK1.8+FLINK1.14

881.14.62.12

org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}

OutputTag介绍

OutputTag是一种命名标记,用于标记算子中的侧输出

实现分流

ctx.output:向由OutputTag标识的侧输出发出记录

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义输出标签OutputTag o1 = new OutputTag("除以3余1") {};OutputTag o2 = new OutputTag("除以3余2") {};//创建流SingleOutputStreamOperator d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(Integer value, Context ctx, Collector out) {//分流if (value % 3 == 2) {ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else if (value % 3 == 1) {ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else {out.collect(value);}}});//输出s.print("被3整除");s.getSideOutput(o1).print(o1.getId());s.getSideOutput(o2).print(o2.getId());//环境执行env.execute();}
}
测试结果
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9

处理迟到数据

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {//发送水位线ctx.emitWatermark(new Watermark(1999L));//发送2条数据,其中1条迟到ctx.collectWithTimestamp("1998", 1998L);ctx.collectWithTimestamp("2000", 2000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(String value, Context ctx, Collector out) {//获取水位线long watermark = ctx.timerService().currentWatermark();//判断是否迟到if (ctx.timestamp() > watermark) {//冇迟到out.collect(value);} else {//迟到:向outputTag发送数据ctx.output(outputTag, value);}}});//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
发送1999水位线,然后发送两条数据,测试结果如下
侧输出> 1998
主流输出> 2000

处理关窗之后到达的数据

开窗后.sideOutputLateData(outputTag)

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {ctx.collectWithTimestamp("a", 4000L);ctx.collectWithTimestamp("b", 5000L);ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭ctx.collectWithTimestamp("c", 5000L);ctx.collectWithTimestamp("d", 5000L);ctx.collectWithTimestamp("e", 6000L);ctx.collectWithTimestamp("f", 7000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d//事件时间滚动窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))//侧输出.sideOutputLateData(outputTag)//拼接字符串.reduce((a, b) -> a + "," + b);//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
中途发送水位线,触发关窗,测试结果如下
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f

相关内容

热门资讯

电视安卓系统哪个品牌好,哪家品... 你有没有想过,家里的电视是不是该升级换代了呢?现在市面上电视品牌琳琅满目,各种操作系统也是让人眼花缭...
安卓会员管理系统怎么用,提升服... 你有没有想过,手机里那些你爱不释手的APP,背后其实有个强大的会员管理系统在默默支持呢?没错,就是那...
安卓系统软件使用技巧,解锁软件... 你有没有发现,用安卓手机的时候,总有一些小技巧能让你玩得更溜?别小看了这些小细节,它们可是能让你的手...
安卓系统提示音替换 你知道吗?手机里那个时不时响起的提示音,有时候真的能让人心情大好,有时候又让人抓狂不已。今天,就让我...
安卓开机不了系统更新 手机突然开不了机,系统更新还卡在那里,这可真是让人头疼的问题啊!你是不是也遇到了这种情况?别急,今天...
安卓系统中微信视频,安卓系统下... 你有没有发现,现在用手机聊天,视频通话简直成了标配!尤其是咱们安卓系统的小伙伴们,微信视频功能更是用...
安卓系统是服务器,服务器端的智... 你知道吗?在科技的世界里,安卓系统可是个超级明星呢!它不仅仅是个手机操作系统,竟然还能成为服务器的得...
pc电脑安卓系统下载软件,轻松... 你有没有想过,你的PC电脑上安装了安卓系统,是不是瞬间觉得世界都大不一样了呢?没错,就是那种“一机在...
电影院购票系统安卓,便捷观影新... 你有没有想过,在繁忙的生活中,一部好电影就像是一剂强心针,能瞬间让你放松心情?而我今天要和你分享的,...
安卓系统可以写程序? 你有没有想过,安卓系统竟然也能写程序呢?没错,你没听错!这个我们日常使用的智能手机操作系统,竟然有着...
安卓系统架构书籍推荐,权威书籍... 你有没有想过,想要深入了解安卓系统架构,却不知道从何下手?别急,今天我就要给你推荐几本超级实用的书籍...
安卓系统看到的炸弹,技术解析与... 安卓系统看到的炸弹——揭秘手机中的隐形威胁在数字化时代,智能手机已经成为我们生活中不可或缺的一部分。...
鸿蒙系统有安卓文件,畅享多平台... 你知道吗?最近在科技圈里,有个大新闻可是闹得沸沸扬扬的,那就是鸿蒙系统竟然有了安卓文件!是不是觉得有...
宝马安卓车机系统切换,驾驭未来... 你有没有发现,现在的汽车越来越智能了?尤其是那些豪华品牌,比如宝马,它们的内饰里那个大屏幕,简直就像...
p30退回安卓系统 你有没有听说最近P30的用户们都在忙活一件大事?没错,就是他们的手机要退回安卓系统啦!这可不是一个简...
oppoa57安卓原生系统,原... 你有没有发现,最近OPPO A57这款手机在安卓原生系统上的表现真是让人眼前一亮呢?今天,就让我带你...
安卓系统输入法联想,安卓系统输... 你有没有发现,手机上的输入法真的是个神奇的小助手呢?尤其是安卓系统的输入法,简直就是智能生活的点睛之...
怎么进入安卓刷机系统,安卓刷机... 亲爱的手机控们,你是否曾对安卓手机的刷机系统充满好奇?想要解锁手机潜能,体验全新的系统魅力?别急,今...
安卓系统程序有病毒 你知道吗?在这个数字化时代,手机已经成了我们生活中不可或缺的好伙伴。但是,你知道吗?即使是安卓系统,...
奥迪中控安卓系统下载,畅享智能... 你有没有发现,现在汽车的中控系统越来越智能了?尤其是奥迪这种豪华品牌,他们的中控系统简直就是科技与艺...