窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据
WIN10+IDEA+JDK1.8+FLINK1.14
8 8 1.14.6 2.12
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version}
OutputTag是一种命名标记,用于标记算子中的侧输出
ctx.output:向由OutputTag标识的侧输出发出记录
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义输出标签OutputTag o1 = new OutputTag("除以3余1") {};OutputTag o2 = new OutputTag("除以3余2") {};//创建流SingleOutputStreamOperator d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(Integer value, Context ctx, Collector out) {//分流if (value % 3 == 2) {ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else if (value % 3 == 1) {ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录} else {out.collect(value);}}});//输出s.print("被3整除");s.getSideOutput(o1).print(o1.getId());s.getSideOutput(o2).print(o2.getId());//环境执行env.execute();}
}
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {//发送水位线ctx.emitWatermark(new Watermark(1999L));//发送2条数据,其中1条迟到ctx.collectWithTimestamp("1998", 1998L);ctx.collectWithTimestamp("2000", 2000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d.process(new ProcessFunction() {@Overridepublic void processElement(String value, Context ctx, Collector out) {//获取水位线long watermark = ctx.timerService().currentWatermark();//判断是否迟到if (ctx.timestamp() > watermark) {//冇迟到out.collect(value);} else {//迟到:向outputTag发送数据ctx.output(outputTag, value);}}});//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}
开窗后.sideOutputLateData(outputTag)
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;public class Hi {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//定义测输出流的输出标签OutputTag outputTag = new OutputTag("迟到标签") {};//创建流,添加自定义数据源SingleOutputStreamOperator d = env.addSource(new SourceFunction() {@Overridepublic void run(SourceContext ctx) {ctx.collectWithTimestamp("a", 4000L);ctx.collectWithTimestamp("b", 5000L);ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭ctx.collectWithTimestamp("c", 5000L);ctx.collectWithTimestamp("d", 5000L);ctx.collectWithTimestamp("e", 6000L);ctx.collectWithTimestamp("f", 7000L);}@Overridepublic void cancel() {}});//处理SingleOutputStreamOperator s = d//事件时间滚动窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))//侧输出.sideOutputLateData(outputTag)//拼接字符串.reduce((a, b) -> a + "," + b);//输出s.print("主流输出");s.getSideOutput(outputTag).print("侧输出");//环境执行env.execute();}
}