大数据(9e)Flink侧输出流
admin
2024-03-05 12:42:33
0

文章目录

  • 概述
  • 环境
  • OutputTag介绍
    • 实现分流
    • 处理迟到数据
  • 处理关窗之后到达的数据

概述

窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据

环境

WIN10+IDEA+JDK1.8+FLINK1.14

881.14.62.12

org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}

OutputTag介绍

OutputTag是一种命名标记,用于标记算子中的侧输出

实现分流

ctx.output:向由OutputTag标识的侧输出发出记录

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义输出标签OutputTag o1 = new OutputTag("除以3余1") {};OutputTag o2 = new OutputTag("除以3余2") {};//创建流SingleOutputStreamOperator d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(Integer value, Context ctx, Collector out) {//分流if (value % 3 == 2) {ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else if (value % 3 == 1) {ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else {out.collect(value);}}});//输出s.print("被3整除");s.getSideOutput(o1).print(o1.getId());s.getSideOutput(o2).print(o2.getId());//环境执行env.execute();}
}
测试结果
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9

处理迟到数据

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {//发送水位线ctx.emitWatermark(new Watermark(1999L));//发送2条数据,其中1条迟到ctx.collectWithTimestamp("1998", 1998L);ctx.collectWithTimestamp("2000", 2000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(String value, Context ctx, Collector out) {//获取水位线long watermark = ctx.timerService().currentWatermark();//判断是否迟到if (ctx.timestamp() > watermark) {//冇迟到out.collect(value);} else {//迟到:向outputTag发送数据ctx.output(outputTag, value);}}});//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
发送1999水位线,然后发送两条数据,测试结果如下
侧输出> 1998
主流输出> 2000

处理关窗之后到达的数据

开窗后.sideOutputLateData(outputTag)

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {ctx.collectWithTimestamp("a", 4000L);ctx.collectWithTimestamp("b", 5000L);ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭ctx.collectWithTimestamp("c", 5000L);ctx.collectWithTimestamp("d", 5000L);ctx.collectWithTimestamp("e", 6000L);ctx.collectWithTimestamp("f", 7000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d//事件时间滚动窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))//侧输出.sideOutputLateData(outputTag)//拼接字符串.reduce((a, b) -> a + "," + b);//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
中途发送水位线,触发关窗,测试结果如下
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f

相关内容

热门资讯

【MySQL】锁 锁 文章目录锁全局锁表级锁表锁元数据锁(MDL)意向锁AUTO-INC锁...
【内网安全】 隧道搭建穿透上线... 文章目录内网穿透-Ngrok-入门-上线1、服务端配置:2、客户端连接服务端ÿ...
GCN的几种模型复现笔记 引言 本篇笔记紧接上文,主要是上一篇看写了快2w字,再去接入代码感觉有点...
数据分页展示逻辑 import java.util.Arrays;import java.util.List;impo...
Redis为什么选择单线程?R... 目录专栏导读一、Redis版本迭代二、Redis4.0之前为什么一直采用单线程?三、R...
【已解决】ERROR: Cou... 正确指令: pip install pyyaml
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
Lock 接口解读 前置知识点Synchronized synchronized 是 Java 中的关键字,...
Win7 专业版安装中文包、汉... 参考资料:http://www.metsky.com/archives/350.htm...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
大模型未来趋势 大模型是人工智能领域的重要发展趋势之一,未来有着广阔的应用前景和发展空间。以下是大模型未来的趋势和展...
python实战应用讲解-【n... 目录 如何在Python中计算残余的平方和 方法1:使用其Base公式 方法2:使用statsmod...
学习u-boot 需要了解的m... 一、常用函数 1. origin 函数 origin 函数的返回值就是变量来源。使用格式如下...
常用python爬虫库介绍与简... 通用 urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库&...
药品批准文号查询|药融云-中国... 药品批文是国家食品药品监督管理局(NMPA)对药品的审评和批准的证明文件...
【2023-03-22】SRS... 【2023-03-22】SRS推流搭配FFmpeg实现目标检测 说明: 外侧测试使用SRS播放器测...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
初级算法-哈希表 主要记录算法和数据结构学习笔记,新的一年更上一层楼! 初级算法-哈希表...
进程间通信【Linux】 1. 进程间通信 1.1 什么是进程间通信 在 Linux 系统中,进程间通信...
【Docker】P3 Dock... Docker数据卷、宿主机与挂载数据卷的概念及作用挂载宿主机配置数据卷挂载操作示例一个容器挂载多个目...